diff --git a/fdbcli/ConsistencyScanCommand.actor.cpp b/fdbcli/ConsistencyScanCommand.actor.cpp new file mode 100644 index 0000000000..c94258045d --- /dev/null +++ b/fdbcli/ConsistencyScanCommand.actor.cpp @@ -0,0 +1,122 @@ +/* + * ConsistencyScanCommand.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbcli/fdbcli.actor.h" + +#include "fdbclient/FDBOptions.g.h" +#include "fdbclient/IClientApi.h" + +#include "flow/Arena.h" +#include "flow/FastRef.h" +#include "flow/ThreadHelper.actor.h" +#include "fdbclient/ConsistencyScanInterface.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +namespace fdb_cli { + +ACTOR Future consistencyScanCommandActor(Database db, std::vector tokens) { + state Reference tr = makeReference(db); + // Here we do not proceed in a try-catch loop since the transaction is always supposed to succeed. + // If not, the outer loop catch block(fdbcli.actor.cpp) will handle the error and print out the error message + state int usageError = 0; + state ConsistencyScanInfo csInfo = ConsistencyScanInfo(); + tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + // Get the exisiting consistencyScanInfo object if present + state Optional consistencyScanInfo = wait(ConsistencyScanInfo::getInfo(tr)); + wait(tr->commit()); + if (consistencyScanInfo.present()) + csInfo = ObjectReader::fromStringRef(consistencyScanInfo.get(), IncludeVersion()); + tr->reset(); + + if (tokens.size() == 1) { + printf("Consistency Scan Info: %s\n", csInfo.toString().c_str()); + } else if ((tokens.size() == 2) && tokencmp(tokens[1], "off")) { + csInfo.consistency_scan_enabled = false; + wait(ConsistencyScanInfo::setInfo(tr, csInfo)); + wait(tr->commit()); + } else if ((tokencmp(tokens[1], "on") && tokens.size() > 2)) { + csInfo.consistency_scan_enabled = true; + state std::vector::iterator t; + for (t = tokens.begin() + 2; t != tokens.end(); ++t) { + if (tokencmp(t->toString(), "restart")) { + if (++t != tokens.end()) { + if (tokencmp(t->toString(), "0")) { + csInfo.restart = false; + } else if (tokencmp(t->toString(), "1")) { + csInfo.restart = true; + } else { + usageError = 1; + } + } else { + usageError = 1; + } + } else if (tokencmp(t->toString(), "maxRate")) { + if (++t != tokens.end()) { + char* end; + csInfo.max_rate = std::strtod(t->toString().data(), &end); + if (!std::isspace(*end) && (*end != '\0')) { + fprintf(stderr, "ERROR: %s failed to parse.\n", t->toString().c_str()); + return false; + } + } else { + usageError = 1; + } + } else if (tokencmp(t->toString(), "targetInterval")) { + if (++t != tokens.end()) { + char* end; + csInfo.target_interval = std::strtod(t->toString().data(), &end); + if (!std::isspace(*end) && (*end != '\0')) { + fprintf(stderr, "ERROR: %s failed to parse.\n", t->toString().c_str()); + return false; + } + } else { + usageError = 1; + } + } else { + usageError = 1; + } + } + + if (!usageError) { + wait(ConsistencyScanInfo::setInfo(tr, csInfo)); + wait(tr->commit()); + } + } else { + usageError = 1; + } + + if (usageError) { + printUsage(tokens[0]); + return false; + } + return true; +} + +CommandFactory consistencyScanFactory( + "consistencyscan", + CommandHelp("consistencyscan ", + "enables or disables consistency scan", + "Calling this command with `on' enables the consistency scan process to run the scan with given " + "arguments and `off' will halt the scan. " + "Calling this command with no arguments will display if consistency scan is currently enabled.\n")); + +} // namespace fdb_cli \ No newline at end of file diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 5cd27f9780..b4be0bc2e1 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -1582,6 +1582,13 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise, Reference configureTenantCommandActor(Reference db, std::vec ACTOR Future consistencyCheckCommandActor(Reference tr, std::vector tokens, bool intrans); +// consistency scan command +ACTOR Future consistencyScanCommandActor(Database localDb, std::vector tokens); // coordinators command ACTOR Future coordinatorsCommandActor(Reference db, std::vector tokens); // createtenant command diff --git a/fdbclient/DatabaseConfiguration.cpp b/fdbclient/DatabaseConfiguration.cpp index 2f337af490..ee8b84adb4 100644 --- a/fdbclient/DatabaseConfiguration.cpp +++ b/fdbclient/DatabaseConfiguration.cpp @@ -66,6 +66,16 @@ void parse(int* i, ValueRef const& v) { *i = atoi(v.toString().c_str()); } +void parse(int64_t* i, ValueRef const& v) { + // FIXME: Sanity checking + *i = atoll(v.toString().c_str()); +} + +void parse(double* i, ValueRef const& v) { + // FIXME: Sanity checking + *i = atof(v.toString().c_str()); +} + void parseReplicationPolicy(Reference* policy, ValueRef const& v) { BinaryReader reader(v, IncludeVersion()); serializeReplicationPolicy(reader, *policy); diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 80436c87d1..8f7474f961 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -137,6 +137,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "blob_manager", "blob_worker", "encrypt_key_proxy", + "consistency_scan", "storage_cache", "router", "coordinator" @@ -561,6 +562,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "unreachable_ratekeeper_worker", "unreachable_blobManager_worker", "unreachable_encryptKeyProxy_worker", + "unreachable_consistencyScan_worker", "unreadable_configuration", "full_replication_timeout", "client_issues", @@ -855,6 +857,19 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "aes_256_ctr" ]} }, + "consistency_scan_info":{ + "consistency_scan_enabled":false, + "restart":false, + "max_rate":0, + "target_interval":0, + "bytes_read_prev_round":0, + "last_round_start_datetime":"2022-04-20 00:05:05.123 +0000", + "last_round_finish_datetime":"1970-01-01 00:00:00.000 +0000", + "last_round_start_timestamp":1648857905.123, + "last_round_finish_timestamp":0, + "smoothed_round_seconds":1, + "finished_rounds":1 + }, "data":{ "least_operating_space_bytes_log_server":0, "average_partition_size_bytes":0, diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index d4bce101c3..7edd39b834 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -546,6 +546,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ATTEMPT_RECRUITMENT_DELAY, 0.035 ); init( WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 1.0 ); init( WAIT_FOR_RATEKEEPER_JOIN_DELAY, 1.0 ); + init( WAIT_FOR_CONSISTENCYSCAN_JOIN_DELAY, 1.0 ); init( WAIT_FOR_BLOB_MANAGER_JOIN_DELAY, 1.0 ); init( WAIT_FOR_ENCRYPT_KEY_PROXY_JOIN_DELAY, 1.0 ); init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0; @@ -556,6 +557,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( CHECK_REMOTE_HEALTH_INTERVAL, 60 ); init( FORCE_RECOVERY_CHECK_DELAY, 5.0 ); init( RATEKEEPER_FAILURE_TIME, 1.0 ); + init( CONSISTENCYSCAN_FAILURE_TIME, 1.0 ); init( BLOB_MANAGER_FAILURE_TIME, 1.0 ); init( REPLACE_INTERFACE_DELAY, 60.0 ); init( REPLACE_INTERFACE_CHECK_DELAY, 5.0 ); diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index b92c87923e..5e1123eaa7 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -852,6 +852,8 @@ const KeyRef perpetualStorageWiggleStatsPrefix( const KeyRef triggerDDTeamInfoPrintKey(LiteralStringRef("\xff/triggerDDTeamInfoPrint")); +const KeyRef consistencyScanInfoKey = "\xff/consistencyScanInfo"_sr; + const KeyRef encryptionAtRestModeConfKey(LiteralStringRef("\xff/conf/encryption_at_rest_mode")); const KeyRangeRef excludedServersKeys(LiteralStringRef("\xff/conf/excluded/"), LiteralStringRef("\xff/conf/excluded0")); diff --git a/fdbclient/include/fdbclient/ConsistencyScanInterface.h b/fdbclient/include/fdbclient/ConsistencyScanInterface.h new file mode 100644 index 0000000000..a5ef1e3dd0 --- /dev/null +++ b/fdbclient/include/fdbclient/ConsistencyScanInterface.h @@ -0,0 +1,189 @@ +/* + * ConsistencyScanInterface.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBCLIENT_CONSISTENCYSCANINTERFACE_H +#define FDBCLIENT_CONSISTENCYSCANINTERFACE_H + +#include "fdbclient/CommitProxyInterface.h" +#include "fdbclient/DatabaseConfiguration.h" +#include "fdbclient/FDBTypes.h" +#include "fdbclient/RunTransaction.actor.h" +#include "fdbrpc/fdbrpc.h" +#include "fdbrpc/Locality.h" + +struct ConsistencyScanInterface { + constexpr static FileIdentifier file_identifier = 4983265; + RequestStream> waitFailure; + RequestStream haltConsistencyScan; + struct LocalityData locality; + UID myId; + + ConsistencyScanInterface() {} + explicit ConsistencyScanInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {} + + void initEndpoints() {} + UID id() const { return myId; } + NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); } + bool operator==(const ConsistencyScanInterface& r) const { return id() == r.id(); } + bool operator!=(const ConsistencyScanInterface& r) const { return !(*this == r); } + + template + void serialize(Archive& ar) { + serializer(ar, waitFailure, haltConsistencyScan, locality, myId); + } +}; + +struct HaltConsistencyScanRequest { + constexpr static FileIdentifier file_identifier = 2323417; + UID requesterID; + ReplyPromise reply; + + HaltConsistencyScanRequest() {} + explicit HaltConsistencyScanRequest(UID uid) : requesterID(uid) {} + + template + void serialize(Ar& ar) { + serializer(ar, requesterID, reply); + } +}; + +// consistency scan configuration and metrics +struct ConsistencyScanInfo { + constexpr static FileIdentifier file_identifier = 732125; + bool consistency_scan_enabled = false; + bool restart = false; + int64_t max_rate = 0; + int64_t target_interval = CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME; + int64_t bytes_read_prev_round = 0; + KeyRef progress_key = KeyRef(); + + // Round Metrics - one round of complete validation across all SSs + // Start and finish are in epoch seconds + double last_round_start = 0; + double last_round_finish = 0; + TimerSmoother smoothed_round_duration; + int finished_rounds = 0; + + ConsistencyScanInfo() : smoothed_round_duration(20.0 * 60) {} + ConsistencyScanInfo(bool enabled, bool r, uint64_t rate, uint64_t interval) + : consistency_scan_enabled(enabled), restart(r), max_rate(rate), target_interval(interval), + smoothed_round_duration(20.0 * 60) {} + + template + void serialize(Ar& ar) { + double round_total; + if (!ar.isDeserializing) { + round_total = smoothed_round_duration.getTotal(); + } + serializer(ar, + consistency_scan_enabled, + restart, + max_rate, + target_interval, + bytes_read_prev_round, + last_round_start, + last_round_finish, + round_total, + finished_rounds); + if (ar.isDeserializing) { + smoothed_round_duration.reset(round_total); + } + } + + static Future setInfo(Reference tr, ConsistencyScanInfo info) { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + tr->set(consistencyScanInfoKey, ObjectWriter::toValue(info, IncludeVersion())); + return Void(); + } + + static Future setInfo(Database cx, ConsistencyScanInfo info) { + return runRYWTransaction( + cx, [=](Reference tr) -> Future { return setInfo(tr, info); }); + } + + static Future> getInfo(Reference tr) { + tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); + return tr->get(consistencyScanInfoKey); + } + + static Future> getInfo(Database cx) { + return runRYWTransaction( + cx, [=](Reference tr) -> Future> { return getInfo(tr); }); + } + + StatusObject toJSON() const { + StatusObject result; + result["consistency_scan_enabled"] = consistency_scan_enabled; + result["restart"] = restart; + result["max_rate"] = max_rate; + result["target_interval"] = target_interval; + result["bytes_read_prev_round"] = bytes_read_prev_round; + result["last_round_start_datetime"] = epochsToGMTString(last_round_start); + result["last_round_finish_datetime"] = epochsToGMTString(last_round_finish); + result["last_round_start_timestamp"] = last_round_start; + result["last_round_finish_timestamp"] = last_round_finish; + result["smoothed_round_seconds"] = smoothed_round_duration.smoothTotal(); + result["finished_rounds"] = finished_rounds; + return result; + } + + std::string toString() const { + return format("consistency_scan_enabled = %d, restart = %d, max_rate = %ld, target_interval = %ld", + consistency_scan_enabled, + restart, + max_rate, + target_interval); + } +}; + +Future getVersion(Database const& cx); +Future getKeyServers( + Database const& cx, + Promise>>> const& keyServersPromise, + KeyRangeRef const& kr, + bool const& performQuiescentChecks); +Future getKeyLocations(Database const& cx, + std::vector>> const& shards, + Promise>> const& keyLocationPromise, + bool const& performQuiescentChecks); +Future checkDataConsistency(Database const& cx, + VectorRef const& keyLocations, + DatabaseConfiguration const& configuration, + std::map const& tssMapping, + bool const& performQuiescentChecks, + bool const& performTSSCheck, + bool const& firstClient, + bool const& failureIsError, + int const& clientId, + int const& clientCount, + bool const& distributed, + bool const& shuffleShards, + int const& shardSampleFactor, + int64_t const& sharedRandomNumber, + int64_t const& repetitions, + int64_t* const& bytesReadInPreviousRound, + int const& restart, + int64_t const& maxRate, + int64_t const& targetInterval, + KeyRef const& progressKey); + +#endif // FDBCLIENT_CONSISTENCYSCANINTERFACE_H \ No newline at end of file diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index cb922cc90b..a3d59000ff 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -457,6 +457,7 @@ public: double ATTEMPT_RECRUITMENT_DELAY; double WAIT_FOR_DISTRIBUTOR_JOIN_DELAY; double WAIT_FOR_RATEKEEPER_JOIN_DELAY; + double WAIT_FOR_CONSISTENCYSCAN_JOIN_DELAY; double WAIT_FOR_BLOB_MANAGER_JOIN_DELAY; double WAIT_FOR_ENCRYPT_KEY_PROXY_JOIN_DELAY; double WORKER_FAILURE_TIME; @@ -470,6 +471,7 @@ public: double CHECK_REMOTE_HEALTH_INTERVAL; // Remote DC health refresh interval. double FORCE_RECOVERY_CHECK_DELAY; double RATEKEEPER_FAILURE_TIME; + double CONSISTENCYSCAN_FAILURE_TIME; double BLOB_MANAGER_FAILURE_TIME; double REPLACE_INTERFACE_DELAY; double REPLACE_INTERFACE_CHECK_DELAY; diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 2c3d8e89a4..068d1d9d37 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -163,6 +163,9 @@ extern const KeyRef cacheChangePrefix; const Key cacheChangeKeyFor(uint16_t idx); uint16_t cacheChangeKeyDecodeIndex(const KeyRef& key); +// For persisting the consistency scan configuration and metrics +extern const KeyRef consistencyScanInfoKey; + // "\xff/tss/[[serverId]]" := "[[tssId]]" extern const KeyRangeRef tssMappingKeys; diff --git a/fdbrpc/Locality.cpp b/fdbrpc/Locality.cpp index bd4c3a9461..79cde1bc54 100644 --- a/fdbrpc/Locality.cpp +++ b/fdbrpc/Locality.cpp @@ -240,6 +240,24 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const default: return ProcessClass::WorstFit; } + case ProcessClass::ConsistencyScan: + switch (_class) { + case ProcessClass::ConsistencyScanClass: + return ProcessClass::BestFit; + case ProcessClass::StatelessClass: + return ProcessClass::GoodFit; + case ProcessClass::UnsetClass: + return ProcessClass::UnsetFit; + case ProcessClass::MasterClass: + return ProcessClass::OkayFit; + case ProcessClass::CoordinatorClass: + case ProcessClass::TesterClass: + case ProcessClass::StorageCacheClass: + case ProcessClass::BlobWorkerClass: + return ProcessClass::NeverAssign; + default: + return ProcessClass::WorstFit; + } case ProcessClass::BlobManager: switch (_class) { case ProcessClass::BlobManagerClass: diff --git a/fdbrpc/include/fdbrpc/Locality.h b/fdbrpc/include/fdbrpc/Locality.h index 3e8d07f6f4..20269b6d05 100644 --- a/fdbrpc/include/fdbrpc/Locality.h +++ b/fdbrpc/include/fdbrpc/Locality.h @@ -43,6 +43,7 @@ struct ProcessClass { DataDistributorClass, CoordinatorClass, RatekeeperClass, + ConsistencyScanClass, StorageCacheClass, BackupClass, GrvProxyClass, @@ -72,6 +73,7 @@ struct ProcessClass { ClusterController, DataDistributor, Ratekeeper, + ConsistencyScan, BlobManager, BlobWorker, StorageCache, @@ -110,6 +112,7 @@ public: else if (s=="data_distributor") _class = DataDistributorClass; else if (s=="coordinator") _class = CoordinatorClass; else if (s=="ratekeeper") _class = RatekeeperClass; + else if (s=="consistency_scan") _class = ConsistencyScanClass; else if (s=="blob_manager") _class = BlobManagerClass; else if (s=="blob_worker") _class = BlobWorkerClass; else if (s=="storage_cache") _class = StorageCacheClass; @@ -140,6 +143,7 @@ public: else if (classStr=="data_distributor") _class = DataDistributorClass; else if (classStr=="coordinator") _class = CoordinatorClass; else if (classStr=="ratekeeper") _class = RatekeeperClass; + else if (classStr=="consistency_scan") _class = ConsistencyScanClass; else if (classStr=="blob_manager") _class = BlobManagerClass; else if (classStr=="blob_worker") _class = BlobWorkerClass; else if (classStr=="storage_cache") _class = StorageCacheClass; @@ -180,6 +184,7 @@ public: case DataDistributorClass: return "data_distributor"; case CoordinatorClass: return "coordinator"; case RatekeeperClass: return "ratekeeper"; + case ConsistencyScanClass: return "consistency_scan"; case BlobManagerClass: return "blob_manager"; case BlobWorkerClass: return "blob_worker"; case StorageCacheClass: return "storage_cache"; diff --git a/fdbrpc/include/fdbrpc/simulator.h b/fdbrpc/include/fdbrpc/simulator.h index e8749ee658..5228ff7b92 100644 --- a/fdbrpc/include/fdbrpc/simulator.h +++ b/fdbrpc/include/fdbrpc/simulator.h @@ -186,6 +186,8 @@ public: return false; case ProcessClass::RatekeeperClass: return false; + case ProcessClass::ConsistencyScanClass: + return false; case ProcessClass::BlobManagerClass: return false; case ProcessClass::StorageCacheClass: diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index a8c2175794..fa06d08c5e 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -26,6 +26,7 @@ #include #include "fdbclient/SystemData.h" +#include "fdbclient/DatabaseContext.h" #include "fdbrpc/FailureMonitor.h" #include "fdbclient/EncryptKeyProxyInterface.h" #include "fdbserver/Knobs.h" @@ -138,6 +139,31 @@ struct DataDistributorSingleton : Singleton { } }; +struct ConsistencyScanSingleton : Singleton { + + ConsistencyScanSingleton(const Optional& interface) : Singleton(interface) {} + + Role getRole() const { return Role::CONSISTENCYSCAN; } + ProcessClass::ClusterRole getClusterRole() const { return ProcessClass::ConsistencyScan; } + + void setInterfaceToDbInfo(ClusterControllerData* cc) const { + if (interface.present()) { + TraceEvent("CCCK_SetInf", cc->id).detail("Id", interface.get().id()); + cc->db.setConsistencyScan(interface.get()); + } + } + void halt(ClusterControllerData* cc, Optional> pid) const { + if (interface.present()) { + cc->id_worker[pid].haltConsistencyScan = + brokenPromiseToNever(interface.get().haltConsistencyScan.getReply(HaltConsistencyScanRequest(cc->id))); + } + } + void recruit(ClusterControllerData* cc) const { + cc->lastRecruitTime = now(); + cc->recruitConsistencyScan.set(true); + } +}; + struct BlobManagerSingleton : Singleton { BlobManagerSingleton(const Optional& interface) : Singleton(interface) {} @@ -248,6 +274,7 @@ ACTOR Future clusterWatchDatabase(ClusterControllerData* cluster, dbInfo.ratekeeper = db->serverInfo->get().ratekeeper; dbInfo.blobManager = db->serverInfo->get().blobManager; dbInfo.encryptKeyProxy = db->serverInfo->get().encryptKeyProxy; + dbInfo.consistencyScan = db->serverInfo->get().consistencyScan; dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig; dbInfo.myLocality = db->serverInfo->get().myLocality; dbInfo.client = ClientDBInfo(); @@ -622,6 +649,7 @@ void checkBetterSingletons(ClusterControllerData* self) { // Try to find a new process for each singleton. WorkerDetails newRKWorker = findNewProcessForSingleton(self, ProcessClass::Ratekeeper, id_used); WorkerDetails newDDWorker = findNewProcessForSingleton(self, ProcessClass::DataDistributor, id_used); + WorkerDetails newCSWorker = findNewProcessForSingleton(self, ProcessClass::ConsistencyScan, id_used); WorkerDetails newBMWorker; if (self->db.blobGranulesEnabled.get()) { @@ -636,6 +664,7 @@ void checkBetterSingletons(ClusterControllerData* self) { // Find best possible fitnesses for each singleton. auto bestFitnessForRK = findBestFitnessForSingleton(self, newRKWorker, ProcessClass::Ratekeeper); auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor); + auto bestFitnessForCS = findBestFitnessForSingleton(self, newCSWorker, ProcessClass::ConsistencyScan); ProcessClass::Fitness bestFitnessForBM; if (self->db.blobGranulesEnabled.get()) { @@ -650,6 +679,7 @@ void checkBetterSingletons(ClusterControllerData* self) { auto& db = self->db.serverInfo->get(); auto rkSingleton = RatekeeperSingleton(db.ratekeeper); auto ddSingleton = DataDistributorSingleton(db.distributor); + ConsistencyScanSingleton csSingleton(db.consistencyScan); BlobManagerSingleton bmSingleton(db.blobManager); EncryptKeyProxySingleton ekpSingleton(db.encryptKeyProxy); @@ -661,6 +691,9 @@ void checkBetterSingletons(ClusterControllerData* self) { bool ddHealthy = isHealthySingleton( self, newDDWorker, ddSingleton, bestFitnessForDD, self->recruitingDistributorID); + bool csHealthy = isHealthySingleton( + self, newCSWorker, csSingleton, bestFitnessForCS, self->recruitingConsistencyScanID); + bool bmHealthy = true; if (self->db.blobGranulesEnabled.get()) { bmHealthy = isHealthySingleton( @@ -674,7 +707,7 @@ void checkBetterSingletons(ClusterControllerData* self) { } // if any of the singletons are unhealthy (rerecruited or not stable), then do not // consider any further re-recruitments - if (!(rkHealthy && ddHealthy && bmHealthy && ekpHealthy)) { + if (!(rkHealthy && ddHealthy && bmHealthy && ekpHealthy && csHealthy)) { return; } @@ -682,8 +715,10 @@ void checkBetterSingletons(ClusterControllerData* self) { // check if we can colocate the singletons in a more optimal way Optional> currRKProcessId = rkSingleton.interface.get().locality.processId(); Optional> currDDProcessId = ddSingleton.interface.get().locality.processId(); + Optional> currCSProcessId = csSingleton.interface.get().locality.processId(); Optional> newRKProcessId = newRKWorker.interf.locality.processId(); Optional> newDDProcessId = newDDWorker.interf.locality.processId(); + Optional> newCSProcessId = newCSWorker.interf.locality.processId(); Optional> currBMProcessId, newBMProcessId; if (self->db.blobGranulesEnabled.get()) { @@ -697,8 +732,8 @@ void checkBetterSingletons(ClusterControllerData* self) { newEKPProcessId = newEKPWorker.interf.locality.processId(); } - std::vector>> currPids = { currRKProcessId, currDDProcessId }; - std::vector>> newPids = { newRKProcessId, newDDProcessId }; + std::vector>> currPids = { currRKProcessId, currDDProcessId, currCSProcessId }; + std::vector>> newPids = { newRKProcessId, newDDProcessId, newCSProcessId }; if (self->db.blobGranulesEnabled.get()) { currPids.emplace_back(currBMProcessId); newPids.emplace_back(newBMProcessId); @@ -728,7 +763,8 @@ void checkBetterSingletons(ClusterControllerData* self) { if (newColocMap[newRKProcessId] <= currColocMap[currRKProcessId] && newColocMap[newDDProcessId] <= currColocMap[currDDProcessId] && newColocMap[newBMProcessId] <= currColocMap[currBMProcessId] && - newColocMap[newEKPProcessId] <= currColocMap[currEKPProcessId]) { + newColocMap[newEKPProcessId] <= currColocMap[currEKPProcessId] && + newColocMap[newCSProcessId] <= currColocMap[currCSProcessId]) { // rerecruit the singleton for which we have found a better process, if any if (newColocMap[newRKProcessId] < currColocMap[currRKProcessId]) { rkSingleton.recruit(self); @@ -738,6 +774,8 @@ void checkBetterSingletons(ClusterControllerData* self) { bmSingleton.recruit(self); } else if (SERVER_KNOBS->ENABLE_ENCRYPTION && newColocMap[newEKPProcessId] < currColocMap[currEKPProcessId]) { ekpSingleton.recruit(self); + } else if (newColocMap[newCSProcessId] < currColocMap[currCSProcessId]) { + csSingleton.recruit(self); } } } @@ -1302,6 +1340,13 @@ ACTOR Future registerWorker(RegisterWorkerRequest req, self, w, currSingleton, registeringSingleton, self->recruitingEncryptKeyProxyID); } + if (req.consistencyScanInterf.present()) { + auto currSingleton = ConsistencyScanSingleton(self->db.serverInfo->get().consistencyScan); + auto registeringSingleton = ConsistencyScanSingleton(req.consistencyScanInterf); + haltRegisteringOrCurrentSingleton( + self, w, currSingleton, registeringSingleton, self->recruitingConsistencyScanID); + } + // Notify the worker to register again with new process class/exclusive property if (!req.reply.isSet() && newPriorityInfo != req.priorityInfo) { req.reply.send(RegisterWorkerReply(newProcessClass, newPriorityInfo)); @@ -2163,6 +2208,101 @@ ACTOR Future monitorRatekeeper(ClusterControllerData* self) { } } +ACTOR Future startConsistencyScan(ClusterControllerData* self) { + wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID. + TraceEvent("CCStartConsistencyScan", self->id).log(); + loop { + try { + state bool no_consistencyScan = !self->db.serverInfo->get().consistencyScan.present(); + while (!self->masterProcessId.present() || + self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || + self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); + } + if (no_consistencyScan && self->db.serverInfo->get().consistencyScan.present()) { + // Existing consistencyScan registers while waiting, so skip. + return Void(); + } + + std::map>, int> id_used = self->getUsedIds(); + WorkerFitnessInfo csWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, + ProcessClass::ConsistencyScan, + ProcessClass::NeverAssign, + self->db.config, + id_used); + + InitializeConsistencyScanRequest req(deterministicRandom()->randomUniqueID()); + state WorkerDetails worker = csWorker.worker; + if (self->onMasterIsBetter(worker, ProcessClass::ConsistencyScan)) { + worker = self->id_worker[self->masterProcessId.get()].details; + } + + self->recruitingConsistencyScanID = req.reqId; + TraceEvent("CCRecruitConsistencyScan", self->id) + .detail("Addr", worker.interf.address()) + .detail("CSID", req.reqId); + + ErrorOr interf = wait(worker.interf.consistencyScan.getReplyUnlessFailedFor( + req, SERVER_KNOBS->WAIT_FOR_CONSISTENCYSCAN_JOIN_DELAY, 0)); + if (interf.present()) { + self->recruitConsistencyScan.set(false); + self->recruitingConsistencyScanID = interf.get().id(); + const auto& consistencyScan = self->db.serverInfo->get().consistencyScan; + TraceEvent("CCConsistencyScanRecruited", self->id) + .detail("Addr", worker.interf.address()) + .detail("CKID", interf.get().id()); + if (consistencyScan.present() && consistencyScan.get().id() != interf.get().id() && + self->id_worker.count(consistencyScan.get().locality.processId())) { + TraceEvent("CCHaltConsistencyScanAfterRecruit", self->id) + .detail("CKID", consistencyScan.get().id()) + .detail("DcID", printable(self->clusterControllerDcId)); + ConsistencyScanSingleton(consistencyScan).halt(self, consistencyScan.get().locality.processId()); + } + if (!consistencyScan.present() || consistencyScan.get().id() != interf.get().id()) { + self->db.setConsistencyScan(interf.get()); + } + checkOutstandingRequests(self); + return Void(); + } else { + TraceEvent("CCConsistencyScanRecruitEmpty", self->id).log(); + } + } catch (Error& e) { + TraceEvent("CCConsistencyScanRecruitError", self->id).error(e); + if (e.code() != error_code_no_more_servers) { + throw; + } + } + wait(lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY)); + } +} + +ACTOR Future monitorConsistencyScan(ClusterControllerData* self) { + while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + TraceEvent("CCMonitorConsistencyScanWaitingForRecovery", self->id).log(); + wait(self->db.serverInfo->onChange()); + } + + TraceEvent("CCMonitorConsistencyScan", self->id).log(); + loop { + if (self->db.serverInfo->get().consistencyScan.present() && !self->recruitConsistencyScan.get()) { + state Future wfClient = + waitFailureClient(self->db.serverInfo->get().consistencyScan.get().waitFailure, + SERVER_KNOBS->CONSISTENCYSCAN_FAILURE_TIME); + choose { + when(wait(wfClient)) { + TraceEvent("CCMonitorConsistencyScanDied", self->id) + .detail("CKID", self->db.serverInfo->get().consistencyScan.get().id()); + self->db.clearInterf(ProcessClass::ConsistencyScanClass); + } + when(wait(self->recruitConsistencyScan.onChange())) {} + } + } else { + TraceEvent("CCMonitorConsistencyScanStarting", self->id).log(); + wait(startConsistencyScan(self)); + } + } +} + ACTOR Future startEncryptKeyProxy(ClusterControllerData* self, double waitTime) { // If master fails at the same time, give it a chance to clear master PID. // Also wait to avoid too many consecutive recruits in a small time window. @@ -2580,6 +2720,7 @@ ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, self.addActor.send(monitorRatekeeper(&self)); self.addActor.send(monitorBlobManager(&self)); self.addActor.send(watchBlobGranulesConfigKey(&self)); + self.addActor.send(monitorConsistencyScan(&self)); self.addActor.send(dbInfoUpdater(&self)); self.addActor.send(traceCounters("ClusterControllerMetrics", self.id, diff --git a/fdbserver/ConsistencyScan.actor.cpp b/fdbserver/ConsistencyScan.actor.cpp new file mode 100644 index 0000000000..1a6046c659 --- /dev/null +++ b/fdbserver/ConsistencyScan.actor.cpp @@ -0,0 +1,1132 @@ +/* + * ConsistencyScan.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbrpc/TenantInfo.h" +#include "fdbserver/WorkerInterface.actor.h" +#include "flow/IRandom.h" +#include "flow/IndexedSet.h" +#include "fdbrpc/FailureMonitor.h" +#include "fdbrpc/Smoother.h" +#include "fdbrpc/simulator.h" +#include "fdbclient/DatabaseContext.h" +#include "fdbclient/ReadYourWrites.h" +#include "fdbclient/TagThrottle.actor.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/StorageMetrics.h" +#include "fdbserver/DataDistribution.actor.h" +#include "fdbserver/RatekeeperInterface.h" +#include "fdbserver/ServerDBInfo.h" +#include "fdbserver/WaitFailure.h" +#include "fdbserver/TesterInterface.actor.h" +#include "flow/DeterministicRandom.h" +#include "flow/Trace.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +// Core of the data consistency checking (checkDataConsistency) and many of the supporting functions are shared between +// the ConsistencyScan role and the ConsistencyCheck workload. They are currently part of this file. ConsistencyScan +// role's main goal is to simply validate data across all shards, while ConsistencyCheck workload does more than that. +// Potentially a re-factor candidate! + +struct ConsistencyScanData { + UID id; + Database db; + + DatabaseConfiguration configuration; + PromiseStream> addActor; + + // TODO: Consider holding a ConsistencyScanInfo object to use as its state, as many of the members are the same. + int64_t restart = 1; + int64_t maxRate = 0; + int64_t targetInterval = 0; + int64_t bytesReadInPrevRound = 0; + int finishedRounds = 0; + KeyRef progressKey; + AsyncVar consistencyScanEnabled = false; + + ConsistencyScanData(UID id, Database db) : id(id), db(db) {} +}; + +// Gets a version at which to read from the storage servers +ACTOR Future getVersion(Database cx) { + loop { + state Transaction tr(cx); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + try { + Version version = wait(tr.getReadVersion()); + return version; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +void testFailure(std::string message, bool performQuiescentChecks, bool isError) { + TraceEvent failEvent(isError ? SevError : SevWarn, "TestFailure"); + if (performQuiescentChecks) + failEvent.detail("Workload", "QuiescentCheck"); + else + failEvent.detail("Workload", "ConsistencyCheck"); + + failEvent.detail("Reason", "Consistency check: " + message); +} + +// Get a list of storage servers(persisting keys within range "kr") from the master and compares them with the +// TLogs. If this is a quiescent check, then each commit proxy needs to respond, otherwise only one needs to +// respond. Returns false if there is a failure (in this case, keyServersPromise will never be set) +ACTOR Future getKeyServers( + Database cx, + Promise>>> keyServersPromise, + KeyRangeRef kr, + bool performQuiescentChecks) { + state std::vector>> keyServers; + + // Try getting key server locations from the master proxies + state std::vector>> keyServerLocationFutures; + state Key begin = kr.begin; + state Key end = kr.end; + state int limitKeyServers = BUGGIFY ? 1 : 100; + state Span span(SpanContext(deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64()), + "WL:ConsistencyCheck"_loc); + + while (begin < end) { + state Reference commitProxyInfo = + wait(cx->getCommitProxiesFuture(UseProvisionalProxies::False)); + keyServerLocationFutures.clear(); + for (int i = 0; i < commitProxyInfo->size(); i++) + keyServerLocationFutures.push_back( + commitProxyInfo->get(i, &CommitProxyInterface::getKeyServersLocations) + .getReplyUnlessFailedFor( + GetKeyServerLocationsRequest( + span.context, TenantInfo(), begin, end, limitKeyServers, false, latestVersion, Arena()), + 2, + 0)); + + state bool keyServersInsertedForThisIteration = false; + choose { + when(wait(waitForAll(keyServerLocationFutures))) { + // Read the key server location results + for (int i = 0; i < keyServerLocationFutures.size(); i++) { + ErrorOr shards = keyServerLocationFutures[i].get(); + + // If performing quiescent check, then all master proxies should be reachable. Otherwise, only + // one needs to be reachable + if (performQuiescentChecks && !shards.present()) { + TraceEvent("ConsistencyCheck_CommitProxyUnavailable") + .detail("CommitProxyID", commitProxyInfo->getId(i)); + testFailure("Commit proxy unavailable", performQuiescentChecks, true); + return false; + } + + // Get the list of shards if one was returned. If not doing a quiescent check, we can break if + // it is. If we are doing a quiescent check, then we only need to do this for the first shard. + if (shards.present() && !keyServersInsertedForThisIteration) { + keyServers.insert(keyServers.end(), shards.get().results.begin(), shards.get().results.end()); + keyServersInsertedForThisIteration = true; + begin = shards.get().results.back().first.end; + + if (!performQuiescentChecks) + break; + } + } // End of For + } + when(wait(cx->onProxiesChanged())) {} + } // End of choose + + if (!keyServersInsertedForThisIteration) // Retry the entire workflow + wait(delay(1.0)); + + } // End of while + + keyServersPromise.send(keyServers); + return true; +} + +// Retrieves the locations of all shards in the database +// Returns false if there is a failure (in this case, keyLocationPromise will never be set) +ACTOR Future getKeyLocations(Database cx, + std::vector>> shards, + Promise>> keyLocationPromise, + bool performQuiescentChecks) { + state Standalone> keyLocations; + state Key beginKey = allKeys.begin.withPrefix(keyServersPrefix); + state Key endKey = allKeys.end.withPrefix(keyServersPrefix); + state int i = 0; + state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior + + // If the responses are too big, we may use multiple requests to get the key locations. Each request begins + // where the last left off + for (; i < shards.size(); i++) { + while (beginKey < std::min(shards[i].first.end, endKey)) { + try { + Version version = wait(getVersion(cx)); + + GetKeyValuesRequest req; + req.begin = firstGreaterOrEqual(beginKey); + req.end = firstGreaterOrEqual(std::min(shards[i].first.end, endKey)); + req.limit = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT; + req.limitBytes = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES; + req.version = version; + req.tags = TagSet(); + + // Try getting the shard locations from the key servers + state std::vector>> keyValueFutures; + for (const auto& kv : shards[i].second) { + resetReply(req); + keyValueFutures.push_back(kv.getKeyValues.getReplyUnlessFailedFor(req, 2, 0)); + } + + wait(waitForAll(keyValueFutures)); + + int firstValidStorageServer = -1; + + // Read the shard location results + for (int j = 0; j < keyValueFutures.size(); j++) { + ErrorOr reply = keyValueFutures[j].get(); + + if (!reply.present() || reply.get().error.present()) { + // If no storage servers replied, then throw all_alternatives_failed to force a retry + if (firstValidStorageServer < 0 && j == keyValueFutures.size() - 1) + throw all_alternatives_failed(); + } + + // If this is the first storage server, store the locations to send back to the caller + else if (firstValidStorageServer < 0) { + firstValidStorageServer = j; + + // Otherwise, compare the data to the results from the first storage server. If they are + // different, then the check fails + } else if (reply.get().data != keyValueFutures[firstValidStorageServer].get().get().data || + reply.get().more != keyValueFutures[firstValidStorageServer].get().get().more) { + TraceEvent("ConsistencyCheck_InconsistentKeyServers") + .detail("StorageServer1", shards[i].second[firstValidStorageServer].id()) + .detail("StorageServer2", shards[i].second[j].id()); + testFailure("Key servers inconsistent", performQuiescentChecks, true); + return false; + } + } + + auto keyValueResponse = keyValueFutures[firstValidStorageServer].get().get(); + RangeResult currentLocations = krmDecodeRanges( + keyServersPrefix, + KeyRangeRef(beginKey.removePrefix(keyServersPrefix), + std::min(shards[i].first.end, endKey).removePrefix(keyServersPrefix)), + RangeResultRef(keyValueResponse.data, keyValueResponse.more)); + + if (keyValueResponse.data.size() && beginKey == keyValueResponse.data[0].key) { + keyLocations.push_back_deep(keyLocations.arena(), currentLocations[0]); + } + + if (currentLocations.size() > 2) { + keyLocations.append_deep(keyLocations.arena(), ¤tLocations[1], currentLocations.size() - 2); + } + + // Next iteration should pick up where we left off + ASSERT(currentLocations.size() > 1); + if (!keyValueResponse.more) { + beginKey = shards[i].first.end; + } else { + beginKey = keyValueResponse.data.end()[-1].key; + } + + // If this is the last iteration, then push the allKeys.end KV pair + if (beginKey >= endKey) + keyLocations.push_back_deep(keyLocations.arena(), currentLocations.end()[-1]); + } catch (Error& e) { + state Error err = e; + wait(onErrorTr.onError(err)); + TraceEvent("ConsistencyCheck_RetryGetKeyLocations").error(err); + } + } + } + + keyLocationPromise.send(keyLocations); + return true; +} + +// Retrieves a vector of the storage servers' estimates for the size of a particular shard +// If a storage server can't be reached, its estimate will be -1 +// If there is an error, then the returned vector will have 0 size +ACTOR Future> getStorageSizeEstimate(std::vector storageServers, + KeyRangeRef shard) { + state std::vector estimatedBytes; + + state WaitMetricsRequest req; + req.keys = shard; + req.max.bytes = -1; + req.min.bytes = 0; + + state std::vector>> metricFutures; + + try { + // Check the size of the shard on each storage server + for (int i = 0; i < storageServers.size(); i++) { + resetReply(req); + metricFutures.push_back(storageServers[i].waitMetrics.getReplyUnlessFailedFor(req, 2, 0)); + } + + // Wait for the storage servers to respond + wait(waitForAll(metricFutures)); + + int firstValidStorageServer = -1; + + // Retrieve the size from the storage server responses + for (int i = 0; i < storageServers.size(); i++) { + ErrorOr reply = metricFutures[i].get(); + + // If the storage server doesn't reply, then return -1 + if (!reply.present()) { + TraceEvent("ConsistencyCheck_FailedToFetchMetrics") + .error(reply.getError()) + .detail("Begin", printable(shard.begin)) + .detail("End", printable(shard.end)) + .detail("StorageServer", storageServers[i].id()) + .detail("IsTSS", storageServers[i].isTss() ? "True" : "False"); + estimatedBytes.push_back(-1); + } + + // Add the result to the list of estimates + else if (reply.present()) { + int64_t numBytes = reply.get().bytes; + estimatedBytes.push_back(numBytes); + if (firstValidStorageServer < 0) + firstValidStorageServer = i; + else if (estimatedBytes[firstValidStorageServer] != numBytes) { + TraceEvent("ConsistencyCheck_InconsistentStorageMetrics") + .detail("ByteEstimate1", estimatedBytes[firstValidStorageServer]) + .detail("ByteEstimate2", numBytes) + .detail("Begin", shard.begin) + .detail("End", shard.end) + .detail("StorageServer1", storageServers[firstValidStorageServer].id()) + .detail("StorageServer2", storageServers[i].id()) + .detail("IsTSS", + storageServers[i].isTss() || storageServers[firstValidStorageServer].isTss() ? "True" + : "False"); + } + } + } + } catch (Error& e) { + TraceEvent("ConsistencyCheck_ErrorFetchingMetrics") + .error(e) + .detail("Begin", printable(shard.begin)) + .detail("End", printable(shard.end)); + estimatedBytes.clear(); + } + + return estimatedBytes; +} + +ACTOR Future getDatabaseSize(Database cx) { + state Transaction tr(cx); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + loop { + try { + StorageMetrics metrics = + wait(tr.getDatabase()->getStorageMetrics(KeyRangeRef(allKeys.begin, keyServersPrefix), 100000)); + return metrics.bytes; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +// Checks that the data in each shard is the same on each storage server that it resides on. Also performs some +// sanity checks on the sizes of shards and storage servers. Returns false if there is a failure +// TODO: Future optimization: Use streaming reads +ACTOR Future checkDataConsistency(Database cx, + VectorRef keyLocations, + DatabaseConfiguration configuration, + std::map tssMapping, + bool performQuiescentChecks, + bool performTSSCheck, + bool firstClient, + bool failureIsError, + int clientId, + int clientCount, + bool distributed, + bool shuffleShards, + int shardSampleFactor, + int64_t sharedRandomNumber, + int64_t repetitions, + int64_t* bytesReadInPrevRound, + int restart, + int64_t maxRate, + int64_t targetInterval, + KeyRef progressKey) { + // Stores the total number of bytes on each storage server + // In a distributed test, this will be an estimated size + state std::map storageServerSizes; + + // Iterate through each shard, checking its values on all of its storage servers + // If shardSampleFactor > 1, then not all shards are processed + // Also, in a distributed data consistency check, each client processes a subset of the shards + // Note: this may cause some shards to be processed more than once or not at all in a non-quiescent database + state int effectiveClientCount = distributed ? clientCount : 1; + state int i = clientId * (shardSampleFactor + 1); + state int increment = (distributed && !firstClient) ? effectiveClientCount * shardSampleFactor : 1; + state int64_t rateLimitForThisRound = + *bytesReadInPrevRound == 0 + ? maxRate + : std::min(maxRate, static_cast(ceil(*bytesReadInPrevRound / (float)targetInterval))); + ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= maxRate); + TraceEvent("ConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound); + state Reference rateLimiter = Reference(new SpeedLimit(rateLimitForThisRound, 1)); + state double rateLimiterStartTime = now(); + state int64_t bytesReadInthisRound = 0; + state bool resume = !(restart || shuffleShards); + + state double dbSize = 100e12; + if (g_network->isSimulated()) { + // This call will get all shard ranges in the database, which is too expensive on real clusters. + int64_t _dbSize = wait(getDatabaseSize(cx)); + dbSize = _dbSize; + } + + state std::vector ranges; + + for (int k = 0; k < keyLocations.size() - 1; k++) { + // TODO: check if this is sufficient + if (resume && keyLocations[k].key < progressKey) { + TraceEvent("ConsistencyCheck_SkippingRange") + .detail("KeyBegin", keyLocations[k].key.toString()) + .detail("KeyEnd", keyLocations[k + 1].key.toString()) + .detail("PrevKey", progressKey.toString()); + continue; + } + KeyRangeRef range(keyLocations[k].key, keyLocations[k + 1].key); + ranges.push_back(range); + } + + state std::vector shardOrder; + shardOrder.reserve(ranges.size()); + for (int k = 0; k < ranges.size(); k++) + shardOrder.push_back(k); + if (shuffleShards) { + uint32_t seed = sharedRandomNumber + repetitions; + DeterministicRandom sharedRandom(seed == 0 ? 1 : seed); + sharedRandom.randomShuffle(shardOrder); + } + + for (; i < ranges.size(); i++) { + state int shard = shardOrder[i]; + + state KeyRangeRef range = ranges[shard]; + state std::vector sourceStorageServers; + state std::vector destStorageServers; + state Transaction tr(cx); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + state int bytesReadInRange = 0; + + RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); + decodeKeyServersValue(UIDtoTagMap, keyLocations[shard].value, sourceStorageServers, destStorageServers, false); + + // If the destStorageServers is non-empty, then this shard is being relocated + state bool isRelocating = destStorageServers.size() > 0; + + // In a quiescent database, check that the team size is the same as the desired team size + if (firstClient && performQuiescentChecks && + sourceStorageServers.size() != configuration.usableRegions * configuration.storageTeamSize) { + TraceEvent("ConsistencyCheck_InvalidTeamSize") + .detail("ShardBegin", printable(range.begin)) + .detail("ShardEnd", printable(range.end)) + .detail("SourceTeamSize", sourceStorageServers.size()) + .detail("DestServerSize", destStorageServers.size()) + .detail("ConfigStorageTeamSize", configuration.storageTeamSize) + .detail("UsableRegions", configuration.usableRegions); + // Record the server reponsible for the problematic shards + int k = 0; + for (auto& id : sourceStorageServers) { + TraceEvent("IncorrectSizeTeamInfo").detail("ServerUID", id).detail("TeamIndex", k++); + } + testFailure("Invalid team size", performQuiescentChecks, failureIsError); + return false; + } + + state std::vector storageServers = (isRelocating) ? destStorageServers : sourceStorageServers; + state std::vector storageServerInterfaces; + + loop { + try { + std::vector>> serverListEntries; + serverListEntries.reserve(storageServers.size()); + for (int s = 0; s < storageServers.size(); s++) + serverListEntries.push_back(tr.get(serverListKeyFor(storageServers[s]))); + state std::vector> serverListValues = wait(getAll(serverListEntries)); + for (int s = 0; s < serverListValues.size(); s++) { + if (serverListValues[s].present()) + storageServerInterfaces.push_back(decodeServerListValue(serverListValues[s].get())); + else if (performQuiescentChecks) + testFailure( + "/FF/serverList changing in a quiescent database", performQuiescentChecks, failureIsError); + } + + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + // add TSS to end of list, if configured and if not relocating + if (!isRelocating && performTSSCheck) { + int initialSize = storageServers.size(); + for (int i = 0; i < initialSize; i++) { + auto tssPair = tssMapping.find(storageServers[i]); + if (tssPair != tssMapping.end()) { + CODE_PROBE(true, "TSS checked in consistency check"); + storageServers.push_back(tssPair->second.id()); + storageServerInterfaces.push_back(tssPair->second); + } + } + } + + state std::vector estimatedBytes = wait(getStorageSizeEstimate(storageServerInterfaces, range)); + + // Gets permitted size range of shard + int64_t maxShardSize = getMaxShardSize(dbSize); + state ShardSizeBounds shardBounds = getShardSizeBounds(range, maxShardSize); + + if (firstClient) { + // If there was an error retrieving shard estimated size + if (performQuiescentChecks && estimatedBytes.size() == 0) + testFailure("Error fetching storage metrics", performQuiescentChecks, failureIsError); + + // If running a distributed test, storage server size is an accumulation of shard estimates + else if (distributed && firstClient) + for (int j = 0; j < storageServers.size(); j++) + storageServerSizes[storageServers[j]] += std::max(estimatedBytes[j], (int64_t)0); + } + + // The first client may need to skip the rest of the loop contents if it is just processing this shard to + // get a size estimate + if (!firstClient || shard % (effectiveClientCount * shardSampleFactor) == 0) { + state int shardKeys = 0; + state int shardBytes = 0; + state int sampledBytes = 0; + state int splitBytes = 0; + state int firstKeySampledBytes = 0; + state int sampledKeys = 0; + state int sampledKeysWithProb = 0; + state double shardVariance = 0; + state bool canSplit = false; + state Key lastSampleKey; + state Key lastStartSampleKey; + state int64_t totalReadAmount = 0; + + state KeySelector begin = firstGreaterOrEqual(range.begin); + state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior + + // Read a limited number of entries at a time, repeating until all keys in the shard have been read + loop { + try { + lastSampleKey = lastStartSampleKey; + + // Get the min version of the storage servers + Version version = wait(getVersion(cx)); + + state GetKeyValuesRequest req; + req.begin = begin; + req.end = firstGreaterOrEqual(range.end); + req.limit = 1e4; + req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT; + req.version = version; + req.tags = TagSet(); + + // Try getting the entries in the specified range + state std::vector>> keyValueFutures; + state int j = 0; + TraceEvent("ConsistencyCheck_StoringGetFutures").detail("SSISize", storageServerInterfaces.size()); + for (j = 0; j < storageServerInterfaces.size(); j++) { + resetReply(req); + keyValueFutures.push_back( + storageServerInterfaces[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0)); + } + + wait(waitForAll(keyValueFutures)); + + // Read the resulting entries + state int firstValidServer = -1; + totalReadAmount = 0; + for (j = 0; j < storageServerInterfaces.size(); j++) { + ErrorOr rangeResult = keyValueFutures[j].get(); + + // Compare the results with other storage servers + if (rangeResult.present() && !rangeResult.get().error.present()) { + state GetKeyValuesReply current = rangeResult.get(); + TraceEvent("ConsistencyCheck_GetKeyValuesStream") + .detail("DataSize", current.data.size()) + .detail(format("StorageServer%d", j).c_str(), storageServers[j].toString()); + totalReadAmount += current.data.expectedSize(); + // If we haven't encountered a valid storage server yet, then mark this as the baseline + // to compare against + if (firstValidServer == -1) { + TraceEvent("ConsistencyCheck_FirstValidServer").detail("Iter", j); + firstValidServer = j; + // Compare this shard against the first + } else { + GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get(); + + if (current.data != reference.data || current.more != reference.more) { + // Be especially verbose if in simulation + if (g_network->isSimulated()) { + int invalidIndex = -1; + printf("\n%sSERVER %d (%s); shard = %s - %s:\n", + "", + j, + storageServerInterfaces[j].address().toString().c_str(), + printable(req.begin.getKey()).c_str(), + printable(req.end.getKey()).c_str()); + for (int k = 0; k < current.data.size(); k++) { + printf("%d. %s => %s\n", + k, + printable(current.data[k].key).c_str(), + printable(current.data[k].value).c_str()); + if (invalidIndex < 0 && (k >= reference.data.size() || + current.data[k].key != reference.data[k].key || + current.data[k].value != reference.data[k].value)) + invalidIndex = k; + } + + printf("\n%sSERVER %d (%s); shard = %s - %s:\n", + "", + firstValidServer, + storageServerInterfaces[firstValidServer].address().toString().c_str(), + printable(req.begin.getKey()).c_str(), + printable(req.end.getKey()).c_str()); + for (int k = 0; k < reference.data.size(); k++) { + printf("%d. %s => %s\n", + k, + printable(reference.data[k].key).c_str(), + printable(reference.data[k].value).c_str()); + if (invalidIndex < 0 && (k >= current.data.size() || + reference.data[k].key != current.data[k].key || + reference.data[k].value != current.data[k].value)) + invalidIndex = k; + } + + printf("\nMISMATCH AT %d\n\n", invalidIndex); + } + + // Data for trace event + // The number of keys unique to the current shard + int currentUniques = 0; + // The number of keys unique to the reference shard + int referenceUniques = 0; + // The number of keys in both shards with conflicting values + int valueMismatches = 0; + // The number of keys in both shards with matching values + int matchingKVPairs = 0; + // Last unique key on the current shard + KeyRef currentUniqueKey; + // Last unique key on the reference shard + KeyRef referenceUniqueKey; + // Last value mismatch + KeyRef valueMismatchKey; + + // Loop indeces + int currentI = 0; + int referenceI = 0; + while (currentI < current.data.size() || referenceI < reference.data.size()) { + if (currentI >= current.data.size()) { + referenceUniqueKey = reference.data[referenceI].key; + referenceUniques++; + referenceI++; + } else if (referenceI >= reference.data.size()) { + currentUniqueKey = current.data[currentI].key; + currentUniques++; + currentI++; + } else { + KeyValueRef currentKV = current.data[currentI]; + KeyValueRef referenceKV = reference.data[referenceI]; + + if (currentKV.key == referenceKV.key) { + if (currentKV.value == referenceKV.value) + matchingKVPairs++; + else { + valueMismatchKey = currentKV.key; + valueMismatches++; + } + + currentI++; + referenceI++; + } else if (currentKV.key < referenceKV.key) { + currentUniqueKey = currentKV.key; + currentUniques++; + currentI++; + } else { + referenceUniqueKey = referenceKV.key; + referenceUniques++; + referenceI++; + } + } + } + + TraceEvent("ConsistencyCheck_DataInconsistent") + .detail(format("StorageServer%d", j).c_str(), storageServers[j].toString()) + .detail(format("StorageServer%d", firstValidServer).c_str(), + storageServers[firstValidServer].toString()) + .detail("ShardBegin", req.begin.getKey()) + .detail("ShardEnd", req.end.getKey()) + .detail("VersionNumber", req.version) + .detail(format("Server%dUniques", j).c_str(), currentUniques) + .detail(format("Server%dUniqueKey", j).c_str(), currentUniqueKey) + .detail(format("Server%dUniques", firstValidServer).c_str(), referenceUniques) + .detail(format("Server%dUniqueKey", firstValidServer).c_str(), + referenceUniqueKey) + .detail("ValueMismatches", valueMismatches) + .detail("ValueMismatchKey", valueMismatchKey) + .detail("MatchingKVPairs", matchingKVPairs) + .detail("IsTSS", + storageServerInterfaces[j].isTss() || + storageServerInterfaces[firstValidServer].isTss() + ? "True" + : "False"); + + if ((g_network->isSimulated() && + g_simulator->tssMode != ISimulator::TSSMode::EnabledDropMutations) || + (!storageServerInterfaces[j].isTss() && + !storageServerInterfaces[firstValidServer].isTss())) { + testFailure("Data inconsistent", performQuiescentChecks, true); + return false; + } + } + } + } + + // If the data is not available and we aren't relocating this shard + else if (!isRelocating) { + Error e = rangeResult.isError() ? rangeResult.getError() : rangeResult.get().error.get(); + + TraceEvent("ConsistencyCheck_StorageServerUnavailable") + .errorUnsuppressed(e) + .suppressFor(1.0) + .detail("StorageServer", storageServers[j]) + .detail("ShardBegin", printable(range.begin)) + .detail("ShardEnd", printable(range.end)) + .detail("Address", storageServerInterfaces[j].address()) + .detail("UID", storageServerInterfaces[j].id()) + .detail("GetKeyValuesToken", + storageServerInterfaces[j].getKeyValues.getEndpoint().token) + .detail("IsTSS", storageServerInterfaces[j].isTss() ? "True" : "False"); + + // All shards should be available in quiscence + if (performQuiescentChecks && !storageServerInterfaces[j].isTss()) { + testFailure("Storage server unavailable", performQuiescentChecks, failureIsError); + return false; + } + } + } + + if (firstValidServer >= 0) { + state VectorRef data = keyValueFutures[firstValidServer].get().get().data; + + // Persist the last key of the range we just verified as the progressKey + if (data.size()) { + state Reference csInfoTr = + makeReference(cx); + progressKey = data[data.size() - 1].key; + loop { + try { + csInfoTr->reset(); + csInfoTr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + state Optional val = wait(ConsistencyScanInfo::getInfo(csInfoTr)); + wait(csInfoTr->commit()); + if (val.present()) { + ConsistencyScanInfo consistencyScanInfo = + ObjectReader::fromStringRef(val.get(), + IncludeVersion()); + consistencyScanInfo.progress_key = progressKey; + csInfoTr->reset(); + csInfoTr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + wait(ConsistencyScanInfo::setInfo(csInfoTr, consistencyScanInfo)); + wait(csInfoTr->commit()); + } + break; + } catch (Error& e) { + wait(csInfoTr->onError(e)); + } + } + } + + // Calculate the size of the shard, the variance of the shard size estimate, and the correct + // shard size estimate + for (int k = 0; k < data.size(); k++) { + ByteSampleInfo sampleInfo = isKeyValueInSample(data[k]); + shardBytes += sampleInfo.size; + double itemProbability = ((double)sampleInfo.size) / sampleInfo.sampledSize; + if (itemProbability < 1) + shardVariance += + itemProbability * (1 - itemProbability) * pow((double)sampleInfo.sampledSize, 2); + + if (sampleInfo.inSample) { + sampledBytes += sampleInfo.sampledSize; + if (!canSplit && sampledBytes >= shardBounds.min.bytes && + data[k].key.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT && + sampledBytes <= + shardBounds.max.bytes * CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT / 2) { + canSplit = true; + splitBytes = sampledBytes; + } + + /*TraceEvent("ConsistencyCheck_ByteSample").detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end)) + .detail("SampledBytes", sampleInfo.sampledSize).detail("Key", + printable(data[k].key)).detail("KeySize", data[k].key.size()).detail("ValueSize", + data[k].value.size());*/ + + // In data distribution, the splitting process ignores the first key in a shard. + // Thus, we shouldn't consider it when validating the upper bound of estimated shard + // sizes + if (k == 0) + firstKeySampledBytes += sampleInfo.sampledSize; + + sampledKeys++; + if (itemProbability < 1) { + sampledKeysWithProb++; + } + } + } + + // Accumulate number of keys in this shard + shardKeys += data.size(); + } + // after requesting each shard, enforce rate limit based on how much data will likely be read + if (rateLimitForThisRound > 0) { + TraceEvent("ConsistencyCheck_RateLimit") + .detail("RateLimitForThisRound", rateLimitForThisRound) + .detail("TotalAmountRead", totalReadAmount); + wait(rateLimiter->getAllowance(totalReadAmount)); + TraceEvent("ConsistencyCheck_AmountRead1").detail("TotalAmountRead", totalReadAmount); + // Set ratelimit to max allowed if current round has been going on for a while + if (now() - rateLimiterStartTime > 1.1 * targetInterval && rateLimitForThisRound != maxRate) { + rateLimitForThisRound = maxRate; + rateLimiter = Reference(new SpeedLimit(rateLimitForThisRound, 1)); + rateLimiterStartTime = now(); + TraceEvent(SevInfo, "ConsistencyCheck_RateLimitSetMaxForThisRound") + .detail("RateLimit", rateLimitForThisRound); + } + } + bytesReadInRange += totalReadAmount; + bytesReadInthisRound += totalReadAmount; + TraceEvent("ConsistencyCheck_BytesRead") + .detail("BytesReadInRange", bytesReadInRange) + .detail("BytesReadInthisRound", bytesReadInthisRound); + + // Advance to the next set of entries + if (firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) { + VectorRef result = keyValueFutures[firstValidServer].get().get().data; + ASSERT(result.size() > 0); + begin = firstGreaterThan(result[result.size() - 1].key); + ASSERT(begin.getKey() != allKeys.end); + lastStartSampleKey = lastSampleKey; + } else + break; + } catch (Error& e) { + state Error err = e; + wait(onErrorTr.onError(err)); + TraceEvent("ConsistencyCheck_RetryDataConsistency").error(err); + } + } + + canSplit = canSplit && sampledBytes - splitBytes >= shardBounds.min.bytes && sampledBytes > splitBytes; + + // Update the size of all storage servers containing this shard + // This is only done in a non-distributed consistency check; the distributed check uses shard size + // estimates + if (!distributed) + for (int j = 0; j < storageServers.size(); j++) + storageServerSizes[storageServers[j]] += shardBytes; + + // If the storage servers' sampled estimate of shard size is different from ours + if (performQuiescentChecks) { + for (int j = 0; j < estimatedBytes.size(); j++) { + if (estimatedBytes[j] >= 0 && estimatedBytes[j] != sampledBytes) { + TraceEvent("ConsistencyCheck_IncorrectEstimate") + .detail("EstimatedBytes", estimatedBytes[j]) + .detail("CorrectSampledBytes", sampledBytes) + .detail("StorageServer", storageServers[j]) + .detail("IsTSS", storageServerInterfaces[j].isTss() ? "True" : "False"); + + if (!storageServerInterfaces[j].isTss()) { + testFailure("Storage servers had incorrect sampled estimate", + performQuiescentChecks, + failureIsError); + } + + break; + } else if (estimatedBytes[j] < 0 && ((g_network->isSimulated() && + g_simulator->tssMode <= ISimulator::TSSMode::EnabledNormal) || + !storageServerInterfaces[j].isTss())) { + // Ignore a non-responding TSS outside of simulation, or if tss fault injection is enabled + break; + } + } + } + + // Compute the difference between the shard size estimate and its actual size. If it is sufficiently + // large, then fail + double stdDev = sqrt(shardVariance); + + double failErrorNumStdDev = 7; + int estimateError = abs(shardBytes - sampledBytes); + + // Only perform the check if there are sufficient keys to get a distribution that should resemble a + // normal distribution + if (sampledKeysWithProb > 30 && estimateError > failErrorNumStdDev * stdDev) { + double numStdDev = estimateError / sqrt(shardVariance); + TraceEvent("ConsistencyCheck_InaccurateShardEstimate") + .detail("Min", shardBounds.min.bytes) + .detail("Max", shardBounds.max.bytes) + .detail("Estimate", sampledBytes) + .detail("Actual", shardBytes) + .detail("NumStdDev", numStdDev) + .detail("Variance", shardVariance) + .detail("StdDev", stdDev) + .detail("ShardBegin", printable(range.begin)) + .detail("ShardEnd", printable(range.end)) + .detail("NumKeys", shardKeys) + .detail("NumSampledKeys", sampledKeys) + .detail("NumSampledKeysWithProb", sampledKeysWithProb); + + testFailure(format("Shard size is more than %f std dev from estimate", failErrorNumStdDev), + performQuiescentChecks, + failureIsError); + } + + // In a quiescent database, check that the (estimated) size of the shard is within permitted bounds + // Min and max shard sizes have a 3 * shardBounds.permittedError.bytes cushion for error since shard + // sizes are not precise Shard splits ignore the first key in a shard, so its size shouldn't be + // considered when checking the upper bound 0xff shards are not checked + if (canSplit && sampledKeys > 5 && performQuiescentChecks && !range.begin.startsWith(keyServersPrefix) && + (sampledBytes < shardBounds.min.bytes - 3 * shardBounds.permittedError.bytes || + sampledBytes - firstKeySampledBytes > shardBounds.max.bytes + 3 * shardBounds.permittedError.bytes)) { + TraceEvent("ConsistencyCheck_InvalidShardSize") + .detail("Min", shardBounds.min.bytes) + .detail("Max", shardBounds.max.bytes) + .detail("Size", shardBytes) + .detail("EstimatedSize", sampledBytes) + .detail("ShardBegin", printable(range.begin)) + .detail("ShardEnd", printable(range.end)) + .detail("ShardCount", ranges.size()) + .detail("SampledKeys", sampledKeys); + testFailure(format("Shard size in quiescent database is too %s", + (sampledBytes < shardBounds.min.bytes) ? "small" : "large"), + performQuiescentChecks, + failureIsError); + return false; + } + } + + if (bytesReadInRange > 0) { + TraceEvent("ConsistencyCheck_ReadRange") + .suppressFor(1.0) + .detail("Range", range) + .detail("BytesRead", bytesReadInRange); + } + } + + *bytesReadInPrevRound = bytesReadInthisRound; + return true; +} + +ACTOR Future runDataValidationCheck(ConsistencyScanData* self) { + state Reference tr = makeReference(self->db); + state ConsistencyScanInfo csInfo = ConsistencyScanInfo(); + csInfo.consistency_scan_enabled = true; + csInfo.restart = self->restart; + csInfo.max_rate = self->maxRate; + csInfo.target_interval = self->targetInterval; + csInfo.last_round_start = StorageMetadataType::currentTime(); + try { + // Get a list of key servers; verify that the TLogs and master all agree about who the key servers are + state Promise>>> keyServerPromise; + state std::map tssMapping; + bool keyServerResult = wait(getKeyServers(self->db, keyServerPromise, keyServersKeys, false)); + if (keyServerResult) { + state std::vector>> keyServers = + keyServerPromise.getFuture().get(); + + // Get the locations of all the shards in the database + state Promise>> keyLocationPromise; + bool keyLocationResult = wait(getKeyLocations(self->db, keyServers, keyLocationPromise, false)); + if (keyLocationResult) { + state Standalone> keyLocations = keyLocationPromise.getFuture().get(); + + // Check that each shard has the same data on all storage servers that it resides on + wait(::success(checkDataConsistency(self->db, + keyLocations, + self->configuration, + tssMapping, + false /* quiescentCheck */, + false /* tssCheck */, + true /* firstClient */, + false /* failureIsError */, + 0 /* clientId */, + 1 /* clientCount */, + false /* distributed */, + false /* shuffleShards */, + 1 /* shardSampleFactor */, + deterministicRandom()->randomInt64(0, 10000000), + self->finishedRounds /* repetitions */, + &(self->bytesReadInPrevRound), + self->restart, + self->maxRate, + self->targetInterval, + self->progressKey))); + } + } + } catch (Error& e) { + if (e.code() == error_code_transaction_too_old || e.code() == error_code_future_version || + e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || + e.code() == error_code_process_behind || e.code() == error_code_actor_cancelled) + TraceEvent("ConsistencyScan_Retry").error(e); // FIXME: consistency check does not retry in this case + else + throw; + } + + TraceEvent("ConsistencyScan_FinishedCheck"); + + // Update the ConsistencyScanInfo object and persist to the database + csInfo.last_round_finish = StorageMetadataType::currentTime(); + csInfo.finished_rounds = self->finishedRounds + 1; + auto duration = csInfo.last_round_finish - csInfo.last_round_start; + csInfo.smoothed_round_duration.setTotal((double)duration); + csInfo.progress_key = self->progressKey; + csInfo.bytes_read_prev_round = self->bytesReadInPrevRound; + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + wait(ConsistencyScanInfo::setInfo(tr, csInfo)); + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + return Void(); +} + +ACTOR Future watchConsistencyScanInfoKey(ConsistencyScanData* self) { + state Reference tr = makeReference(self->db); + + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + state Optional val = wait(ConsistencyScanInfo::getInfo(tr)); + if (val.present()) { + ConsistencyScanInfo consistencyScanInfo = + ObjectReader::fromStringRef(val.get(), IncludeVersion()); + self->restart = consistencyScanInfo.restart; + self->maxRate = consistencyScanInfo.max_rate; + self->targetInterval = consistencyScanInfo.target_interval; + self->progressKey = consistencyScanInfo.progress_key; + self->bytesReadInPrevRound = consistencyScanInfo.bytes_read_prev_round; + self->finishedRounds = consistencyScanInfo.finished_rounds; + self->consistencyScanEnabled.set(consistencyScanInfo.consistency_scan_enabled); + //TraceEvent("ConsistencyScan_WatchGotVal", self->id) + // .detail("Enabled", consistencyScanInfo.consistency_scan_enabled) + // .detail("MaxRateRead", consistencyScanInfo.max_rate) + // .detail("MaxRateSelf", self->maxRate); + } + state Future watch = tr->watch(consistencyScanInfoKey); + wait(tr->commit()); + wait(watch); + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + +ACTOR Future consistencyScan(ConsistencyScanInterface csInterf, Reference const> dbInfo) { + state ConsistencyScanData self(csInterf.id(), + openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); + state Promise err; + state Future collection = actorCollection(self.addActor.getFuture()); + state ConsistencyScanInfo csInfo = ConsistencyScanInfo(); + + TraceEvent("ConsistencyScan_Starting", csInterf.id()).log(); + + // Randomly enable consistencyScan in simulation + if (g_network->isSimulated()) { + if (deterministicRandom()->random01() < 0.5) { + csInfo.consistency_scan_enabled = false; + } else { + csInfo.consistency_scan_enabled = true; + csInfo.restart = false; + csInfo.max_rate = 50e6; + csInfo.target_interval = 24 * 7 * 60 * 60; + } + TraceEvent("SimulatedConsistencyScanConfigRandom") + .detail("ConsistencyScanEnabled", csInfo.consistency_scan_enabled) + .detail("MaxRate", csInfo.max_rate) + .detail("Interval", csInfo.target_interval); + state Reference tr = makeReference(self.db); + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + wait(ConsistencyScanInfo::setInfo(tr, csInfo)); + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + } + + self.addActor.send(waitFailureServer(csInterf.waitFailure.getFuture())); + self.addActor.send(traceRole(Role::CONSISTENCYSCAN, csInterf.id())); + self.addActor.send(watchConsistencyScanInfoKey(&self)); + + loop { + if (self.consistencyScanEnabled.get()) { + try { + loop choose { + when(wait(runDataValidationCheck(&self))) { + TraceEvent("ConsistencyScan_Done", csInterf.id()).log(); + return Void(); + } + when(HaltConsistencyScanRequest req = waitNext(csInterf.haltConsistencyScan.getFuture())) { + req.reply.send(Void()); + TraceEvent("ConsistencyScan_Halted", csInterf.id()).detail("ReqID", req.requesterID); + break; + } + when(wait(err.getFuture())) {} + when(wait(collection)) { + ASSERT(false); + throw internal_error(); + } + } + } catch (Error& err) { + if (err.code() == error_code_actor_cancelled) { + TraceEvent("ConsistencyScan_ActorCanceled", csInterf.id()).errorUnsuppressed(err); + return Void(); + } + TraceEvent("ConsistencyScan_Died", csInterf.id()).errorUnsuppressed(err); + } + } else { + TraceEvent("ConsistencyScan_WaitingForConfigChange", self.id).log(); + wait(self.consistencyScanEnabled.onChange()); + } + } +} \ No newline at end of file diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index deb0aec396..89d9e4cb30 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -34,6 +34,7 @@ #include "fdbserver/ClusterRecovery.actor.h" #include "fdbserver/CoordinationInterface.h" #include "fdbserver/DataDistribution.actor.h" +#include "fdbclient/ConsistencyScanInterface.h" #include "flow/UnitTest.h" #include "fdbserver/QuietDatabase.h" #include "fdbserver/RecoveryState.h" @@ -809,6 +810,10 @@ ACTOR static Future processStatusFetcher( roles.addRole("blob_manager", db->get().blobManager.get()); } + if (db->get().consistencyScan.present()) { + roles.addRole("consistency_scan", db->get().consistencyScan.get()); + } + if (SERVER_KNOBS->ENABLE_ENCRYPTION && db->get().encryptKeyProxy.present()) { roles.addRole("encrypt_key_proxy", db->get().encryptKeyProxy.get()); } @@ -2871,6 +2876,26 @@ ACTOR Future storageWigglerStatsFetcher(Optional> consistencyScanInfoFetcher(Database cx) { + state Reference tr(new ReadYourWritesTransaction(cx)); + state Optional val; + loop { + try { + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + wait(store(val, ConsistencyScanInfo::getInfo(tr))); + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + TraceEvent("ConsistencyScanInfoFetcher").log(); + return val.get(); +} + // constructs the cluster section of the json status output ACTOR Future clusterGetStatus( Reference> db, @@ -2890,6 +2915,7 @@ ACTOR Future clusterGetStatus( state WorkerDetails ccWorker; // Cluster-Controller worker state WorkerDetails ddWorker; // DataDistributor worker state WorkerDetails rkWorker; // Ratekeeper worker + state WorkerDetails csWorker; // ConsistencyScan worker try { // Get the master Worker interface @@ -2936,6 +2962,19 @@ ACTOR Future clusterGetStatus( rkWorker = _rkWorker.get(); } + // Get the ConsistencyScan worker interface + Optional _csWorker; + if (db->get().consistencyScan.present()) { + _csWorker = getWorker(workers, db->get().consistencyScan.get().address()); + } + + if (!db->get().consistencyScan.present() || !_csWorker.present()) { + messages.push_back(JsonString::makeMessage("unreachable_consistencyScan_worker", + "Unable to locate the consistencyScan worker.")); + } else { + csWorker = _csWorker.get(); + } + // Get latest events for various event types from ALL workers // WorkerEvents is a map of worker's NetworkAddress to its event string // The pair represents worker responses and a set of worker NetworkAddress strings which did not respond. @@ -3350,6 +3389,26 @@ ACTOR Future clusterGetStatus( messages.push_back(clientIssueMessage); } + // Fetch Consistency Scan Information + state Reference tr(new ReadYourWritesTransaction(cx)); + state Optional val; + loop { + try { + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + wait(store(val, ConsistencyScanInfo::getInfo(tr))); + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + if (val.present()) { + ConsistencyScanInfo consistencyScanInfo = + ObjectReader::fromStringRef(val.get(), IncludeVersion()); + TraceEvent("StatusConsistencyScanGotVal").log(); + statusObj["consistency_scan_info"] = consistencyScanInfo.toJSON(); + } + // Create the status_incomplete message if there were any reasons that the status is incomplete. if (!status_incomplete_reasons.empty()) { JsonBuilderObject incomplete_message = diff --git a/fdbserver/include/fdbserver/ClusterController.actor.h b/fdbserver/include/fdbserver/ClusterController.actor.h index f90d17f9c8..72f4e92d32 100644 --- a/fdbserver/include/fdbserver/ClusterController.actor.h +++ b/fdbserver/include/fdbserver/ClusterController.actor.h @@ -51,6 +51,7 @@ struct WorkerInfo : NonCopyable { Future haltDistributor; Future haltBlobManager; Future haltEncryptKeyProxy; + Future haltConsistencyScan; Standalone> issues; WorkerInfo() @@ -73,7 +74,7 @@ struct WorkerInfo : NonCopyable { : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), reboots(r.reboots), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)), haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), haltBlobManager(r.haltBlobManager), - haltEncryptKeyProxy(r.haltEncryptKeyProxy), issues(r.issues) {} + haltEncryptKeyProxy(r.haltEncryptKeyProxy), haltConsistencyScan(r.haltConsistencyScan), issues(r.issues) {} void operator=(WorkerInfo&& r) noexcept { watcher = std::move(r.watcher); reply = std::move(r.reply); @@ -188,6 +189,14 @@ public: serverInfo->set(newInfo); } + void setConsistencyScan(const ConsistencyScanInterface& interf) { + auto newInfo = serverInfo->get(); + newInfo.id = deterministicRandom()->randomUniqueID(); + newInfo.infoGeneration = ++dbInfoCount; + newInfo.consistencyScan = interf; + serverInfo->set(newInfo); + } + void clearInterf(ProcessClass::ClassType t) { auto newInfo = serverInfo->get(); newInfo.id = deterministicRandom()->randomUniqueID(); @@ -200,6 +209,8 @@ public: newInfo.blobManager = Optional(); } else if (t == ProcessClass::EncryptKeyProxyClass) { newInfo.encryptKeyProxy = Optional(); + } else if (t == ProcessClass::ConsistencyScanClass) { + newInfo.consistencyScan = Optional(); } serverInfo->set(newInfo); } @@ -294,7 +305,9 @@ public: (db.serverInfo->get().blobManager.present() && db.serverInfo->get().blobManager.get().locality.processId() == processId) || (db.serverInfo->get().encryptKeyProxy.present() && - db.serverInfo->get().encryptKeyProxy.get().locality.processId() == processId); + db.serverInfo->get().encryptKeyProxy.get().locality.processId() == processId) || + (db.serverInfo->get().consistencyScan.present() && + db.serverInfo->get().consistencyScan.get().locality.processId() == processId); } WorkerDetails getStorageWorker(RecruitStorageRequest const& req) { @@ -2880,7 +2893,8 @@ public: ASSERT(masterProcessId.present()); const auto& pid = worker.interf.locality.processId(); if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper && - role != ProcessClass::BlobManager && role != ProcessClass::EncryptKeyProxy) || + role != ProcessClass::BlobManager && role != ProcessClass::EncryptKeyProxy && + role != ProcessClass::ConsistencyScan) || pid == masterProcessId.get()) { return false; } @@ -3263,6 +3277,8 @@ public: Optional recruitingBlobManagerID; AsyncVar recruitEncryptKeyProxy; Optional recruitingEncryptKeyProxyID; + AsyncVar recruitConsistencyScan; + Optional recruitingConsistencyScanID; // Stores the health information from a particular worker's perspective. struct WorkerHealth { @@ -3300,7 +3316,7 @@ public: goodRecruitmentTime(Never()), goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false), remoteDCMonitorStarted(false), remoteTransactionSystemDegraded(false), recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false), recruitEncryptKeyProxy(false), - clusterControllerMetrics("ClusterController", id.toString()), + recruitConsistencyScan(false), clusterControllerMetrics("ClusterController", id.toString()), openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics), registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics), getWorkersRequests("GetWorkersRequests", clusterControllerMetrics), diff --git a/fdbserver/include/fdbserver/ServerDBInfo.actor.h b/fdbserver/include/fdbserver/ServerDBInfo.actor.h index 1a7a9a4211..3a86fcc0d2 100644 --- a/fdbserver/include/fdbserver/ServerDBInfo.actor.h +++ b/fdbserver/include/fdbserver/ServerDBInfo.actor.h @@ -31,6 +31,7 @@ #include "fdbserver/LogSystemConfig.h" #include "fdbserver/RatekeeperInterface.h" #include "fdbserver/BlobManagerInterface.h" +#include "fdbclient/ConsistencyScanInterface.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/LatencyBandConfig.h" #include "fdbserver/WorkerInterface.actor.h" @@ -50,6 +51,7 @@ struct ServerDBInfo { Optional ratekeeper; Optional blobManager; Optional encryptKeyProxy; + Optional consistencyScan; std::vector resolvers; DBRecoveryCount recoveryCount; // A recovery count from DBCoreState. A successful cluster recovery increments it twice; @@ -83,6 +85,7 @@ struct ServerDBInfo { ratekeeper, blobManager, encryptKeyProxy, + consistencyScan, resolvers, recoveryCount, recoveryState, diff --git a/fdbserver/include/fdbserver/WorkerInterface.actor.h b/fdbserver/include/fdbserver/WorkerInterface.actor.h index afbe301dec..e8f40990e6 100644 --- a/fdbserver/include/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/include/fdbserver/WorkerInterface.actor.h @@ -31,6 +31,7 @@ #include "fdbserver/MasterInterface.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/RatekeeperInterface.h" +#include "fdbclient/ConsistencyScanInterface.h" #include "fdbserver/BlobManagerInterface.h" #include "fdbserver/ResolverInterface.h" #include "fdbclient/BlobWorkerInterface.h" @@ -57,6 +58,7 @@ struct WorkerInterface { RequestStream ratekeeper; RequestStream blobManager; RequestStream blobWorker; + RequestStream consistencyScan; RequestStream resolver; RequestStream storage; RequestStream logRouter; @@ -112,6 +114,7 @@ struct WorkerInterface { ratekeeper, blobManager, blobWorker, + consistencyScan, resolver, storage, logRouter, @@ -428,6 +431,7 @@ struct RegisterWorkerRequest { Optional ratekeeperInterf; Optional blobManagerInterf; Optional encryptKeyProxyInterf; + Optional consistencyScanInterf; Standalone> issues; std::vector incompatiblePeers; ReplyPromise reply; @@ -449,6 +453,7 @@ struct RegisterWorkerRequest { Optional rkInterf, Optional bmInterf, Optional ekpInterf, + Optional csInterf, bool degraded, Optional lastSeenKnobVersion, Optional knobConfigClassSet, @@ -456,9 +461,9 @@ struct RegisterWorkerRequest { ConfigBroadcastInterface configBroadcastInterface) : wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf), - encryptKeyProxyInterf(ekpInterf), degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), - knobConfigClassSet(knobConfigClassSet), requestDbInfo(false), recoveredDiskFiles(recoveredDiskFiles), - configBroadcastInterface(configBroadcastInterface) {} + encryptKeyProxyInterf(ekpInterf), consistencyScanInterf(csInterf), degraded(degraded), + lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet), requestDbInfo(false), + recoveredDiskFiles(recoveredDiskFiles), configBroadcastInterface(configBroadcastInterface) {} template void serialize(Ar& ar) { @@ -472,6 +477,7 @@ struct RegisterWorkerRequest { ratekeeperInterf, blobManagerInterf, encryptKeyProxyInterf, + consistencyScanInterf, issues, incompatiblePeers, reply, @@ -728,6 +734,19 @@ struct InitializeRatekeeperRequest { } }; +struct InitializeConsistencyScanRequest { + constexpr static FileIdentifier file_identifier = 3104275; + UID reqId; + ReplyPromise reply; + + InitializeConsistencyScanRequest() {} + explicit InitializeConsistencyScanRequest(UID uid) : reqId(uid) {} + template + void serialize(Ar& ar) { + serializer(ar, reqId, reply); + } +}; + struct InitializeBlobManagerRequest { constexpr static FileIdentifier file_identifier = 2567474; UID reqId; @@ -990,6 +1009,7 @@ struct Role { static const Role COORDINATOR; static const Role BACKUP; static const Role ENCRYPT_KEY_PROXY; + static const Role CONSISTENCYSCAN; std::string roleName; std::string abbreviation; @@ -1027,6 +1047,8 @@ struct Role { return BACKUP; case ProcessClass::EncryptKeyProxy: return ENCRYPT_KEY_PROXY; + case ProcessClass::ConsistencyScan: + return CONSISTENCYSCAN; case ProcessClass::Worker: return WORKER; case ProcessClass::NoRole: @@ -1148,6 +1170,7 @@ ACTOR Future logRouter(TLogInterface interf, Reference const> db); ACTOR Future dataDistributor(DataDistributorInterface ddi, Reference const> db); ACTOR Future ratekeeper(RatekeeperInterface rki, Reference const> db); +ACTOR Future consistencyScan(ConsistencyScanInterface csInterf, Reference const> dbInfo); ACTOR Future blobManager(BlobManagerInterface bmi, Reference const> db, int64_t epoch); ACTOR Future storageCacheServer(StorageServerInterface interf, uint16_t id, diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 003de0107a..455b9efd0a 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -4843,6 +4843,11 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe !data->isTss() && !data->isSSWithTSSPair()) ? 1 : CLIENT_KNOBS->REPLY_BYTE_LIMIT; + TraceEvent(SevDebug, "SSGetKeyValueStreamLimits") + .detail("ByteLimit", byteLimit) + .detail("ReqLimit", req.limit) + .detail("Begin", begin.printable()) + .detail("End", end.printable()); GetKeyValuesReply _r = wait(readRange(data, version, KeyRangeRef(begin, end), diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 76365aa79c..3c92ae2205 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -562,6 +562,7 @@ ACTOR Future registrationClient( Reference> const> rkInterf, Reference>> const> bmInterf, Reference> const> ekpInterf, + Reference> const> csInterf, Reference const> degraded, Reference connRecord, Reference> const> issues, @@ -602,6 +603,7 @@ ACTOR Future registrationClient( rkInterf->get(), bmInterf->get().present() ? bmInterf->get().get().second : Optional(), ekpInterf->get(), + csInterf->get(), degraded->get(), localConfig.isValid() ? localConfig->lastSeenVersion() : Optional(), localConfig.isValid() ? localConfig->configClassSet() : Optional(), @@ -670,6 +672,7 @@ ACTOR Future registrationClient( when(wait(ccInterface->onChange())) { break; } when(wait(ddInterf->onChange())) { break; } when(wait(rkInterf->onChange())) { break; } + when(wait(csInterf->onChange())) { break; } when(wait(bmInterf->onChange())) { break; } when(wait(ekpInterf->onChange())) { break; } when(wait(degraded->onChange())) { break; } @@ -696,6 +699,10 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference workerServer(Reference connRecord, state UID lastBMRecruitRequestId; state Reference>> ekpInterf( new AsyncVar>()); + state Reference>> csInterf( + new AsyncVar>()); state Future handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last state ActorCollection errorForwarders(false); state Future loggingTrigger = Void(); @@ -1942,6 +1951,7 @@ ACTOR Future workerServer(Reference connRecord, rkInterf, bmEpochAndInterf, ekpInterf, + csInterf, degraded, connRecord, issues, @@ -2136,6 +2146,31 @@ ACTOR Future workerServer(Reference connRecord, TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id()); req.reply.send(recruited); } + when(InitializeConsistencyScanRequest req = waitNext(interf.consistencyScan.getFuture())) { + LocalLineage _; + getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::ConsistencyScan; + ConsistencyScanInterface recruited(locality, req.reqId); + recruited.initEndpoints(); + + if (csInterf->get().present()) { + recruited = csInterf->get().get(); + CODE_PROBE(true, "Recovered while already a consistencyscan"); + } else { + startRole(Role::CONSISTENCYSCAN, recruited.id(), interf.id()); + DUMPTOKEN(recruited.waitFailure); + DUMPTOKEN(recruited.haltConsistencyScan); + + Future consistencyScanProcess = consistencyScan(recruited, dbInfo); + errorForwarders.add(forwardError( + errors, + Role::CONSISTENCYSCAN, + recruited.id(), + setWhenDoneOrError(consistencyScanProcess, csInterf, Optional()))); + csInterf->set(Optional(recruited)); + } + TraceEvent("ConsistencyScanReceived", req.reqId).detail("ConsistencyScanId", recruited.id()); + req.reply.send(recruited); + } when(InitializeBlobManagerRequest req = waitNext(interf.blobManager.getFuture())) { LocalLineage _; getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::BlobManager; @@ -3459,3 +3494,4 @@ const Role Role::STORAGE_CACHE("StorageCache", "SC"); const Role Role::COORDINATOR("Coordinator", "CD"); const Role Role::BACKUP("Backup", "BK"); const Role Role::ENCRYPT_KEY_PROXY("EncryptKeyProxy", "EP"); +const Role Role::CONSISTENCYSCAN("ConsistencyScan", "CS"); diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index 212d705bd5..eb435fa513 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -321,19 +321,41 @@ struct ConsistencyCheckWorkload : TestWorkload { // Get a list of key servers; verify that the TLogs and master all agree about who the key servers are state Promise>>> keyServerPromise; - bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise, keyServersKeys)); + bool keyServerResult = + wait(getKeyServers(cx, keyServerPromise, keyServersKeys, self->performQuiescentChecks)); if (keyServerResult) { state std::vector>> keyServers = keyServerPromise.getFuture().get(); // Get the locations of all the shards in the database state Promise>> keyLocationPromise; - bool keyLocationResult = wait(self->getKeyLocations(cx, keyServers, self, keyLocationPromise)); + bool keyLocationResult = + wait(getKeyLocations(cx, keyServers, keyLocationPromise, self->performQuiescentChecks)); if (keyLocationResult) { state Standalone> keyLocations = keyLocationPromise.getFuture().get(); // Check that each shard has the same data on all storage servers that it resides on - wait(::success(self->checkDataConsistency(cx, keyLocations, configuration, tssMapping, self))); + wait(::success( + checkDataConsistency(cx, + keyLocations, + configuration, + tssMapping, + self->performQuiescentChecks, + self->performTSSCheck, + self->firstClient, + self->failureIsError, + self->clientId, + self->clientCount, + self->distributed, + self->shuffleShards, + self->shardSampleFactor, + self->sharedRandomNumber, + self->repetitions, + &(self->bytesReadInPreviousRound), + true, + self->rateLimitMax, + CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, + KeyRef()))); // Cache consistency check if (self->performCacheCheck) @@ -343,11 +365,12 @@ struct ConsistencyCheckWorkload : TestWorkload { } catch (Error& e) { if (e.code() == error_code_transaction_too_old || e.code() == error_code_future_version || e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || - e.code() == error_code_process_behind) + e.code() == error_code_process_behind || e.code() == error_code_actor_cancelled) { TraceEvent("ConsistencyCheck_Retry") .error(e); // FIXME: consistency check does not retry in this case - else + } else { self->testFailure(format("Error %d - %s", e.code(), e.name())); + } } } @@ -526,7 +549,7 @@ struct ConsistencyCheckWorkload : TestWorkload { lastSampleKey = lastStartSampleKey; // Get the min version of the storage servers - Version version = wait(self->getVersion(cx, self)); + Version version = wait(getVersion(cx)); state GetKeyValuesRequest req; req.begin = begin; @@ -744,7 +767,7 @@ struct ConsistencyCheckWorkload : TestWorkload { bool removePrefix) { // get shards paired with corresponding storage servers state Promise>>> keyServerPromise; - bool keyServerResult = wait(self->getKeyServers(cx, self, keyServerPromise, range)); + bool keyServerResult = wait(getKeyServers(cx, keyServerPromise, range, self->performQuiescentChecks)); if (!keyServerResult) return false; state std::vector>> shards = @@ -762,7 +785,7 @@ struct ConsistencyCheckWorkload : TestWorkload { for (i = 0; i < shards.size(); i++) { while (beginKey < std::min(shards[i].first.end, endKey)) { try { - Version version = wait(self->getVersion(cx, self)); + Version version = wait(getVersion(cx)); GetKeyValuesRequest req; req.begin = firstGreaterOrEqual(beginKey); @@ -846,879 +869,12 @@ struct ConsistencyCheckWorkload : TestWorkload { return true; } - // Gets a version at which to read from the storage servers - ACTOR Future getVersion(Database cx, ConsistencyCheckWorkload* self) { - loop { - state Transaction tr(cx); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - try { - Version version = wait(tr.getReadVersion()); - return version; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - } - - // Get a list of storage servers(persisting keys within range "kr") from the master and compares them with the - // TLogs. If this is a quiescent check, then each commit proxy needs to respond, otherwise only one needs to - // respond. Returns false if there is a failure (in this case, keyServersPromise will never be set) - ACTOR Future getKeyServers( - Database cx, - ConsistencyCheckWorkload* self, - Promise>>> keyServersPromise, - KeyRangeRef kr) { - state std::vector>> keyServers; - - // Try getting key server locations from the master proxies - state std::vector>> keyServerLocationFutures; - state Key begin = kr.begin; - state Key end = kr.end; - state int limitKeyServers = BUGGIFY ? 1 : 100; - state Span span(SpanContext(deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64()), - "WL:ConsistencyCheck"_loc); - - while (begin < end) { - state Reference commitProxyInfo = - wait(cx->getCommitProxiesFuture(UseProvisionalProxies::False)); - keyServerLocationFutures.clear(); - for (int i = 0; i < commitProxyInfo->size(); i++) - keyServerLocationFutures.push_back( - commitProxyInfo->get(i, &CommitProxyInterface::getKeyServersLocations) - .getReplyUnlessFailedFor( - GetKeyServerLocationsRequest( - span.context, TenantInfo(), begin, end, limitKeyServers, false, latestVersion, Arena()), - 2, - 0)); - - state bool keyServersInsertedForThisIteration = false; - choose { - when(wait(waitForAll(keyServerLocationFutures))) { - // Read the key server location results - for (int i = 0; i < keyServerLocationFutures.size(); i++) { - ErrorOr shards = keyServerLocationFutures[i].get(); - - // If performing quiescent check, then all master proxies should be reachable. Otherwise, only - // one needs to be reachable - if (self->performQuiescentChecks && !shards.present()) { - TraceEvent("ConsistencyCheck_CommitProxyUnavailable") - .detail("CommitProxyID", commitProxyInfo->getId(i)); - self->testFailure("Commit proxy unavailable"); - return false; - } - - // Get the list of shards if one was returned. If not doing a quiescent check, we can break if - // it is. If we are doing a quiescent check, then we only need to do this for the first shard. - if (shards.present() && !keyServersInsertedForThisIteration) { - keyServers.insert( - keyServers.end(), shards.get().results.begin(), shards.get().results.end()); - keyServersInsertedForThisIteration = true; - begin = shards.get().results.back().first.end; - - if (!self->performQuiescentChecks) - break; - } - } // End of For - } - when(wait(cx->onProxiesChanged())) {} - } // End of choose - - if (!keyServersInsertedForThisIteration) // Retry the entire workflow - wait(delay(1.0)); - - } // End of while - - keyServersPromise.send(keyServers); - return true; - } - - // Retrieves the locations of all shards in the database - // Returns false if there is a failure (in this case, keyLocationPromise will never be set) - ACTOR Future getKeyLocations(Database cx, - std::vector>> shards, - ConsistencyCheckWorkload* self, - Promise>> keyLocationPromise) { - state Standalone> keyLocations; - state Key beginKey = allKeys.begin.withPrefix(keyServersPrefix); - state Key endKey = allKeys.end.withPrefix(keyServersPrefix); - state int i = 0; - state Transaction onErrorTr(cx); // This transaction exists only to access onError and its backoff behavior - - // If the responses are too big, we may use multiple requests to get the key locations. Each request begins - // where the last left off - for (; i < shards.size(); i++) { - while (beginKey < std::min(shards[i].first.end, endKey)) { - try { - Version version = wait(self->getVersion(cx, self)); - - GetKeyValuesRequest req; - req.begin = firstGreaterOrEqual(beginKey); - req.end = firstGreaterOrEqual(std::min(shards[i].first.end, endKey)); - req.limit = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT; - req.limitBytes = SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES; - req.version = version; - req.tags = TagSet(); - - // Try getting the shard locations from the key servers - state std::vector>> keyValueFutures; - for (const auto& kv : shards[i].second) { - resetReply(req); - keyValueFutures.push_back(kv.getKeyValues.getReplyUnlessFailedFor(req, 2, 0)); - } - - wait(waitForAll(keyValueFutures)); - - int firstValidStorageServer = -1; - - // Read the shard location results - for (int j = 0; j < keyValueFutures.size(); j++) { - ErrorOr reply = keyValueFutures[j].get(); - - if (!reply.present() || reply.get().error.present()) { - // If the storage server didn't reply in a quiescent database, then the check fails - if (self->performQuiescentChecks) { - TraceEvent("ConsistencyCheck_KeyServerUnavailable") - .detail("StorageServer", shards[i].second[j].id().toString().c_str()); - self->testFailure("Key server unavailable"); - return false; - } - - // If no storage servers replied, then throw all_alternatives_failed to force a retry - else if (firstValidStorageServer < 0 && j == keyValueFutures.size() - 1) - throw all_alternatives_failed(); - } - - // If this is the first storage server, store the locations to send back to the caller - else if (firstValidStorageServer < 0) { - firstValidStorageServer = j; - - // Otherwise, compare the data to the results from the first storage server. If they are - // different, then the check fails - } else if (reply.get().data != keyValueFutures[firstValidStorageServer].get().get().data || - reply.get().more != keyValueFutures[firstValidStorageServer].get().get().more) { - TraceEvent("ConsistencyCheck_InconsistentKeyServers") - .detail("StorageServer1", shards[i].second[firstValidStorageServer].id()) - .detail("StorageServer2", shards[i].second[j].id()); - self->testFailure("Key servers inconsistent", true); - return false; - } - } - - auto keyValueResponse = keyValueFutures[firstValidStorageServer].get().get(); - RangeResult currentLocations = krmDecodeRanges( - keyServersPrefix, - KeyRangeRef(beginKey.removePrefix(keyServersPrefix), - std::min(shards[i].first.end, endKey).removePrefix(keyServersPrefix)), - RangeResultRef(keyValueResponse.data, keyValueResponse.more)); - - if (keyValueResponse.data.size() && beginKey == keyValueResponse.data[0].key) { - keyLocations.push_back_deep(keyLocations.arena(), currentLocations[0]); - } - - if (currentLocations.size() > 2) { - keyLocations.append_deep( - keyLocations.arena(), ¤tLocations[1], currentLocations.size() - 2); - } - - // Next iteration should pick up where we left off - ASSERT(currentLocations.size() > 1); - if (!keyValueResponse.more) { - beginKey = shards[i].first.end; - } else { - beginKey = keyValueResponse.data.end()[-1].key; - } - - // If this is the last iteration, then push the allKeys.end KV pair - if (beginKey >= endKey) - keyLocations.push_back_deep(keyLocations.arena(), currentLocations.end()[-1]); - } catch (Error& e) { - state Error err = e; - wait(onErrorTr.onError(err)); - TraceEvent("ConsistencyCheck_RetryGetKeyLocations").error(err); - } - } - } - - keyLocationPromise.send(keyLocations); - return true; - } - - // Retrieves a vector of the storage servers' estimates for the size of a particular shard - // If a storage server can't be reached, its estimate will be -1 - // If there is an error, then the returned vector will have 0 size - ACTOR Future> getStorageSizeEstimate(std::vector storageServers, - KeyRangeRef shard) { - state std::vector estimatedBytes; - - state WaitMetricsRequest req; - req.keys = shard; - req.max.bytes = -1; - req.min.bytes = 0; - - state std::vector>> metricFutures; - - try { - // Check the size of the shard on each storage server - for (int i = 0; i < storageServers.size(); i++) { - resetReply(req); - metricFutures.push_back(storageServers[i].waitMetrics.getReplyUnlessFailedFor(req, 2, 0)); - } - - // Wait for the storage servers to respond - wait(waitForAll(metricFutures)); - - int firstValidStorageServer = -1; - - // Retrieve the size from the storage server responses - for (int i = 0; i < storageServers.size(); i++) { - ErrorOr reply = metricFutures[i].get(); - - // If the storage server doesn't reply, then return -1 - if (!reply.present()) { - TraceEvent("ConsistencyCheck_FailedToFetchMetrics") - .error(reply.getError()) - .detail("Begin", printable(shard.begin)) - .detail("End", printable(shard.end)) - .detail("StorageServer", storageServers[i].id()) - .detail("IsTSS", storageServers[i].isTss() ? "True" : "False"); - estimatedBytes.push_back(-1); - } - - // Add the result to the list of estimates - else if (reply.present()) { - int64_t numBytes = reply.get().bytes; - estimatedBytes.push_back(numBytes); - if (firstValidStorageServer < 0) - firstValidStorageServer = i; - else if (estimatedBytes[firstValidStorageServer] != numBytes) { - TraceEvent("ConsistencyCheck_InconsistentStorageMetrics") - .detail("ByteEstimate1", estimatedBytes[firstValidStorageServer]) - .detail("ByteEstimate2", numBytes) - .detail("Begin", shard.begin) - .detail("End", shard.end) - .detail("StorageServer1", storageServers[firstValidStorageServer].id()) - .detail("StorageServer2", storageServers[i].id()) - .detail("IsTSS", - storageServers[i].isTss() || storageServers[firstValidStorageServer].isTss() - ? "True" - : "False"); - } - } - } - } catch (Error& e) { - TraceEvent("ConsistencyCheck_ErrorFetchingMetrics") - .error(e) - .detail("Begin", printable(shard.begin)) - .detail("End", printable(shard.end)); - estimatedBytes.clear(); - } - - return estimatedBytes; - } - // Comparison function used to compare map elements by value template static bool compareByValue(std::pair a, std::pair b) { return a.second < b.second; } - ACTOR Future getDatabaseSize(Database cx) { - state Transaction tr(cx); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - loop { - try { - StorageMetrics metrics = - wait(tr.getDatabase()->getStorageMetrics(KeyRangeRef(allKeys.begin, keyServersPrefix), 100000)); - return metrics.bytes; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - } - - // Checks that the data in each shard is the same on each storage server that it resides on. Also performs some - // sanity checks on the sizes of shards and storage servers. Returns false if there is a failure - ACTOR Future checkDataConsistency(Database cx, - VectorRef keyLocations, - DatabaseConfiguration configuration, - std::map tssMapping, - ConsistencyCheckWorkload* self) { - // Stores the total number of bytes on each storage server - // In a distributed test, this will be an estimated size - state std::map storageServerSizes; - - // Iterate through each shard, checking its values on all of its storage servers - // If shardSampleFactor > 1, then not all shards are processed - // Also, in a distributed data consistency check, each client processes a subset of the shards - // Note: this may cause some shards to be processed more than once or not at all in a non-quiescent database - state int effectiveClientCount = (self->distributed) ? self->clientCount : 1; - state int i = self->clientId * (self->shardSampleFactor + 1); - state int increment = - (self->distributed && !self->firstClient) ? effectiveClientCount * self->shardSampleFactor : 1; - state int rateLimitForThisRound = - self->bytesReadInPreviousRound == 0 - ? self->rateLimitMax - : std::min( - self->rateLimitMax, - static_cast(ceil(self->bytesReadInPreviousRound / - (float)CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME))); - ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= self->rateLimitMax); - TraceEvent("ConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound); - state Reference rateLimiter = Reference(new SpeedLimit(rateLimitForThisRound, 1)); - state double rateLimiterStartTime = now(); - state int64_t bytesReadInthisRound = 0; - - state double dbSize = 100e12; - if (g_network->isSimulated()) { - // This call will get all shard ranges in the database, which is too expensive on real clusters. - int64_t _dbSize = wait(self->getDatabaseSize(cx)); - dbSize = _dbSize; - } - - state std::vector ranges; - - for (int k = 0; k < keyLocations.size() - 1; k++) { - KeyRangeRef range(keyLocations[k].key, keyLocations[k + 1].key); - ranges.push_back(range); - } - - state std::vector shardOrder; - shardOrder.reserve(ranges.size()); - for (int k = 0; k < ranges.size(); k++) - shardOrder.push_back(k); - if (self->shuffleShards) { - uint32_t seed = self->sharedRandomNumber + self->repetitions; - DeterministicRandom sharedRandom(seed == 0 ? 1 : seed); - sharedRandom.randomShuffle(shardOrder); - } - - for (; i < ranges.size(); i += increment) { - state int shard = shardOrder[i]; - - state KeyRangeRef range = ranges[shard]; - state std::vector sourceStorageServers; - state std::vector destStorageServers; - state Transaction tr(cx); - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - state int bytesReadInRange = 0; - - RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); - ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); - decodeKeyServersValue( - UIDtoTagMap, keyLocations[shard].value, sourceStorageServers, destStorageServers, false); - - // If the destStorageServers is non-empty, then this shard is being relocated - state bool isRelocating = destStorageServers.size() > 0; - - // This check was disabled because we now disable data distribution during the consistency check, - // which can leave shards with dest storage servers. - - // Disallow relocations in a quiescent database - /*if(self->firstClient && self->performQuiescentChecks && isRelocating) - { - TraceEvent("ConsistencyCheck_QuiescentShardRelocation").detail("ShardBegin", printable(range.start)).detail("ShardEnd", printable(range.end)); - self->testFailure("Shard is being relocated in quiescent database"); - return false; - }*/ - - // In a quiescent database, check that the team size is the same as the desired team size - if (self->firstClient && self->performQuiescentChecks && - sourceStorageServers.size() != configuration.usableRegions * configuration.storageTeamSize) { - TraceEvent("ConsistencyCheck_InvalidTeamSize") - .detail("ShardBegin", printable(range.begin)) - .detail("ShardEnd", printable(range.end)) - .detail("SourceTeamSize", sourceStorageServers.size()) - .detail("DestServerSize", destStorageServers.size()) - .detail("ConfigStorageTeamSize", configuration.storageTeamSize) - .detail("UsableRegions", configuration.usableRegions); - // Record the server reponsible for the problematic shards - int i = 0; - for (auto& id : sourceStorageServers) { - TraceEvent("IncorrectSizeTeamInfo").detail("ServerUID", id).detail("TeamIndex", i++); - } - self->testFailure("Invalid team size"); - return false; - } - - state std::vector storageServers = (isRelocating) ? destStorageServers : sourceStorageServers; - state std::vector storageServerInterfaces; - - //TraceEvent("ConsistencyCheck_GetStorageInfo").detail("StorageServers", storageServers.size()); - loop { - try { - std::vector>> serverListEntries; - serverListEntries.reserve(storageServers.size()); - for (int s = 0; s < storageServers.size(); s++) - serverListEntries.push_back(tr.get(serverListKeyFor(storageServers[s]))); - state std::vector> serverListValues = wait(getAll(serverListEntries)); - for (int s = 0; s < serverListValues.size(); s++) { - if (serverListValues[s].present()) - storageServerInterfaces.push_back(decodeServerListValue(serverListValues[s].get())); - else if (self->performQuiescentChecks) - self->testFailure("/FF/serverList changing in a quiescent database"); - } - - break; - } catch (Error& e) { - wait(tr.onError(e)); - } - } - - // add TSS to end of list, if configured and if not relocating - if (!isRelocating && self->performTSSCheck) { - int initialSize = storageServers.size(); - for (int i = 0; i < initialSize; i++) { - auto tssPair = tssMapping.find(storageServers[i]); - if (tssPair != tssMapping.end()) { - CODE_PROBE(true, "TSS checked in consistency check"); - storageServers.push_back(tssPair->second.id()); - storageServerInterfaces.push_back(tssPair->second); - } - } - } - - state std::vector estimatedBytes = - wait(self->getStorageSizeEstimate(storageServerInterfaces, range)); - - // Gets permitted size range of shard - int64_t maxShardSize = getMaxShardSize(dbSize); - state ShardSizeBounds shardBounds = getShardSizeBounds(range, maxShardSize); - - if (self->firstClient) { - // If there was an error retrieving shard estimated size - if (self->performQuiescentChecks && estimatedBytes.size() == 0) - self->testFailure("Error fetching storage metrics"); - - // If running a distributed test, storage server size is an accumulation of shard estimates - else if (self->distributed && self->firstClient) - for (int j = 0; j < storageServers.size(); j++) - storageServerSizes[storageServers[j]] += std::max(estimatedBytes[j], (int64_t)0); - } - - // The first client may need to skip the rest of the loop contents if it is just processing this shard to - // get a size estimate - if (!self->firstClient || shard % (effectiveClientCount * self->shardSampleFactor) == 0) { - state int shardKeys = 0; - state int shardBytes = 0; - state int sampledBytes = 0; - state int splitBytes = 0; - state int firstKeySampledBytes = 0; - state int sampledKeys = 0; - state int sampledKeysWithProb = 0; - state double shardVariance = 0; - state bool canSplit = false; - state Key lastSampleKey; - state Key lastStartSampleKey; - state int64_t totalReadAmount = 0; - - state KeySelector begin = firstGreaterOrEqual(range.begin); - state Transaction onErrorTr( - cx); // This transaction exists only to access onError and its backoff behavior - - // Read a limited number of entries at a time, repeating until all keys in the shard have been read - loop { - try { - lastSampleKey = lastStartSampleKey; - - // Get the min version of the storage servers - Version version = wait(self->getVersion(cx, self)); - - state GetKeyValuesRequest req; - req.begin = begin; - req.end = firstGreaterOrEqual(range.end); - req.limit = 1e4; - req.limitBytes = CLIENT_KNOBS->REPLY_BYTE_LIMIT; - req.version = version; - req.tags = TagSet(); - - // Try getting the entries in the specified range - state std::vector>> keyValueFutures; - state int j = 0; - for (j = 0; j < storageServerInterfaces.size(); j++) { - resetReply(req); - keyValueFutures.push_back( - storageServerInterfaces[j].getKeyValues.getReplyUnlessFailedFor(req, 2, 0)); - } - - wait(waitForAll(keyValueFutures)); - - // Read the resulting entries - state int firstValidServer = -1; - totalReadAmount = 0; - for (j = 0; j < keyValueFutures.size(); j++) { - ErrorOr rangeResult = keyValueFutures[j].get(); - - // Compare the results with other storage servers - if (rangeResult.present() && !rangeResult.get().error.present()) { - state GetKeyValuesReply current = rangeResult.get(); - totalReadAmount += current.data.expectedSize(); - // If we haven't encountered a valid storage server yet, then mark this as the baseline - // to compare against - if (firstValidServer == -1) - firstValidServer = j; - - // Compare this shard against the first - else { - GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get(); - - if (current.data != reference.data || current.more != reference.more) { - // Be especially verbose if in simulation - if (g_network->isSimulated()) { - int invalidIndex = -1; - printf("\n%sSERVER %d (%s); shard = %s - %s:\n", - storageServerInterfaces[j].isTss() ? "TSS " : "", - j, - storageServerInterfaces[j].address().toString().c_str(), - printable(req.begin.getKey()).c_str(), - printable(req.end.getKey()).c_str()); - for (int k = 0; k < current.data.size(); k++) { - printf("%d. %s => %s\n", - k, - printable(current.data[k].key).c_str(), - printable(current.data[k].value).c_str()); - if (invalidIndex < 0 && - (k >= reference.data.size() || - current.data[k].key != reference.data[k].key || - current.data[k].value != reference.data[k].value)) - invalidIndex = k; - } - - printf( - "\n%sSERVER %d (%s); shard = %s - %s:\n", - storageServerInterfaces[firstValidServer].isTss() ? "TSS " : "", - firstValidServer, - storageServerInterfaces[firstValidServer].address().toString().c_str(), - printable(req.begin.getKey()).c_str(), - printable(req.end.getKey()).c_str()); - for (int k = 0; k < reference.data.size(); k++) { - printf("%d. %s => %s\n", - k, - printable(reference.data[k].key).c_str(), - printable(reference.data[k].value).c_str()); - if (invalidIndex < 0 && - (k >= current.data.size() || - reference.data[k].key != current.data[k].key || - reference.data[k].value != current.data[k].value)) - invalidIndex = k; - } - - printf("\nMISMATCH AT %d\n\n", invalidIndex); - } - - // Data for trace event - // The number of keys unique to the current shard - int currentUniques = 0; - // The number of keys unique to the reference shard - int referenceUniques = 0; - // The number of keys in both shards with conflicting values - int valueMismatches = 0; - // The number of keys in both shards with matching values - int matchingKVPairs = 0; - // Last unique key on the current shard - KeyRef currentUniqueKey; - // Last unique key on the reference shard - KeyRef referenceUniqueKey; - // Last value mismatch - KeyRef valueMismatchKey; - - // Loop indeces - int currentI = 0; - int referenceI = 0; - while (currentI < current.data.size() || referenceI < reference.data.size()) { - if (currentI >= current.data.size()) { - referenceUniqueKey = reference.data[referenceI].key; - referenceUniques++; - referenceI++; - } else if (referenceI >= reference.data.size()) { - currentUniqueKey = current.data[currentI].key; - currentUniques++; - currentI++; - } else { - KeyValueRef currentKV = current.data[currentI]; - KeyValueRef referenceKV = reference.data[referenceI]; - - if (currentKV.key == referenceKV.key) { - if (currentKV.value == referenceKV.value) - matchingKVPairs++; - else { - valueMismatchKey = currentKV.key; - valueMismatches++; - } - - currentI++; - referenceI++; - } else if (currentKV.key < referenceKV.key) { - currentUniqueKey = currentKV.key; - currentUniques++; - currentI++; - } else { - referenceUniqueKey = referenceKV.key; - referenceUniques++; - referenceI++; - } - } - } - - TraceEvent("ConsistencyCheck_DataInconsistent") - .detail(format("StorageServer%d", j).c_str(), storageServers[j].toString()) - .detail(format("StorageServer%d", firstValidServer).c_str(), - storageServers[firstValidServer].toString()) - .detail("ShardBegin", req.begin.getKey()) - .detail("ShardEnd", req.end.getKey()) - .detail("VersionNumber", req.version) - .detail(format("Server%dUniques", j).c_str(), currentUniques) - .detail(format("Server%dUniqueKey", j).c_str(), currentUniqueKey) - .detail(format("Server%dUniques", firstValidServer).c_str(), - referenceUniques) - .detail(format("Server%dUniqueKey", firstValidServer).c_str(), - referenceUniqueKey) - .detail("ValueMismatches", valueMismatches) - .detail("ValueMismatchKey", valueMismatchKey) - .detail("MatchingKVPairs", matchingKVPairs) - .detail("IsTSS", - storageServerInterfaces[j].isTss() || - storageServerInterfaces[firstValidServer].isTss() - ? "True" - : "False"); - - if ((g_network->isSimulated() && - g_simulator->tssMode != ISimulator::TSSMode::EnabledDropMutations) || - (!storageServerInterfaces[j].isTss() && - !storageServerInterfaces[firstValidServer].isTss())) { - self->testFailure("Data inconsistent", true); - return false; - } - } - } - } - - // If the data is not available and we aren't relocating this shard - else if (!isRelocating) { - Error e = - rangeResult.isError() ? rangeResult.getError() : rangeResult.get().error.get(); - - TraceEvent("ConsistencyCheck_StorageServerUnavailable") - .errorUnsuppressed(e) - .suppressFor(1.0) - .detail("StorageServer", storageServers[j]) - .detail("ShardBegin", printable(range.begin)) - .detail("ShardEnd", printable(range.end)) - .detail("Address", storageServerInterfaces[j].address()) - .detail("UID", storageServerInterfaces[j].id()) - .detail("GetKeyValuesToken", - storageServerInterfaces[j].getKeyValues.getEndpoint().token) - .detail("IsTSS", storageServerInterfaces[j].isTss() ? "True" : "False"); - - // All shards should be available in quiscence - if (self->performQuiescentChecks && !storageServerInterfaces[j].isTss()) { - self->testFailure("Storage server unavailable"); - return false; - } - } - } - - if (firstValidServer >= 0) { - VectorRef data = keyValueFutures[firstValidServer].get().get().data; - // Calculate the size of the shard, the variance of the shard size estimate, and the correct - // shard size estimate - for (int k = 0; k < data.size(); k++) { - ByteSampleInfo sampleInfo = isKeyValueInSample(data[k]); - shardBytes += sampleInfo.size; - double itemProbability = ((double)sampleInfo.size) / sampleInfo.sampledSize; - if (itemProbability < 1) - shardVariance += itemProbability * (1 - itemProbability) * - pow((double)sampleInfo.sampledSize, 2); - - if (sampleInfo.inSample) { - sampledBytes += sampleInfo.sampledSize; - if (!canSplit && sampledBytes >= shardBounds.min.bytes && - data[k].key.size() <= CLIENT_KNOBS->SPLIT_KEY_SIZE_LIMIT && - sampledBytes <= shardBounds.max.bytes * - CLIENT_KNOBS->STORAGE_METRICS_UNFAIR_SPLIT_LIMIT / 2) { - canSplit = true; - splitBytes = sampledBytes; - } - - /*TraceEvent("ConsistencyCheck_ByteSample").detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end)) - .detail("SampledBytes", sampleInfo.sampledSize).detail("Key", - printable(data[k].key)).detail("KeySize", data[k].key.size()).detail("ValueSize", - data[k].value.size());*/ - - // In data distribution, the splitting process ignores the first key in a shard. - // Thus, we shouldn't consider it when validating the upper bound of estimated shard - // sizes - if (k == 0) - firstKeySampledBytes += sampleInfo.sampledSize; - - sampledKeys++; - if (itemProbability < 1) { - sampledKeysWithProb++; - } - } - } - - // Accumulate number of keys in this shard - shardKeys += data.size(); - } - // after requesting each shard, enforce rate limit based on how much data will likely be read - if (rateLimitForThisRound > 0) { - wait(rateLimiter->getAllowance(totalReadAmount)); - // Set ratelimit to max allowed if current round has been going on for a while - if (now() - rateLimiterStartTime > - 1.1 * CLIENT_KNOBS->CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME && - rateLimitForThisRound != self->rateLimitMax) { - rateLimitForThisRound = self->rateLimitMax; - rateLimiter = Reference(new SpeedLimit(rateLimitForThisRound, 1)); - rateLimiterStartTime = now(); - TraceEvent(SevInfo, "ConsistencyCheck_RateLimitSetMaxForThisRound") - .detail("RateLimit", rateLimitForThisRound); - } - } - bytesReadInRange += totalReadAmount; - bytesReadInthisRound += totalReadAmount; - - // Advance to the next set of entries - if (firstValidServer >= 0 && keyValueFutures[firstValidServer].get().get().more) { - VectorRef result = keyValueFutures[firstValidServer].get().get().data; - ASSERT(result.size() > 0); - begin = firstGreaterThan(result[result.size() - 1].key); - ASSERT(begin.getKey() != allKeys.end); - lastStartSampleKey = lastSampleKey; - } else - break; - } catch (Error& e) { - state Error err = e; - wait(onErrorTr.onError(err)); - TraceEvent("ConsistencyCheck_RetryDataConsistency").error(err); - } - } - - canSplit = canSplit && sampledBytes - splitBytes >= shardBounds.min.bytes && sampledBytes > splitBytes; - - // Update the size of all storage servers containing this shard - // This is only done in a non-distributed consistency check; the distributed check uses shard size - // estimates - if (!self->distributed) - for (int j = 0; j < storageServers.size(); j++) - storageServerSizes[storageServers[j]] += shardBytes; - - // FIXME: Where is this intended to be used? - [[maybe_unused]] bool hasValidEstimate = estimatedBytes.size() > 0; - - // If the storage servers' sampled estimate of shard size is different from ours - if (self->performQuiescentChecks) { - for (int j = 0; j < estimatedBytes.size(); j++) { - if (estimatedBytes[j] >= 0 && estimatedBytes[j] != sampledBytes) { - TraceEvent("ConsistencyCheck_IncorrectEstimate") - .detail("EstimatedBytes", estimatedBytes[j]) - .detail("CorrectSampledBytes", sampledBytes) - .detail("StorageServer", storageServers[j]) - .detail("IsTSS", storageServerInterfaces[j].isTss() ? "True" : "False"); - - if (!storageServerInterfaces[j].isTss()) { - self->testFailure("Storage servers had incorrect sampled estimate"); - } - - hasValidEstimate = false; - - break; - } else if (estimatedBytes[j] < 0 && - ((g_network->isSimulated() && - g_simulator->tssMode <= ISimulator::TSSMode::EnabledNormal) || - !storageServerInterfaces[j].isTss())) { - // Ignore a non-responding TSS outside of simulation, or if tss fault injection is enabled - hasValidEstimate = false; - break; - } - } - } - - // Compute the difference between the shard size estimate and its actual size. If it is sufficiently - // large, then fail - double stdDev = sqrt(shardVariance); - - double failErrorNumStdDev = 7; - int estimateError = abs(shardBytes - sampledBytes); - - // Only perform the check if there are sufficient keys to get a distribution that should resemble a - // normal distribution - if (sampledKeysWithProb > 30 && estimateError > failErrorNumStdDev * stdDev) { - double numStdDev = estimateError / sqrt(shardVariance); - TraceEvent("ConsistencyCheck_InaccurateShardEstimate") - .detail("Min", shardBounds.min.bytes) - .detail("Max", shardBounds.max.bytes) - .detail("Estimate", sampledBytes) - .detail("Actual", shardBytes) - .detail("NumStdDev", numStdDev) - .detail("Variance", shardVariance) - .detail("StdDev", stdDev) - .detail("ShardBegin", printable(range.begin)) - .detail("ShardEnd", printable(range.end)) - .detail("NumKeys", shardKeys) - .detail("NumSampledKeys", sampledKeys) - .detail("NumSampledKeysWithProb", sampledKeysWithProb); - - self->testFailure(format("Shard size is more than %f std dev from estimate", failErrorNumStdDev)); - } - - // In a quiescent database, check that the (estimated) size of the shard is within permitted bounds - // Min and max shard sizes have a 3 * shardBounds.permittedError.bytes cushion for error since shard - // sizes are not precise Shard splits ignore the first key in a shard, so its size shouldn't be - // considered when checking the upper bound 0xff shards are not checked - if (canSplit && sampledKeys > 5 && self->performQuiescentChecks && - !range.begin.startsWith(keyServersPrefix) && - (sampledBytes < shardBounds.min.bytes - 3 * shardBounds.permittedError.bytes || - sampledBytes - firstKeySampledBytes > - shardBounds.max.bytes + 3 * shardBounds.permittedError.bytes)) { - TraceEvent("ConsistencyCheck_InvalidShardSize") - .detail("Min", shardBounds.min.bytes) - .detail("Max", shardBounds.max.bytes) - .detail("Size", shardBytes) - .detail("EstimatedSize", sampledBytes) - .detail("ShardBegin", printable(range.begin)) - .detail("ShardEnd", printable(range.end)) - .detail("ShardCount", ranges.size()) - .detail("SampledKeys", sampledKeys); - self->testFailure(format("Shard size in quiescent database is too %s", - (sampledBytes < shardBounds.min.bytes) ? "small" : "large")); - return false; - } - } - - if (bytesReadInRange > 0) { - TraceEvent("ConsistencyCheck_ReadRange") - .suppressFor(1.0) - .detail("Range", range) - .detail("BytesRead", bytesReadInRange); - } - } - - // SOMEDAY: when background data distribution is implemented, include this test - // In a quiescent database, check that the sizes of storage servers are roughly the same - /*if(self->performQuiescentChecks) - { - auto minStorageServer = std::min_element(storageServerSizes.begin(), storageServerSizes.end(), - ConsistencyCheckWorkload::compareByValue); auto maxStorageServer = - std::max_element(storageServerSizes.begin(), storageServerSizes.end(), - ConsistencyCheckWorkload::compareByValue); - - int bias = SERVER_KNOBS->MIN_SHARD_BYTES; - if(1.1 * (minStorageServer->second + SERVER_KNOBS->MIN_SHARD_BYTES) < maxStorageServer->second + - SERVER_KNOBS->MIN_SHARD_BYTES) - { - TraceEvent("ConsistencyCheck_InconsistentStorageServerSizes").detail("MinSize", minStorageServer->second).detail("MaxSize", maxStorageServer->second) - .detail("MinStorageServer", minStorageServer->first).detail("MaxStorageServer", - maxStorageServer->first); - - self->testFailure(format("Storage servers differ significantly in size by a factor of %f", - ((double)maxStorageServer->second) / minStorageServer->second)); return false; - } - }*/ - - self->bytesReadInPreviousRound = bytesReadInthisRound; - return true; - } - // Returns true if any storage servers have the exact same network address or are not using the correct key value // store type ACTOR Future checkForUndesirableServers(Database cx,