Fix some conflicts and remote debugging trace events

This commit is contained in:
Young Liu 2020-07-22 23:35:46 -07:00
parent 525f10e30c
commit 229ab0d5f1
14 changed files with 94 additions and 101 deletions

View File

@ -302,14 +302,15 @@ struct GetKeyServerLocationsRequest {
};
struct GetRawCommittedVersionReply {
constexpr static FileIdentifier file_identifier = 61314632;
constexpr static FileIdentifier file_identifier = 1314732;
Optional<UID> debugID;
Version version;
bool locked;
Optional<Value> metadataVersion;
Version minKnownCommittedVersion;
GetRawCommittedVersionReply(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
GetRawCommittedVersionReply(): debugID(Optional<UID>()), version(invalidVersion), locked(false), metadataVersion(Optional<Value>()), minKnownCommittedVersion(invalidVersion) {}
// GetRawCommittedVersionReply(Optional<UID> const& debugID = Optional<UID>()) : debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {

View File

@ -475,9 +475,8 @@ Future< REPLY_TYPE(Request) > basicLoadBalance(
bool atMostOnce = false)
{
setReplyPriority(request, taskID);
if (!alternatives) {
if (!alternatives)
return Never();
}
ASSERT( alternatives->size() && alternatives->alwaysFresh() );
@ -531,10 +530,10 @@ Future< REPLY_TYPE(Request) > basicLoadBalance(
if(loadBalancedReply.present()) {
alternatives->updateRecent( useAlt, loadBalancedReply.get().recentRequests );
}
return result.get();
}
if(result.getError().code() != error_code_broken_promise && result.getError().code() != error_code_request_maybe_delivered) {
throw result.getError();
}

View File

@ -121,7 +121,6 @@ public:
case TransactionClass: return "transaction";
case ResolutionClass: return "resolution";
case ProxyClass: return "proxy";
// case GrvProxyClass: return "grv_proxy";
case MasterClass: return "master";
case TesterClass: return "test";
case StatelessClass: return "stateless";

View File

@ -948,6 +948,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
wait(self->pulledVersion.whenAtLeast(currentVersion));
pullFinished = Future<Void>(); // cancels pullAsyncData()
self->pulling = false;
TraceEvent("BackupWorkerPaused", self->myId).detail("Reson", "NoBackup");
} else {
// Backup key is not present, enter this NOOP POP mode.
state Future<Version> committedVersion = self->getMinKnownCommittedVersion();

View File

@ -460,9 +460,7 @@ public:
return bestFitness;
}
WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role,
ProcessClass::Fitness unacceptableFitness, DatabaseConfiguration const& conf,
std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false ) {
WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, ProcessClass::Fitness unacceptableFitness, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, std::pair<vector<WorkerDetails>,vector<WorkerDetails>>> fitness_workers;
for( auto& it : id_worker ) {
@ -493,10 +491,7 @@ public:
throw no_more_servers();
}
vector<WorkerDetails> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role,
int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used,
Optional<std::vector<WorkerFitnessInfo>> minWorkers = Optional<std::vector<WorkerFitnessInfo>>(), bool checkStable = false ) {
vector<WorkerDetails> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<std::vector<WorkerFitnessInfo>> minWorkers = Optional<std::vector<WorkerFitnessInfo>>(), bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, std::pair<vector<WorkerDetails>,vector<WorkerDetails>>> fitness_workers;
vector<WorkerDetails> results;
if(minWorkers.present()) {
@ -737,14 +732,14 @@ public:
auto proxies = getWorkersForRoleInDatacenter( dcId, ProcessClass::Proxy, req.configuration.getDesiredProxies(), req.configuration, id_used, first_two_proxies );
ASSERT(proxies.size() >= 2 && proxies.size() <= req.configuration.getDesiredProxies());
int grvProxiesCount = std::max(1, (int) (CLIENT_KNOBS->GRV_PROXIES_RATIO * proxies.size()));
ASSERT(grvProxiesCount >= 1);
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].interf);
deterministicRandom()->randomShuffle(proxies);
for(int i = 0; i < proxies.size(); i++) {
if (i < proxies.size() - grvProxiesCount) {
result.masterProxies.push_back(proxies[i].interf);
} else {
if (i < grvProxiesCount) {
result.grvProxies.push_back(proxies[i].interf);
} else {
result.masterProxies.push_back(proxies[i].interf);
}
}
@ -895,14 +890,13 @@ public:
result.resolvers.push_back(resolvers[i].interf);
ASSERT(proxies.size() >= 2 && proxies.size() <= req.configuration.getDesiredProxies());
// TODO: Consider shuffle the proxies array before assigning grv proxies and normal proxies.
deterministicRandom()->randomShuffle(proxies);
int grvProxiesCount = std::max(1, (int) (CLIENT_KNOBS->GRV_PROXIES_RATIO * proxies.size()));
ASSERT(grvProxiesCount >= 1);
for(int i = 0; i < proxies.size(); i++) {
if (i < proxies.size() - grvProxiesCount) {
result.masterProxies.push_back(proxies[i].interf);
} else {
if (i < grvProxiesCount) {
result.grvProxies.push_back(proxies[i].interf);
} else {
result.masterProxies.push_back(proxies[i].interf);
}
}
@ -1840,9 +1834,7 @@ ACTOR Future<Void> clusterRecruitFromConfiguration( ClusterControllerData* self,
TEST(true); //ClusterController RecruitTLogsRequest
loop {
try {
// TraceEvent("DatabaseConfigurationRequest").detail("Config", req.configuration.toString());
auto rep = self->findWorkersForConfiguration( req );
// TraceEvent("RecruitResult").detail("Res", rep.toString());
req.reply.send( rep );
return Void();
} catch (Error& e) {
@ -1897,7 +1889,8 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
.detail("Resolvers", req.resolvers.size())
.detail("RecoveryState", (int)req.recoveryState)
.detail("RegistrationCount", req.registrationCount)
.detail("Proxies", req.masterProxies.size())
.detail("MasterProxies", req.masterProxies.size())
.detail("GrvProxies", req.grvProxies.size())
.detail("RecoveryCount", req.recoveryCount)
.detail("Stalled", req.recoveryStalled)
.detail("OldestBackupEpoch", req.logSystemConfig.oldestBackupEpoch);

View File

@ -18,6 +18,7 @@ struct GrvProxyStats {
Counter txnThrottled;
LatencyBands grvLatencyBands;
LatencySample grvLatencySample;
Future<Void> logger;
@ -58,6 +59,7 @@ struct GrvProxyStats {
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc),
txnThrottled("TxnThrottled", cc),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
logger = traceCounters("GrvProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "GrvProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
@ -146,12 +148,10 @@ struct GrvProxyData {
GrvProxyStats stats;
MasterInterface master;
RequestStream<GetReadVersionRequest> getConsistentReadVersion;
LogSystemDiskQueueAdapter* logAdapter;
Reference<ILogSystem> logSystem;
IKeyValueStore* txnStateStore;
Database cx;
Reference<AsyncVar<ServerDBInfo>> db; //maybe
Reference<AsyncVar<ServerDBInfo>> db;
Optional<LatencyBandConfig> latencyBandConfig;
double lastStartCommit;
@ -199,9 +199,13 @@ ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy, GetHea
}
}
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, GrvTransactionRateInfo *transactionRateInfo,
GrvTransactionRateInfo *batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter, PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount,
int64_t* inBatchTransactionCount, GrvTransactionRateInfo* transactionRateInfo,
GrvTransactionRateInfo* batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -225,7 +229,16 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never();
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter, detailed)));
TransactionTagMap<uint64_t> tagCounts;
for(auto itr : *throttledTags) {
for(auto priorityThrottles : itr.second) {
tagCounts[priorityThrottles.first] = (*transactionTagCounter)[priorityThrottles.first];
}
}
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(
GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter,
TransactionTagMap<TransactionCommitCostEstimation>(), detailed)));
transactionTagCounter->clear();
expectingDetailedReply = detailed;
}
@ -234,7 +247,6 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
transactionRateInfo->setRate(rep.transactionRate);
batchTransactionRateInfo->setRate(rep.batchTransactionRate);
//TraceEvent("GrvProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
@ -254,18 +266,17 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
transactionRateInfo->disable();
batchTransactionRateInfo->disable();
TraceEvent(SevWarn, "GrvProxyRateLeaseExpired", myID).suppressFor(5.0);
//TraceEvent("GrvProxyRate", myID).detail("Rate", 0.0).detail("BatchRate", 0.0).detail("Lease", 0);
//TraceEvent("MasterProxyRate", myID).detail("Rate", 0.0).detail("BatchRate", 0.0).detail("Lease", 0);
leaseTimeout = Never();
}
}
}
ACTOR Future<Void> queueGetReadVersionRequests(
Reference<AsyncVar<ServerDBInfo>> db,
Deque<GetReadVersionRequest> *systemQueue,
Deque<GetReadVersionRequest> *defaultQueue,
Deque<GetReadVersionRequest> *batchQueue,
SpannedDeque<GetReadVersionRequest> *systemQueue,
SpannedDeque<GetReadVersionRequest> *defaultQueue,
SpannedDeque<GetReadVersionRequest> *batchQueue,
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer, double *lastGRVTime,
double *GRVBatchTime, FutureStream<double> replyTimes,
@ -302,9 +313,11 @@ ACTOR Future<Void> queueGetReadVersionRequests(
if (req.priority >= TransactionPriority::IMMEDIATE) {
stats->txnSystemPriorityStartIn += req.transactionCount;
systemQueue->push_back(req);
systemQueue->span.addParent(req.spanContext);
} else if (req.priority >= TransactionPriority::DEFAULT) {
stats->txnDefaultPriorityStartIn += req.transactionCount;
defaultQueue->push_back(req);
defaultQueue->span.addParent(req.spanContext);
} else {
// Return error for batch_priority GRV requests
int64_t proxiesCount = std::max((int)db->get().client.masterProxies.size(), 1);
@ -316,6 +329,7 @@ ACTOR Future<Void> queueGetReadVersionRequests(
stats->txnBatchPriorityStartIn += req.transactionCount;
batchQueue->push_back(req);
batchQueue->span.addParent(req.spanContext);
}
}
}
@ -361,26 +375,19 @@ ACTOR Future<Void> lastCommitUpdater(GrvProxyData* self, PromiseStream<Future<Vo
}
}
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(GrvProxyData* grvProxyData, uint32_t flags, Optional<UID> debugID,
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan, GrvProxyData* grvProxyData, uint32_t flags, Optional<UID> debugID,
int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount)
{
// Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
// (1) The version returned is the committedVersion of some proxy at some point before the request returns, so it is committed.
// (2) No proxy on our list reported committed a higher version before this request was received, because then its committedVersion would have been higher,
// and no other proxy could have already committed anything without first ending the epoch
state Span span("GP:getLiveCommittedVersion"_loc, parentSpan);
++grvProxyData->stats.txnStartBatch;
state Future<GetRawCommittedVersionReply> replyFromMasterFuture;
replyFromMasterFuture = grvProxyData->master.getLiveCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::GetLiveCommittedVersionReply);
replyFromMasterFuture = grvProxyData->master.getLiveCommittedVersion.getReply(
GetRawCommittedVersionRequest(span.context, debugID), TaskPriority::GetLiveCommittedVersionReply);
// TODO: figure out what's this
// Causal read risky means it has risks to serve stale read versions which mainly happens in recovery stage in which we
// may serve read versions in the last epoch. To minimize that risk, we want to confirm the epoch is still live.
// FLAG_CAUSAL_READ_RISKY means the request can tolerate the stale read version because it may just want a version and doesn't need to read data.
// Here this means if the system is not always causal read risky and the request really wants a causal read, then
// we must need to confirm we can still write to the current epoch of tlogs.
// If not, we also want to make sure the last commit time is less than REQUIRED_MIN_RECOVERY_DURATION ago which decreases
// the risk of stale read versions.
if (!SERVER_KNOBS->ALWAYS_CAUSAL_READ_RISKY && !(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)) {
wait(updateLastCommit(grvProxyData, debugID));
} else if (SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION > 0 &&
@ -421,6 +428,10 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
double end = g_network->timer();
for(GetReadVersionRequest const& request : requests) {
if(request.priority == TransactionPriority::DEFAULT) {
stats->grvLatencySample.addMeasurement(end - request.requestTime());
}
if(request.priority >= TransactionPriority::DEFAULT) {
stats->grvLatencyBands.addMeasurement(end - request.requestTime());
}
@ -479,14 +490,15 @@ ACTOR static Future<Void> getReadVersionServer(
state GrvTransactionRateInfo normalRateInfo(10);
state GrvTransactionRateInfo batchRateInfo(0);
state Deque<GetReadVersionRequest> systemQueue;
state Deque<GetReadVersionRequest> defaultQueue;
state Deque<GetReadVersionRequest> batchQueue;
state SpannedDeque<GetReadVersionRequest> systemQueue("GP:getReadVersionServerSystemQueue"_loc);
state SpannedDeque<GetReadVersionRequest> defaultQueue("GP:getReadVersionServerDefaultQueue"_loc);
state SpannedDeque<GetReadVersionRequest> batchQueue("GP:getReadVersionServerBatchQueue"_loc);
state TransactionTagMap<uint64_t> transactionTagCounter;
state PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags;
state PromiseStream<double> replyTimes;
state Span span;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
addActor.send(queueGetReadVersionRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
@ -530,7 +542,7 @@ ACTOR static Future<Void> getReadVersionServer(
int requestsToStart = 0;
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
Deque<GetReadVersionRequest>* transactionQueue;
SpannedDeque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
transactionQueue = &systemQueue;
} else if(!defaultQueue.empty()) {
@ -540,6 +552,7 @@ ACTOR static Future<Void> getReadVersionServer(
} else {
break;
}
transactionQueue->span.swap(span);
auto& req = transactionQueue->front();
int tc = req.transactionCount;
@ -601,7 +614,8 @@ ACTOR static Future<Void> getReadVersionServer(
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(grvProxyData, i, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(
span.context, grvProxyData, i, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
addActor.send(sendGrvReplies(readVersionReply, start[i], &grvProxyData->stats,
grvProxyData->minKnownCommittedVersion, throttledTags));
@ -611,6 +625,7 @@ ACTOR static Future<Void> getReadVersionServer(
}
}
}
span = Span(span.location);
}
}
@ -632,23 +647,14 @@ ACTOR Future<Void> grvProxyServerCore(
// Wait until we can load the "real" logsystem, since we don't support switching them currently
while (!(grvProxyData.db->get().master.id() == master.id() && grvProxyData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
//TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
wait(grvProxyData.db->onChange());
}
// Do we need to wait for any db info change? Yes. To update latency band.
state Future<Void> dbInfoChange = grvProxyData.db->onChange();
grvProxyData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), grvProxyData.db->get(), false, addActor);
TraceEvent("LogSystemCreate").detail("Role", "GRV").detail("UID", proxy.id());
// grvProxyData.logAdapter = new LogSystemDiskQueueAdapter(grvProxyData.logSystem, Reference<AsyncVar<PeekTxsInfo>>(), 1, false);
// grvProxyData.txnStateStore = keyValueStoreLogSystem(grvProxyData.logAdapter, proxy.id(), 2e9, true, true, true);
grvProxyData.updateLatencyBandConfig(grvProxyData.db->get().latencyBandConfig);
// // wait for txnStateStore recovery
// wait(success(grvProxyData.txnStateStore->readValue(StringRef())));
addActor.send(getReadVersionServer(proxy, grvProxyData.db, addActor, &grvProxyData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
@ -683,18 +689,15 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, uint64_t r
ACTOR Future<Void> grvProxyServer(
GrvProxyInterface proxy,
InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db,
std::string whitelistBinPaths)
Reference<AsyncVar<ServerDBInfo>> db)
{
try {
state Future<Void> core = grvProxyServerCore(proxy, req.master, db);
// do we need wait for the recovery?
wait(core || checkRemoved(db, req.recoveryCount, proxy));
}
catch (Error& e) {
TraceEvent("GrvProxyTerminated", proxy.id()).error(e, true);
// Examine all the unnecessary codes.
if (e.code() != error_code_worker_removed && e.code() != error_code_tlog_stopped &&
e.code() != error_code_master_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_coordinated_state_conflict && e.code() != error_code_new_coordinators_timed_out) {

View File

@ -1474,20 +1474,20 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan, Pro
rep.metadataVersion = commitData->metadataVersion;
rep.version = commitData->committedVersion.get();
GetRawCommittedVersionReply replyFromMaster = wait(replyFromMasterFuture);
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
state GetRawCommittedVersionReply replyFromMaster = wait(replyFromMasterFuture);
if (replyFromMaster.version > rep.version) {
rep.locked = replyFromMaster.locked;
rep.metadataVersion = replyFromMaster.metadataVersion;
rep.version = replyFromMaster.version;
}
} else {
vector<GetRawCommittedVersionReply> versions = wait(getAll(proxyVersions));
for (auto v : versions) {
state vector<GetRawCommittedVersionReply> versionsFromProxies = wait(getAll(proxyVersions));
for (auto v : versionsFromProxies) {
if (v.version > rep.version) {
rep.locked = replyFromMaster.locked;
rep.metadataVersion = replyFromMaster.metadataVersion;
rep.version = replyFromMaster.version;
rep.locked = v.locked;
rep.metadataVersion = v.metadataVersion;
rep.version = v.version;
}
}
}

View File

@ -123,6 +123,7 @@ int64_t getQueueSize( const TraceEventFields& md ) {
sscanf(md.getValue("BytesInput").c_str(), "%lf %lf %" SCNd64, &inputRate, &inputRoughness, &inputBytes);
sscanf(md.getValue("BytesDurable").c_str(), "%lf %lf %" SCNd64, &durableRate, &durableRoughness, &durableBytes);
return inputBytes - durableBytes;
}
@ -566,9 +567,7 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
.detail("StorageServersRecruiting", storageServersRecruiting.get())
.detail("NumSuccesses", numSuccesses);
if (dataInFlight.get() > dataInFlightGate ||
tLogQueueInfo.get().first > maxTLogQueueGate ||
tLogQueueInfo.get().second > maxPoppedVersionLag ||
if (dataInFlight.get() > dataInFlightGate || tLogQueueInfo.get().first > maxTLogQueueGate || tLogQueueInfo.get().second > maxPoppedVersionLag ||
dataDistributionQueueSize.get() > maxDataDistributionQueueSize ||
storageQueueSize.get() > maxStorageServerQueueGate || dataDistributionActive.get() == false ||
storageServersRecruiting.get() == true || teamCollectionValid.get() == false) {

View File

@ -397,6 +397,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Version version = self->versionMessages.front().first;
std::pair<int,int> &sizes = logData->version_sizes[version];
int64_t messagesErased = 0;
while(!self->versionMessages.empty() && self->versionMessages.front().first == version) {
auto const& m = self->versionMessages.front();
++messagesErased;
@ -446,7 +447,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
bool stopped, initialized;
DBRecoveryCount recoveryCount;
VersionMetricHandle persistentDataVersion, persistentDataDurableVersion; // ? // The last version number in the portion of the log (written|durable) to persistentData
VersionMetricHandle persistentDataVersion, persistentDataDurableVersion; // The last version number in the portion of the log (written|durable) to persistentData
NotifiedVersion version, queueCommittedVersion;
Version queueCommittingVersion;
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
@ -471,7 +472,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
//only callable after getTagData returns a null reference
Reference<TagData> createTagData(Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered) {
if(tag.locality != tagLocalityLogRouter && tag.locality != tagLocalityTxs && tag != txsTag && !allTags.empty() && !allTags.count(tag) && popped <= recoveredAt) {
if(tag.locality != tagLocalityLogRouter && tag.locality != tagLocalityTxs && tag != txsTag && allTags.size() && !allTags.count(tag) && popped <= recoveredAt) {
popped = recoveredAt + 1;
}
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, 0, nothingPersistent, poppedRecently, unpoppedRecovered) );
@ -1052,9 +1053,8 @@ ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere
}
}
if (upTo > logData->persistentDataDurableVersion) {
if (upTo > logData->persistentDataDurableVersion)
wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop));
}
//TraceEvent("TLogPop", logData->logId).detail("Tag", tag.toString()).detail("To", upTo);
}
return Void();
@ -1144,6 +1144,7 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
wait( logData->queueCommittedVersion.whenAtLeast( nextVersion ) );
wait( delay(0, TaskPriority::UpdateStorage) );
//TraceEvent("TlogUpdatePersist", self->dbgid).detail("LogId", logData->logId).detail("NextVersion", nextVersion).detail("Version", logData->version.get()).detail("PersistentDataDurableVer", logData->persistentDataDurableVersion).detail("QueueCommitVer", logData->queueCommittedVersion.get()).detail("PersistDataVer", logData->persistentDataVersion);
if (nextVersion > logData->persistentDataVersion) {
wait( self->persistentDataCommitLock.take() );

View File

@ -448,15 +448,14 @@ struct InitializeMasterProxyRequest {
};
struct InitializeGrvProxyRequest {
constexpr static FileIdentifier file_identifier = 313542387;
constexpr static FileIdentifier file_identifier = 8265613;
MasterInterface master;
uint64_t recoveryCount; // needed?
Version recoveryTransactionVersion; // needed?
uint64_t recoveryCount;
ReplyPromise<GrvProxyInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, master, recoveryCount, recoveryTransactionVersion, reply);
serializer(ar, master, recoveryCount, reply);
}
};
@ -738,8 +737,7 @@ ACTOR Future<Void> masterServer(MasterInterface mi, Reference<AsyncVar<ServerDBI
ServerCoordinators serverCoordinators, LifetimeToken lifetime, bool forceRecovery);
ACTOR Future<Void> masterProxyServer(MasterProxyInterface proxy, InitializeMasterProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db, std::string whitelistBinPaths);
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy, InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db, std::string whitelistBinPaths);
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy, InitializeGrvProxyRequest req, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, UID workerID,

View File

@ -260,7 +260,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
lastEpochEnd(invalidVersion),
liveCommittedVersion(invalidVersion),
databaseLocked(false),
minKnownCommittedVersion(0),
minKnownCommittedVersion(invalidVersion),
recoveryTransactionVersion(invalidVersion),
lastCommitTime(0),
registrationCount(0),
@ -305,7 +305,6 @@ ACTOR Future<Void> newGrvProxies( Reference<MasterData> self, RecruitFromConfigu
InitializeGrvProxyRequest req;
req.master = self->myInterface;
req.recoveryCount = self->cstate.myDBState.recoveryCount + 1;
req.recoveryTransactionVersion = self->recoveryTransactionVersion; // may not need it
TraceEvent("GrvProxyReplies",self->dbgid).detail("WorkerID", recr.grvProxies[i].id());
initializationReplies.push_back( transformErrors( throwErrorOr( recr.grvProxies[i].grvProxy.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
}
@ -494,13 +493,15 @@ ACTOR Future<Void> updateLogsValue( Reference<MasterData> self, Database cx ) {
}
}
Future<Void> sendMasterRegistration(MasterData* self,
Future<Void> sendMasterRegistration(
MasterData* self,
LogSystemConfig const& logSystemConfig,
vector<MasterProxyInterface> proxies,
vector<GrvProxyInterface> grvProxies,
vector<ResolverInterface> resolvers,
DBRecoveryCount recoveryCount,
vector<UID> priorCommittedLogServers ) {
RegisterMasterRequest masterReq;
masterReq.id = self->myInterface.id();
masterReq.mi = self->myInterface.locality;
@ -1067,11 +1068,9 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
reply.locked = self->databaseLocked;
reply.metadataVersion = self->proxyMetadataVersion;
reply.minKnownCommittedVersion = self->minKnownCommittedVersion;
// TraceEvent("ServerServeGet").detail("Own", self->minKnownCommittedVersion);
req.reply.send(reply);
}
when(ReportRawCommittedVersionRequest req = waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) {
// TraceEvent("ServerReceiveReport").detail("MV", req.minKnownCommittedVersion).detail("Own", self->minKnownCommittedVersion);
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion);
if (req.version > self->liveCommittedVersion) {
self->liveCommittedVersion = req.version;

View File

@ -1410,7 +1410,7 @@ ACTOR Future<Void> workerServer(
//printf("Recruited as grvProxyServer\n");
errorForwarders.add( zombie(recruited, forwardError( errors, Role::GRV_PROXY, recruited.id(),
grvProxyServer( recruited, req, dbInfo, whitelistBinPaths ) ) ) );
grvProxyServer( recruited, req, dbInfo ) ) ) );
req.reply.send(recruited);
}
when( InitializeResolverRequest req = waitNext(interf.resolver.getFuture()) ) {