1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-23 23:59:58 +08:00

Use override where applicable in fdbrpc

This commit is contained in:
sfc-gh-tclinkenbeard 2020-10-07 19:55:05 -07:00
parent 8571dcfe28
commit 71d0ef676c
12 changed files with 193 additions and 208 deletions

@ -112,8 +112,8 @@ public:
return statdata.st_mtime;
}
virtual void addref() { ReferenceCounted<AsyncFileEIO>::addref(); }
virtual void delref() { ReferenceCounted<AsyncFileEIO>::delref(); }
void addref() override { ReferenceCounted<AsyncFileEIO>::addref(); }
void delref() override { ReferenceCounted<AsyncFileEIO>::delref(); }
int64_t debugFD() const override { return fd; }

@ -80,12 +80,8 @@ public:
}
}
virtual void addref() {
ReferenceCounted<AsyncFileDetachable>::addref();
}
virtual void delref() {
ReferenceCounted<AsyncFileDetachable>::delref();
}
void addref() override { ReferenceCounted<AsyncFileDetachable>::addref(); }
void delref() override { ReferenceCounted<AsyncFileDetachable>::delref(); }
Future<int> read(void* data, int length, int64_t offset) override {
if( !file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady() )
@ -249,10 +245,8 @@ public:
//TraceEvent("AsyncFileNonDurable_Destroy", id).detail("Filename", filename);
}
virtual void addref() {
ReferenceCounted<AsyncFileNonDurable>::addref();
}
virtual void delref() {
void addref() override { ReferenceCounted<AsyncFileNonDurable>::addref(); }
void delref() override {
if(delref_no_destroy()) {
ASSERT(filesBeingDeleted.count(filename) == 0);
//TraceEvent("AsyncFileNonDurable_StartDelete", id).detail("Filename", filename);

@ -32,10 +32,10 @@
#include "flow/actorcompiler.h" // This must be the last #include.
// Read-only file type that wraps another file instance, reads in large blocks, and reads ahead of the actual range requested
class AsyncFileReadAheadCache : public IAsyncFile, public ReferenceCounted<AsyncFileReadAheadCache> {
class AsyncFileReadAheadCache final : public IAsyncFile, public ReferenceCounted<AsyncFileReadAheadCache> {
public:
virtual void addref() { ReferenceCounted<AsyncFileReadAheadCache>::addref(); }
virtual void delref() { ReferenceCounted<AsyncFileReadAheadCache>::delref(); }
void addref() override { ReferenceCounted<AsyncFileReadAheadCache>::addref(); }
void delref() override { ReferenceCounted<AsyncFileReadAheadCache>::delref(); }
struct CacheBlock : ReferenceCounted<CacheBlock> {
CacheBlock(int size = 0) : data(new uint8_t[size]), len(size) {}
@ -177,7 +177,7 @@ public:
std::string getFilename() const override { return m_f->getFilename(); }
virtual ~AsyncFileReadAheadCache() {
~AsyncFileReadAheadCache() {
for(auto &it : m_blocks) {
it.second.cancel();
}
@ -196,7 +196,6 @@ public:
: m_f(f), m_block_size(blockSize), m_read_ahead_blocks(readAheadBlocks), m_max_concurrent_reads(maxConcurrentReads),
m_cache_block_limit(std::max<int>(1, cacheSizeBlocks)) {
}
};
#include "flow/unactorcompiler.h"

@ -75,13 +75,25 @@ TEST_CASE("/flow/buggifiedDelay") {
}
template <class T, class Func, class ErrFunc, class CallbackType>
class LambdaCallback : public CallbackType, public FastAllocated<LambdaCallback<T,Func,ErrFunc,CallbackType>> {
class LambdaCallback final : public CallbackType, public FastAllocated<LambdaCallback<T, Func, ErrFunc, CallbackType>> {
Func func;
ErrFunc errFunc;
virtual void fire(T const& t) { CallbackType::remove(); func(t); delete this; }
virtual void fire(T && t) { CallbackType::remove(); func(std::move(t)); delete this; }
virtual void error(Error e) { CallbackType::remove(); errFunc(e); delete this; }
void fire(T const& t) override {
CallbackType::remove();
func(t);
delete this;
}
void fire(T&& t) override {
CallbackType::remove();
func(std::move(t));
delete this;
}
void error(Error e) override {
CallbackType::remove();
errFunc(e);
delete this;
}
public:
LambdaCallback(Func&& f, ErrFunc&& e) : func(std::move(f)), errFunc(std::move(e)) {}
@ -192,14 +204,14 @@ ACTOR static Future<Void> testHygeine() {
//bool expectActorCount(int x) { return actorCount == x; }
bool expectActorCount(int) { return true; }
struct YieldMockNetwork : INetwork, ReferenceCounted<YieldMockNetwork> {
struct YieldMockNetwork final : INetwork, ReferenceCounted<YieldMockNetwork> {
int ticks;
Promise<Void> nextTick;
int nextYield;
INetwork* baseNetwork;
virtual flowGlobalType global(int id) const override { return baseNetwork->global(id); }
virtual void setGlobal(size_t id, flowGlobalType v) override {
flowGlobalType global(int id) const override { return baseNetwork->global(id); }
void setGlobal(size_t id, flowGlobalType v) override {
baseNetwork->setGlobal(id, v);
return;
}
@ -219,35 +231,35 @@ struct YieldMockNetwork : INetwork, ReferenceCounted<YieldMockNetwork> {
t.send(Void());
}
virtual Future<class Void> delay(double seconds, TaskPriority taskID) override { return nextTick.getFuture(); }
Future<class Void> delay(double seconds, TaskPriority taskID) override { return nextTick.getFuture(); }
virtual Future<class Void> yield(TaskPriority taskID) override {
Future<class Void> yield(TaskPriority taskID) override {
if (check_yield(taskID))
return delay(0,taskID);
return Void();
}
virtual bool check_yield(TaskPriority taskID) override {
bool check_yield(TaskPriority taskID) override {
if (nextYield > 0) --nextYield;
return nextYield == 0;
}
// Delegate everything else. TODO: Make a base class NetworkWrapper for delegating everything in INetwork
virtual TaskPriority getCurrentTask() const override { return baseNetwork->getCurrentTask(); }
virtual void setCurrentTask(TaskPriority taskID) override { baseNetwork->setCurrentTask(taskID); }
virtual double now() const override { return baseNetwork->now(); }
virtual double timer() override { return baseNetwork->timer(); }
virtual void stop() override { return baseNetwork->stop(); }
virtual void addStopCallback(std::function<void()> fn) override {
TaskPriority getCurrentTask() const override { return baseNetwork->getCurrentTask(); }
void setCurrentTask(TaskPriority taskID) override { baseNetwork->setCurrentTask(taskID); }
double now() const override { return baseNetwork->now(); }
double timer() override { return baseNetwork->timer(); }
void stop() override { return baseNetwork->stop(); }
void addStopCallback(std::function<void()> fn) override {
ASSERT(false);
return;
}
virtual bool isSimulated() const override { return baseNetwork->isSimulated(); }
virtual void onMainThread(Promise<Void>&& signal, TaskPriority taskID) override {
bool isSimulated() const override { return baseNetwork->isSimulated(); }
void onMainThread(Promise<Void>&& signal, TaskPriority taskID) override {
return baseNetwork->onMainThread(std::move(signal), taskID);
}
bool isOnMainThread() const override { return baseNetwork->isOnMainThread(); }
virtual THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg) override {
THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg) override {
return baseNetwork->startThread(func, arg);
}
Future<Reference<class IAsyncFile>> open(std::string filename, int64_t flags, int64_t mode) {
@ -256,15 +268,15 @@ struct YieldMockNetwork : INetwork, ReferenceCounted<YieldMockNetwork> {
Future<Void> deleteFile(std::string filename, bool mustBeDurable) {
return IAsyncFileSystem::filesystem()->deleteFile(filename, mustBeDurable);
}
virtual void run() override { return baseNetwork->run(); }
virtual bool checkRunnable() override { return baseNetwork->checkRunnable(); }
virtual void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) override {
void run() override { return baseNetwork->run(); }
bool checkRunnable() override { return baseNetwork->checkRunnable(); }
void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) override {
return baseNetwork->getDiskBytes(directory, free, total);
}
virtual bool isAddressOnThisHost(NetworkAddress const& addr) const override {
bool isAddressOnThisHost(NetworkAddress const& addr) const override {
return baseNetwork->isAddressOnThisHost(addr);
}
virtual const TLSConfig& getTLSConfig() const override {
const TLSConfig& getTLSConfig() const override {
static TLSConfig emptyConfig;
return emptyConfig;
}

@ -156,14 +156,14 @@ void EndpointMap::remove( Endpoint::Token const& token, NetworkMessageReceiver*
}
}
struct EndpointNotFoundReceiver : NetworkMessageReceiver {
struct EndpointNotFoundReceiver final : NetworkMessageReceiver {
EndpointNotFoundReceiver(EndpointMap& endpoints) {
//endpoints[WLTOKEN_ENDPOINT_NOT_FOUND] = this;
Endpoint::Token e = WLTOKEN_ENDPOINT_NOT_FOUND;
endpoints.insert(this, e, TaskPriority::DefaultEndpoint);
ASSERT( e == WLTOKEN_ENDPOINT_NOT_FOUND );
}
virtual void receive(ArenaObjectReader& reader) override {
void receive(ArenaObjectReader& reader) override {
// Remote machine tells us it doesn't have endpoint e
Endpoint e;
reader.deserialize(e);
@ -171,13 +171,13 @@ struct EndpointNotFoundReceiver : NetworkMessageReceiver {
}
};
struct PingReceiver : NetworkMessageReceiver {
struct PingReceiver final : NetworkMessageReceiver {
PingReceiver(EndpointMap& endpoints) {
Endpoint::Token e = WLTOKEN_PING_PACKET;
endpoints.insert(this, e, TaskPriority::ReadSocket);
ASSERT( e == WLTOKEN_PING_PACKET );
}
virtual void receive(ArenaObjectReader& reader) override {
void receive(ArenaObjectReader& reader) override {
ReplyPromise<Void> reply;
reader.deserialize(reply);
reply.send(Void());

@ -35,18 +35,18 @@ public:
};
// An IRateControl implemenation that allows at most hands out at most windowLimit units of 'credit' in windowSeconds seconds
class SpeedLimit : public IRateControl, ReferenceCounted<SpeedLimit> {
class SpeedLimit final : public IRateControl, ReferenceCounted<SpeedLimit> {
public:
SpeedLimit(int windowLimit, int windowSeconds) : m_limit(windowLimit), m_seconds(windowSeconds), m_last_update(0), m_budget(0) {
m_budget_max = m_limit * m_seconds;
m_last_update = timer();
}
virtual ~SpeedLimit() {}
~SpeedLimit() = default;
virtual void addref() { ReferenceCounted<SpeedLimit>::addref(); }
virtual void delref() { ReferenceCounted<SpeedLimit>::delref(); }
void addref() override { ReferenceCounted<SpeedLimit>::addref(); }
void delref() override { ReferenceCounted<SpeedLimit>::delref(); }
virtual Future<Void> getAllowance(unsigned int n) {
Future<Void> getAllowance(unsigned int n) override {
// Replenish budget based on time since last update
double ts = timer();
// returnUnused happens to do exactly what we want here
@ -60,7 +60,7 @@ public:
return delay(m_seconds * -m_budget / m_limit);
}
virtual void returnUnused(int n) {
void returnUnused(int n) override {
if(n < 0)
return;
m_budget = std::min<int64_t>(m_budget + n, m_budget_max);
@ -75,13 +75,13 @@ private:
};
// An IRateControl implemenation that enforces no limit
class Unlimited : public IRateControl, ReferenceCounted<Unlimited> {
class Unlimited final : public IRateControl, ReferenceCounted<Unlimited> {
public:
Unlimited() {}
virtual ~Unlimited() {}
virtual void addref() { ReferenceCounted<Unlimited>::addref(); }
virtual void delref() { ReferenceCounted<Unlimited>::delref(); }
~Unlimited() = default;
void addref() override { ReferenceCounted<Unlimited>::addref(); }
void delref() override { ReferenceCounted<Unlimited>::delref(); }
virtual Future<Void> getAllowance(unsigned int n) { return Void(); }
virtual void returnUnused(int n) {}
Future<Void> getAllowance(unsigned int n) override { return Void(); }
void returnUnused(int n) override {}
};

@ -118,10 +118,7 @@ PolicyAcross::PolicyAcross(int count, std::string const& attribKey, Reference<IR
PolicyAcross::PolicyAcross() : _policy(new PolicyOne()) {}
PolicyAcross::~PolicyAcross()
{
return;
}
PolicyAcross::~PolicyAcross() {}
// Debug purpose only
// Trace all record entries to help debug

@ -92,41 +92,41 @@ inline void save(Archive& ar, const Reference<IReplicationPolicy>& value) {
}
}
struct PolicyOne : IReplicationPolicy, public ReferenceCounted<PolicyOne> {
struct PolicyOne final : IReplicationPolicy, public ReferenceCounted<PolicyOne> {
PolicyOne(){};
explicit PolicyOne(const PolicyOne& o) {}
virtual ~PolicyOne(){};
virtual std::string name() const { return "One"; }
virtual std::string info() const { return "1"; }
virtual int maxResults() const { return 1; }
virtual int depth() const { return 1; }
virtual bool validate(std::vector<LocalityEntry> const& solutionSet,
Reference<LocalitySet> const& fromServers) const;
virtual bool selectReplicas(Reference<LocalitySet>& fromServers, std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry>& results);
std::string name() const override { return "One"; }
std::string info() const override { return "1"; }
int maxResults() const override { return 1; }
int depth() const override { return 1; }
bool validate(std::vector<LocalityEntry> const& solutionSet,
Reference<LocalitySet> const& fromServers) const override;
bool selectReplicas(Reference<LocalitySet>& fromServers, std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry>& results) override;
template <class Ar>
void serialize(Ar& ar) {
static_assert(!is_fb_function<Ar>);
}
virtual void deserializationDone() {}
virtual void attributeKeys(std::set<std::string>* set) const override { return; }
void deserializationDone() override {}
void attributeKeys(std::set<std::string>* set) const override { return; }
};
struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross> {
struct PolicyAcross final : IReplicationPolicy, public ReferenceCounted<PolicyAcross> {
friend struct serializable_traits<PolicyAcross*>;
PolicyAcross(int count, std::string const& attribKey, Reference<IReplicationPolicy> const policy);
explicit PolicyAcross();
explicit PolicyAcross(const PolicyAcross& other) : PolicyAcross(other._count, other._attribKey, other._policy) {}
virtual ~PolicyAcross();
virtual std::string name() const { return "Across"; }
std::string embeddedPolicyName() const { return _policy->name(); }
~PolicyAcross();
std::string name() const override { return "Across"; }
std::string embeddedPolicyName() const override { return _policy->name(); }
int getCount() const { return _count; }
virtual std::string info() const { return format("%s^%d x ", _attribKey.c_str(), _count) + _policy->info(); }
virtual int maxResults() const { return _count * _policy->maxResults(); }
virtual int depth() const { return 1 + _policy->depth(); }
virtual bool validate(std::vector<LocalityEntry> const& solutionSet, Reference<LocalitySet> const& fromServers) const;
virtual bool selectReplicas(Reference<LocalitySet>& fromServers, std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry>& results);
std::string info() const override { return format("%s^%d x ", _attribKey.c_str(), _count) + _policy->info(); }
int maxResults() const override { return _count * _policy->maxResults(); }
int depth() const override { return 1 + _policy->depth(); }
bool validate(std::vector<LocalityEntry> const& solutionSet,
Reference<LocalitySet> const& fromServers) const override;
bool selectReplicas(Reference<LocalitySet>& fromServers, std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry>& results) override;
template <class Ar>
void serialize(Ar& ar) {
@ -135,13 +135,13 @@ struct PolicyAcross : IReplicationPolicy, public ReferenceCounted<PolicyAcross>
serializeReplicationPolicy(ar, _policy);
}
virtual void deserializationDone() {}
void deserializationDone() override {}
static bool compareAddedResults(const std::pair<int, int>& rhs, const std::pair<int, int>& lhs) {
return (rhs.first < lhs.first) || (!(lhs.first < rhs.first) && (rhs.second < lhs.second));
}
virtual void attributeKeys(std::set<std::string>* set) const override {
void attributeKeys(std::set<std::string>* set) const override {
set->insert(_attribKey);
_policy->attributeKeys(set);
}
@ -159,7 +159,7 @@ protected:
Arena _arena;
};
struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
struct PolicyAnd final : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
friend struct serializable_traits<PolicyAnd*>;
PolicyAnd(std::vector<Reference<IReplicationPolicy>> policies) : _policies(policies), _sortedPolicies(policies) {
// Sort the policy array
@ -167,9 +167,8 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
}
explicit PolicyAnd(const PolicyAnd& other) : _policies(other._policies), _sortedPolicies(other._sortedPolicies) {}
explicit PolicyAnd() {}
virtual ~PolicyAnd() {}
virtual std::string name() const { return "And"; }
virtual std::string info() const {
std::string name() const override { return "And"; }
std::string info() const override {
std::string infoText;
for (auto& policy : _policies) {
infoText += ((infoText.length()) ? " & (" : "(") + policy->info() + ")";
@ -177,14 +176,14 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
if (_policies.size()) infoText = "(" + infoText + ")";
return infoText;
}
virtual int maxResults() const {
int maxResults() const override {
int resultsMax = 0;
for (auto& policy : _policies) {
resultsMax += policy->maxResults();
}
return resultsMax;
}
virtual int depth() const {
int depth() const override {
int policyDepth, depthMax = 0;
for (auto& policy : _policies) {
policyDepth = policy->depth();
@ -194,11 +193,11 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
}
return depthMax;
}
virtual bool validate(std::vector<LocalityEntry> const& solutionSet,
Reference<LocalitySet> const& fromServers) const;
bool validate(std::vector<LocalityEntry> const& solutionSet,
Reference<LocalitySet> const& fromServers) const override;
virtual bool selectReplicas(Reference<LocalitySet>& fromServers, std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry>& results);
bool selectReplicas(Reference<LocalitySet>& fromServers, std::vector<LocalityEntry> const& alsoServers,
std::vector<LocalityEntry>& results) override;
static bool comparePolicy(const Reference<IReplicationPolicy>& rhs, const Reference<IReplicationPolicy>& lhs) {
return (lhs->maxResults() < rhs->maxResults()) ||
@ -220,12 +219,12 @@ struct PolicyAnd : IReplicationPolicy, public ReferenceCounted<PolicyAnd> {
}
}
virtual void deserializationDone() {
void deserializationDone() override {
_sortedPolicies = _policies;
std::sort(_sortedPolicies.begin(), _sortedPolicies.end(), PolicyAnd::comparePolicy);
}
virtual void attributeKeys(std::set<std::string>* set) const override {
void attributeKeys(std::set<std::string>* set) const override {
for (const Reference<IReplicationPolicy>& r : _policies) {
r->attributeKeys(set);
}

@ -102,8 +102,8 @@ struct KeyValueMap : public ReferenceCounted<KeyValueMap> {
return ((lower != _keyvaluearray.end()) && (lower->first == indexKey) && (lower->second == indexValue));
}
virtual void addref() { ReferenceCounted<KeyValueMap>::addref(); }
virtual void delref() { ReferenceCounted<KeyValueMap>::delref(); }
void addref() override { ReferenceCounted<KeyValueMap>::addref(); }
void delref() override { ReferenceCounted<KeyValueMap>::delref(); }
static bool compareKeyValue(const AttribRecord& lhs, const AttribRecord& rhs)
{ return (lhs.first < rhs.first) || (!(rhs.first < lhs.first) && (lhs.second < rhs.second)); }
@ -114,20 +114,19 @@ struct KeyValueMap : public ReferenceCounted<KeyValueMap> {
// This class stores the information for each entry within the locality map
struct LocalityRecord : public ReferenceCounted<LocalityRecord> {
struct LocalityRecord final : public ReferenceCounted<LocalityRecord> {
Reference<KeyValueMap> _dataMap;
LocalityEntry _entryIndex;
LocalityRecord(Reference<KeyValueMap> const& dataMap, int arrayIndex): _dataMap(dataMap), _entryIndex(arrayIndex) {}
LocalityRecord(LocalityRecord const& entry) : _dataMap(entry._dataMap), _entryIndex(entry._entryIndex) {}
virtual ~LocalityRecord(){}
LocalityRecord& operator=(LocalityRecord const& source) {
_dataMap = source._dataMap;
_entryIndex = source._entryIndex;
return *this;
}
virtual void addref() { ReferenceCounted<LocalityRecord>::addref(); }
virtual void delref() { ReferenceCounted<LocalityRecord>::delref(); }
void addref() override { ReferenceCounted<LocalityRecord>::addref(); }
void delref() override { ReferenceCounted<LocalityRecord>::delref(); }
Optional<AttribValue> getValue(AttribKey indexKey) const {
return _dataMap->getValue(indexKey);
@ -155,12 +154,11 @@ struct LocalityRecord : public ReferenceCounted<LocalityRecord> {
};
// This class stores the information for string to integer map for keys and values
struct StringToIntMap : public ReferenceCounted<StringToIntMap> {
struct StringToIntMap final : public ReferenceCounted<StringToIntMap> {
std::map<std::string, int> _hashmap;
std::vector<std::string> _lookuparray;
StringToIntMap() {}
StringToIntMap(StringToIntMap const& source):_hashmap(source._hashmap), _lookuparray(source._lookuparray){}
virtual ~StringToIntMap(){}
StringToIntMap& operator=(StringToIntMap const& source) {
_hashmap = source._hashmap;
_lookuparray = source._lookuparray;
@ -206,8 +204,8 @@ struct StringToIntMap : public ReferenceCounted<StringToIntMap> {
}
return memSize;
}
virtual void addref() { ReferenceCounted<StringToIntMap>::addref(); }
virtual void delref() { ReferenceCounted<StringToIntMap>::delref(); }
void addref() override { ReferenceCounted<StringToIntMap>::addref(); }
void delref() override { ReferenceCounted<StringToIntMap>::delref(); }
};
extern const std::vector<LocalityEntry> emptyEntryArray;

@ -126,19 +126,19 @@ struct Traceable<Counter> : std::true_type {
};
template <class F>
struct SpecialCounter : ICounter, FastAllocated<SpecialCounter<F>>, NonCopyable {
struct SpecialCounter final : ICounter, FastAllocated<SpecialCounter<F>>, NonCopyable {
SpecialCounter(CounterCollection& collection, std::string const& name, F && f) : name(name), f(f) { collection.counters.push_back(this); collection.counters_to_remove.push_back(this); }
virtual void remove() { delete this; }
void remove() override { delete this; }
virtual std::string const& getName() const { return name; }
virtual int64_t getValue() const { return f(); }
std::string const& getName() const override { return name; }
int64_t getValue() const override { return f(); }
virtual void resetInterval() {}
void resetInterval() override {}
virtual bool hasRate() const { return false; }
virtual double getRate() const { throw internal_error(); }
virtual bool hasRoughness() const { return false; }
virtual double getRoughness() const { throw internal_error(); }
bool hasRate() const override { return false; }
double getRate() const override { throw internal_error(); }
bool hasRoughness() const override { return false; }
double getRoughness() const override { throw internal_error(); }
std::string name;
F f;

@ -80,7 +80,7 @@ private:
};
template <class T>
struct NetSAV : SAV<T>, FlowReceiver, FastAllocated<NetSAV<T>> {
struct NetSAV final : SAV<T>, FlowReceiver, FastAllocated<NetSAV<T>> {
using FastAllocated<NetSAV<T>>::operator new;
using FastAllocated<NetSAV<T>>::operator delete;
@ -89,8 +89,8 @@ struct NetSAV : SAV<T>, FlowReceiver, FastAllocated<NetSAV<T>> {
: SAV<T>(futures, promises), FlowReceiver(remoteEndpoint, false) {
}
virtual void destroy() { delete this; }
virtual void receive(ArenaObjectReader& reader) {
void destroy() override { delete this; }
void receive(ArenaObjectReader& reader) override {
if (!SAV<T>::canBeSet()) return;
this->addPromiseRef();
ErrorOr<EnsureTable<T>> message;
@ -212,12 +212,8 @@ void setReplyPriority(ReplyPromise<Reply> & p, TaskPriority taskID) { p.getEndpo
template <class Reply>
void setReplyPriority(const ReplyPromise<Reply> & p, TaskPriority taskID) { p.getEndpoint(taskID); }
template <class T>
struct NetNotifiedQueue : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotifiedQueue<T>> {
struct NetNotifiedQueue final : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotifiedQueue<T>> {
using FastAllocated<NetNotifiedQueue<T>>::operator new;
using FastAllocated<NetNotifiedQueue<T>>::operator delete;
@ -225,18 +221,17 @@ struct NetNotifiedQueue : NotifiedQueue<T>, FlowReceiver, FastAllocated<NetNotif
NetNotifiedQueue(int futures, int promises, const Endpoint& remoteEndpoint)
: NotifiedQueue<T>(futures, promises), FlowReceiver(remoteEndpoint, true) {}
virtual void destroy() { delete this; }
virtual void receive(ArenaObjectReader& reader) {
void destroy() override { delete this; }
void receive(ArenaObjectReader& reader) override {
this->addPromiseRef();
T message;
reader.deserialize(message);
this->send(std::move(message));
this->delPromiseRef();
}
virtual bool isStream() const { return true; }
bool isStream() const override { return true; }
};
template <class T>
class RequestStream {
public:

@ -158,7 +158,7 @@ private:
SimClogging g_clogging;
struct Sim2Conn : IConnection, ReferenceCounted<Sim2Conn> {
struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
Sim2Conn( ISimulator::ProcessInfo* process )
: process(process), dbgid( deterministicRandom()->randomUniqueID() ), opened(false), closedByCaller(false), stopReceive(Never())
{
@ -182,20 +182,21 @@ struct Sim2Conn : IConnection, ReferenceCounted<Sim2Conn> {
ASSERT_ABORT( !opened || closedByCaller );
}
virtual void addref() { ReferenceCounted<Sim2Conn>::addref(); }
virtual void delref() { ReferenceCounted<Sim2Conn>::delref(); }
virtual void close() { closedByCaller = true; closeInternal(); }
virtual Future<Void> acceptHandshake() { return delay(0.01*deterministicRandom()->random01()); }
virtual Future<Void> connectHandshake() { return delay(0.01*deterministicRandom()->random01()); }
virtual Future<Void> onWritable() { return whenWritable(this); }
virtual Future<Void> onReadable() { return whenReadable(this); }
bool isPeerGone() {
return !peer || peerProcess->failed;
void addref() override { ReferenceCounted<Sim2Conn>::addref(); }
void delref() override { ReferenceCounted<Sim2Conn>::delref(); }
void close() override {
closedByCaller = true;
closeInternal();
}
Future<Void> acceptHandshake() override { return delay(0.01 * deterministicRandom()->random01()); }
Future<Void> connectHandshake() override { return delay(0.01 * deterministicRandom()->random01()); }
Future<Void> onWritable() override { return whenWritable(this); }
Future<Void> onReadable() override { return whenReadable(this); }
bool isPeerGone() const { return !peer || peerProcess->failed; }
void peerClosed() {
leakedConnectionTracker = trackLeakedConnection(this);
stopReceive = delay(1.0);
@ -203,7 +204,7 @@ struct Sim2Conn : IConnection, ReferenceCounted<Sim2Conn> {
// Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0)
// (or may throw an error if the connection dies)
virtual int read( uint8_t* begin, uint8_t* end ) {
int read(uint8_t* begin, uint8_t* end) override {
rollRandomClose();
int64_t avail = receivedBytes.get() - readBytes.get(); // SOMEDAY: random?
@ -218,7 +219,7 @@ struct Sim2Conn : IConnection, ReferenceCounted<Sim2Conn> {
// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of bytes written (might be 0)
// (or may throw an error if the connection dies)
virtual int write( SendBuffer const* buffer, int limit) {
int write(SendBuffer const* buffer, int limit) override {
rollRandomClose();
ASSERT(limit > 0);
@ -255,8 +256,8 @@ struct Sim2Conn : IConnection, ReferenceCounted<Sim2Conn> {
// Returns the network address and port of the other end of the connection. In the case of an incoming connection, this may not
// be an address we can connect to!
virtual NetworkAddress getPeerAddress() const override { return peerEndpoint; }
virtual UID getDebugID() const override { return dbgid; }
NetworkAddress getPeerAddress() const override { return peerEndpoint; }
UID getDebugID() const override { return dbgid; }
bool opened, closedByCaller;
@ -475,8 +476,8 @@ public:
}
}
virtual void addref() { ReferenceCounted<SimpleFile>::addref(); }
virtual void delref() { ReferenceCounted<SimpleFile>::delref(); }
void addref() override { ReferenceCounted<SimpleFile>::addref(); }
void delref() override { ReferenceCounted<SimpleFile>::delref(); }
int64_t debugFD() const override { return (int64_t)h; }
@ -695,14 +696,12 @@ struct Sim2Listener : IListener, ReferenceCounted<Sim2Listener> {
incoming( Reference<Sim2Listener>::addRef( this ), seconds, conn );
}
virtual void addref() { ReferenceCounted<Sim2Listener>::addref(); }
virtual void delref() { ReferenceCounted<Sim2Listener>::delref(); }
void addref() override { ReferenceCounted<Sim2Listener>::addref(); }
void delref() override { ReferenceCounted<Sim2Listener>::delref(); }
virtual Future<Reference<IConnection>> accept() {
return popOne( nextConnection.getFuture() );
}
Future<Reference<IConnection>> accept() override { return popOne(nextConnection.getFuture()); }
virtual NetworkAddress getListenAddress() const override { return address; }
NetworkAddress getListenAddress() const override { return address; }
private:
ISimulator::ProcessInfo* process;
@ -733,15 +732,15 @@ class Sim2 : public ISimulator, public INetworkConnections {
public:
// Implement INetwork interface
// Everything actually network related is delegated to the Sim2Net class; Sim2 is only concerned with simulating machines and time
virtual double now() const override { return time; }
double now() const override { return time; }
// timer() can be up to 0.1 seconds ahead of now()
virtual double timer() {
double timer() override {
timerTime += deterministicRandom()->random01()*(time+0.1-timerTime)/2.0;
return timerTime;
return timerTime;
}
virtual Future<class Void> delay( double seconds, TaskPriority taskID ) {
Future<class Void> delay(double seconds, TaskPriority taskID) override {
ASSERT(taskID >= TaskPriority::Min && taskID <= TaskPriority::Max);
return delay( seconds, taskID, currentProcess );
}
@ -767,7 +766,7 @@ public:
self->setCurrentTask(taskID);
return Void();
}
virtual Future<class Void> yield( TaskPriority taskID ) {
Future<class Void> yield(TaskPriority taskID) override {
if (taskID == TaskPriority::DefaultYield) taskID = currentTaskID;
if (check_yield(taskID)) {
// We want to check that yielders can handle actual time elapsing (it sometimes will outside simulation), but
@ -777,7 +776,7 @@ public:
setCurrentTask(taskID);
return Void();
}
virtual bool check_yield( TaskPriority taskID ) {
bool check_yield(TaskPriority taskID) override {
if (yielded) return true;
if (--yield_limit <= 0) {
yield_limit = deterministicRandom()->randomInt(1, 150); // If yield returns false *too* many times in a row, there could be a stack overflow, since we can't deterministically check stack size as the real network does
@ -785,12 +784,10 @@ public:
}
return yielded = BUGGIFY_WITH_PROB(0.01);
}
virtual TaskPriority getCurrentTask() const override { return currentTaskID; }
virtual void setCurrentTask(TaskPriority taskID ) {
currentTaskID = taskID;
}
TaskPriority getCurrentTask() const override { return currentTaskID; }
void setCurrentTask(TaskPriority taskID) override { currentTaskID = taskID; }
// Sets the taskID/priority of the current task, without yielding
virtual Future<Reference<IConnection>> connect( NetworkAddress toAddr, std::string host ) {
Future<Reference<IConnection>> connect(NetworkAddress toAddr, std::string host) override {
ASSERT( host.empty());
if (!addressMap.count( toAddr )) {
return waitForProcessAndConnect( toAddr, this );
@ -814,7 +811,7 @@ public:
((Sim2Listener*)peerp->getListener(toAddr).getPtr())->incomingConnection( 0.5*deterministicRandom()->random01(), Reference<IConnection>(peerc) );
return onConnect( ::delay(0.5*deterministicRandom()->random01()), myc );
}
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service) {
Future<std::vector<NetworkAddress>> resolveTCPEndpoint(std::string host, std::string service) override {
throw lookup_failed();
}
ACTOR static Future<Reference<IConnection>> onConnect( Future<Void> ready, Reference<Sim2Conn> conn ) {
@ -829,7 +826,7 @@ public:
conn->opened = true;
return conn;
}
virtual Reference<IListener> listen( NetworkAddress localAddr ) {
Reference<IListener> listen(NetworkAddress localAddr) override {
Reference<IListener> listener( getCurrentProcess()->getListener(localAddr) );
ASSERT(listener);
return listener;
@ -845,22 +842,16 @@ public:
}
}
}
virtual const TLSConfig& getTLSConfig() const override {
const TLSConfig& getTLSConfig() const override {
static TLSConfig emptyConfig;
return emptyConfig;
}
virtual bool checkRunnable() {
return net2->checkRunnable();
}
bool checkRunnable() override { return net2->checkRunnable(); }
virtual void stop() {
isStopped = true;
}
virtual void addStopCallback( std::function<void()> fn ) {
stopCallbacks.emplace_back(std::move(fn));
}
virtual bool isSimulated() const { return true; }
void stop() override { isStopped = true; }
void addStopCallback(std::function<void()> fn) override { stopCallbacks.emplace_back(std::move(fn)); }
bool isSimulated() const override { return true; }
struct SimThreadArgs {
THREAD_FUNC_RETURN (*func) (void*);
@ -884,12 +875,12 @@ public:
THREAD_RETURN;
}
virtual THREAD_HANDLE startThread( THREAD_FUNC_RETURN (*func) (void*), void *arg ) {
THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg) override {
SimThreadArgs *simArgs = new SimThreadArgs(func, arg);
return ::startThread(simStartThread, simArgs);
}
virtual void getDiskBytes( std::string const& directory, int64_t& free, int64_t& total) {
void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) override {
ProcessInfo *proc = getCurrentProcess();
SimDiskSpace &diskSpace = diskSpaceMap[proc->address.ip];
@ -924,7 +915,7 @@ public:
if(free == 0)
TraceEvent(SevWarnAlways, "Sim2NoFreeSpace").detail("TotalSpace", diskSpace.totalSpace).detail("BaseFreeSpace", diskSpace.baseFreeSpace).detail("TotalFileSize", totalFileSize).detail("NumFiles", numFiles);
}
virtual bool isAddressOnThisHost(NetworkAddress const& addr) const override {
bool isAddressOnThisHost(NetworkAddress const& addr) const override {
return addr.ip == getCurrentProcess()->address.ip;
}
@ -990,13 +981,13 @@ public:
}
// Implement ISimulator interface
virtual void run() {
void run() override {
Future<Void> loopFuture = runLoop(this);
net2->run();
}
virtual ProcessInfo* newProcess(const char* name, IPAddress ip, uint16_t port, bool sslEnabled, uint16_t listenPerProcess,
LocalityData locality, ProcessClass startingClass, const char* dataFolder,
const char* coordinationFolder) {
ProcessInfo* newProcess(const char* name, IPAddress ip, uint16_t port, bool sslEnabled, uint16_t listenPerProcess,
LocalityData locality, ProcessClass startingClass, const char* dataFolder,
const char* coordinationFolder) override {
ASSERT( locality.machineId().present() );
MachineInfo& machine = machines[ locality.machineId().get() ];
if (!machine.machineId.present())
@ -1050,8 +1041,7 @@ public:
return m;
}
virtual bool isAvailable() const
{
bool isAvailable() const override {
std::vector<ProcessInfo*> processesLeft, processesDead;
for (auto processInfo : getAllProcesses()) {
if (processInfo->isAvailableClass()) {
@ -1065,8 +1055,7 @@ public:
return canKillProcesses(processesLeft, processesDead, KillInstantly, nullptr);
}
virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const
{
bool datacenterDead(Optional<Standalone<StringRef>> dcId) const override {
if(!dcId.present()) {
return false;
}
@ -1096,8 +1085,9 @@ public:
}
// The following function will determine if the specified configuration of available and dead processes can allow the cluster to survive
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const
{
bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses,
std::vector<ProcessInfo*> const& deadProcesses, KillType kt,
KillType* newKillType) const override {
bool canSurvive = true;
int nQuorum = ((desiredCoordinators+1)/2)*2-1;
@ -1247,7 +1237,7 @@ public:
return canSurvive;
}
virtual void destroyProcess( ISimulator::ProcessInfo *p ) {
void destroyProcess(ISimulator::ProcessInfo* p) override {
TraceEvent("ProcessDestroyed").detail("Name", p->name).detail("Address", p->address).detail("MachineId", p->locality.machineId());
currentlyRebootingProcesses.insert(std::pair<NetworkAddress, ProcessInfo*>(p->address, p));
std::vector<ProcessInfo*>& processes = machines[ p->locality.machineId().get() ].processes;
@ -1293,14 +1283,14 @@ public:
}
ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting);
}
virtual void rebootProcess( ProcessInfo* process, KillType kt ) {
void rebootProcess(ProcessInfo* process, KillType kt) override {
if( kt == RebootProcessAndDelete && protectedAddresses.count(process->address) ) {
TraceEvent("RebootChanged").detail("ZoneId", process->locality.describeZone()).detail("KillType", RebootProcess).detail("OrigKillType", kt).detail("Reason", "Protected process");
kt = RebootProcess;
}
doReboot( process, kt );
}
virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses ) {
void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses) override {
if( allProcesses ) {
auto processes = getAllProcesses();
for( int i = 0; i < processes.size(); i++ )
@ -1317,20 +1307,20 @@ public:
doReboot( deterministicRandom()->randomChoice( processes ), RebootProcess );
}
}
virtual void killProcess( ProcessInfo* machine, KillType kt ) {
void killProcess(ProcessInfo* machine, KillType kt) override {
TraceEvent("AttemptingKillProcess").detail("ProcessInfo", machine->toString());
if (kt < RebootAndDelete ) {
killProcess_internal( machine, kt );
}
}
virtual void killInterface( NetworkAddress address, KillType kt ) {
void killInterface(NetworkAddress address, KillType kt) override {
if (kt < RebootAndDelete ) {
std::vector<ProcessInfo*>& processes = machines[ addressMap[address]->locality.machineId() ].processes;
for( int i = 0; i < processes.size(); i++ )
killProcess_internal( processes[i], kt );
}
}
virtual bool killZone(Optional<Standalone<StringRef>> zoneId, KillType kt, bool forceKill, KillType* ktFinal) {
bool killZone(Optional<Standalone<StringRef>> zoneId, KillType kt, bool forceKill, KillType* ktFinal) override {
auto processes = getAllProcesses();
std::set<Optional<Standalone<StringRef>>> zoneMachines;
for (auto& process : processes) {
@ -1346,7 +1336,8 @@ public:
}
return result;
}
virtual bool killMachine(Optional<Standalone<StringRef>> machineId, KillType kt, bool forceKill, KillType* ktFinal) {
bool killMachine(Optional<Standalone<StringRef>> machineId, KillType kt, bool forceKill,
KillType* ktFinal) override {
auto ktOrig = kt;
TEST(true); // Trying to killing a machine
@ -1476,7 +1467,7 @@ public:
return true;
}
virtual bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, bool forceKill, KillType* ktFinal) {
bool killDataCenter(Optional<Standalone<StringRef>> dcId, KillType kt, bool forceKill, KillType* ktFinal) override {
auto ktOrig = kt;
auto processes = getAllProcesses();
std::map<Optional<Standalone<StringRef>>, int> datacenterMachines;
@ -1565,7 +1556,7 @@ public:
return (kt == ktMin);
}
virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) {
void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) override {
if (mode == ClogDefault) {
double a = deterministicRandom()->random01();
if ( a < 0.3 ) mode = ClogSend;
@ -1582,10 +1573,10 @@ public:
if (mode == ClogReceive || mode==ClogAll)
g_clogging.clogRecvFor( ip, seconds );
}
virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) {
void clogPair(const IPAddress& from, const IPAddress& to, double seconds) override {
g_clogging.clogPairFor( from, to, seconds );
}
virtual std::vector<ProcessInfo*> getAllProcesses() const {
std::vector<ProcessInfo*> getAllProcesses() const override {
std::vector<ProcessInfo*> processes;
for( auto& c : machines ) {
processes.insert( processes.end(), c.second.processes.begin(), c.second.processes.end() );
@ -1595,22 +1586,22 @@ public:
}
return processes;
}
virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) {
ProcessInfo* getProcessByAddress(NetworkAddress const& address) override {
NetworkAddress normalizedAddress(address.ip, address.port, true, address.isTLS());
ASSERT( addressMap.count( normalizedAddress ) );
// NOTE: addressMap[normalizedAddress]->address may not equal to normalizedAddress
return addressMap[normalizedAddress];
}
virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) {
MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) override {
return &machines[addressMap[address]->locality.machineId()];
}
virtual MachineInfo* getMachineById(Optional<Standalone<StringRef>> const& machineId) {
MachineInfo* getMachineById(Optional<Standalone<StringRef>> const& machineId) override {
return &machines[machineId];
}
virtual void destroyMachine(Optional<Standalone<StringRef>> const& machineId ) {
void destroyMachine(Optional<Standalone<StringRef>> const& machineId) override {
auto& machine = machines[machineId];
for( auto process : machine.processes ) {
ASSERT( process->failed );
@ -1683,7 +1674,7 @@ public:
}
}
virtual void onMainThread( Promise<Void>&& signal, TaskPriority taskID ) {
void onMainThread(Promise<Void>&& signal, TaskPriority taskID) override {
// This is presumably coming from either a "fake" thread pool thread, i.e. it is actually on this thread
// or a thread created with g_network->startThread
ASSERT(getCurrentProcess());
@ -1696,10 +1687,10 @@ public:
bool isOnMainThread() const override {
return net2->isOnMainThread();
}
virtual Future<Void> onProcess( ISimulator::ProcessInfo *process, TaskPriority taskID ) {
Future<Void> onProcess(ISimulator::ProcessInfo* process, TaskPriority taskID) override {
return delay( 0, taskID, process );
}
virtual Future<Void> onMachine( ISimulator::ProcessInfo *process, TaskPriority taskID ) {
Future<Void> onMachine(ISimulator::ProcessInfo* process, TaskPriority taskID) override {
if( process->machine == 0 )
return Void();
return delay( 0, taskID, process->machine->machineProcess );