/* * ConsistencyCheck.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include "flow/IRandom.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbrpc/IRateControl.h" #include "fdbrpc/simulator.h" #include "fdbserver/Knobs.h" #include "fdbserver/StorageMetrics.h" #include "fdbserver/DataDistribution.actor.h" #include "fdbserver/QuietDatabase.h" #include "flow/DeterministicRandom.h" #include "fdbclient/ManagementAPI.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. struct ConsistencyCheckWorkload : TestWorkload { //Whether or not we should perform checks that will only pass if the database is in a quiescent state bool performQuiescentChecks; //How long to wait for the database to go quiet before failing (if doing quiescent checks) double quiescentWaitTimeout; //If true, then perform all checks on this client. The first client is the only one to perform all of the fast checks //All other clients will perform slow checks if this test is distributed bool firstClient; //If true, then the expensive checks will be distributed to multiple clients bool distributed; //Determines how many shards are checked for consistency: out of every shards, 1 will be checked int shardSampleFactor; //The previous data distribution mode int oldDataDistributionMode; //If true, then any failure of the consistency check will be logged as SevError. Otherwise, it will be logged as SevWarn bool failureIsError; //Max number of bytes per second to read from each storage server int rateLimitMax; // DataSet Size int64_t bytesReadInPreviousRound; //Randomize shard order with each iteration if true bool shuffleShards; bool success; //Number of times this client has run its portion of the consistency check int64_t repetitions; //Whether to continuously perfom the consistency check bool indefinite; ConsistencyCheckWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { performQuiescentChecks = getOption(options, LiteralStringRef("performQuiescentChecks"), false); quiescentWaitTimeout = getOption(options, LiteralStringRef("quiescentWaitTimeout"), 600.0); distributed = getOption(options, LiteralStringRef("distributed"), true); shardSampleFactor = std::max(getOption(options, LiteralStringRef("shardSampleFactor"), 1), 1); failureIsError = getOption(options, LiteralStringRef("failureIsError"), false); rateLimitMax = getOption(options, LiteralStringRef("rateLimitMax"), 0); shuffleShards = getOption(options, LiteralStringRef("shuffleShards"), false); indefinite = getOption(options, LiteralStringRef("indefinite"), false); success = true; firstClient = clientId == 0; repetitions = 0; bytesReadInPreviousRound = 0; } virtual std::string description() { return "ConsistencyCheck"; } virtual Future setup(Database const& cx) { return _setup(cx, this); } ACTOR Future _setup(Database cx, ConsistencyCheckWorkload *self) { //If performing quiescent checks, wait for the database to go quiet if(self->firstClient && self->performQuiescentChecks) { if(g_network->isSimulated()) { wait( timeKeeperSetDisable(cx) ); } try { wait(timeoutError(quietDatabase(cx, self->dbInfo, "ConsistencyCheckStart", 0, 1e5, 0, 0), self->quiescentWaitTimeout)); // FIXME: should be zero? } catch (Error& e) { TraceEvent("ConsistencyCheck_QuietDatabaseError").error(e); self->testFailure("Unable to achieve a quiet database"); self->performQuiescentChecks = false; } } return Void(); } virtual Future start(Database const& cx) { TraceEvent("ConsistencyCheck"); return _start(cx, this); } virtual Future check(Database const& cx) { return success; } virtual void getMetrics( vector& m ) { } void testFailure(std::string message, bool isError = false) { success = false; TraceEvent failEvent((failureIsError || isError) ? SevError : SevWarn, "TestFailure"); if(performQuiescentChecks) failEvent.detail("Workload", "QuiescentCheck"); else failEvent.detail("Workload", "ConsistencyCheck"); failEvent.detail("Reason", "Consistency check: " + message); } ACTOR Future _start(Database cx, ConsistencyCheckWorkload *self) { loop { wait(self->runCheck(cx, self)); if(!self->indefinite) break; self->repetitions++; wait(delay(5.0)); } return Void(); } ACTOR Future runCheck(Database cx, ConsistencyCheckWorkload *self) { TEST(self->performQuiescentChecks); //Quiescent consistency check TEST(!self->performQuiescentChecks); //Non-quiescent consistency check if(self->firstClient || self->distributed) { try { state DatabaseConfiguration configuration; state Transaction tr(cx); tr.setOption(FDBTransactionOptions::LOCK_AWARE); loop { try { Standalone res = wait( tr.getRange(configKeys, 1000) ); if( res.size() == 1000 ) { TraceEvent("ConsistencyCheck_TooManyConfigOptions"); self->testFailure("Read too many configuration options"); } for( int i = 0; i < res.size(); i++ ) configuration.set(res[i].key,res[i].value); break; } catch( Error &e ) { wait( tr.onError(e) ); } } //Perform quiescence-only checks if(self->firstClient && self->performQuiescentChecks) { //Check for undesirable servers (storage servers with exact same network address or using the wrong key value store type) state bool hasUndesirableServers = wait(self->checkForUndesirableServers(cx, configuration, self)); //Check that nothing is in-flight or in queue in data distribution int64_t inDataDistributionQueue = wait(getDataDistributionQueueSize(cx, self->dbInfo, true)); if(inDataDistributionQueue > 0) { TraceEvent("ConsistencyCheck_NonZeroDataDistributionQueue").detail("QueueSize", inDataDistributionQueue); self->testFailure("Non-zero data distribution queue/in-flight size"); } // Check that the number of process (and machine) teams is no larger than // the allowed maximum number of teams bool teamCollectionValid = wait(getTeamCollectionValid(cx, self->dbInfo)); if (!teamCollectionValid) { TraceEvent(SevError, "ConsistencyCheck_TooManyTeams"); self->testFailure("The number of process or machine teams is larger than the allowed maximum number of teams"); } //Check that nothing is in the TLog queues std::pair maxTLogQueueInfo = wait(getTLogQueueInfo(cx, self->dbInfo)); if(maxTLogQueueInfo.first > 1e5) // FIXME: Should be zero? { TraceEvent("ConsistencyCheck_NonZeroTLogQueue").detail("MaxQueueSize", maxTLogQueueInfo.first); self->testFailure("Non-zero tlog queue size"); } if(maxTLogQueueInfo.second > 30e6) { TraceEvent("ConsistencyCheck_PoppedVersionLag").detail("PoppedVersionLag", maxTLogQueueInfo.second); self->testFailure("large popped version lag"); } //Check that nothing is in the storage server queues try { int64_t maxStorageServerQueueSize = wait(getMaxStorageServerQueueSize(cx, self->dbInfo)); if(maxStorageServerQueueSize > 0) { TraceEvent("ConsistencyCheck_NonZeroStorageServerQueue").detail("MaxQueueSize", maxStorageServerQueueSize); self->testFailure("Non-zero storage server queue size"); } } catch(Error& e) { if(e.code() == error_code_attribute_not_found) { TraceEvent("ConsistencyCheck_StorageQueueSizeError").error(e).detail("Reason", "Could not read queue size"); //This error occurs if we have undesirable servers; in that case just report the undesirable servers error if(!hasUndesirableServers) self->testFailure("Could not read storage queue size"); } else throw; } wait(::success(self->checkForStorage(cx, configuration, self))); wait(::success(self->checkForExtraDataStores(cx, self))); //Check that each machine is operating as its desired class bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self)); if(!usingDesiredClasses) self->testFailure("Cluster has machine(s) not using requested classes"); bool workerListCorrect = wait( self->checkWorkerList(cx, self) ); if(!workerListCorrect) self->testFailure("Worker list incorrect"); bool coordinatorsCorrect = wait( self->checkCoordinators(cx) ); if(!coordinatorsCorrect) self->testFailure("Coordinators incorrect"); } //Get a list of key servers; verify that the TLogs and master all agree about who the key servers are state Promise>>> keyServerPromise; bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise)); if(keyServerResult) { state std::vector>> keyServers = keyServerPromise.getFuture().get(); //Get the locations of all the shards in the database state Promise>> keyLocationPromise; bool keyLocationResult = wait(self->getKeyLocations(cx, keyServers, self, keyLocationPromise)); if(keyLocationResult) { state Standalone> keyLocations = keyLocationPromise.getFuture().get(); //Check that each shard has the same data on all storage servers that it resides on wait(::success(self->checkDataConsistency(cx, keyLocations, configuration, self))); } } } catch(Error &e) { if (e.code() == error_code_transaction_too_old || e.code() == error_code_future_version || e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || e.code() == error_code_process_behind) TraceEvent("ConsistencyCheck_Retry").error(e); // FIXME: consistency check does not retry in this case else self->testFailure(format("Error %d - %s", e.code(), e.name())); } } TraceEvent("ConsistencyCheck_FinishedCheck").detail("Repetitions", self->repetitions); return Void(); } //Gets a version at which to read from the storage servers ACTOR Future getVersion(Database cx, ConsistencyCheckWorkload *self) { loop { state Transaction tr(cx); tr.setOption(FDBTransactionOptions::LOCK_AWARE); try { Version version = wait(tr.getReadVersion()); return version; } catch(Error &e) { tr.onError(e); } } } //Get a list of storage servers from the master and compares them with the TLogs. //If this is a quiescent check, then each master proxy needs to respond, otherwise only one needs to respond. //Returns false if there is a failure (in this case, keyServersPromise will never be set) ACTOR Future getKeyServers(Database cx, ConsistencyCheckWorkload *self, Promise>>> keyServersPromise) { state std::vector>> keyServers; //Try getting key server locations from the master proxies state vector>> keyServerLocationFutures; state Key begin = keyServersKeys.begin; state Key end = keyServersKeys.end; state int limitKeyServers = BUGGIFY ? 1 : 100; while (begin < end) { state Reference proxyInfo = wait(cx->getMasterProxiesFuture(false)); keyServerLocationFutures.clear(); for (int i = 0; i < proxyInfo->size(); i++) keyServerLocationFutures.push_back(proxyInfo->get(i, &MasterProxyInterface::getKeyServersLocations).getReplyUnlessFailedFor(GetKeyServerLocationsRequest(begin, end, limitKeyServers, false, Arena()), 2, 0)); state bool keyServersInsertedForThisIteration = false; choose { when(wait(waitForAll(keyServerLocationFutures))) { //Read the key server location results for (int i = 0; i < keyServerLocationFutures.size(); i++) { ErrorOr shards = keyServerLocationFutures[i].get(); //If performing quiescent check, then all master proxies should be reachable. Otherwise, only one needs to be reachable if (self->performQuiescentChecks && (!shards.present() || shards.get().newClientInfo.present())) { TraceEvent("ConsistencyCheck_MasterProxyUnavailable").detail("MasterProxyID", proxyInfo->getId(i)); self->testFailure("Master proxy unavailable"); return false; } //Get the list of shards if one was returned. If not doing a quiescent check, we can break if it is. //If we are doing a quiescent check, then we only need to do this for the first shard. if (shards.present() && !shards.get().newClientInfo.present() && !keyServersInsertedForThisIteration) { keyServers.insert(keyServers.end(), shards.get().results.begin(), shards.get().results.end()); keyServersInsertedForThisIteration = true; begin = shards.get().results.back().first.end; if (!self->performQuiescentChecks) break; } } // End of For } when(wait(cx->onMasterProxiesChanged())) { } } // End of choose if (!keyServersInsertedForThisIteration) // Retry the entire workflow wait(delay(1.0)); } // End of while keyServersPromise.send(keyServers); return true; } //Retrieves the locations of all shards in the database //Returns false if there is a failure (in this case, keyLocationPromise will never be set) ACTOR Future getKeyLocations(Database cx, std::vector>> shards, ConsistencyCheckWorkload *self, Promise>> keyLocationPromise) { state Standalone> keyLocations; state Key beginKey = allKeys.begin.withPrefix(keyServersPrefix); state Key endKey = allKeys.end.withPrefix(keyServersPrefix); state int i = 0; state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior //If the responses are too big, we may use multiple requests to get the key locations. Each request begins where the last left off for ( ; i < shards.size(); i++) { while (beginKey < std::min(shards[i].first.end, endKey)) { try { Version version = wait(self->getVersion(cx, self)); GetKeyValuesRequest req; req.begin = firstGreaterOrEqual(beginKey); req.end = firstGreaterOrEqual(std::min(shards[i].first.end, endKey)); req.limit = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT; req.limitBytes = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES; req.version = version; //Try getting the shard locations from the key servers state vector>> keyValueFutures; for (const auto& kv : shards[i].second) { resetReply(req); keyValueFutures.push_back(kv.getKeyValues.getReplyUnlessFailedFor(req, 2, 0)); } wait(waitForAll(keyValueFutures)); int firstValidStorageServer = -1; //Read the shard location results for (int j = 0; j < keyValueFutures.size(); j++) { ErrorOr reply = keyValueFutures[j].get(); if (!reply.present() || reply.get().error.present()) { //If the storage server didn't reply in a quiescent database, then the check fails if(self->performQuiescentChecks) { TraceEvent("ConsistencyCheck_KeyServerUnavailable").detail("StorageServer", shards[i].second[j].id().toString().c_str()); self->testFailure("Key server unavailable"); return false; } //If no storage servers replied, then throw all_alternatives_failed to force a retry else if(firstValidStorageServer < 0 && j == keyValueFutures.size() - 1) throw all_alternatives_failed(); } //If this is the first storage server, store the locations to send back to the caller else if(firstValidStorageServer < 0) { firstValidStorageServer = j; //Otherwise, compare the data to the results from the first storage server. If they are different, then the check fails } else if(reply.get().data != keyValueFutures[firstValidStorageServer].get().get().data || reply.get().more != keyValueFutures[firstValidStorageServer].get().get().more) { TraceEvent("ConsistencyCheck_InconsistentKeyServers").detail("StorageServer1", shards[i].second[firstValidStorageServer].id()) .detail("StorageServer2", shards[i].second[j].id()); self->testFailure("Key servers inconsistent", true); return false; } } auto keyValueResponse = keyValueFutures[firstValidStorageServer].get().get(); Standalone currentLocations = krmDecodeRanges( keyServersPrefix, KeyRangeRef(beginKey.removePrefix(keyServersPrefix), std::min(shards[i].first.end, endKey).removePrefix(keyServersPrefix)), RangeResultRef( keyValueResponse.data, keyValueResponse.more) ); if(keyValueResponse.data.size() && beginKey == keyValueResponse.data[0].key) { keyLocations.push_back_deep(keyLocations.arena(), currentLocations[0]); } if(currentLocations.size() > 2) { keyLocations.append_deep(keyLocations.arena(), ¤tLocations[1], currentLocations.size() - 2); } //Next iteration should pick up where we left off ASSERT(currentLocations.size() > 1); if(!keyValueResponse.more) { beginKey = shards[i].first.end; } else { beginKey = keyValueResponse.data.end()[-1].key; } //If this is the last iteration, then push the allKeys.end KV pair if(beginKey >= endKey) keyLocations.push_back_deep(keyLocations.arena(), currentLocations.end()[-1]); } catch (Error& e) { state Error err = e; wait(onErrorTr.onError(err)); TraceEvent("ConsistencyCheck_RetryGetKeyLocations").error(err); } } } keyLocationPromise.send(keyLocations); return true; } //Retrieves a vector of the storage servers' estimates for the size of a particular shard //If a storage server can't be reached, its estimate will be -1 //If there is an error, then the returned vector will have 0 size ACTOR Future> getStorageSizeEstimate(vector storageServers, KeyRangeRef shard) { state vector estimatedBytes; state WaitMetricsRequest req; req.keys = shard; req.max.bytes = -1; req.min.bytes = 0; state vector>> metricFutures; try { //Check the size of the shard on each storage server for(int i = 0; i < storageServers.size(); i++) { resetReply(req); metricFutures.push_back(storageServers[i].waitMetrics.getReplyUnlessFailedFor(req, 2, 0)); } //Wait for the storage servers to respond wait(waitForAll(metricFutures)); int firstValidStorageServer = -1; //Retrieve the size from the storage server responses for(int i = 0; i < storageServers.size(); i++) { ErrorOr reply = metricFutures[i].get(); //If the storage server doesn't reply, then return -1 if(!reply.present()) { TraceEvent("ConsistencyCheck_FailedToFetchMetrics").detail("Begin", printable(shard.begin)).detail("End", printable(shard.end)).detail("StorageServer", storageServers[i].id()); estimatedBytes.push_back(-1); } //Add the result to the list of estimates else if(reply.present()) { int64_t numBytes = reply.get().bytes; estimatedBytes.push_back(numBytes); if(firstValidStorageServer < 0) firstValidStorageServer = i; else if(estimatedBytes[firstValidStorageServer] != numBytes) { TraceEvent("ConsistencyCheck_InconsistentStorageMetrics").detail("ByteEstimate1", estimatedBytes[firstValidStorageServer]).detail("ByteEstimate2", numBytes) .detail("Begin", printable(shard.begin)).detail("End", printable(shard.end)).detail("StorageServer1", storageServers[firstValidStorageServer].id()) .detail("StorageServer2", storageServers[i].id()); } } } } catch(Error& e) { TraceEvent("ConsistencyCheck_ErrorFetchingMetrics").error(e).detail("Begin", printable(shard.begin)).detail("End", printable(shard.end)); estimatedBytes.clear(); } return estimatedBytes; } //Comparison function used to compare map elements by value template static bool compareByValue(std::pair a, std::pair b) { return a.second < b.second; } ACTOR Future getDatabaseSize(Database cx) { state Transaction tr( cx ); tr.setOption(FDBTransactionOptions::LOCK_AWARE); loop { try { StorageMetrics metrics = wait( tr.getStorageMetrics( KeyRangeRef(allKeys.begin, keyServersPrefix), 100000 ) ); return metrics.bytes; } catch( Error &e ) { wait( tr.onError( e ) ); } } } //Checks that the data in each shard is the same on each storage server that it resides on. Also performs some sanity checks on the sizes of shards and storage servers. //Returns false if there is a failure ACTOR Future checkDataConsistency(Database cx, VectorRef keyLocations, DatabaseConfiguration configuration, ConsistencyCheckWorkload *self) { //Stores the total number of bytes on each storage server //In a distributed test, this will be an estimated size state std::map storageServerSizes; //Iterate through each shard, checking its values on all of its storage servers //If shardSampleFactor > 1, then not all shards are processed //Also, in a distributed data consistency check, each client processes a subset of the shards //Note: this may cause some shards to be processed more than once or not at all in a non-quiescent database state int effectiveClientCount = (self->distributed) ? self->clientCount : 1; state int i = self->clientId * (self->shardSampleFactor + 1); state int increment = (self->distributed && !self->firstClient) ? effectiveClientCount * self->shardSampleFactor : 1; state int rateLimitForThisRound = self->bytesReadInPreviousRound == 0 ? self->rateLimitMax : std::min(self->rateLimitMax, static_cast(ceil(self->bytesReadInPreviousRound / (float) CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME))); ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= self->rateLimitMax); TraceEvent("ConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound); state Reference rateLimiter = Reference( new SpeedLimit(rateLimitForThisRound, 1) ); state double rateLimiterStartTime = now(); state int64_t bytesReadInthisRound = 0; state double dbSize = 100e12; if(g_network->isSimulated()) { //This call will get all shard ranges in the database, which is too expensive on real clusters. int64_t _dbSize = wait( self->getDatabaseSize( cx ) ); dbSize = _dbSize; } state vector ranges; for(int k = 0; k < keyLocations.size() - 1; k++) { KeyRangeRef range(keyLocations[k].key, keyLocations[k + 1].key); ranges.push_back(range); } state vector shardOrder; for(int k = 0; k < ranges.size(); k++) shardOrder.push_back(k); if(self->shuffleShards) { uint32_t seed = self->sharedRandomNumber + self->repetitions; DeterministicRandom sharedRandom( seed == 0 ? 1 : seed ); sharedRandom.randomShuffle(shardOrder); } for(; i < ranges.size(); i += increment) { state int shard = shardOrder[i]; state KeyRangeRef range = ranges[shard]; state vector sourceStorageServers; state vector destStorageServers; state Transaction tr(cx); tr.setOption(FDBTransactionOptions::LOCK_AWARE); state int bytesReadInRange = 0; decodeKeyServersValue(keyLocations[shard].value, sourceStorageServers, destStorageServers); //If the destStorageServers is non-empty, then this shard is being relocated state bool isRelocating = destStorageServers.size() > 0; //This check was disabled because we now disable data distribution during the consistency check, //which can leave shards with dest storage servers. //Disallow relocations in a quiescent database /*if(self->firstClient && self->performQuiescentChecks && isRelocating) { TraceEvent("ConsistencyCheck_QuiescentShardRelocation").detail("ShardBegin", printable(range.start)).detail("ShardEnd", printable(range.end)); self->testFailure("Shard is being relocated in quiescent database"); return false; }*/ //In a quiescent database, check that the team size is the same as the desired team size if(self->firstClient && self->performQuiescentChecks && sourceStorageServers.size() != configuration.usableRegions*configuration.storageTeamSize) { TraceEvent("ConsistencyCheck_InvalidTeamSize") .detail("ShardBegin", printable(range.begin)) .detail("ShardEnd", printable(range.end)) .detail("SourceTeamSize", sourceStorageServers.size()) .detail("DestServerSize", destStorageServers.size()) .detail("ConfigStorageTeamSize", configuration.storageTeamSize) .detail("UsableRegions", configuration.usableRegions); // Record the server reponsible for the problematic shards int i = 0; for (auto& id : sourceStorageServers) { TraceEvent("IncorrectSizeTeamInfo").detail("ServerUID", id).detail("TeamIndex", i++); } self->testFailure("Invalid team size"); return false; } state vector storageServers = (isRelocating) ? destStorageServers : sourceStorageServers; state vector storageServerInterfaces; loop { try { vector< Future< Optional > > serverListEntries; for(int s=0; s> serverListValues = wait( getAll(serverListEntries) ); for(int s=0; sperformQuiescentChecks) self->testFailure("/FF/serverList changing in a quiescent database"); } break; } catch(Error &e) { wait( tr.onError(e) ); } } state vector estimatedBytes = wait(self->getStorageSizeEstimate(storageServerInterfaces, range)); //Gets permitted size range of shard int64_t maxShardSize = getMaxShardSize( dbSize ); state ShardSizeBounds shardBounds = getShardSizeBounds(range, maxShardSize); if(self->firstClient) { //If there was an error retrieving shard estimated size if(self->performQuiescentChecks && estimatedBytes.size() == 0) self->testFailure("Error fetching storage metrics"); //If running a distributed test, storage server size is an accumulation of shard estimates else if(self->distributed && self->firstClient) for(int j = 0; j < storageServers.size(); j++) storageServerSizes[storageServers[j]] += std::max(estimatedBytes[j], (int64_t)0); } //The first client may need to skip the rest of the loop contents if it is just processing this shard to get a size estimate if(!self->firstClient || shard % (effectiveClientCount * self->shardSampleFactor) == 0) { state int shardKeys = 0; state int shardBytes = 0; state int sampledBytes = 0; state int splitBytes = 0; state int firstKeySampledBytes = 0; state int sampledKeys = 0; state int sampledKeysWithProb = 0; state double shardVariance = 0; state bool canSplit = false; state Key lastSampleKey; state Key lastStartSampleKey; state int64_t totalReadAmount = 0; state KeySelector begin = firstGreaterOrEqual(range.begin); state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior //Read a limited number of entries at a time, repeating until all keys in the shard have been read loop { try { lastSampleKey = lastStartSampleKey; //Get the min version of the storage servers Version version = wait(self->getVersion(cx, self)); state GetKeyValuesRequest req; req.begin = begin; req.end = firstGreaterOrEqual(range.end); req.limit = 1e4; req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT; req.version = version; //Try getting the entries in the specified range state vector>> keyValueFutures; state int j = 0; for(j = 0; j < storageServerInterfaces.size(); j++) { resetReply(req); keyValueFutures.push_back(storageServerInterfaces[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0)); } wait(waitForAll(keyValueFutures)); //Read the resulting entries state int firstValidServer = -1; totalReadAmount = 0; for(j = 0 ; j < keyValueFutures.size(); j++) { ErrorOr rangeResult = keyValueFutures[j].get(); //Compare the results with other storage servers if(rangeResult.present() && !rangeResult.get().error.present()) { state GetKeyValuesReply current = rangeResult.get(); totalReadAmount += current.data.expectedSize(); //If we haven't encountered a valid storage server yet, then mark this as the baseline to compare against if(firstValidServer == -1) firstValidServer = j; //Compare this shard against the first else { GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get(); if(current.data != reference.data || current.more != reference.more) { //Be especially verbose if in simulation if(g_network->isSimulated()) { int invalidIndex = -1; printf("\nSERVER %d (%s); shard = %s - %s:\n", j, storageServerInterfaces[j].address().toString().c_str(), printable(req.begin.getKey()).c_str(), printable(req.end.getKey()).c_str()); for(int k = 0; k < current.data.size(); k++) { printf("%d. %s => %s\n", k, printable(current.data[k].key).c_str(), printable(current.data[k].value).c_str()); if(invalidIndex < 0 && (k >= reference.data.size() || current.data[k].key != reference.data[k].key || current.data[k].value != reference.data[k].value)) invalidIndex = k; } printf("\nSERVER %d (%s); shard = %s - %s:\n", firstValidServer, storageServerInterfaces[firstValidServer].address().toString().c_str(), printable(req.begin.getKey()).c_str(), printable(req.end.getKey()).c_str()); for(int k = 0; k < reference.data.size(); k++) { printf("%d. %s => %s\n", k, printable(reference.data[k].key).c_str(), printable(reference.data[k].value).c_str()); if(invalidIndex < 0 && (k >= current.data.size() || reference.data[k].key != current.data[k].key || reference.data[k].value != current.data[k].value)) invalidIndex = k; } printf("\nMISMATCH AT %d\n\n", invalidIndex); } //Data for trace event //The number of keys unique to the current shard int currentUniques = 0; //The number of keys unique to the reference shard int referenceUniques = 0; //The number of keys in both shards with conflicting values int valueMismatches = 0; //The number of keys in both shards with matching values int matchingKVPairs = 0; //Last unique key on the current shard KeyRef currentUniqueKey; //Last unique key on the reference shard KeyRef referenceUniqueKey; //Last value mismatch KeyRef valueMismatchKey; //Loop indeces int currentI = 0; int referenceI = 0; while(currentI < current.data.size() || referenceI < reference.data.size()) { if(currentI >= current.data.size()) { referenceUniqueKey = reference.data[referenceI].key; referenceUniques++; referenceI++; } else if(referenceI >= reference.data.size()) { currentUniqueKey = current.data[currentI].key; currentUniques++; currentI++; } else { KeyValueRef currentKV = current.data[currentI]; KeyValueRef referenceKV = reference.data[referenceI]; if(currentKV.key == referenceKV.key) { if(currentKV.value == referenceKV.value) matchingKVPairs++; else { valueMismatchKey = currentKV.key; valueMismatches++; } currentI++; referenceI++; } else if(currentKV.key < referenceKV.key) { currentUniqueKey = currentKV.key; currentUniques++; currentI++; } else { referenceUniqueKey = referenceKV.key; referenceUniques++; referenceI++; } } } TraceEvent("ConsistencyCheck_DataInconsistent").detail(format("StorageServer%d", j).c_str(), storageServers[j].toString()) .detail(format("StorageServer%d",firstValidServer).c_str(), storageServers[firstValidServer].toString()) .detail("ShardBegin", printable(req.begin.getKey())) .detail("ShardEnd", printable(req.end.getKey())) .detail("VersionNumber", req.version) .detail(format("Server%dUniques",j).c_str(), currentUniques) .detail(format("Server%dUniqueKey",j).c_str(), printable(currentUniqueKey)) .detail(format("Server%dUniques",firstValidServer).c_str(), referenceUniques) .detail(format("Server%dUniqueKey",firstValidServer).c_str(), printable(referenceUniqueKey)) .detail("ValueMismatches", valueMismatches) .detail("ValueMismatchKey", printable(valueMismatchKey)) .detail("MatchingKVPairs", matchingKVPairs); self->testFailure("Data inconsistent", true); return false; } } } //If the data is not available and we aren't relocating this shard else if(!isRelocating) { TraceEvent("ConsistencyCheck_StorageServerUnavailable").suppressFor(1.0).detail("StorageServer", storageServers[j]).detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end)) .detail("Address", storageServerInterfaces[j].address()).detail("GetKeyValuesToken", storageServerInterfaces[j].getKeyValues.getEndpoint().token); //All shards should be available in quiscence if(self->performQuiescentChecks) { self->testFailure("Storage server unavailable"); return false; } } } if(firstValidServer >= 0) { VectorRef data = keyValueFutures[firstValidServer].get().get().data; //Calculate the size of the shard, the variance of the shard size estimate, and the correct shard size estimate for(int k = 0; k < data.size(); k++) { ByteSampleInfo sampleInfo = isKeyValueInSample(data[k]); shardBytes += sampleInfo.size; double itemProbability = ((double)sampleInfo.size) / sampleInfo.sampledSize; if(itemProbability < 1) shardVariance += itemProbability * (1 - itemProbability) * pow((double)sampleInfo.sampledSize, 2); if(sampleInfo.inSample) { sampledBytes += sampleInfo.sampledSize; if(!canSplit && sampledBytes >= shardBounds.min.bytes && data[k].key.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT && sampledBytes <= shardBounds.max.bytes*CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT/2 ) { canSplit = true; splitBytes = sampledBytes; } /*TraceEvent("ConsistencyCheck_ByteSample").detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end)) .detail("SampledBytes", sampleInfo.sampledSize).detail("Key", printable(data[k].key)).detail("KeySize", data[k].key.size()).detail("ValueSize", data[k].value.size());*/ //In data distribution, the splitting process ignores the first key in a shard. Thus, we shouldn't consider it when validating the upper bound of estimated shard sizes if(k == 0) firstKeySampledBytes += sampleInfo.sampledSize; sampledKeys++; if(itemProbability < 1) { sampledKeysWithProb++; } } } //Accumulate number of keys in this shard shardKeys += data.size(); } //after requesting each shard, enforce rate limit based on how much data will likely be read if(rateLimitForThisRound > 0) { wait(rateLimiter->getAllowance(totalReadAmount)); // Set ratelimit to max allowed if current round has been going on for a while if(now() - rateLimiterStartTime > 1.1 * CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME && rateLimitForThisRound != self->rateLimitMax) { rateLimitForThisRound = self->rateLimitMax; rateLimiter = Reference( new SpeedLimit(rateLimitForThisRound, 1) ); rateLimiterStartTime = now(); TraceEvent(SevInfo, "ConsistencyCheck_RateLimitSetMaxForThisRound").detail("RateLimit", rateLimitForThisRound); } } bytesReadInRange += totalReadAmount; bytesReadInthisRound += totalReadAmount; //Advance to the next set of entries if(firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) { VectorRef result = keyValueFutures[firstValidServer].get().get().data; ASSERT(result.size() > 0); begin = firstGreaterThan(result[result.size() - 1].key); ASSERT(begin.getKey() != allKeys.end); lastStartSampleKey = lastSampleKey; } else break; } catch(Error &e) { state Error err = e; wait(onErrorTr.onError(err)); TraceEvent("ConsistencyCheck_RetryDataConsistency").error(err); } } canSplit = canSplit && sampledBytes - splitBytes >= shardBounds.min.bytes && sampledBytes > splitBytes; //Update the size of all storage servers containing this shard //This is only done in a non-distributed consistency check; the distributed check uses shard size estimates if(!self->distributed) for(int j = 0; j < storageServers.size(); j++) storageServerSizes[storageServers[j]] += shardBytes; bool hasValidEstimate = estimatedBytes.size() > 0; //If the storage servers' sampled estimate of shard size is different from ours if(self->performQuiescentChecks) { for(int j = 0; j < estimatedBytes.size(); j++) { if(estimatedBytes[j] >= 0 && estimatedBytes[j] != sampledBytes) { TraceEvent("ConsistencyCheck_IncorrectEstimate").detail("EstimatedBytes", estimatedBytes[j]).detail("CorrectSampledBytes", sampledBytes) .detail("StorageServer", storageServers[j]); self->testFailure("Storage servers had incorrect sampled estimate"); hasValidEstimate = false; break; } else if(estimatedBytes[j] < 0) { self->testFailure("Could not get storage metrics from server"); hasValidEstimate = false; break; } } } //Compute the difference between the shard size estimate and its actual size. If it is sufficiently large, then fail double stdDev = sqrt(shardVariance); double failErrorNumStdDev = 7; int estimateError = abs(shardBytes - sampledBytes); //Only perform the check if there are sufficient keys to get a distribution that should resemble a normal distribution if(sampledKeysWithProb > 30 && estimateError > failErrorNumStdDev * stdDev) { double numStdDev = estimateError / sqrt(shardVariance); TraceEvent("ConsistencyCheck_InaccurateShardEstimate").detail("Min", shardBounds.min.bytes).detail("Max", shardBounds.max.bytes).detail("Estimate", sampledBytes) .detail("Actual", shardBytes).detail("NumStdDev", numStdDev).detail("Variance", shardVariance).detail("StdDev", stdDev) .detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end)).detail("NumKeys", shardKeys).detail("NumSampledKeys", sampledKeys) .detail("NumSampledKeysWithProb", sampledKeysWithProb); self->testFailure(format("Shard size is more than %f std dev from estimate", failErrorNumStdDev)); } //In a quiescent database, check that the (estimated) size of the shard is within permitted bounds //Min and max shard sizes have a 3 * shardBounds.permittedError.bytes cushion for error since shard sizes are not precise //Shard splits ignore the first key in a shard, so its size shouldn't be considered when checking the upper bound //0xff shards are not checked if( canSplit && sampledKeys > 5 && self->performQuiescentChecks && !range.begin.startsWith(keyServersPrefix) && (sampledBytes < shardBounds.min.bytes - 3 * shardBounds.permittedError.bytes || sampledBytes - firstKeySampledBytes > shardBounds.max.bytes + 3 * shardBounds.permittedError.bytes)) { TraceEvent("ConsistencyCheck_InvalidShardSize").detail("Min", shardBounds.min.bytes).detail("Max", shardBounds.max.bytes).detail("Size", shardBytes) .detail("EstimatedSize", sampledBytes).detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end)).detail("ShardCount", ranges.size()) .detail("SampledKeys", sampledKeys); self->testFailure(format("Shard size in quiescent database is too %s", (sampledBytes < shardBounds.min.bytes) ? "small" : "large")); return false; } } if(bytesReadInRange > 0) { TraceEvent("ConsistencyCheck_ReadRange").suppressFor(1.0).detail("Range", printable(range)).detail("BytesRead", bytesReadInRange); } } //SOMEDAY: when background data distribution is implemented, include this test //In a quiescent database, check that the sizes of storage servers are roughly the same /*if(self->performQuiescentChecks) { auto minStorageServer = std::min_element(storageServerSizes.begin(), storageServerSizes.end(), ConsistencyCheckWorkload::compareByValue); auto maxStorageServer = std::max_element(storageServerSizes.begin(), storageServerSizes.end(), ConsistencyCheckWorkload::compareByValue); int bias = SERVER_KNOBS->MIN_SHARD_BYTES; if(1.1 * (minStorageServer->second + SERVER_KNOBS->MIN_SHARD_BYTES) < maxStorageServer->second + SERVER_KNOBS->MIN_SHARD_BYTES) { TraceEvent("ConsistencyCheck_InconsistentStorageServerSizes").detail("MinSize", minStorageServer->second).detail("MaxSize", maxStorageServer->second) .detail("MinStorageServer", minStorageServer->first).detail("MaxStorageServer", maxStorageServer->first); self->testFailure(format("Storage servers differ significantly in size by a factor of %f", ((double)maxStorageServer->second) / minStorageServer->second)); return false; } }*/ self->bytesReadInPreviousRound = bytesReadInthisRound; return true; } //Returns true if any storage servers have the exact same network address or are not using the correct key value store type ACTOR Future checkForUndesirableServers(Database cx, DatabaseConfiguration configuration, ConsistencyCheckWorkload *self) { state int i; state int j; state vector storageServers = wait( getStorageServers( cx ) ); //Check each pair of storage servers for an address match for(i = 0; i < storageServers.size(); i++) { //Check that each storage server has the correct key value store type ReplyPromise typeReply; ErrorOr keyValueStoreType = wait(storageServers[i].getKeyValueStoreType.getReplyUnlessFailedFor(typeReply, 2, 0)); if(!keyValueStoreType.present()) { TraceEvent("ConsistencyCheck_ServerUnavailable").detail("ServerID", storageServers[i].id()); self->testFailure("Storage server unavailable"); } else if(keyValueStoreType.get() != configuration.storageServerStoreType) { TraceEvent("ConsistencyCheck_WrongKeyValueStoreType").detail("ServerID", storageServers[i].id()).detail("StoreType", keyValueStoreType.get().toString()).detail("DesiredType", configuration.storageServerStoreType.toString()); self->testFailure("Storage server has wrong key-value store type"); return true; } //Check each pair of storage servers for an address match for(j = i + 1; j < storageServers.size(); j++) { if(storageServers[i].address() == storageServers[j].address()) { TraceEvent("ConsistencyCheck_UndesirableServer").detail("StorageServer1", storageServers[i].id()).detail("StorageServer2", storageServers[j].id()) .detail("Address", storageServers[i].address()); self->testFailure("Multiple storage servers have the same address"); return true; } } } return false; } //Returns false if any worker that should have a storage server does not have one ACTOR Future checkForStorage(Database cx, DatabaseConfiguration configuration, ConsistencyCheckWorkload *self) { state vector workers = wait( getWorkers( self->dbInfo ) ); state vector storageServers = wait( getStorageServers( cx ) ); std::set> missingStorage; for( int i = 0; i < workers.size(); i++ ) { if( !configuration.isExcludedServer(workers[i].interf.address()) && ( workers[i].processClass == ProcessClass::StorageClass || workers[i].processClass == ProcessClass::UnsetClass ) ) { bool found = false; for( int j = 0; j < storageServers.size(); j++ ) { if( storageServers[j].address() == workers[i].interf.address() ) { found = true; break; } } if( !found ) { TraceEvent("ConsistencyCheck_NoStorage") .detail("Address", workers[i].interf.address()) .detail("ProcessClassEqualToStorageClass", (int)(workers[i].processClass == ProcessClass::StorageClass)); missingStorage.insert(workers[i].interf.locality.dcId()); } } } if(( configuration.regions.size() == 0 && missingStorage.size()) || (configuration.regions.size() == 1 && missingStorage.count(configuration.regions[0].dcId)) || (configuration.regions.size() == 2 && configuration.usableRegions == 1 && missingStorage.count(configuration.regions[0].dcId) && missingStorage.count(configuration.regions[1].dcId)) || (configuration.regions.size() == 2 && configuration.usableRegions > 1 && (missingStorage.count(configuration.regions[0].dcId) || missingStorage.count(configuration.regions[1].dcId)))) { self->testFailure("No storage server on worker"); return false; } return true; } ACTOR Future checkForExtraDataStores(Database cx, ConsistencyCheckWorkload *self) { state vector workers = wait( getWorkers( self->dbInfo ) ); state vector storageServers = wait( getStorageServers( cx ) ); auto& db = self->dbInfo->get(); state std::vector logs = db.logSystemConfig.allPresentLogs(); state std::vector::iterator itr; state bool foundExtraDataStore = false; state std::map> statefulProcesses; for (const auto& ss : storageServers) { statefulProcesses[ss.address()].insert(ss.id()); } for (const auto& log : logs) { statefulProcesses[log.address()].insert(log.id()); } for(itr = workers.begin(); itr != workers.end(); ++itr) { ErrorOr>> stores = wait(itr->interf.diskStoreRequest.getReplyUnlessFailedFor(DiskStoreRequest(false), 2, 0)); if(stores.isError()) { TraceEvent("ConsistencyCheck_GetDataStoreFailure").error(stores.getError()).detail("Address", itr->interf.address()); self->testFailure("Failed to get data stores"); return false; } for (const auto& id : stores.get()) { if(!statefulProcesses[itr->interf.address()].count(id)) { TraceEvent("ConsistencyCheck_ExtraDataStore").detail("Address", itr->interf.address()).detail("DataStoreID", id); if(g_network->isSimulated()) { TraceEvent("ConsistencyCheck_RebootProcess").detail("Address", itr->interf.address()).detail("DataStoreID", id); g_simulator.rebootProcess(g_simulator.getProcessByAddress(itr->interf.address()), ISimulator::RebootProcess); } foundExtraDataStore = true; } } } if(foundExtraDataStore) { self->testFailure("Extra data stores present on workers"); return false; } return true; } ACTOR Future checkWorkerList( Database cx, ConsistencyCheckWorkload *self ) { if(g_simulator.extraDB) return true; vector workers = wait( getWorkers( self->dbInfo ) ); std::set workerAddresses; for (const auto& it : workers) { ISimulator::ProcessInfo* info = g_simulator.getProcessByAddress(it.interf.address()); if(!info || info->failed) { TraceEvent("ConsistencyCheck_FailedWorkerInList").detail("Addr", it.interf.address()); return false; } workerAddresses.insert( NetworkAddress(it.interf.address().ip, it.interf.address().port, true, false) ); } vector all = g_simulator.getAllProcesses(); for(int i = 0; i < all.size(); i++) { if( all[i]->isReliable() && all[i]->name == std::string("Server") && all[i]->startingClass != ProcessClass::TesterClass ) { if(!workerAddresses.count(all[i]->address)) { TraceEvent("ConsistencyCheck_WorkerMissingFromList").detail("Addr", all[i]->address); return false; } } } return true; } static ProcessClass::Fitness getBestAvailableFitness(const std::vector& availableClassTypes, ProcessClass::ClusterRole role) { ProcessClass::Fitness bestAvailableFitness = ProcessClass::NeverAssign; for (auto classType : availableClassTypes) { bestAvailableFitness = std::min(bestAvailableFitness, ProcessClass(classType, ProcessClass::InvalidSource).machineClassFitness(role)); } return bestAvailableFitness; } template static std::string getOptionalString(Optional opt) { if (opt.present()) return opt.get().toString(); return "NotSet"; } ACTOR Future checkCoordinators(Database cx) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::LOCK_AWARE ); Optional currentKey = wait( tr.get( coordinatorsKey ) ); if (!currentKey.present()) { TraceEvent("ConsistencyCheck_NoCoordinatorKey"); return false; } state ClusterConnectionString old( currentKey.get().toString() ); vector workers = wait(::getWorkers(&tr)); std::map addr_locality; for(auto w : workers) { addr_locality[w.address] = w.locality; } std::set>> checkDuplicates; for (const auto& addr : old.coordinators()) { auto findResult = addr_locality.find(addr); if (findResult != addr_locality.end()) { if(checkDuplicates.count(findResult->second.zoneId())) { TraceEvent("ConsistencyCheck_BadCoordinator").detail("Addr", addr).detail("NotFound", findResult == addr_locality.end()); return false; } checkDuplicates.insert(findResult->second.zoneId()); } } return true; } catch( Error &e ) { wait( tr.onError(e) ); } } } //Returns true if all machines in the cluster that specified a desired class are operating in that class ACTOR Future checkUsingDesiredClasses(Database cx, ConsistencyCheckWorkload *self) { state Optional expectedPrimaryDcId; state Optional expectedRemoteDcId; state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx)); state vector allWorkers = wait(getWorkers(self->dbInfo)); state vector nonExcludedWorkers = wait(getWorkers(self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY)); auto& db = self->dbInfo->get(); std::map allWorkerProcessMap; std::map, std::vector> dcToAllClassTypes; for (const auto& worker : allWorkers) { allWorkerProcessMap[worker.interf.address()] = worker; Optional dc = worker.interf.locality.dcId(); if (!dcToAllClassTypes.count(dc)) dcToAllClassTypes.insert({}); dcToAllClassTypes[dc].push_back(worker.processClass.classType()); } std::map nonExcludedWorkerProcessMap; std::map, std::vector> dcToNonExcludedClassTypes; for (const auto& worker : nonExcludedWorkers) { nonExcludedWorkerProcessMap[worker.interf.address()] = worker; Optional dc = worker.interf.locality.dcId(); if (!dcToNonExcludedClassTypes.count(dc)) dcToNonExcludedClassTypes.insert({}); dcToNonExcludedClassTypes[dc].push_back(worker.processClass.classType()); } if (!allWorkerProcessMap.count(db.clusterInterface.clientInterface.address())) { TraceEvent("ConsistencyCheck_CCNotInWorkerList").detail("CCAddress", db.clusterInterface.clientInterface.address().toString()); return false; } if (!allWorkerProcessMap.count(db.master.address())) { TraceEvent("ConsistencyCheck_MasterNotInWorkerList").detail("MasterAddress", db.master.address().toString()); return false; } Optional ccDcId = allWorkerProcessMap[db.clusterInterface.clientInterface.address()].interf.locality.dcId(); Optional masterDcId = allWorkerProcessMap[db.master.address()].interf.locality.dcId(); if (ccDcId != masterDcId) { TraceEvent("ConsistencyCheck_CCAndMasterNotInSameDC").detail("ClusterControllerDcId", getOptionalString(ccDcId)).detail("MasterDcId", getOptionalString(masterDcId)); return false; } // Check if master and cluster controller are in the desired DC for fearless cluster when running under simulation // FIXME: g_simulator.datacenterDead could return false positives. Relaxing checks until it is fixed. if (g_network->isSimulated() && config.usableRegions> 1 && g_simulator.primaryDcId.present() && !g_simulator.datacenterDead(g_simulator.primaryDcId) && !g_simulator.datacenterDead(g_simulator.remoteDcId)) { expectedPrimaryDcId = config.regions[0].dcId; expectedRemoteDcId = config.regions[1].dcId; // If the priorities are equal, either could be the primary if (config.regions[0].priority == config.regions[1].priority) { expectedPrimaryDcId = masterDcId; expectedRemoteDcId = config.regions[0].dcId == expectedPrimaryDcId.get() ? config.regions[1].dcId : config.regions[0].dcId; } if (ccDcId != expectedPrimaryDcId) { TraceEvent("ConsistencyCheck_ClusterControllerDcNotBest").detail("PreferredDcId", getOptionalString(expectedPrimaryDcId)).detail("ExistingDcId", getOptionalString(ccDcId)); return false; } if (masterDcId != expectedPrimaryDcId) { TraceEvent("ConsistencyCheck_MasterDcNotBest").detail("PreferredDcId", getOptionalString(expectedPrimaryDcId)).detail("ExistingDcId", getOptionalString(masterDcId)); return false; } } // Check CC ProcessClass::Fitness bestClusterControllerFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[ccDcId], ProcessClass::ClusterController); if (!nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) || nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].processClass.machineClassFitness(ProcessClass::ClusterController) != bestClusterControllerFitness) { TraceEvent("ConsistencyCheck_ClusterControllerNotBest").detail("BestClusterControllerFitness", bestClusterControllerFitness).detail("ExistingClusterControllerFit", nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) ? nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].processClass.machineClassFitness(ProcessClass::ClusterController) : -1); return false; } // Check Master ProcessClass::Fitness bestMasterFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Master); if (bestMasterFitness == ProcessClass::NeverAssign) { bestMasterFitness = getBestAvailableFitness(dcToAllClassTypes[masterDcId], ProcessClass::Master); if (bestMasterFitness != ProcessClass::NeverAssign) { bestMasterFitness = ProcessClass::ExcludeFit; } } if ((!nonExcludedWorkerProcessMap.count(db.master.address()) && bestMasterFitness != ProcessClass::ExcludeFit) || nonExcludedWorkerProcessMap[db.master.address()].processClass.machineClassFitness(ProcessClass::Master) != bestMasterFitness) { TraceEvent("ConsistencyCheck_MasterNotBest").detail("BestMasterFitness", bestMasterFitness).detail("ExistingMasterFit", nonExcludedWorkerProcessMap.count(db.master.address()) ? nonExcludedWorkerProcessMap[db.master.address()].processClass.machineClassFitness(ProcessClass::Master) : -1); return false; } // Check proxy ProcessClass::Fitness bestMasterProxyFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Proxy); for (auto masterProxy : db.client.proxies) { if (!nonExcludedWorkerProcessMap.count(masterProxy.address()) || nonExcludedWorkerProcessMap[masterProxy.address()].processClass.machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) { TraceEvent("ConsistencyCheck_ProxyNotBest").detail("BestMasterProxyFitness", bestMasterProxyFitness).detail("ExistingMasterProxyFitness", nonExcludedWorkerProcessMap.count(masterProxy.address()) ? nonExcludedWorkerProcessMap[masterProxy.address()].processClass.machineClassFitness(ProcessClass::Proxy) : -1); return false; } } // Check resolver ProcessClass::Fitness bestResolverFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Resolver); for (const auto& resolver : db.resolvers) { if (!nonExcludedWorkerProcessMap.count(resolver.address()) || nonExcludedWorkerProcessMap[resolver.address()].processClass.machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) { TraceEvent("ConsistencyCheck_ResolverNotBest").detail("BestResolverFitness", bestResolverFitness).detail("ExistingResolverFitness", nonExcludedWorkerProcessMap.count(resolver.address()) ? nonExcludedWorkerProcessMap[resolver.address()].processClass.machineClassFitness(ProcessClass::Resolver) : -1); return false; } } // Check LogRouter if (g_network->isSimulated() && config.usableRegions> 1 && g_simulator.primaryDcId.present() && !g_simulator.datacenterDead(g_simulator.primaryDcId) && !g_simulator.datacenterDead(g_simulator.remoteDcId)) { for (auto &tlogSet : db.logSystemConfig.tLogs) { if (!tlogSet.isLocal && tlogSet.logRouters.size()) { for (auto &logRouter : tlogSet.logRouters) { if (!nonExcludedWorkerProcessMap.count(logRouter.interf().address())) { TraceEvent("ConsistencyCheck_LogRouterNotInNonExcludedWorkers").detail("Id", logRouter.id()); return false; } if (logRouter.interf().locality.dcId() != expectedRemoteDcId) { TraceEvent("ConsistencyCheck_LogRouterNotBestDC").detail("expectedDC", getOptionalString(expectedRemoteDcId)).detail("ActualDC", getOptionalString(logRouter.interf().locality.dcId())); return false; } } } } } // Check DataDistributor ProcessClass::Fitness bestDistributorFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::DataDistributor); if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) != bestDistributorFitness)) { TraceEvent("ConsistencyCheck_DistributorNotBest").detail("BestDataDistributorFitness", bestDistributorFitness) .detail("ExistingDistributorFitness", nonExcludedWorkerProcessMap.count(db.distributor.get().address()) ? nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) : -1); return false; } // Check Ratekeeper ProcessClass::Fitness bestRatekeeperFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Ratekeeper); if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) != bestRatekeeperFitness)) { TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", bestRatekeeperFitness) .detail("ExistingRatekeeperFitness", nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) ? nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) : -1); return false; } // TODO: Check Tlog return true; } }; WorkloadFactory ConsistencyCheckWorkloadFactory("ConsistencyCheck");