add consistency-check-urgent-mode to tester process class (#11484)

This commit is contained in:
Zhe Wang 2024-07-09 13:21:37 -07:00 committed by GitHub
parent c1ba8acaab
commit e9505506df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 38 additions and 12 deletions

View File

@ -849,7 +849,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
whitelistBinPaths,
"",
{},
configDBType));
configDBType,
false));
}
if (processRunBackupAgent(processMode)) {
futures.push_back(runBackup(connRecord));

View File

@ -118,7 +118,7 @@ enum {
OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE,
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_CONFIG_PATH, OPT_USE_TEST_CONFIG_DB, OPT_NO_CONFIG_DB, OPT_FAULT_INJECTION, OPT_PROFILER, OPT_PRINT_SIMTIME,
OPT_FLOW_PROCESS_NAME, OPT_FLOW_PROCESS_ENDPOINT, OPT_IP_TRUSTED_MASK, OPT_KMS_CONN_DISCOVERY_URL_FILE, OPT_KMS_CONNECTOR_TYPE, OPT_KMS_REST_ALLOW_NOT_SECURE_CONECTION, OPT_KMS_CONN_VALIDATION_TOKEN_DETAILS,
OPT_KMS_CONN_GET_ENCRYPTION_KEYS_ENDPOINT, OPT_KMS_CONN_GET_LATEST_ENCRYPTION_KEYS_ENDPOINT, OPT_KMS_CONN_GET_BLOB_METADATA_ENDPOINT, OPT_NEW_CLUSTER_KEY, OPT_AUTHZ_PUBLIC_KEY_FILE, OPT_USE_FUTURE_PROTOCOL_VERSION
OPT_KMS_CONN_GET_ENCRYPTION_KEYS_ENDPOINT, OPT_KMS_CONN_GET_LATEST_ENCRYPTION_KEYS_ENDPOINT, OPT_KMS_CONN_GET_BLOB_METADATA_ENDPOINT, OPT_NEW_CLUSTER_KEY, OPT_AUTHZ_PUBLIC_KEY_FILE, OPT_USE_FUTURE_PROTOCOL_VERSION, OPT_CONSISTENCY_CHECK_URGENT_MODE
};
CSimpleOpt::SOption g_rgOptions[] = {
@ -223,6 +223,7 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_KMS_CONN_GET_LATEST_ENCRYPTION_KEYS_ENDPOINT, "--kms-conn-get-latest-encryption-keys-endpoint", SO_REQ_SEP },
{ OPT_KMS_CONN_GET_BLOB_METADATA_ENDPOINT, "--kms-conn-get-blob-metadata-endpoint", SO_REQ_SEP },
{ OPT_USE_FUTURE_PROTOCOL_VERSION, "--use-future-protocol-version", SO_REQ_SEP },
{ OPT_CONSISTENCY_CHECK_URGENT_MODE, "--consistency-check-urgent-mode", SO_NONE },
TLS_OPTION_FLAGS,
SO_END_OF_OPTIONS
};
@ -1068,6 +1069,7 @@ struct CLIOptions {
LocalityData localities;
int minTesterCount = 1;
bool testOnServers = false;
bool consistencyCheckUrgentMode = false;
TLSConfig tlsConfig = TLSConfig(TLSEndpointType::SERVER);
double fileIoTimeout = 0.0;
@ -1586,6 +1588,9 @@ private:
case OPT_TEST_ON_SERVERS:
testOnServers = true;
break;
case OPT_CONSISTENCY_CHECK_URGENT_MODE:
consistencyCheckUrgentMode = true;
break;
case OPT_METRICSCONNFILE:
metricsConnFile = args.OptionArg();
break;
@ -2341,7 +2346,8 @@ int main(int argc, char* argv[]) {
opts.whitelistBinPaths,
opts.configPath,
opts.manualKnobOverrides,
opts.configDBType));
opts.configDBType,
opts.consistencyCheckUrgentMode));
actors.push_back(histogramReport());
// actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement

View File

@ -124,7 +124,8 @@ struct TesterInterface {
ACTOR Future<Void> testerServerCore(TesterInterface interf,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<struct ServerDBInfo> const> serverDBInfo,
LocalityData locality);
LocalityData locality,
Optional<std::string> expectedWorkLoad = Optional<std::string>());
enum test_location_t { TEST_HERE, TEST_ON_SERVERS, TEST_ON_TESTERS };
enum test_type_t {

View File

@ -1200,7 +1200,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> ccr,
std::string whitelistBinPaths,
std::string configPath,
std::map<std::string, std::string> manualKnobOverrides,
ConfigDBType configDBType);
ConfigDBType configDBType,
bool consistencyCheckUrgentMode);
ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,

View File

@ -935,7 +935,8 @@ ACTOR Future<Void> testerServerWorkload(WorkloadRequest work,
ACTOR Future<Void> testerServerCore(TesterInterface interf,
Reference<IClusterConnectionRecord> ccr,
Reference<AsyncVar<struct ServerDBInfo> const> dbInfo,
LocalityData locality) {
LocalityData locality,
Optional<std::string> expectedWorkLoad) {
state PromiseStream<Future<Void>> addWorkload;
state Future<Void> workerFatalError = actorCollection(addWorkload.getFuture());
@ -943,7 +944,8 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
// At any time, we only allow at most 1 consistency checker workload on a server
state std::pair<int64_t, Future<Void>> consistencyCheckerUrgentTester = std::make_pair(0, Future<Void>());
TraceEvent("StartingTesterServerCore", interf.id()).log();
TraceEvent("StartingTesterServerCore", interf.id())
.detail("ExpectedWorkload", expectedWorkLoad.present() ? expectedWorkLoad.get() : "[Unset]");
loop choose {
when(wait(workerFatalError)) {}
when(wait(consistencyCheckerUrgentTester.second.isValid() ? consistencyCheckerUrgentTester.second : Never())) {
@ -953,7 +955,14 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
consistencyCheckerUrgentTester = std::make_pair(0, Future<Void>()); // reset
}
when(WorkloadRequest work = waitNext(interf.recruitments.getFuture())) {
if (work.title == "ConsistencyCheckUrgent") {
if (expectedWorkLoad.present() && expectedWorkLoad.get() != work.title) {
TraceEvent(SevError, "StartingTesterServerCoreUnexpectedWorkload", interf.id())
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount)
.detail("ExpectedWorkLoad", expectedWorkLoad.get())
.detail("WorkLoad", work.title);
// Drop the workload
} else if (work.title == "ConsistencyCheckUrgent") {
// The workload is a consistency checker urgent workload
if (work.sharedRandomNumber == consistencyCheckerUrgentTester.first) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_TesterDuplicatedRequest", interf.id())

View File

@ -2083,7 +2083,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
ConfigBroadcastInterface configBroadcastInterface,
Reference<ConfigNode> configNode,
Reference<LocalConfiguration> localConfig,
Reference<AsyncVar<Optional<UID>>> clusterId) {
Reference<AsyncVar<Optional<UID>>> clusterId,
bool consistencyCheckUrgentMode) {
state PromiseStream<ErrorInfo> errors;
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
new AsyncVar<Optional<DataDistributorInterface>>());
@ -2182,7 +2183,12 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
errorForwarders.add(loadedPonger(interf.debugPing.getFuture()));
errorForwarders.add(waitFailureServer(interf.waitFailure.getFuture()));
errorForwarders.add(monitorTraceLogIssues(issues));
errorForwarders.add(testerServerCore(interf.testerInterface, connRecord, dbInfo, locality));
errorForwarders.add(
testerServerCore(interf.testerInterface,
connRecord,
dbInfo,
locality,
consistencyCheckUrgentMode ? "ConsistencyCheckUrgent" : Optional<std::string>()));
errorForwarders.add(monitorHighMemory(memoryProfileThreshold));
filesClosed.add(stopping.getFuture());
@ -4139,7 +4145,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
std::string whitelistBinPaths,
std::string configPath,
std::map<std::string, std::string> manualKnobOverrides,
ConfigDBType configDBType) {
ConfigDBType configDBType,
bool consistencyCheckUrgentMode) {
state std::vector<Future<Void>> actors;
state Reference<ConfigNode> configNode;
state Reference<LocalConfiguration> localConfig;
@ -4232,7 +4239,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
configBroadcastInterface,
configNode,
localConfig,
clusterId),
clusterId,
consistencyCheckUrgentMode),
"WorkerServer",
UID(),
&normalWorkerErrors()));