mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 18:56:00 +08:00
Fix more -Wreorder-ctor warnings across many files
This commit is contained in:
parent
e006e4fed4
commit
3442ebd3b7
@ -31,7 +31,8 @@ namespace {
|
|||||||
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
|
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
|
||||||
public:
|
public:
|
||||||
BackupFile(const std::string& fileName, Reference<IAsyncFile> file, const std::string& finalFullPath)
|
BackupFile(const std::string& fileName, Reference<IAsyncFile> file, const std::string& finalFullPath)
|
||||||
: IBackupFile(fileName), m_file(file), m_finalFullPath(finalFullPath), m_writeOffset(0), m_blockSize(CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK) {
|
: IBackupFile(fileName), m_file(file), m_writeOffset(0), m_finalFullPath(finalFullPath),
|
||||||
|
m_blockSize(CLIENT_KNOBS->BACKUP_LOCAL_FILE_WRITE_BLOCK) {
|
||||||
if (BUGGIFY) {
|
if (BUGGIFY) {
|
||||||
m_blockSize = deterministicRandom()->randomInt(100, 20000);
|
m_blockSize = deterministicRandom()->randomInt(100, 20000);
|
||||||
}
|
}
|
||||||
|
@ -44,28 +44,29 @@ const Key DatabaseBackupAgent::keyDatabasesInSync = LiteralStringRef("databases_
|
|||||||
const int DatabaseBackupAgent::LATEST_DR_VERSION = 1;
|
const int DatabaseBackupAgent::LATEST_DR_VERSION = 1;
|
||||||
|
|
||||||
DatabaseBackupAgent::DatabaseBackupAgent()
|
DatabaseBackupAgent::DatabaseBackupAgent()
|
||||||
: subspace(Subspace(databaseBackupPrefixRange.begin)), tagNames(subspace.get(BackupAgentBase::keyTagName)),
|
: subspace(Subspace(databaseBackupPrefixRange.begin)), states(subspace.get(BackupAgentBase::keyStates)),
|
||||||
states(subspace.get(BackupAgentBase::keyStates)), config(subspace.get(BackupAgentBase::keyConfig)),
|
config(subspace.get(BackupAgentBase::keyConfig)), errors(subspace.get(BackupAgentBase::keyErrors)),
|
||||||
errors(subspace.get(BackupAgentBase::keyErrors)), ranges(subspace.get(BackupAgentBase::keyRanges)),
|
ranges(subspace.get(BackupAgentBase::keyRanges)), tagNames(subspace.get(BackupAgentBase::keyTagName)),
|
||||||
|
sourceStates(subspace.get(BackupAgentBase::keySourceStates)),
|
||||||
|
sourceTagNames(subspace.get(BackupAgentBase::keyTagName)),
|
||||||
taskBucket(new TaskBucket(subspace.get(BackupAgentBase::keyTasks),
|
taskBucket(new TaskBucket(subspace.get(BackupAgentBase::keyTasks),
|
||||||
AccessSystemKeys::True,
|
AccessSystemKeys::True,
|
||||||
PriorityBatch::False,
|
PriorityBatch::False,
|
||||||
LockAware::True)),
|
LockAware::True)),
|
||||||
futureBucket(new FutureBucket(subspace.get(BackupAgentBase::keyFutures), AccessSystemKeys::True, LockAware::True)),
|
futureBucket(new FutureBucket(subspace.get(BackupAgentBase::keyFutures), AccessSystemKeys::True, LockAware::True)) {
|
||||||
sourceStates(subspace.get(BackupAgentBase::keySourceStates)),
|
}
|
||||||
sourceTagNames(subspace.get(BackupAgentBase::keyTagName)) {}
|
|
||||||
|
|
||||||
DatabaseBackupAgent::DatabaseBackupAgent(Database src)
|
DatabaseBackupAgent::DatabaseBackupAgent(Database src)
|
||||||
: subspace(Subspace(databaseBackupPrefixRange.begin)), tagNames(subspace.get(BackupAgentBase::keyTagName)),
|
: subspace(Subspace(databaseBackupPrefixRange.begin)), states(subspace.get(BackupAgentBase::keyStates)),
|
||||||
states(subspace.get(BackupAgentBase::keyStates)), config(subspace.get(BackupAgentBase::keyConfig)),
|
config(subspace.get(BackupAgentBase::keyConfig)), errors(subspace.get(BackupAgentBase::keyErrors)),
|
||||||
errors(subspace.get(BackupAgentBase::keyErrors)), ranges(subspace.get(BackupAgentBase::keyRanges)),
|
ranges(subspace.get(BackupAgentBase::keyRanges)), tagNames(subspace.get(BackupAgentBase::keyTagName)),
|
||||||
|
sourceStates(subspace.get(BackupAgentBase::keySourceStates)),
|
||||||
|
sourceTagNames(subspace.get(BackupAgentBase::keyTagName)),
|
||||||
taskBucket(new TaskBucket(subspace.get(BackupAgentBase::keyTasks),
|
taskBucket(new TaskBucket(subspace.get(BackupAgentBase::keyTasks),
|
||||||
AccessSystemKeys::True,
|
AccessSystemKeys::True,
|
||||||
PriorityBatch::False,
|
PriorityBatch::False,
|
||||||
LockAware::True)),
|
LockAware::True)),
|
||||||
futureBucket(new FutureBucket(subspace.get(BackupAgentBase::keyFutures), AccessSystemKeys::True, LockAware::True)),
|
futureBucket(new FutureBucket(subspace.get(BackupAgentBase::keyFutures), AccessSystemKeys::True, LockAware::True)) {
|
||||||
sourceStates(subspace.get(BackupAgentBase::keySourceStates)),
|
|
||||||
sourceTagNames(subspace.get(BackupAgentBase::keyTagName)) {
|
|
||||||
taskBucket->src = src;
|
taskBucket->src = src;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -396,7 +396,7 @@ void loadClientFunction(T* fp, void* lib, std::string libPath, const char* funct
|
|||||||
}
|
}
|
||||||
|
|
||||||
DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad)
|
DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad)
|
||||||
: api(new FdbCApi()), fdbCPath(fdbCPath), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {}
|
: fdbCPath(fdbCPath), api(new FdbCApi()), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {}
|
||||||
|
|
||||||
// Loads client API functions (definitions are in FdbCApi struct)
|
// Loads client API functions (definitions are in FdbCApi struct)
|
||||||
void DLApi::init() {
|
void DLApi::init() {
|
||||||
@ -993,8 +993,8 @@ ThreadFuture<ProtocolVersion> MultiVersionDatabase::getServerProtocol(Optional<P
|
|||||||
}
|
}
|
||||||
|
|
||||||
MultiVersionDatabase::DatabaseState::DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb)
|
MultiVersionDatabase::DatabaseState::DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb)
|
||||||
: clusterFilePath(clusterFilePath), versionMonitorDb(versionMonitorDb),
|
: dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))),
|
||||||
dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))) {}
|
clusterFilePath(clusterFilePath), versionMonitorDb(versionMonitorDb) {}
|
||||||
|
|
||||||
// Adds a client (local or externally loaded) that can be used to connect to the cluster
|
// Adds a client (local or externally loaded) that can be used to connect to the cluster
|
||||||
void MultiVersionDatabase::DatabaseState::addClient(Reference<ClientInfo> client) {
|
void MultiVersionDatabase::DatabaseState::addClient(Reference<ClientInfo> client) {
|
||||||
@ -1855,8 +1855,8 @@ void MultiVersionApi::loadEnvironmentVariableNetworkOptions() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
MultiVersionApi::MultiVersionApi()
|
MultiVersionApi::MultiVersionApi()
|
||||||
: bypassMultiClientApi(false), networkStartSetup(false), networkSetup(false), callbackOnMainThread(true),
|
: callbackOnMainThread(true), localClientDisabled(false), networkStartSetup(false), networkSetup(false),
|
||||||
externalClient(false), localClientDisabled(false), apiVersion(0), envOptionsLoaded(false), threadCount(0) {}
|
bypassMultiClientApi(false), externalClient(false), apiVersion(0), threadCount(0), envOptionsLoaded(false) {}
|
||||||
|
|
||||||
MultiVersionApi* MultiVersionApi::api = new MultiVersionApi();
|
MultiVersionApi* MultiVersionApi::api = new MultiVersionApi();
|
||||||
|
|
||||||
|
@ -248,8 +248,9 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks,
|
|||||||
}
|
}
|
||||||
|
|
||||||
SpecialKeySpace::SpecialKeySpace(KeyRef spaceStartKey, KeyRef spaceEndKey, bool testOnly)
|
SpecialKeySpace::SpecialKeySpace(KeyRef spaceStartKey, KeyRef spaceEndKey, bool testOnly)
|
||||||
: range(KeyRangeRef(spaceStartKey, spaceEndKey)), readImpls(nullptr, spaceEndKey), writeImpls(nullptr, spaceEndKey),
|
: readImpls(nullptr, spaceEndKey),
|
||||||
modules(testOnly ? SpecialKeySpace::MODULE::TESTONLY : SpecialKeySpace::MODULE::UNKNOWN, spaceEndKey) {
|
modules(testOnly ? SpecialKeySpace::MODULE::TESTONLY : SpecialKeySpace::MODULE::UNKNOWN, spaceEndKey),
|
||||||
|
writeImpls(nullptr, spaceEndKey), range(KeyRangeRef(spaceStartKey, spaceEndKey)) {
|
||||||
// Default begin of KeyRangeMap is Key(), insert the range to update start key
|
// Default begin of KeyRangeMap is Key(), insert the range to update start key
|
||||||
readImpls.insert(range, nullptr);
|
readImpls.insert(range, nullptr);
|
||||||
writeImpls.insert(range, nullptr);
|
writeImpls.insert(range, nullptr);
|
||||||
|
@ -873,13 +873,14 @@ TaskBucket::TaskBucket(const Subspace& subspace,
|
|||||||
AccessSystemKeys sysAccess,
|
AccessSystemKeys sysAccess,
|
||||||
PriorityBatch priorityBatch,
|
PriorityBatch priorityBatch,
|
||||||
LockAware lockAware)
|
LockAware lockAware)
|
||||||
: prefix(subspace), active(prefix.get(LiteralStringRef("ac"))), available(prefix.get(LiteralStringRef("av"))),
|
: cc("TaskBucket"), dbgid(deterministicRandom()->randomUniqueID()),
|
||||||
|
dispatchSlotChecksStarted("DispatchSlotChecksStarted", cc), dispatchErrors("DispatchErrors", cc),
|
||||||
|
dispatchDoTasks("DispatchDoTasks", cc), dispatchEmptyTasks("DispatchEmptyTasks", cc),
|
||||||
|
dispatchSlotChecksComplete("DispatchSlotChecksComplete", cc), prefix(subspace),
|
||||||
|
active(prefix.get(LiteralStringRef("ac"))), available(prefix.get(LiteralStringRef("av"))),
|
||||||
available_prioritized(prefix.get(LiteralStringRef("avp"))), timeouts(prefix.get(LiteralStringRef("to"))),
|
available_prioritized(prefix.get(LiteralStringRef("avp"))), timeouts(prefix.get(LiteralStringRef("to"))),
|
||||||
pauseKey(prefix.pack(LiteralStringRef("pause"))), timeout(CLIENT_KNOBS->TASKBUCKET_TIMEOUT_VERSIONS),
|
timeout(CLIENT_KNOBS->TASKBUCKET_TIMEOUT_VERSIONS), pauseKey(prefix.pack(LiteralStringRef("pause"))),
|
||||||
system_access(sysAccess), priority_batch(priorityBatch), lockAware(lockAware), cc("TaskBucket"),
|
system_access(sysAccess), priority_batch(priorityBatch), lockAware(lockAware) {}
|
||||||
dbgid(deterministicRandom()->randomUniqueID()), dispatchSlotChecksStarted("DispatchSlotChecksStarted", cc),
|
|
||||||
dispatchErrors("DispatchErrors", cc), dispatchDoTasks("DispatchDoTasks", cc),
|
|
||||||
dispatchEmptyTasks("DispatchEmptyTasks", cc), dispatchSlotChecksComplete("DispatchSlotChecksComplete", cc) {}
|
|
||||||
|
|
||||||
TaskBucket::~TaskBucket() {}
|
TaskBucket::~TaskBucket() {}
|
||||||
|
|
||||||
|
@ -340,9 +340,8 @@ ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TransportData::TransportData(uint64_t transportId)
|
TransportData::TransportData(uint64_t transportId)
|
||||||
: endpoints(WLTOKEN_COUNTS), endpointNotFoundReceiver(endpoints), pingReceiver(endpoints),
|
: warnAlwaysForLargePacket(true), endpoints(WLTOKEN_COUNTS), endpointNotFoundReceiver(endpoints),
|
||||||
warnAlwaysForLargePacket(true), lastIncompatibleMessage(0), transportId(transportId),
|
pingReceiver(endpoints), numIncompatibleConnections(0), lastIncompatibleMessage(0), transportId(transportId) {
|
||||||
numIncompatibleConnections(0) {
|
|
||||||
degraded = makeReference<AsyncVar<bool>>(false);
|
degraded = makeReference<AsyncVar<bool>>(false);
|
||||||
pingLogger = pingLatencyLogger(this);
|
pingLogger = pingLatencyLogger(this);
|
||||||
}
|
}
|
||||||
@ -795,13 +794,14 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
|
|||||||
}
|
}
|
||||||
|
|
||||||
Peer::Peer(TransportData* transport, NetworkAddress const& destination)
|
Peer::Peer(TransportData* transport, NetworkAddress const& destination)
|
||||||
: transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
|
: transport(transport), destination(destination), compatible(true), outgoingConnectionIdle(true),
|
||||||
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
|
lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), peerReferences(-1),
|
||||||
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()),
|
outstandingReplies(0), pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1),
|
||||||
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedBytesReceived(0),
|
lastLoggedTime(0.0), lastLoggedBytesReceived(0), lastLoggedBytesSent(0), timeoutCount(0),
|
||||||
bytesSent(0), lastLoggedBytesSent(0), timeoutCount(0), lastLoggedTime(0.0), connectOutgoingCount(0), connectIncomingCount(0),
|
incompatibleProtocolVersionNewer(false), bytesReceived(0), bytesSent(0), lastDataPacketSentTime(now()),
|
||||||
connectFailedCount(0), connectLatencies(destination.isPublic() ? FLOW_KNOBS->NETWORK_CONNECT_SAMPLE_AMOUNT : 1),
|
protocolVersion(Reference<AsyncVar<Optional<ProtocolVersion>>>(new AsyncVar<Optional<ProtocolVersion>>())),
|
||||||
protocolVersion(Reference<AsyncVar<Optional<ProtocolVersion>>>(new AsyncVar<Optional<ProtocolVersion>>())) {
|
connectOutgoingCount(0), connectIncomingCount(0), connectFailedCount(0),
|
||||||
|
connectLatencies(destination.isPublic() ? FLOW_KNOBS->NETWORK_CONNECT_SAMPLE_AMOUNT : 1) {
|
||||||
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
|
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,8 +241,8 @@ struct BackupData {
|
|||||||
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
|
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
|
||||||
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
|
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
|
||||||
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
|
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
|
||||||
cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false),
|
pulledVersion(0), paused(false), lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)),
|
||||||
lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)) {
|
cc("BackupWorker", myId.toString()) {
|
||||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True);
|
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True);
|
||||||
|
|
||||||
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
|
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
|
||||||
|
@ -128,15 +128,15 @@ public:
|
|||||||
std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>> clientStatus;
|
std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>> clientStatus;
|
||||||
|
|
||||||
DBInfo()
|
DBInfo()
|
||||||
: masterRegistrationCount(0), recoveryStalled(false), forceRecovery(false), unfinishedRecoveries(0),
|
: clientInfo(new AsyncVar<ClientDBInfo>()), serverInfo(new AsyncVar<ServerDBInfo>()),
|
||||||
logGenerations(0), cachePopulated(false), clientInfo(new AsyncVar<ClientDBInfo>()), dbInfoCount(0),
|
masterRegistrationCount(0), dbInfoCount(0), recoveryStalled(false), forceRecovery(false),
|
||||||
serverInfo(new AsyncVar<ServerDBInfo>()), db(DatabaseContext::create(clientInfo,
|
db(DatabaseContext::create(clientInfo,
|
||||||
Future<Void>(),
|
Future<Void>(),
|
||||||
LocalityData(),
|
LocalityData(),
|
||||||
EnableLocalityLoadBalance::True,
|
EnableLocalityLoadBalance::True,
|
||||||
TaskPriority::DefaultEndpoint,
|
TaskPriority::DefaultEndpoint,
|
||||||
LockAware::True)) // SOMEDAY: Locality!
|
LockAware::True)), // SOMEDAY: Locality!
|
||||||
{}
|
unfinishedRecoveries(0), logGenerations(0), cachePopulated(false) {}
|
||||||
|
|
||||||
void setDistributor(const DataDistributorInterface& interf) {
|
void setDistributor(const DataDistributorInterface& interf) {
|
||||||
auto newInfo = serverInfo->get();
|
auto newInfo = serverInfo->get();
|
||||||
@ -1431,12 +1431,12 @@ public:
|
|||||||
bool degraded = false;
|
bool degraded = false;
|
||||||
|
|
||||||
RoleFitness(int bestFit, int worstFit, int count, ProcessClass::ClusterRole role)
|
RoleFitness(int bestFit, int worstFit, int count, ProcessClass::ClusterRole role)
|
||||||
: bestFit((ProcessClass::Fitness)bestFit), worstFit((ProcessClass::Fitness)worstFit), count(count),
|
: bestFit((ProcessClass::Fitness)bestFit), worstFit((ProcessClass::Fitness)worstFit), role(role),
|
||||||
role(role) {}
|
count(count) {}
|
||||||
|
|
||||||
RoleFitness(int fitness, int count, ProcessClass::ClusterRole role)
|
RoleFitness(int fitness, int count, ProcessClass::ClusterRole role)
|
||||||
: bestFit((ProcessClass::Fitness)fitness), worstFit((ProcessClass::Fitness)fitness), count(count),
|
: bestFit((ProcessClass::Fitness)fitness), worstFit((ProcessClass::Fitness)fitness), role(role),
|
||||||
role(role) {}
|
count(count) {}
|
||||||
|
|
||||||
RoleFitness()
|
RoleFitness()
|
||||||
: bestFit(ProcessClass::NeverAssign), worstFit(ProcessClass::NeverAssign), role(ProcessClass::NoRole),
|
: bestFit(ProcessClass::NeverAssign), worstFit(ProcessClass::NeverAssign), role(ProcessClass::NoRole),
|
||||||
@ -3059,9 +3059,9 @@ public:
|
|||||||
ClusterControllerData(ClusterControllerFullInterface const& ccInterface,
|
ClusterControllerData(ClusterControllerFullInterface const& ccInterface,
|
||||||
LocalityData const& locality,
|
LocalityData const& locality,
|
||||||
ServerCoordinators const& coordinators)
|
ServerCoordinators const& coordinators)
|
||||||
: clusterControllerProcessId(locality.processId()), clusterControllerDcId(locality.dcId()), id(ccInterface.id()),
|
: gotProcessClasses(false), gotFullyRecoveredConfig(false), clusterControllerProcessId(locality.processId()),
|
||||||
ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()), gotProcessClasses(false),
|
clusterControllerDcId(locality.dcId()), id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()),
|
||||||
gotFullyRecoveredConfig(false), startTime(now()), goodRecruitmentTime(Never()),
|
outstandingRemoteRequestChecker(Void()), startTime(now()), goodRecruitmentTime(Never()),
|
||||||
goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false),
|
goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false),
|
||||||
recruitingDistributor(false), recruitRatekeeper(false),
|
recruitingDistributor(false), recruitRatekeeper(false),
|
||||||
clusterControllerMetrics("ClusterController", id.toString()),
|
clusterControllerMetrics("ClusterController", id.toString()),
|
||||||
|
@ -203,7 +203,7 @@ class ConfigBroadcasterImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ConfigBroadcasterImpl()
|
ConfigBroadcasterImpl()
|
||||||
: id(deterministicRandom()->randomUniqueID()), lastCompactedVersion(0), mostRecentVersion(0),
|
: mostRecentVersion(0), lastCompactedVersion(0), id(deterministicRandom()->randomUniqueID()),
|
||||||
cc("ConfigBroadcaster"), compactRequest("CompactRequest", cc),
|
cc("ConfigBroadcaster"), compactRequest("CompactRequest", cc),
|
||||||
successfulChangeRequest("SuccessfulChangeRequest", cc), failedChangeRequest("FailedChangeRequest", cc),
|
successfulChangeRequest("SuccessfulChangeRequest", cc), failedChangeRequest("FailedChangeRequest", cc),
|
||||||
snapshotRequest("SnapshotRequest", cc) {
|
snapshotRequest("SnapshotRequest", cc) {
|
||||||
|
@ -241,8 +241,8 @@ class BroadcasterToLocalConfigEnvironment {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
BroadcasterToLocalConfigEnvironment(std::string const& dataDir, std::string const& configPath)
|
BroadcasterToLocalConfigEnvironment(std::string const& dataDir, std::string const& configPath)
|
||||||
: broadcaster(ConfigFollowerInterface{}), cbfi(makeReference<AsyncVar<ConfigBroadcastFollowerInterface>>()),
|
: readFrom(dataDir, configPath, {}), cbfi(makeReference<AsyncVar<ConfigBroadcastFollowerInterface>>()),
|
||||||
readFrom(dataDir, configPath, {}) {}
|
broadcaster(ConfigFollowerInterface{}) {}
|
||||||
|
|
||||||
Future<Void> setup() { return setup(this); }
|
Future<Void> setup() { return setup(this); }
|
||||||
|
|
||||||
@ -371,8 +371,9 @@ class TransactionToLocalConfigEnvironment {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
TransactionToLocalConfigEnvironment(std::string const& dataDir, std::string const& configPath)
|
TransactionToLocalConfigEnvironment(std::string const& dataDir, std::string const& configPath)
|
||||||
: writeTo(dataDir), readFrom(dataDir, configPath, {}), broadcaster(writeTo.getFollowerInterface()),
|
: writeTo(dataDir), readFrom(dataDir, configPath, {}),
|
||||||
cbfi(makeReference<AsyncVar<ConfigBroadcastFollowerInterface>>()) {}
|
cbfi(makeReference<AsyncVar<ConfigBroadcastFollowerInterface>>()), broadcaster(writeTo.getFollowerInterface()) {
|
||||||
|
}
|
||||||
|
|
||||||
Future<Void> setup() { return setup(this); }
|
Future<Void> setup() { return setup(this); }
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ struct RelocateData {
|
|||||||
TraceInterval interval;
|
TraceInterval interval;
|
||||||
|
|
||||||
RelocateData()
|
RelocateData()
|
||||||
: startTime(-1), priority(-1), boundaryPriority(-1), healthPriority(-1), workFactor(0), wantsNewServers(false),
|
: priority(-1), boundaryPriority(-1), healthPriority(-1), startTime(-1), workFactor(0), wantsNewServers(false),
|
||||||
interval("QueuedRelocation") {}
|
interval("QueuedRelocation") {}
|
||||||
explicit RelocateData(RelocateShard const& rs)
|
explicit RelocateData(RelocateShard const& rs)
|
||||||
: keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1),
|
: keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1),
|
||||||
@ -448,14 +448,14 @@ struct DDQueueData {
|
|||||||
FutureStream<RelocateShard> input,
|
FutureStream<RelocateShard> input,
|
||||||
PromiseStream<GetMetricsRequest> getShardMetrics,
|
PromiseStream<GetMetricsRequest> getShardMetrics,
|
||||||
double* lastLimited)
|
double* lastLimited)
|
||||||
: activeRelocations(0), queuedRelocations(0), bytesWritten(0), teamCollections(teamCollections),
|
: distributorId(mid), lock(lock), cx(cx), teamCollections(teamCollections), shardsAffectedByTeamFailure(sABTF),
|
||||||
shardsAffectedByTeamFailure(sABTF), getAverageShardBytes(getAverageShardBytes), distributorId(mid), lock(lock),
|
getAverageShardBytes(getAverageShardBytes),
|
||||||
cx(cx), teamSize(teamSize), singleRegionTeamSize(singleRegionTeamSize), output(output), input(input),
|
startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
||||||
getShardMetrics(getShardMetrics), startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
|
||||||
finishMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
finishMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
|
||||||
fetchSourceLock(new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM)), lastLimited(lastLimited),
|
fetchSourceLock(new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM)), activeRelocations(0),
|
||||||
suppressIntervals(0), lastInterval(0), unhealthyRelocations(0),
|
queuedRelocations(0), bytesWritten(0), teamSize(teamSize), singleRegionTeamSize(singleRegionTeamSize),
|
||||||
rawProcessingUnhealthy(new AsyncVar<bool>(false)) {}
|
output(output), input(input), getShardMetrics(getShardMetrics), lastLimited(lastLimited), lastInterval(0),
|
||||||
|
suppressIntervals(0), rawProcessingUnhealthy(new AsyncVar<bool>(false)), unhealthyRelocations(0) {}
|
||||||
|
|
||||||
void validate() {
|
void validate() {
|
||||||
if (EXPENSIVE_VALIDATION) {
|
if (EXPENSIVE_VALIDATION) {
|
||||||
|
@ -123,10 +123,10 @@ struct DataDistributionTracker {
|
|||||||
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
|
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
|
||||||
KeyRangeMap<ShardTrackedData>& shards,
|
KeyRangeMap<ShardTrackedData>& shards,
|
||||||
bool& trackerCancelled)
|
bool& trackerCancelled)
|
||||||
: cx(cx), distributorId(distributorId), dbSizeEstimate(new AsyncVar<int64_t>()), systemSizeEstimate(0),
|
: cx(cx), distributorId(distributorId), shards(shards), sizeChanges(false), systemSizeEstimate(0),
|
||||||
maxShardSize(new AsyncVar<Optional<int64_t>>()), sizeChanges(false), readyToStart(readyToStart), output(output),
|
dbSizeEstimate(new AsyncVar<int64_t>()), maxShardSize(new AsyncVar<Optional<int64_t>>()), output(output),
|
||||||
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams),
|
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), readyToStart(readyToStart),
|
||||||
shards(shards), trackerCancelled(trackerCancelled) {}
|
anyZeroHealthyTeams(anyZeroHealthyTeams), trackerCancelled(trackerCancelled) {}
|
||||||
|
|
||||||
~DataDistributionTracker() {
|
~DataDistributionTracker() {
|
||||||
trackerCancelled = true;
|
trackerCancelled = true;
|
||||||
|
@ -168,11 +168,11 @@ private:
|
|||||||
class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
|
class RawDiskQueue_TwoFiles : public Tracked<RawDiskQueue_TwoFiles> {
|
||||||
public:
|
public:
|
||||||
RawDiskQueue_TwoFiles(std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit)
|
RawDiskQueue_TwoFiles(std::string basename, std::string fileExtension, UID dbgid, int64_t fileSizeWarningLimit)
|
||||||
: basename(basename), fileExtension(fileExtension), onError(delayed(error.getFuture())),
|
: basename(basename), fileExtension(fileExtension), dbgid(dbgid), dbg_file0BeginSeq(0),
|
||||||
onStopped(stopped.getFuture()), readingFile(-1), readingPage(-1), writingPos(-1), dbgid(dbgid),
|
fileSizeWarningLimit(fileSizeWarningLimit), onError(delayed(error.getFuture())), onStopped(stopped.getFuture()),
|
||||||
dbg_file0BeginSeq(0), fileExtensionBytes(SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES),
|
readyToPush(Void()), lastCommit(Void()), isFirstCommit(true), readingBuffer(dbgid), readingFile(-1),
|
||||||
fileShrinkBytes(SERVER_KNOBS->DISK_QUEUE_FILE_SHRINK_BYTES), readingBuffer(dbgid), readyToPush(Void()),
|
readingPage(-1), writingPos(-1), fileExtensionBytes(SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES),
|
||||||
fileSizeWarningLimit(fileSizeWarningLimit), lastCommit(Void()), isFirstCommit(true) {
|
fileShrinkBytes(SERVER_KNOBS->DISK_QUEUE_FILE_SHRINK_BYTES) {
|
||||||
if (BUGGIFY)
|
if (BUGGIFY)
|
||||||
fileExtensionBytes = _PAGE_SIZE * deterministicRandom()->randomSkewedUInt32(1, 10 << 10);
|
fileExtensionBytes = _PAGE_SIZE * deterministicRandom()->randomSkewedUInt32(1, 10 << 10);
|
||||||
if (BUGGIFY)
|
if (BUGGIFY)
|
||||||
@ -878,9 +878,9 @@ public:
|
|||||||
DiskQueueVersion diskQueueVersion,
|
DiskQueueVersion diskQueueVersion,
|
||||||
int64_t fileSizeWarningLimit)
|
int64_t fileSizeWarningLimit)
|
||||||
: rawQueue(new RawDiskQueue_TwoFiles(basename, fileExtension, dbgid, fileSizeWarningLimit)), dbgid(dbgid),
|
: rawQueue(new RawDiskQueue_TwoFiles(basename, fileExtension, dbgid, fileSizeWarningLimit)), dbgid(dbgid),
|
||||||
diskQueueVersion(diskQueueVersion), anyPopped(false), nextPageSeq(0), poppedSeq(0), lastPoppedSeq(0),
|
diskQueueVersion(diskQueueVersion), anyPopped(false), warnAlwaysForMemory(true), nextPageSeq(0), poppedSeq(0),
|
||||||
nextReadLocation(-1), readBufPage(nullptr), readBufPos(0), pushed_page_buffer(nullptr), recovered(false),
|
lastPoppedSeq(0), lastCommittedSeq(-1), pushed_page_buffer(nullptr), recovered(false), initialized(false),
|
||||||
initialized(false), lastCommittedSeq(-1), warnAlwaysForMemory(true) {}
|
nextReadLocation(-1), readBufPage(nullptr), readBufPos(0) {}
|
||||||
|
|
||||||
location push(StringRef contents) override {
|
location push(StringRef contents) override {
|
||||||
ASSERT(recovered);
|
ASSERT(recovered);
|
||||||
|
@ -681,7 +681,7 @@ struct SQLiteTransaction {
|
|||||||
struct IntKeyCursor {
|
struct IntKeyCursor {
|
||||||
SQLiteDB& db;
|
SQLiteDB& db;
|
||||||
BtCursor* cursor;
|
BtCursor* cursor;
|
||||||
IntKeyCursor(SQLiteDB& db, int table, bool write) : cursor(0), db(db) {
|
IntKeyCursor(SQLiteDB& db, int table, bool write) : db(db), cursor(nullptr) {
|
||||||
cursor = (BtCursor*)new char[sqlite3BtreeCursorSize()];
|
cursor = (BtCursor*)new char[sqlite3BtreeCursorSize()];
|
||||||
sqlite3BtreeCursorZero(cursor);
|
sqlite3BtreeCursorZero(cursor);
|
||||||
db.checkError("BtreeCursor", sqlite3BtreeCursor(db.btree, table, write, nullptr, cursor));
|
db.checkError("BtreeCursor", sqlite3BtreeCursor(db.btree, table, write, nullptr, cursor));
|
||||||
@ -705,7 +705,7 @@ struct RawCursor {
|
|||||||
|
|
||||||
operator bool() const { return valid; }
|
operator bool() const { return valid; }
|
||||||
|
|
||||||
RawCursor(SQLiteDB& db, int table, bool write) : cursor(0), db(db), valid(false) {
|
RawCursor(SQLiteDB& db, int table, bool write) : db(db), cursor(nullptr), valid(false) {
|
||||||
keyInfo.db = db.db;
|
keyInfo.db = db.db;
|
||||||
keyInfo.enc = db.db->aDb[0].pSchema->enc;
|
keyInfo.enc = db.db->aDb[0].pSchema->enc;
|
||||||
keyInfo.aColl[0] = db.db->pDfltColl;
|
keyInfo.aColl[0] = db.db->pDfltColl;
|
||||||
@ -1732,9 +1732,9 @@ private:
|
|||||||
volatile int64_t& freeListPages,
|
volatile int64_t& freeListPages,
|
||||||
UID dbgid,
|
UID dbgid,
|
||||||
vector<Reference<ReadCursor>>* pReadThreads)
|
vector<Reference<ReadCursor>>* pReadThreads)
|
||||||
: kvs(kvs), conn(kvs->filename, isBtreeV2, isBtreeV2), commits(), setsThisCommit(), freeTableEmpty(false),
|
: kvs(kvs), conn(kvs->filename, isBtreeV2, isBtreeV2), cursor(nullptr), commits(), setsThisCommit(),
|
||||||
writesComplete(writesComplete), springCleaningStats(springCleaningStats), diskBytesUsed(diskBytesUsed),
|
freeTableEmpty(false), writesComplete(writesComplete), springCleaningStats(springCleaningStats),
|
||||||
freeListPages(freeListPages), cursor(nullptr), dbgid(dbgid), readThreads(*pReadThreads),
|
diskBytesUsed(diskBytesUsed), freeListPages(freeListPages), dbgid(dbgid), readThreads(*pReadThreads),
|
||||||
checkAllChecksumsOnOpen(checkAllChecksumsOnOpen), checkIntegrityOnOpen(checkIntegrityOnOpen) {}
|
checkAllChecksumsOnOpen(checkAllChecksumsOnOpen), checkIntegrityOnOpen(checkIntegrityOnOpen) {}
|
||||||
~Writer() override {
|
~Writer() override {
|
||||||
TraceEvent("KVWriterDestroying", dbgid);
|
TraceEvent("KVWriterDestroying", dbgid);
|
||||||
@ -2109,7 +2109,7 @@ KeyValueStoreSQLite::KeyValueStoreSQLite(std::string const& filename,
|
|||||||
KeyValueStoreType storeType,
|
KeyValueStoreType storeType,
|
||||||
bool checkChecksums,
|
bool checkChecksums,
|
||||||
bool checkIntegrity)
|
bool checkIntegrity)
|
||||||
: type(storeType), filename(filename), logID(id), readThreads(CoroThreadPool::createThreadPool()),
|
: type(storeType), logID(id), filename(filename), readThreads(CoroThreadPool::createThreadPool()),
|
||||||
writeThread(CoroThreadPool::createThreadPool()), readsRequested(0), writesRequested(0), writesComplete(0),
|
writeThread(CoroThreadPool::createThreadPool()), readsRequested(0), writesRequested(0), writesComplete(0),
|
||||||
diskBytesUsed(0), freeListPages(0) {
|
diskBytesUsed(0), freeListPages(0) {
|
||||||
TraceEvent(SevDebug, "KeyValueStoreSQLiteCreate").detail("Filename", filename);
|
TraceEvent(SevDebug, "KeyValueStoreSQLiteCreate").detail("Filename", filename);
|
||||||
|
@ -326,9 +326,9 @@ public:
|
|||||||
std::string const& configPath,
|
std::string const& configPath,
|
||||||
std::map<std::string, std::string> const& manualKnobOverrides,
|
std::map<std::string, std::string> const& manualKnobOverrides,
|
||||||
IsTest isTest)
|
IsTest isTest)
|
||||||
: id(deterministicRandom()->randomUniqueID()), kvStore(dataFolder, id, "localconf-"), cc("LocalConfiguration"),
|
: id(deterministicRandom()->randomUniqueID()), kvStore(dataFolder, id, "localconf-"),
|
||||||
broadcasterChanges("BroadcasterChanges", cc), snapshots("Snapshots", cc),
|
configKnobOverrides(configPath), cc("LocalConfiguration"), broadcasterChanges("BroadcasterChanges", cc),
|
||||||
changeRequestsFetched("ChangeRequestsFetched", cc), mutations("Mutations", cc), configKnobOverrides(configPath),
|
snapshots("Snapshots", cc), changeRequestsFetched("ChangeRequestsFetched", cc), mutations("Mutations", cc),
|
||||||
manualKnobOverrides(manualKnobOverrides) {
|
manualKnobOverrides(manualKnobOverrides) {
|
||||||
if (isTest) {
|
if (isTest) {
|
||||||
testKnobCollection =
|
testKnobCollection =
|
||||||
|
@ -29,7 +29,7 @@
|
|||||||
|
|
||||||
struct MetricsRule {
|
struct MetricsRule {
|
||||||
MetricsRule(bool enabled = false, int minLevel = 0, StringRef const& name = StringRef())
|
MetricsRule(bool enabled = false, int minLevel = 0, StringRef const& name = StringRef())
|
||||||
: enabled(enabled), minLevel(minLevel), namePattern(name) {}
|
: namePattern(name), enabled(enabled), minLevel(minLevel) {}
|
||||||
|
|
||||||
Standalone<StringRef> typePattern;
|
Standalone<StringRef> typePattern;
|
||||||
Standalone<StringRef> namePattern;
|
Standalone<StringRef> namePattern;
|
||||||
|
@ -515,8 +515,7 @@ struct RatekeeperLimits {
|
|||||||
int64_t logSpringBytes,
|
int64_t logSpringBytes,
|
||||||
double maxVersionDifference,
|
double maxVersionDifference,
|
||||||
int64_t durabilityLagTargetVersions)
|
int64_t durabilityLagTargetVersions)
|
||||||
: priority(priority), tpsLimit(std::numeric_limits<double>::infinity()),
|
: tpsLimit(std::numeric_limits<double>::infinity()), tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
|
||||||
tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
|
|
||||||
reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes),
|
reasonMetric(StringRef("Ratekeeper.Reason" + context)), storageTargetBytes(storageTargetBytes),
|
||||||
storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes),
|
storageSpringBytes(storageSpringBytes), logTargetBytes(logTargetBytes), logSpringBytes(logSpringBytes),
|
||||||
maxVersionDifference(maxVersionDifference),
|
maxVersionDifference(maxVersionDifference),
|
||||||
@ -524,7 +523,8 @@ struct RatekeeperLimits {
|
|||||||
durabilityLagTargetVersions +
|
durabilityLagTargetVersions +
|
||||||
SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not
|
SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS), // The read transaction life versions are expected to not
|
||||||
// be durable on the storage servers
|
// be durable on the storage servers
|
||||||
durabilityLagLimit(std::numeric_limits<double>::infinity()), lastDurabilityLag(0), context(context) {}
|
lastDurabilityLag(0), durabilityLagLimit(std::numeric_limits<double>::infinity()), priority(priority),
|
||||||
|
context(context) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct GrvProxyInfo {
|
struct GrvProxyInfo {
|
||||||
@ -536,7 +536,7 @@ struct GrvProxyInfo {
|
|||||||
double lastTagPushTime;
|
double lastTagPushTime;
|
||||||
|
|
||||||
GrvProxyInfo()
|
GrvProxyInfo()
|
||||||
: totalTransactions(0), batchTransactions(0), lastUpdateTime(0), lastThrottledTagChangeId(0), lastTagPushTime(0) {
|
: totalTransactions(0), batchTransactions(0), lastThrottledTagChangeId(0), lastUpdateTime(0), lastTagPushTime(0) {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -577,7 +577,7 @@ struct RatekeeperData {
|
|||||||
smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||||
smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), lastSSListFetchedTimestamp(now()),
|
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), lastWarning(0), lastSSListFetchedTimestamp(now()),
|
||||||
throttledTagChangeId(0), lastBusiestCommitTagPick(0),
|
lastBusiestCommitTagPick(0), throttledTagChangeId(0),
|
||||||
normalLimits(TransactionPriority::DEFAULT,
|
normalLimits(TransactionPriority::DEFAULT,
|
||||||
"",
|
"",
|
||||||
SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER,
|
SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER,
|
||||||
|
@ -92,7 +92,7 @@ struct KeyInfo {
|
|||||||
|
|
||||||
KeyInfo() = default;
|
KeyInfo() = default;
|
||||||
KeyInfo(StringRef key, bool begin, bool write, int transaction, int* pIndex)
|
KeyInfo(StringRef key, bool begin, bool write, int transaction, int* pIndex)
|
||||||
: key(key), begin(begin), write(write), transaction(transaction), pIndex(pIndex) {}
|
: key(key), pIndex(pIndex), begin(begin), write(write), transaction(transaction) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
force_inline int extra_ordering(const KeyInfo& ki) {
|
force_inline int extra_ordering(const KeyInfo& ki) {
|
||||||
@ -343,7 +343,7 @@ public:
|
|||||||
StringRef value;
|
StringRef value;
|
||||||
|
|
||||||
Finger() = default;
|
Finger() = default;
|
||||||
Finger(Node* header, const StringRef& ptr) : value(ptr), x(header) {}
|
Finger(Node* header, const StringRef& ptr) : x(header), value(ptr) {}
|
||||||
|
|
||||||
void init(const StringRef& value, Node* header) {
|
void init(const StringRef& value, Node* header) {
|
||||||
this->value = value;
|
this->value = value;
|
||||||
|
@ -224,14 +224,14 @@ public:
|
|||||||
// LatencyBands readLatencyBands;
|
// LatencyBands readLatencyBands;
|
||||||
|
|
||||||
Counters(StorageCacheData* self)
|
Counters(StorageCacheData* self)
|
||||||
: cc("StorageCacheServer", self->thisServerID.toString()), getKeyQueries("GetKeyQueries", cc),
|
: cc("StorageCacheServer", self->thisServerID.toString()), allQueries("QueryQueue", cc),
|
||||||
getValueQueries("GetValueQueries", cc), getRangeQueries("GetRangeQueries", cc),
|
getKeyQueries("GetKeyQueries", cc), getValueQueries("GetValueQueries", cc),
|
||||||
allQueries("QueryQueue", cc), finishedQueries("FinishedQueries", cc), rowsQueried("RowsQueried", cc),
|
getRangeQueries("GetRangeQueries", cc), finishedQueries("FinishedQueries", cc),
|
||||||
bytesQueried("BytesQueried", cc), bytesInput("BytesInput", cc), bytesFetched("BytesFetched", cc),
|
rowsQueried("RowsQueried", cc), bytesQueried("BytesQueried", cc), bytesInput("BytesInput", cc),
|
||||||
mutationBytes("MutationBytes", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
|
bytesFetched("BytesFetched", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
|
||||||
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
|
setMutations("SetMutations", cc), clearRangeMutations("ClearRangeMutations", cc),
|
||||||
updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc),
|
atomicMutations("AtomicMutations", cc), updateBatches("UpdateBatches", cc),
|
||||||
readsRejected("ReadsRejected", cc) {
|
updateVersions("UpdateVersions", cc), loops("Loops", cc), readsRejected("ReadsRejected", cc) {
|
||||||
specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; });
|
specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; });
|
||||||
specialCounter(cc, "Version", [self]() { return self->version.get(); });
|
specialCounter(cc, "Version", [self]() { return self->version.get(); });
|
||||||
specialCounter(cc, "VersionLag", [self]() { return self->versionLag; });
|
specialCounter(cc, "VersionLag", [self]() { return self->versionLag; });
|
||||||
@ -1542,7 +1542,7 @@ ACTOR Future<Void> fetchKeys(StorageCacheData* data, AddingCacheRange* cacheRang
|
|||||||
};
|
};
|
||||||
|
|
||||||
AddingCacheRange::AddingCacheRange(StorageCacheData* server, KeyRangeRef const& keys)
|
AddingCacheRange::AddingCacheRange(StorageCacheData* server, KeyRangeRef const& keys)
|
||||||
: server(server), keys(keys), transferredVersion(invalidVersion), phase(WaitPrevious) {
|
: keys(keys), server(server), transferredVersion(invalidVersion), phase(WaitPrevious) {
|
||||||
fetchClient = fetchKeys(server, this);
|
fetchClient = fetchKeys(server, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1704,9 +1704,9 @@ void cacheWarmup(StorageCacheData* data, const KeyRangeRef& keys, bool nowAssign
|
|||||||
class StorageCacheUpdater {
|
class StorageCacheUpdater {
|
||||||
public:
|
public:
|
||||||
StorageCacheUpdater()
|
StorageCacheUpdater()
|
||||||
: fromVersion(invalidVersion), currentVersion(invalidVersion), processedCacheStartKey(false) {}
|
: currentVersion(invalidVersion), fromVersion(invalidVersion), processedCacheStartKey(false) {}
|
||||||
StorageCacheUpdater(Version currentVersion)
|
StorageCacheUpdater(Version currentVersion)
|
||||||
: fromVersion(currentVersion), currentVersion(currentVersion), processedCacheStartKey(false) {}
|
: currentVersion(invalidVersion), fromVersion(currentVersion), processedCacheStartKey(false) {}
|
||||||
|
|
||||||
void applyMutation(StorageCacheData* data, MutationRef const& m, Version ver) {
|
void applyMutation(StorageCacheData* data, MutationRef const& m, Version ver) {
|
||||||
//TraceEvent("SCNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver);
|
//TraceEvent("SCNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver);
|
||||||
|
@ -95,8 +95,8 @@ public:
|
|||||||
ReusableCoordinatedState(ServerCoordinators const& coordinators,
|
ReusableCoordinatedState(ServerCoordinators const& coordinators,
|
||||||
PromiseStream<Future<Void>> const& addActor,
|
PromiseStream<Future<Void>> const& addActor,
|
||||||
UID const& dbgid)
|
UID const& dbgid)
|
||||||
: coordinators(coordinators), cstate(coordinators), addActor(addActor), dbgid(dbgid), finalWriteStarted(false),
|
: finalWriteStarted(false), previousWrite(Void()), cstate(coordinators), coordinators(coordinators),
|
||||||
previousWrite(Void()) {}
|
addActor(addActor), dbgid(dbgid) {}
|
||||||
|
|
||||||
Future<Void> read() { return _read(this); }
|
Future<Void> read() { return _read(this); }
|
||||||
|
|
||||||
@ -265,12 +265,12 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||||||
|
|
||||||
: dbgid(myInterface.id()), lastEpochEnd(invalidVersion), recoveryTransactionVersion(invalidVersion),
|
: dbgid(myInterface.id()), lastEpochEnd(invalidVersion), recoveryTransactionVersion(invalidVersion),
|
||||||
lastCommitTime(0), liveCommittedVersion(invalidVersion), databaseLocked(false),
|
lastCommitTime(0), liveCommittedVersion(invalidVersion), databaseLocked(false),
|
||||||
minKnownCommittedVersion(invalidVersion), myInterface(myInterface), dbInfo(dbInfo),
|
minKnownCommittedVersion(invalidVersion), hasConfiguration(false), coordinators(coordinators),
|
||||||
cstate(coordinators, addActor, dbgid), coordinators(coordinators), clusterController(clusterController),
|
version(invalidVersion), lastVersionTime(0), txnStateStore(nullptr), memoryLimit(2e9), dbId(dbId),
|
||||||
dbId(dbId), forceRecovery(forceRecovery), safeLocality(tagLocalityInvalid), primaryLocality(tagLocalityInvalid),
|
myInterface(myInterface), clusterController(clusterController), cstate(coordinators, addActor, dbgid),
|
||||||
neverCreated(false), hasConfiguration(false), version(invalidVersion), lastVersionTime(0),
|
dbInfo(dbInfo), registrationCount(0), addActor(addActor),
|
||||||
txnStateStore(nullptr), memoryLimit(2e9), registrationCount(0), addActor(addActor),
|
recruitmentStalled(makeReference<AsyncVar<bool>>(false)), forceRecovery(forceRecovery), neverCreated(false),
|
||||||
recruitmentStalled(makeReference<AsyncVar<bool>>(false)), cc("Master", dbgid.toString()),
|
safeLocality(tagLocalityInvalid), primaryLocality(tagLocalityInvalid), cc("Master", dbgid.toString()),
|
||||||
changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc),
|
changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc),
|
||||||
getCommitVersionRequests("GetCommitVersionRequests", cc),
|
getCommitVersionRequests("GetCommitVersionRequests", cc),
|
||||||
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
|
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
|
||||||
|
@ -73,7 +73,7 @@ struct AsyncFileCorrectnessWorkload : public AsyncFileWorkload {
|
|||||||
PerfIntCounter numOperations;
|
PerfIntCounter numOperations;
|
||||||
|
|
||||||
AsyncFileCorrectnessWorkload(WorkloadContext const& wcx)
|
AsyncFileCorrectnessWorkload(WorkloadContext const& wcx)
|
||||||
: AsyncFileWorkload(wcx), success(true), numOperations("Num Operations"), memoryFile(nullptr) {
|
: AsyncFileWorkload(wcx), memoryFile(nullptr), success(true), numOperations("Num Operations") {
|
||||||
maxOperationSize = getOption(options, LiteralStringRef("maxOperationSize"), 4096);
|
maxOperationSize = getOption(options, LiteralStringRef("maxOperationSize"), 4096);
|
||||||
numSimultaneousOperations = getOption(options, LiteralStringRef("numSimultaneousOperations"), 10);
|
numSimultaneousOperations = getOption(options, LiteralStringRef("numSimultaneousOperations"), 10);
|
||||||
targetFileSize = getOption(options, LiteralStringRef("targetFileSize"), (uint64_t)163840);
|
targetFileSize = getOption(options, LiteralStringRef("targetFileSize"), (uint64_t)163840);
|
||||||
|
@ -46,7 +46,7 @@ struct AsyncFileWriteWorkload : public AsyncFileWorkload {
|
|||||||
PerfIntCounter bytesWritten;
|
PerfIntCounter bytesWritten;
|
||||||
|
|
||||||
AsyncFileWriteWorkload(WorkloadContext const& wcx)
|
AsyncFileWriteWorkload(WorkloadContext const& wcx)
|
||||||
: AsyncFileWorkload(wcx), bytesWritten("Bytes Written"), writeBuffer(nullptr) {
|
: AsyncFileWorkload(wcx), writeBuffer(nullptr), bytesWritten("Bytes Written") {
|
||||||
numParallelWrites = getOption(options, LiteralStringRef("numParallelWrites"), 0);
|
numParallelWrites = getOption(options, LiteralStringRef("numParallelWrites"), 0);
|
||||||
writeSize = getOption(options, LiteralStringRef("writeSize"), _PAGE_SIZE);
|
writeSize = getOption(options, LiteralStringRef("writeSize"), _PAGE_SIZE);
|
||||||
fileSize = getOption(options, LiteralStringRef("fileSize"), 10002432);
|
fileSize = getOption(options, LiteralStringRef("fileSize"), 10002432);
|
||||||
|
@ -243,7 +243,7 @@ public:
|
|||||||
struct DelayedTask : OrderedTask {
|
struct DelayedTask : OrderedTask {
|
||||||
double at;
|
double at;
|
||||||
DelayedTask(double at, int64_t priority, TaskPriority taskID, Task* task)
|
DelayedTask(double at, int64_t priority, TaskPriority taskID, Task* task)
|
||||||
: at(at), OrderedTask(priority, taskID, task) {}
|
: OrderedTask(priority, taskID, task), at(at) {}
|
||||||
bool operator<(DelayedTask const& rhs) const { return at > rhs.at; } // Ordering is reversed for priority_queue
|
bool operator<(DelayedTask const& rhs) const { return at > rhs.at; } // Ordering is reversed for priority_queue
|
||||||
};
|
};
|
||||||
std::priority_queue<DelayedTask, std::vector<DelayedTask>> timers;
|
std::priority_queue<DelayedTask, std::vector<DelayedTask>> timers;
|
||||||
@ -1169,19 +1169,16 @@ struct PromiseTask : public Task, public FastAllocated<PromiseTask> {
|
|||||||
// 5MB for loading files into memory
|
// 5MB for loading files into memory
|
||||||
|
|
||||||
Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics)
|
Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics)
|
||||||
: useThreadPool(useThreadPool), network(this), reactor(this), stopped(false), tasksIssued(0),
|
: useThreadPool(useThreadPool), reactor(this),
|
||||||
ready(FLOW_KNOBS->READY_QUEUE_RESERVED_SIZE),
|
|
||||||
// Until run() is called, yield() will always yield
|
|
||||||
tscBegin(0), tscEnd(0), taskBegin(0), currentTaskID(TaskPriority::DefaultYield), numYields(0),
|
|
||||||
lastPriorityStats(nullptr), tlsInitializedState(ETLSInitState::NONE), tlsConfig(tlsConfig), started(false)
|
|
||||||
#ifndef TLS_DISABLED
|
#ifndef TLS_DISABLED
|
||||||
,
|
|
||||||
sslContextVar({ ReferencedObject<boost::asio::ssl::context>::from(
|
sslContextVar({ ReferencedObject<boost::asio::ssl::context>::from(
|
||||||
boost::asio::ssl::context(boost::asio::ssl::context::tls)) }),
|
boost::asio::ssl::context(boost::asio::ssl::context::tls)) }),
|
||||||
sslPoolHandshakesInProgress(0), sslHandshakerThreadsStarted(0)
|
sslHandshakerThreadsStarted(0), sslPoolHandshakesInProgress(0),
|
||||||
#endif
|
#endif
|
||||||
|
tlsConfig(tlsConfig), network(this), tscBegin(0), tscEnd(0), taskBegin(0),
|
||||||
{
|
currentTaskID(TaskPriority::DefaultYield), tasksIssued(0), stopped(false), started(false), numYields(0),
|
||||||
|
lastPriorityStats(nullptr), ready(FLOW_KNOBS->READY_QUEUE_RESERVED_SIZE), tlsInitializedState(ETLSInitState::NONE) {
|
||||||
|
// Until run() is called, yield() will always yield
|
||||||
TraceEvent("Net2Starting");
|
TraceEvent("Net2Starting");
|
||||||
|
|
||||||
// Set the global members
|
// Set the global members
|
||||||
@ -1908,7 +1905,7 @@ void Net2::getDiskBytes(std::string const& directory, int64_t& free, int64_t& to
|
|||||||
#include <sched.h>
|
#include <sched.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
ASIOReactor::ASIOReactor(Net2* net) : network(net), firstTimer(ios), do_not_stop(ios) {
|
ASIOReactor::ASIOReactor(Net2* net) : do_not_stop(ios), network(net), firstTimer(ios) {
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
// Reactor flags are used only for experimentation, and are platform-specific
|
// Reactor flags are used only for experimentation, and are platform-specific
|
||||||
if (FLOW_KNOBS->REACTOR_FLAGS & 1) {
|
if (FLOW_KNOBS->REACTOR_FLAGS & 1) {
|
||||||
|
@ -128,7 +128,7 @@ struct Profiler {
|
|||||||
bool timerInitialized;
|
bool timerInitialized;
|
||||||
|
|
||||||
Profiler(int period, std::string const& outfn, INetwork* network)
|
Profiler(int period, std::string const& outfn, INetwork* network)
|
||||||
: environmentInfoWriter(Unversioned()), signalClosure(signal_handler_for_closure, this), network(network),
|
: signalClosure(signal_handler_for_closure, this), environmentInfoWriter(Unversioned()), network(network),
|
||||||
timerInitialized(false) {
|
timerInitialized(false) {
|
||||||
actor = profile(this, period, outfn);
|
actor = profile(this, period, outfn);
|
||||||
}
|
}
|
||||||
|
@ -205,7 +205,7 @@ public:
|
|||||||
WriterThread(Reference<BarrierList> barriers,
|
WriterThread(Reference<BarrierList> barriers,
|
||||||
Reference<ITraceLogWriter> logWriter,
|
Reference<ITraceLogWriter> logWriter,
|
||||||
Reference<ITraceLogFormatter> formatter)
|
Reference<ITraceLogFormatter> formatter)
|
||||||
: barriers(barriers), logWriter(logWriter), formatter(formatter) {}
|
: logWriter(logWriter), formatter(formatter), barriers(barriers) {}
|
||||||
|
|
||||||
void init() override {}
|
void init() override {}
|
||||||
|
|
||||||
@ -277,8 +277,8 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
TraceLog()
|
TraceLog()
|
||||||
: bufferLength(0), loggedLength(0), opened(false), preopenOverflowCount(0), barriers(new BarrierList),
|
: formatter(new XmlTraceLogFormatter()), loggedLength(0), bufferLength(0), opened(false), preopenOverflowCount(0),
|
||||||
logTraceEventMetrics(false), formatter(new XmlTraceLogFormatter()), issues(new IssuesList) {}
|
logTraceEventMetrics(false), issues(new IssuesList), barriers(new BarrierList) {}
|
||||||
|
|
||||||
bool isOpen() const { return opened; }
|
bool isOpen() const { return opened; }
|
||||||
|
|
||||||
@ -835,28 +835,27 @@ Future<Void> pingTraceLogWriterThread() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent::TraceEvent(const char* type, UID id)
|
TraceEvent::TraceEvent(const char* type, UID id)
|
||||||
: id(id), type(type), severity(SevInfo), initialized(false), enabled(true), logged(false) {
|
: initialized(false), enabled(true), logged(false), severity(SevInfo), type(type), id(id) {
|
||||||
setMaxFieldLength(0);
|
setMaxFieldLength(0);
|
||||||
setMaxEventLength(0);
|
setMaxEventLength(0);
|
||||||
}
|
}
|
||||||
TraceEvent::TraceEvent(Severity severity, const char* type, UID id)
|
TraceEvent::TraceEvent(Severity severity, const char* type, UID id)
|
||||||
: id(id), type(type), severity(severity), initialized(false), logged(false),
|
: initialized(false), enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity), logged(false),
|
||||||
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
|
severity(severity), type(type), id(id) {
|
||||||
setMaxFieldLength(0);
|
setMaxFieldLength(0);
|
||||||
setMaxEventLength(0);
|
setMaxEventLength(0);
|
||||||
}
|
}
|
||||||
TraceEvent::TraceEvent(TraceInterval& interval, UID id)
|
TraceEvent::TraceEvent(TraceInterval& interval, UID id)
|
||||||
: id(id), type(interval.type), severity(interval.severity), initialized(false), logged(false),
|
: initialized(false), enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= interval.severity),
|
||||||
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= interval.severity) {
|
logged(false), type(interval.type), id(id), severity(interval.severity) {
|
||||||
|
|
||||||
setMaxFieldLength(0);
|
setMaxFieldLength(0);
|
||||||
setMaxEventLength(0);
|
setMaxEventLength(0);
|
||||||
|
|
||||||
init(interval);
|
init(interval);
|
||||||
}
|
}
|
||||||
TraceEvent::TraceEvent(Severity severity, TraceInterval& interval, UID id)
|
TraceEvent::TraceEvent(Severity severity, TraceInterval& interval, UID id)
|
||||||
: id(id), type(interval.type), severity(severity), initialized(false), logged(false),
|
: initialized(false), logged(false), enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity),
|
||||||
enabled(g_network == nullptr || FLOW_KNOBS->MIN_TRACE_SEVERITY <= severity) {
|
type(interval.type), id(id), severity(severity) {
|
||||||
|
|
||||||
setMaxFieldLength(0);
|
setMaxFieldLength(0);
|
||||||
setMaxEventLength(0);
|
setMaxEventLength(0);
|
||||||
|
@ -276,7 +276,7 @@ ACTOR Future<Void> fastTraceLogger(int* unreadyMessages, int* failedMessages, in
|
|||||||
|
|
||||||
struct FastUDPTracer : public UDPTracer {
|
struct FastUDPTracer : public UDPTracer {
|
||||||
FastUDPTracer()
|
FastUDPTracer()
|
||||||
: socket_fd_(-1), unready_socket_messages_(0), failed_messages_(0), total_messages_(0), send_error_(false) {
|
: unready_socket_messages_(0), failed_messages_(0), total_messages_(0), socket_fd_(-1), send_error_(false) {
|
||||||
request_ = TraceRequest{ .buffer = std::make_unique<uint8_t[]>(kTraceBufferSize),
|
request_ = TraceRequest{ .buffer = std::make_unique<uint8_t[]>(kTraceBufferSize),
|
||||||
.data_size = 0,
|
.data_size = 0,
|
||||||
.buffer_size = kTraceBufferSize };
|
.buffer_size = kTraceBufferSize };
|
||||||
|
Loading…
x
Reference in New Issue
Block a user