diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index b5cd8bd4e4..973f36d742 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -450,16 +450,21 @@ bool isCompleteConfiguration(std::map const& options) options.count(p + "storage_engine") == 1; } +ACTOR Future getDatabaseConfiguration(Transaction* tr) { + tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); + tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + RangeResult res = wait(tr->getRange(configKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(res.size() < CLIENT_KNOBS->TOO_MANY); + DatabaseConfiguration config; + config.fromKeyValues((VectorRef)res); + return config; +} + ACTOR Future getDatabaseConfiguration(Database cx) { state Transaction tr(cx); loop { try { - tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE); - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - RangeResult res = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY)); - ASSERT(res.size() < CLIENT_KNOBS->TOO_MANY); - DatabaseConfiguration config; - config.fromKeyValues((VectorRef)res); + DatabaseConfiguration config = wait(getDatabaseConfiguration(&tr)); return config; } catch (Error& e) { wait(tr.onError(e)); diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index cd8b0a97f1..4aae6fdd2a 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -117,7 +117,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi // disk snapshot max timeout, to be put in TLog, storage and coordinator nodes init( MAX_FORKED_PROCESS_OUTPUT, 1024 ); - init( SNAP_CREATE_MAX_TIMEOUT, 300.0 ); + init( SNAP_CREATE_MAX_TIMEOUT, isSimulated ? 70.0 : 300.0 ); + init( SNAP_MINIMUM_TIME_GAP, 5.0 ); + init( SNAP_NETWORK_FAILURE_RETRY_LIMIT, 10 ); init( MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE, 1 ); init( MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE, 1 ); diff --git a/fdbclient/include/fdbclient/ManagementAPI.actor.h b/fdbclient/include/fdbclient/ManagementAPI.actor.h index d4b551eeaa..63f56242f7 100644 --- a/fdbclient/include/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/include/fdbclient/ManagementAPI.actor.h @@ -41,6 +41,7 @@ standard API and some knowledge of the contents of the system key space. #include "fdbclient/MonitorLeader.h" #include "flow/actorcompiler.h" // has to be last include +ACTOR Future getDatabaseConfiguration(Transaction* tr); ACTOR Future getDatabaseConfiguration(Database cx); ACTOR Future waitForFullReplication(Database cx); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index c3ba0bf8c6..329fc55e08 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -615,7 +615,12 @@ public: // disk snapshot int64_t MAX_FORKED_PROCESS_OUTPUT; + // retry limit after network failures + int64_t SNAP_NETWORK_FAILURE_RETRY_LIMIT; + // time limit for creating snapshot double SNAP_CREATE_MAX_TIMEOUT; + // minimum gap time between two snapshot requests for the same process + double SNAP_MINIMUM_TIME_GAP; // Maximum number of storage servers a snapshot can fail to // capture while still succeeding int64_t MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE; diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 9df56a83aa..5f5c743650 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -2086,21 +2086,32 @@ ACTOR Future proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co throw snap_log_anti_quorum_unsupported(); } - // send a snap request to DD - if (!commitData->db->get().distributor.present()) { - TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest"); - throw dd_not_found(); - } - state Future> ddSnapReq = commitData->db->get().distributor.get().distributorSnapReq.tryGetReply( - DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID)); - try { - wait(throwErrorOr(ddSnapReq)); - } catch (Error& e) { - TraceEvent("SnapCommitProxy_DDSnapResponseError") - .errorUnsuppressed(e) - .detail("SnapPayload", snapReq.snapPayload) - .detail("SnapUID", snapReq.snapUID); - throw e; + state int snapReqRetry = 0; + state double snapRetryBackoff = FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY; + loop { + // send a snap request to DD + if (!commitData->db->get().distributor.present()) { + TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest"); + throw dd_not_found(); + } + try { + Future> ddSnapReq = + commitData->db->get().distributor.get().distributorSnapReq.tryGetReply( + DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID)); + wait(throwErrorOr(ddSnapReq)); + break; + } catch (Error& e) { + TraceEvent("SnapCommitProxy_DDSnapResponseError") + .errorUnsuppressed(e) + .detail("SnapPayload", snapReq.snapPayload) + .detail("SnapUID", snapReq.snapUID); + // Retry if we have network issues + if (e.code() != error_code_request_maybe_delivered || + ++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT) + throw e; + wait(delay(snapRetryBackoff)); + snapRetryBackoff = snapRetryBackoff * 2; // exponential backoff + } } snapReq.reply.send(Void()); } catch (Error& e) { diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index d80d25c6b2..429b3c4b64 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -876,14 +876,26 @@ Future sendSnapReq(RequestStream stream, Req req, Error e) { return Void(); } -ACTOR template -Future> trySendSnapReq(RequestStream stream, Req req) { - ErrorOr reply = wait(stream.tryGetReply(req)); - if (reply.isError()) { - TraceEvent("SnapDataDistributor_ReqError") - .errorUnsuppressed(reply.getError()) - .detail("Peer", stream.getEndpoint().getPrimaryAddress()); - return ErrorOr(reply.getError()); +ACTOR Future> trySendSnapReq(RequestStream stream, WorkerSnapRequest req) { + state int snapReqRetry = 0; + state double snapRetryBackoff = FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY; + loop { + ErrorOr reply = wait(stream.tryGetReply(req)); + if (reply.isError()) { + TraceEvent("SnapDataDistributor_ReqError") + .errorUnsuppressed(reply.getError()) + .detail("Peer", stream.getEndpoint().getPrimaryAddress()); + if (reply.getError().code() != error_code_request_maybe_delivered || + ++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT) + return ErrorOr(reply.getError()); + else { + // retry for network failures with same snap UID to avoid snapshot twice + req = WorkerSnapRequest(req.snapPayload, req.snapUID, req.role); + wait(delay(snapRetryBackoff)); + snapRetryBackoff = snapRetryBackoff * 2; + } + } else + break; } return ErrorOr(Void()); } @@ -906,6 +918,124 @@ ACTOR static Future waitForMost(std::vector>> futures return Void(); } +ACTOR Future>> getStatefulWorkers( + Database cx, + Reference const> dbInfo, + std::vector* tlogs, + int* storageFaultTolerance) { + state std::map> result; + state std::map workersMap; + state Transaction tr(cx); + state DatabaseConfiguration configuration; + loop { + try { + // necessary options + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + + // get database configuration + DatabaseConfiguration _configuration = wait(getDatabaseConfiguration(&tr)); + configuration = _configuration; + + // get storages + RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY); + state std::vector storageServers; + storageServers.reserve(serverList.size()); + for (int i = 0; i < serverList.size(); i++) + storageServers.push_back(decodeServerListValue(serverList[i].value)); + + // get workers + state std::vector workers = wait(getWorkers(dbInfo)); + for (const auto& worker : workers) { + workersMap[worker.interf.address()] = worker.interf; + } + + Optional regionsValue = + wait(tr.get(LiteralStringRef("usable_regions").withPrefix(configKeysPrefix))); + int usableRegions = 1; + if (regionsValue.present()) { + usableRegions = atoi(regionsValue.get().toString().c_str()); + } + auto masterDcId = dbInfo->get().master.locality.dcId(); + int storageFailures = 0; + for (const auto& server : storageServers) { + TraceEvent(SevDebug, "StorageServerDcIdInfo") + .detail("Address", server.address().toString()) + .detail("ServerLocalityID", server.locality.dcId()) + .detail("MasterDcID", masterDcId); + if (usableRegions == 1 || server.locality.dcId() == masterDcId) { + auto itr = workersMap.find(server.address()); + if (itr == workersMap.end()) { + TraceEvent(SevWarn, "GetStorageWorkers") + .detail("Reason", "Could not find worker for storage server") + .detail("SS", server.id()); + ++storageFailures; + } else { + if (result.count(server.address())) { + ASSERT(itr->second.id() == result[server.address()].first.id()); + if (result[server.address()].second.find("storage") == std::string::npos) + result[server.address()].second.append(",storage"); + } else { + result[server.address()] = std::make_pair(itr->second, "storage"); + } + } + } + } + // calculate fault tolerance + *storageFaultTolerance = std::min(static_cast(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE), + configuration.storageTeamSize - 1) - + storageFailures; + if (*storageFaultTolerance < 0) { + TEST(true); // Too many failed storage servers to complete snapshot + throw snap_storage_failed(); + } + // tlogs + for (const auto& tlog : *tlogs) { + TraceEvent(SevDebug, "GetStatefulWorkersTlog").detail("Addr", tlog.address()); + if (workersMap.find(tlog.address()) == workersMap.end()) { + TraceEvent(SevError, "MissingTlogWorkerInterface").detail("TlogAddress", tlog.address()); + throw snap_tlog_failed(); + } + if (result.count(tlog.address())) { + ASSERT(workersMap[tlog.address()].id() == result[tlog.address()].first.id()); + result[tlog.address()].second.append(",tlog"); + } else { + result[tlog.address()] = std::make_pair(workersMap[tlog.address()], "tlog"); + } + } + + // get coordinators + Optional coordinators = wait(tr.get(coordinatorsKey)); + if (!coordinators.present()) { + throw operation_failed(); + } + ClusterConnectionString ccs(coordinators.get().toString()); + std::vector coordinatorsAddr = wait(ccs.tryResolveHostnames()); + std::set coordinatorsAddrSet(coordinatorsAddr.begin(), coordinatorsAddr.end()); + for (const auto& worker : workers) { + // Note : only considers second address for coordinators, + // as we use primary addresses from storage and tlog interfaces above + NetworkAddress primary = worker.interf.address(); + Optional secondary = worker.interf.tLog.getEndpoint().addresses.secondaryAddress; + if (coordinatorsAddrSet.find(primary) != coordinatorsAddrSet.end() || + (secondary.present() && (coordinatorsAddrSet.find(secondary.get()) != coordinatorsAddrSet.end()))) { + if (result.count(primary)) { + ASSERT(workersMap[primary].id() == result[primary].first.id()); + result[primary].second.append(",coord"); + } else { + result[primary] = std::make_pair(workersMap[primary], "coord"); + } + } + } + return result; + } catch (Error& e) { + wait(tr.onError(e)); + result.clear(); + } + } +} + ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference const> db) { state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); @@ -942,47 +1072,44 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference, int> storageWorkersAndFailures = - wait(transformErrors(getStorageWorkers(cx, db, true /* localOnly */), snap_storage_failed())); - const auto& [storageWorkers, storageFailures] = storageWorkersAndFailures; - auto const storageFaultTolerance = - std::min(static_cast(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE), - configuration.storageTeamSize - 1) - - storageFailures; - if (storageFaultTolerance < 0) { - TEST(true); // Too many failed storage servers to complete snapshot - throw snap_storage_failed(); - } - TraceEvent("SnapDataDistributor_GotStorageWorkers") + + state int storageFaultTolerance; + // snap stateful nodes + state std::map> statefulWorkers = + wait(transformErrors(getStatefulWorkers(cx, db, &tlogs, &storageFaultTolerance), snap_storage_failed())); + + TraceEvent("SnapDataDistributor_GotStatefulWorkers") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); + + // we need to snapshot storage nodes before snapshot any tlogs std::vector>> storageSnapReqs; - storageSnapReqs.reserve(storageWorkers.size()); - for (const auto& worker : storageWorkers) { - storageSnapReqs.push_back(trySendSnapReq( - worker.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr))); + for (const auto& [addr, entry] : statefulWorkers) { + auto& [interf, role] = entry; + if (role.find("storage") != std::string::npos) + storageSnapReqs.push_back(trySendSnapReq( + interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr))); } wait(waitForMost(storageSnapReqs, storageFaultTolerance, snap_storage_failed())); - TraceEvent("SnapDataDistributor_AfterSnapStorage") + .detail("FaultTolerance", storageFaultTolerance) .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); - // snap local tlog nodes - std::vector> tLogSnapReqs; + + std::vector>> tLogSnapReqs; tLogSnapReqs.reserve(tlogs.size()); - for (const auto& tlog : tlogs) { - tLogSnapReqs.push_back(sendSnapReq(tlog.snapRequest, - TLogSnapRequest{ snapReq.snapPayload, snapReq.snapUID, "tlog"_sr }, - snap_tlog_failed())); + for (const auto& [addr, entry] : statefulWorkers) { + auto& [interf, role] = entry; + if (role.find("tlog") != std::string::npos) + tLogSnapReqs.push_back(trySendSnapReq( + interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "tlog"_sr))); } - wait(waitForAll(tLogSnapReqs)); + wait(waitForMost(tLogSnapReqs, 0, snap_tlog_failed())); TraceEvent("SnapDataDistributor_AfterTLogStorage") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); + // enable tlog pop on local tlog nodes std::vector> enablePops; enablePops.reserve(tlogs.size()); @@ -995,20 +1122,18 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference coordWorkers = wait(getCoordWorkers(cx, db)); - TraceEvent("SnapDataDistributor_GotCoordWorkers") - .detail("SnapPayload", snapReq.snapPayload) - .detail("SnapUID", snapReq.snapUID); + std::vector>> coordSnapReqs; - coordSnapReqs.reserve(coordWorkers.size()); - for (const auto& worker : coordWorkers) { - coordSnapReqs.push_back(trySendSnapReq( - worker.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr))); + for (const auto& [addr, entry] : statefulWorkers) { + auto& [interf, role] = entry; + if (role.find("coord") != std::string::npos) + coordSnapReqs.push_back(trySendSnapReq( + interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr))); } auto const coordFaultTolerance = std::min(std::max(0, coordSnapReqs.size() / 2 - 1), SERVER_KNOBS->MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE); wait(waitForMost(coordSnapReqs, coordFaultTolerance, snap_coord_failed())); + TraceEvent("SnapDataDistributor_AfterSnapCoords") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); @@ -1056,37 +1181,48 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference ddSnapCreate(DistributorSnapRequest snapReq, - Reference const> db, - DDEnabledState* ddEnabledState) { +ACTOR Future ddSnapCreate( + DistributorSnapRequest snapReq, + Reference const> db, + DDEnabledState* ddEnabledState, + std::map* ddSnapMap /* ongoing snapshot requests */, + std::map>* + ddSnapResultMap /* finished snapshot requests, expired in SNAP_MINIMUM_TIME_GAP seconds */) { state Future dbInfoChange = db->onChange(); if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) { // disable DD before doing snapCreate, if previous snap req has already disabled DD then this operation fails // here - TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck").log(); - snapReq.reply.sendError(operation_failed()); + TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck").detail("SnapUID", snapReq.snapUID); + ddSnapMap->at(snapReq.snapUID).reply.sendError(operation_failed()); + ddSnapMap->erase(snapReq.snapUID); + (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(operation_failed()); return Void(); } - double delayTime = g_network->isSimulated() ? 70.0 : SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT; try { choose { when(wait(dbInfoChange)) { TraceEvent("SnapDDCreateDBInfoChanged") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); - snapReq.reply.sendError(snap_with_recovery_unsupported()); + ddSnapMap->at(snapReq.snapUID).reply.sendError(snap_with_recovery_unsupported()); + ddSnapMap->erase(snapReq.snapUID); + (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(snap_with_recovery_unsupported()); } when(wait(ddSnapCreateCore(snapReq, db))) { TraceEvent("SnapDDCreateSuccess") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); - snapReq.reply.send(Void()); + ddSnapMap->at(snapReq.snapUID).reply.send(Void()); + ddSnapMap->erase(snapReq.snapUID); + (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(Void()); } - when(wait(delay(delayTime))) { + when(wait(delay(SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT))) { TraceEvent("SnapDDCreateTimedOut") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); - snapReq.reply.sendError(timed_out()); + ddSnapMap->at(snapReq.snapUID).reply.sendError(timed_out()); + ddSnapMap->erase(snapReq.snapUID); + (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(timed_out()); } } } catch (Error& e) { @@ -1095,7 +1231,9 @@ ACTOR Future ddSnapCreate(DistributorSnapRequest snapReq, .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); if (e.code() != error_code_operation_cancelled) { - snapReq.reply.sendError(e); + ddSnapMap->at(snapReq.snapUID).reply.sendError(e); + ddSnapMap->erase(snapReq.snapUID); + (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(e); } else { // enable DD should always succeed bool success = ddEnabledState->setDDEnabled(true, snapReq.snapUID); @@ -1246,6 +1384,8 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference ddSnapReqMap; + state std::map> ddSnapReqResultMap; self->addActor.send(actors.getResult()); self->addActor.send(traceRole(Role::DATA_DISTRIBUTOR, di.id())); @@ -1273,7 +1413,30 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Referenceerase(snapUID); + return Void(); + }, + delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP))); + } } when(DistributorExclusionSafetyCheckRequest exclCheckReq = waitNext(di.distributorExclCheckReq.getFuture())) { diff --git a/fdbserver/FDBExecHelper.actor.cpp b/fdbserver/FDBExecHelper.actor.cpp index 6d2ea7fcb2..a8110f438e 100644 --- a/fdbserver/FDBExecHelper.actor.cpp +++ b/fdbserver/FDBExecHelper.actor.cpp @@ -426,14 +426,12 @@ ACTOR Future execHelper(ExecCmdValueString* execArg, UID snapUID, std::stri } else { // copy the files state std::string folderFrom = folder + "/."; - state std::string folderTo = folder + "-snap-" + uidStr.toString(); - double maxSimDelayTime = 10.0; - folderTo = folder + "-snap-" + uidStr.toString() + "-" + role; + state std::string folderTo = folder + "-snap-" + uidStr.toString() + "-" + role; std::vector paramList; std::string mkdirBin = "/bin/mkdir"; paramList.push_back(mkdirBin); paramList.push_back(folderTo); - cmdErr = spawnProcess(mkdirBin, paramList, maxWaitTime, false /*isSync*/, maxSimDelayTime); + cmdErr = spawnProcess(mkdirBin, paramList, maxWaitTime, false /*isSync*/, 10.0); wait(success(cmdErr)); err = cmdErr.get(); if (err == 0) { diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index b04d6247e5..f01fa1861a 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -366,9 +366,9 @@ struct TLogData : NonCopyable { // the set and for callers that unset will // be able to match it up std::string dataFolder; // folder where data is stored - Reference> degraded; // End of fields used by snapshot based backup and restore + Reference> degraded; std::vector tempTagMessages; Reference commitLatencyDist; @@ -2569,42 +2569,6 @@ void getQueuingMetrics(TLogData* self, Reference logData, TLogQueuingMe req.reply.send(reply); } -ACTOR Future tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference logData) { - if (self->ignorePopUid != snapReq.snapUID.toString()) { - snapReq.reply.sendError(operation_failed()); - return Void(); - } - ExecCmdValueString snapArg(snapReq.snapPayload); - try { - int err = wait(execHelper(&snapArg, snapReq.snapUID, self->dataFolder, snapReq.role.toString())); - - std::string uidStr = snapReq.snapUID.toString(); - TraceEvent("ExecTraceTLog") - .detail("Uid", uidStr) - .detail("Status", err) - .detail("Role", snapReq.role) - .detail("Value", self->dataFolder) - .detail("ExecPayload", snapReq.snapPayload) - .detail("PersistentDataVersion", logData->persistentDataVersion) - .detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion) - .detail("QueueCommittedVersion", logData->queueCommittedVersion.get()) - .detail("Version", logData->version.get()); - - if (err != 0) { - throw operation_failed(); - } - snapReq.reply.send(Void()); - } catch (Error& e) { - TraceEvent("TLogExecHelperError").errorUnsuppressed(e); - if (e.code() != error_code_operation_cancelled) { - snapReq.reply.sendError(e); - } else { - throw e; - } - } - return Void(); -} - ACTOR Future tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData* self, Reference logData) { if (self->ignorePopUid != enablePopReq.snapUID.toString()) { TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch") @@ -2731,9 +2695,6 @@ ACTOR Future serveTLogInterface(TLogData* self, when(TLogEnablePopRequest enablePopReq = waitNext(tli.enablePopRequest.getFuture())) { logData->addActor.send(tLogEnablePopReq(enablePopReq, self, logData)); } - when(TLogSnapRequest snapReq = waitNext(tli.snapRequest.getFuture())) { - logData->addActor.send(tLogSnapCreate(snapReq, self, logData)); - } } } diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 314e9e9053..c6fe9a594f 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1415,10 +1415,16 @@ ACTOR Future traceRole(Role role, UID roleId) { } } -ACTOR Future workerSnapCreate(WorkerSnapRequest snapReq, Standalone snapFolder) { +ACTOR Future workerSnapCreate( + WorkerSnapRequest snapReq, + std::string snapFolder, + std::map* snapReqMap /* ongoing snapshot requests */, + std::map>* + snapReqResultMap /* finished snapshot requests, expired in SNAP_MINIMUM_TIME_GAP seconds */) { state ExecCmdValueString snapArg(snapReq.snapPayload); + state std::string snapReqKey = snapReq.snapUID.toString() + snapReq.role.toString(); try { - int err = wait(execHelper(&snapArg, snapReq.snapUID, snapFolder.toString(), snapReq.role.toString())); + int err = wait(execHelper(&snapArg, snapReq.snapUID, snapFolder, snapReq.role.toString())); std::string uidStr = snapReq.snapUID.toString(); TraceEvent("ExecTraceWorker") .detail("Uid", uidStr) @@ -1432,11 +1438,15 @@ ACTOR Future workerSnapCreate(WorkerSnapRequest snapReq, Standaloneat(snapReqKey).reply.send(Void()); + snapReqMap->erase(snapReqKey); + (*snapReqResultMap)[snapReqKey] = ErrorOr(Void()); } catch (Error& e) { TraceEvent("ExecHelperError").errorUnsuppressed(e); if (e.code() != error_code_operation_cancelled) { - snapReq.reply.sendError(e); + snapReqMap->at(snapReqKey).reply.sendError(e); + snapReqMap->erase(snapReqKey); + (*snapReqResultMap)[snapReqKey] = ErrorOr(e); } else { throw e; } @@ -1584,6 +1594,11 @@ ACTOR Future workerServer(Reference connRecord, state WorkerCache backupWorkerCache; state WorkerCache blobWorkerCache; + state WorkerSnapRequest lastSnapReq; + // Here the key is UID+role, as we still send duplicate requests to a process which is both storage and tlog + state std::map snapReqMap; + state std::map> snapReqResultMap; + state double lastSnapTime = -SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP; // always successful for the first Snap Request state std::string coordFolder = abspath(_coordFolder); state WorkerInterface interf(locality); @@ -2497,11 +2512,49 @@ ACTOR Future workerServer(Reference connRecord, loggingTrigger = delay(loggingDelay, TaskPriority::FlushTrace); } when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) { - Standalone snapFolder = StringRef(folder); - if (snapReq.role.toString() == "coord") { - snapFolder = coordFolder; + std::string snapUID = snapReq.snapUID.toString() + snapReq.role.toString(); + if (snapReqResultMap.count(snapUID)) { + TEST(true); // Worker received a duplicate finished snap request + auto result = snapReqResultMap[snapUID]; + result.isError() ? snapReq.reply.sendError(result.getError()) : snapReq.reply.send(result.get()); + TraceEvent("RetryFinishedWorkerSnapRequest") + .detail("SnapUID", snapUID) + .detail("Role", snapReq.role) + .detail("Result", result.isError() ? result.getError().code() : 0); + } else if (snapReqMap.count(snapUID)) { + TEST(true); // Worker received a duplicate ongoing snap request + TraceEvent("RetryOngoingWorkerSnapRequest").detail("SnapUID", snapUID).detail("Role", snapReq.role); + ASSERT(snapReq.role == snapReqMap[snapUID].role); + ASSERT(snapReq.snapPayload == snapReqMap[snapUID].snapPayload); + snapReqMap[snapUID] = snapReq; + } else { + snapReqMap[snapUID] = snapReq; // set map point to the request + if (g_network->isSimulated() && (now() - lastSnapTime) < SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP) { + // only allow duplicate snapshots on same process in a short time for different roles + auto okay = (lastSnapReq.snapUID == snapReq.snapUID) && lastSnapReq.role != snapReq.role; + TraceEvent(okay ? SevInfo : SevError, "RapidSnapRequestsOnSameProcess") + .detail("CurrSnapUID", snapUID) + .detail("PrevSnapUID", lastSnapReq.snapUID) + .detail("CurrRole", snapReq.role) + .detail("PrevRole", lastSnapReq.role) + .detail("GapTime", now() - lastSnapTime); + } + errorForwarders.add(workerSnapCreate(snapReq, + snapReq.role.toString() == "coord" ? coordFolder : folder, + &snapReqMap, + &snapReqResultMap)); + auto* snapReqResultMapPtr = &snapReqResultMap; + errorForwarders.add(fmap( + [snapReqResultMapPtr, snapUID](Void _) { + snapReqResultMapPtr->erase(snapUID); + return Void(); + }, + delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP))); + if (g_network->isSimulated()) { + lastSnapReq = snapReq; + lastSnapTime = now(); + } } - errorForwarders.add(workerSnapCreate(snapReq, snapFolder)); } when(wait(errorForwarders.getResult())) {} when(wait(handleErrors)) {}