Avoid duplicate snapshot on one process if it serves as multiple roles (#7294)

* Fix comments

* Add simulation value for SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT

* A work version with correctness clean

* Remove unnecessay comments; debugging symbols

* Only check secondary address for coordinators, same as before

* Change the trace to SevError and remove the ASSERT(false)

* Remove TLogSnapRequest handling on TlogServer, which is changed to use WorkerSnapRequest

* Add retry for network failures

* Add retry limit for network failures; still allow duplicate snapshots on processes are both tlog and storage to avoid race

* Add retry limit as a knob and make backoff exponentail

* Add getDatabaseConfiguration(Transaction* tr)

* revert back to send request for each role once

* update some comments
This commit is contained in:
Chaoguang Lin 2022-06-29 11:23:07 -07:00 committed by GitHub
parent db769667ae
commit 29f98f3654
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 329 additions and 130 deletions

View File

@ -450,16 +450,21 @@ bool isCompleteConfiguration(std::map<std::string, std::string> const& options)
options.count(p + "storage_engine") == 1;
}
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration(Transaction* tr) {
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
RangeResult res = wait(tr->getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(res.size() < CLIENT_KNOBS->TOO_MANY);
DatabaseConfiguration config;
config.fromKeyValues((VectorRef<KeyValueRef>)res);
return config;
}
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration(Database cx) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
RangeResult res = wait(tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(res.size() < CLIENT_KNOBS->TOO_MANY);
DatabaseConfiguration config;
config.fromKeyValues((VectorRef<KeyValueRef>)res);
DatabaseConfiguration config = wait(getDatabaseConfiguration(&tr));
return config;
} catch (Error& e) {
wait(tr.onError(e));

View File

@ -117,7 +117,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( MAX_FORKED_PROCESS_OUTPUT, 1024 );
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
init( SNAP_CREATE_MAX_TIMEOUT, isSimulated ? 70.0 : 300.0 );
init( SNAP_MINIMUM_TIME_GAP, 5.0 );
init( SNAP_NETWORK_FAILURE_RETRY_LIMIT, 10 );
init( MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE, 1 );
init( MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE, 1 );

View File

@ -41,6 +41,7 @@ standard API and some knowledge of the contents of the system key space.
#include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // has to be last include
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration(Transaction* tr);
ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration(Database cx);
ACTOR Future<Void> waitForFullReplication(Database cx);

View File

@ -615,7 +615,12 @@ public:
// disk snapshot
int64_t MAX_FORKED_PROCESS_OUTPUT;
// retry limit after network failures
int64_t SNAP_NETWORK_FAILURE_RETRY_LIMIT;
// time limit for creating snapshot
double SNAP_CREATE_MAX_TIMEOUT;
// minimum gap time between two snapshot requests for the same process
double SNAP_MINIMUM_TIME_GAP;
// Maximum number of storage servers a snapshot can fail to
// capture while still succeeding
int64_t MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE;

View File

@ -2086,21 +2086,32 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
throw snap_log_anti_quorum_unsupported();
}
// send a snap request to DD
if (!commitData->db->get().distributor.present()) {
TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest");
throw dd_not_found();
}
state Future<ErrorOr<Void>> ddSnapReq = commitData->db->get().distributor.get().distributorSnapReq.tryGetReply(
DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID));
try {
wait(throwErrorOr(ddSnapReq));
} catch (Error& e) {
TraceEvent("SnapCommitProxy_DDSnapResponseError")
.errorUnsuppressed(e)
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
throw e;
state int snapReqRetry = 0;
state double snapRetryBackoff = FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY;
loop {
// send a snap request to DD
if (!commitData->db->get().distributor.present()) {
TraceEvent(SevWarnAlways, "DataDistributorNotPresent").detail("Operation", "SnapRequest");
throw dd_not_found();
}
try {
Future<ErrorOr<Void>> ddSnapReq =
commitData->db->get().distributor.get().distributorSnapReq.tryGetReply(
DistributorSnapRequest(snapReq.snapPayload, snapReq.snapUID));
wait(throwErrorOr(ddSnapReq));
break;
} catch (Error& e) {
TraceEvent("SnapCommitProxy_DDSnapResponseError")
.errorUnsuppressed(e)
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// Retry if we have network issues
if (e.code() != error_code_request_maybe_delivered ||
++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT)
throw e;
wait(delay(snapRetryBackoff));
snapRetryBackoff = snapRetryBackoff * 2; // exponential backoff
}
}
snapReq.reply.send(Void());
} catch (Error& e) {

View File

@ -876,14 +876,26 @@ Future<Void> sendSnapReq(RequestStream<Req> stream, Req req, Error e) {
return Void();
}
ACTOR template <class Req>
Future<ErrorOr<Void>> trySendSnapReq(RequestStream<Req> stream, Req req) {
ErrorOr<REPLY_TYPE(Req)> reply = wait(stream.tryGetReply(req));
if (reply.isError()) {
TraceEvent("SnapDataDistributor_ReqError")
.errorUnsuppressed(reply.getError())
.detail("Peer", stream.getEndpoint().getPrimaryAddress());
return ErrorOr<Void>(reply.getError());
ACTOR Future<ErrorOr<Void>> trySendSnapReq(RequestStream<WorkerSnapRequest> stream, WorkerSnapRequest req) {
state int snapReqRetry = 0;
state double snapRetryBackoff = FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY;
loop {
ErrorOr<REPLY_TYPE(WorkerSnapRequest)> reply = wait(stream.tryGetReply(req));
if (reply.isError()) {
TraceEvent("SnapDataDistributor_ReqError")
.errorUnsuppressed(reply.getError())
.detail("Peer", stream.getEndpoint().getPrimaryAddress());
if (reply.getError().code() != error_code_request_maybe_delivered ||
++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT)
return ErrorOr<Void>(reply.getError());
else {
// retry for network failures with same snap UID to avoid snapshot twice
req = WorkerSnapRequest(req.snapPayload, req.snapUID, req.role);
wait(delay(snapRetryBackoff));
snapRetryBackoff = snapRetryBackoff * 2;
}
} else
break;
}
return ErrorOr<Void>(Void());
}
@ -906,6 +918,124 @@ ACTOR static Future<Void> waitForMost(std::vector<Future<ErrorOr<Void>>> futures
return Void();
}
ACTOR Future<std::map<NetworkAddress, std::pair<WorkerInterface, std::string>>> getStatefulWorkers(
Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::vector<TLogInterface>* tlogs,
int* storageFaultTolerance) {
state std::map<NetworkAddress, std::pair<WorkerInterface, std::string>> result;
state std::map<NetworkAddress, WorkerInterface> workersMap;
state Transaction tr(cx);
state DatabaseConfiguration configuration;
loop {
try {
// necessary options
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
// get database configuration
DatabaseConfiguration _configuration = wait(getDatabaseConfiguration(&tr));
configuration = _configuration;
// get storages
RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
state std::vector<StorageServerInterface> storageServers;
storageServers.reserve(serverList.size());
for (int i = 0; i < serverList.size(); i++)
storageServers.push_back(decodeServerListValue(serverList[i].value));
// get workers
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
for (const auto& worker : workers) {
workersMap[worker.interf.address()] = worker.interf;
}
Optional<Value> regionsValue =
wait(tr.get(LiteralStringRef("usable_regions").withPrefix(configKeysPrefix)));
int usableRegions = 1;
if (regionsValue.present()) {
usableRegions = atoi(regionsValue.get().toString().c_str());
}
auto masterDcId = dbInfo->get().master.locality.dcId();
int storageFailures = 0;
for (const auto& server : storageServers) {
TraceEvent(SevDebug, "StorageServerDcIdInfo")
.detail("Address", server.address().toString())
.detail("ServerLocalityID", server.locality.dcId())
.detail("MasterDcID", masterDcId);
if (usableRegions == 1 || server.locality.dcId() == masterDcId) {
auto itr = workersMap.find(server.address());
if (itr == workersMap.end()) {
TraceEvent(SevWarn, "GetStorageWorkers")
.detail("Reason", "Could not find worker for storage server")
.detail("SS", server.id());
++storageFailures;
} else {
if (result.count(server.address())) {
ASSERT(itr->second.id() == result[server.address()].first.id());
if (result[server.address()].second.find("storage") == std::string::npos)
result[server.address()].second.append(",storage");
} else {
result[server.address()] = std::make_pair(itr->second, "storage");
}
}
}
}
// calculate fault tolerance
*storageFaultTolerance = std::min(static_cast<int>(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE),
configuration.storageTeamSize - 1) -
storageFailures;
if (*storageFaultTolerance < 0) {
TEST(true); // Too many failed storage servers to complete snapshot
throw snap_storage_failed();
}
// tlogs
for (const auto& tlog : *tlogs) {
TraceEvent(SevDebug, "GetStatefulWorkersTlog").detail("Addr", tlog.address());
if (workersMap.find(tlog.address()) == workersMap.end()) {
TraceEvent(SevError, "MissingTlogWorkerInterface").detail("TlogAddress", tlog.address());
throw snap_tlog_failed();
}
if (result.count(tlog.address())) {
ASSERT(workersMap[tlog.address()].id() == result[tlog.address()].first.id());
result[tlog.address()].second.append(",tlog");
} else {
result[tlog.address()] = std::make_pair(workersMap[tlog.address()], "tlog");
}
}
// get coordinators
Optional<Value> coordinators = wait(tr.get(coordinatorsKey));
if (!coordinators.present()) {
throw operation_failed();
}
ClusterConnectionString ccs(coordinators.get().toString());
std::vector<NetworkAddress> coordinatorsAddr = wait(ccs.tryResolveHostnames());
std::set<NetworkAddress> coordinatorsAddrSet(coordinatorsAddr.begin(), coordinatorsAddr.end());
for (const auto& worker : workers) {
// Note : only considers second address for coordinators,
// as we use primary addresses from storage and tlog interfaces above
NetworkAddress primary = worker.interf.address();
Optional<NetworkAddress> secondary = worker.interf.tLog.getEndpoint().addresses.secondaryAddress;
if (coordinatorsAddrSet.find(primary) != coordinatorsAddrSet.end() ||
(secondary.present() && (coordinatorsAddrSet.find(secondary.get()) != coordinatorsAddrSet.end()))) {
if (result.count(primary)) {
ASSERT(workersMap[primary].id() == result[primary].first.id());
result[primary].second.append(",coord");
} else {
result[primary] = std::make_pair(workersMap[primary], "coord");
}
}
}
return result;
} catch (Error& e) {
wait(tr.onError(e));
result.clear();
}
}
}
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<ServerDBInfo> const> db) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
@ -942,47 +1072,44 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
TraceEvent("SnapDataDistributor_AfterDisableTLogPop")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local storage nodes
// TODO: Atomically read configuration and storage worker list in a single transaction
state DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx));
std::pair<std::vector<WorkerInterface>, int> storageWorkersAndFailures =
wait(transformErrors(getStorageWorkers(cx, db, true /* localOnly */), snap_storage_failed()));
const auto& [storageWorkers, storageFailures] = storageWorkersAndFailures;
auto const storageFaultTolerance =
std::min(static_cast<int>(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE),
configuration.storageTeamSize - 1) -
storageFailures;
if (storageFaultTolerance < 0) {
TEST(true); // Too many failed storage servers to complete snapshot
throw snap_storage_failed();
}
TraceEvent("SnapDataDistributor_GotStorageWorkers")
state int storageFaultTolerance;
// snap stateful nodes
state std::map<NetworkAddress, std::pair<WorkerInterface, std::string>> statefulWorkers =
wait(transformErrors(getStatefulWorkers(cx, db, &tlogs, &storageFaultTolerance), snap_storage_failed()));
TraceEvent("SnapDataDistributor_GotStatefulWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// we need to snapshot storage nodes before snapshot any tlogs
std::vector<Future<ErrorOr<Void>>> storageSnapReqs;
storageSnapReqs.reserve(storageWorkers.size());
for (const auto& worker : storageWorkers) {
storageSnapReqs.push_back(trySendSnapReq(
worker.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr)));
for (const auto& [addr, entry] : statefulWorkers) {
auto& [interf, role] = entry;
if (role.find("storage") != std::string::npos)
storageSnapReqs.push_back(trySendSnapReq(
interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr)));
}
wait(waitForMost(storageSnapReqs, storageFaultTolerance, snap_storage_failed()));
TraceEvent("SnapDataDistributor_AfterSnapStorage")
.detail("FaultTolerance", storageFaultTolerance)
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap local tlog nodes
std::vector<Future<Void>> tLogSnapReqs;
std::vector<Future<ErrorOr<Void>>> tLogSnapReqs;
tLogSnapReqs.reserve(tlogs.size());
for (const auto& tlog : tlogs) {
tLogSnapReqs.push_back(sendSnapReq(tlog.snapRequest,
TLogSnapRequest{ snapReq.snapPayload, snapReq.snapUID, "tlog"_sr },
snap_tlog_failed()));
for (const auto& [addr, entry] : statefulWorkers) {
auto& [interf, role] = entry;
if (role.find("tlog") != std::string::npos)
tLogSnapReqs.push_back(trySendSnapReq(
interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "tlog"_sr)));
}
wait(waitForAll(tLogSnapReqs));
wait(waitForMost(tLogSnapReqs, 0, snap_tlog_failed()));
TraceEvent("SnapDataDistributor_AfterTLogStorage")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// enable tlog pop on local tlog nodes
std::vector<Future<Void>> enablePops;
enablePops.reserve(tlogs.size());
@ -995,20 +1122,18 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
TraceEvent("SnapDataDistributor_AfterEnableTLogPops")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
// snap the coordinators
std::vector<WorkerInterface> coordWorkers = wait(getCoordWorkers(cx, db));
TraceEvent("SnapDataDistributor_GotCoordWorkers")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
std::vector<Future<ErrorOr<Void>>> coordSnapReqs;
coordSnapReqs.reserve(coordWorkers.size());
for (const auto& worker : coordWorkers) {
coordSnapReqs.push_back(trySendSnapReq(
worker.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr)));
for (const auto& [addr, entry] : statefulWorkers) {
auto& [interf, role] = entry;
if (role.find("coord") != std::string::npos)
coordSnapReqs.push_back(trySendSnapReq(
interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr)));
}
auto const coordFaultTolerance = std::min<int>(std::max<int>(0, coordSnapReqs.size() / 2 - 1),
SERVER_KNOBS->MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE);
wait(waitForMost(coordSnapReqs, coordFaultTolerance, snap_coord_failed()));
TraceEvent("SnapDataDistributor_AfterSnapCoords")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
@ -1056,37 +1181,48 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
return Void();
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState* ddEnabledState) {
ACTOR Future<Void> ddSnapCreate(
DistributorSnapRequest snapReq,
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState* ddEnabledState,
std::map<UID, DistributorSnapRequest>* ddSnapMap /* ongoing snapshot requests */,
std::map<UID, ErrorOr<Void>>*
ddSnapResultMap /* finished snapshot requests, expired in SNAP_MINIMUM_TIME_GAP seconds */) {
state Future<Void> dbInfoChange = db->onChange();
if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) {
// disable DD before doing snapCreate, if previous snap req has already disabled DD then this operation fails
// here
TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck").log();
snapReq.reply.sendError(operation_failed());
TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck").detail("SnapUID", snapReq.snapUID);
ddSnapMap->at(snapReq.snapUID).reply.sendError(operation_failed());
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(operation_failed());
return Void();
}
double delayTime = g_network->isSimulated() ? 70.0 : SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT;
try {
choose {
when(wait(dbInfoChange)) {
TraceEvent("SnapDDCreateDBInfoChanged")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.sendError(snap_with_recovery_unsupported());
ddSnapMap->at(snapReq.snapUID).reply.sendError(snap_with_recovery_unsupported());
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(snap_with_recovery_unsupported());
}
when(wait(ddSnapCreateCore(snapReq, db))) {
TraceEvent("SnapDDCreateSuccess")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.send(Void());
ddSnapMap->at(snapReq.snapUID).reply.send(Void());
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(Void());
}
when(wait(delay(delayTime))) {
when(wait(delay(SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT))) {
TraceEvent("SnapDDCreateTimedOut")
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
snapReq.reply.sendError(timed_out());
ddSnapMap->at(snapReq.snapUID).reply.sendError(timed_out());
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(timed_out());
}
}
} catch (Error& e) {
@ -1095,7 +1231,9 @@ ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
.detail("SnapPayload", snapReq.snapPayload)
.detail("SnapUID", snapReq.snapUID);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
ddSnapMap->at(snapReq.snapUID).reply.sendError(e);
ddSnapMap->erase(snapReq.snapUID);
(*ddSnapResultMap)[snapReq.snapUID] = ErrorOr<Void>(e);
} else {
// enable DD should always succeed
bool success = ddEnabledState->setDDEnabled(true, snapReq.snapUID);
@ -1246,6 +1384,8 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
state ActorCollection actors(false);
state DDEnabledState ddEnabledState;
state std::map<UID, DistributorSnapRequest> ddSnapReqMap;
state std::map<UID, ErrorOr<Void>> ddSnapReqResultMap;
self->addActor.send(actors.getResult());
self->addActor.send(traceRole(Role::DATA_DISTRIBUTOR, di.id()));
@ -1273,7 +1413,30 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
actors.add(ddGetMetrics(req, getShardMetricsList));
}
when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) {
actors.add(ddSnapCreate(snapReq, db, &ddEnabledState));
auto& snapUID = snapReq.snapUID;
if (ddSnapReqResultMap.count(snapUID)) {
TEST(true); // Data distributor received a duplicate finished snap request
auto result = ddSnapReqResultMap[snapUID];
result.isError() ? snapReq.reply.sendError(result.getError()) : snapReq.reply.send(result.get());
TraceEvent("RetryFinishedDistributorSnapRequest")
.detail("SnapUID", snapUID)
.detail("Result", result.isError() ? result.getError().code() : 0);
} else if (ddSnapReqMap.count(snapReq.snapUID)) {
TEST(true); // Data distributor received a duplicate ongoing snap request
TraceEvent("RetryOngoingDistributorSnapRequest").detail("SnapUID", snapUID);
ASSERT(snapReq.snapPayload == ddSnapReqMap[snapUID].snapPayload);
ddSnapReqMap[snapUID] = snapReq;
} else {
ddSnapReqMap[snapUID] = snapReq;
actors.add(ddSnapCreate(snapReq, db, &ddEnabledState, &ddSnapReqMap, &ddSnapReqResultMap));
auto* ddSnapReqResultMapPtr = &ddSnapReqResultMap;
actors.add(fmap(
[ddSnapReqResultMapPtr, snapUID](Void _) {
ddSnapReqResultMapPtr->erase(snapUID);
return Void();
},
delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP)));
}
}
when(DistributorExclusionSafetyCheckRequest exclCheckReq =
waitNext(di.distributorExclCheckReq.getFuture())) {

View File

@ -426,14 +426,12 @@ ACTOR Future<int> execHelper(ExecCmdValueString* execArg, UID snapUID, std::stri
} else {
// copy the files
state std::string folderFrom = folder + "/.";
state std::string folderTo = folder + "-snap-" + uidStr.toString();
double maxSimDelayTime = 10.0;
folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
state std::string folderTo = folder + "-snap-" + uidStr.toString() + "-" + role;
std::vector<std::string> paramList;
std::string mkdirBin = "/bin/mkdir";
paramList.push_back(mkdirBin);
paramList.push_back(folderTo);
cmdErr = spawnProcess(mkdirBin, paramList, maxWaitTime, false /*isSync*/, maxSimDelayTime);
cmdErr = spawnProcess(mkdirBin, paramList, maxWaitTime, false /*isSync*/, 10.0);
wait(success(cmdErr));
err = cmdErr.get();
if (err == 0) {

View File

@ -366,9 +366,9 @@ struct TLogData : NonCopyable {
// the set and for callers that unset will
// be able to match it up
std::string dataFolder; // folder where data is stored
Reference<AsyncVar<bool>> degraded;
// End of fields used by snapshot based backup and restore
Reference<AsyncVar<bool>> degraded;
std::vector<TagsAndMessage> tempTagMessages;
Reference<Histogram> commitLatencyDist;
@ -2569,42 +2569,6 @@ void getQueuingMetrics(TLogData* self, Reference<LogData> logData, TLogQueuingMe
req.reply.send(reply);
}
ACTOR Future<Void> tLogSnapCreate(TLogSnapRequest snapReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != snapReq.snapUID.toString()) {
snapReq.reply.sendError(operation_failed());
return Void();
}
ExecCmdValueString snapArg(snapReq.snapPayload);
try {
int err = wait(execHelper(&snapArg, snapReq.snapUID, self->dataFolder, snapReq.role.toString()));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceTLog")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", self->dataFolder)
.detail("ExecPayload", snapReq.snapPayload)
.detail("PersistentDataVersion", logData->persistentDataVersion)
.detail("PersistentDatadurableVersion", logData->persistentDataDurableVersion)
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
.detail("Version", logData->version.get());
if (err != 0) {
throw operation_failed();
}
snapReq.reply.send(Void());
} catch (Error& e) {
TraceEvent("TLogExecHelperError").errorUnsuppressed(e);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
} else {
throw e;
}
}
return Void();
}
ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData* self, Reference<LogData> logData) {
if (self->ignorePopUid != enablePopReq.snapUID.toString()) {
TraceEvent(SevWarn, "TLogPopDisableEnableUidMismatch")
@ -2731,9 +2695,6 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
when(TLogEnablePopRequest enablePopReq = waitNext(tli.enablePopRequest.getFuture())) {
logData->addActor.send(tLogEnablePopReq(enablePopReq, self, logData));
}
when(TLogSnapRequest snapReq = waitNext(tli.snapRequest.getFuture())) {
logData->addActor.send(tLogSnapCreate(snapReq, self, logData));
}
}
}

View File

@ -1415,10 +1415,16 @@ ACTOR Future<Void> traceRole(Role role, UID roleId) {
}
}
ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, Standalone<StringRef> snapFolder) {
ACTOR Future<Void> workerSnapCreate(
WorkerSnapRequest snapReq,
std::string snapFolder,
std::map<std::string, WorkerSnapRequest>* snapReqMap /* ongoing snapshot requests */,
std::map<std::string, ErrorOr<Void>>*
snapReqResultMap /* finished snapshot requests, expired in SNAP_MINIMUM_TIME_GAP seconds */) {
state ExecCmdValueString snapArg(snapReq.snapPayload);
state std::string snapReqKey = snapReq.snapUID.toString() + snapReq.role.toString();
try {
int err = wait(execHelper(&snapArg, snapReq.snapUID, snapFolder.toString(), snapReq.role.toString()));
int err = wait(execHelper(&snapArg, snapReq.snapUID, snapFolder, snapReq.role.toString()));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceWorker")
.detail("Uid", uidStr)
@ -1432,11 +1438,15 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, Standalone<String
if (snapReq.role.toString() == "storage") {
printStorageVersionInfo();
}
snapReq.reply.send(Void());
snapReqMap->at(snapReqKey).reply.send(Void());
snapReqMap->erase(snapReqKey);
(*snapReqResultMap)[snapReqKey] = ErrorOr<Void>(Void());
} catch (Error& e) {
TraceEvent("ExecHelperError").errorUnsuppressed(e);
if (e.code() != error_code_operation_cancelled) {
snapReq.reply.sendError(e);
snapReqMap->at(snapReqKey).reply.sendError(e);
snapReqMap->erase(snapReqKey);
(*snapReqResultMap)[snapReqKey] = ErrorOr<Void>(e);
} else {
throw e;
}
@ -1584,6 +1594,11 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state WorkerCache<InitializeBackupReply> backupWorkerCache;
state WorkerCache<InitializeBlobWorkerReply> blobWorkerCache;
state WorkerSnapRequest lastSnapReq;
// Here the key is UID+role, as we still send duplicate requests to a process which is both storage and tlog
state std::map<std::string, WorkerSnapRequest> snapReqMap;
state std::map<std::string, ErrorOr<Void>> snapReqResultMap;
state double lastSnapTime = -SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP; // always successful for the first Snap Request
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf(locality);
@ -2497,11 +2512,49 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
loggingTrigger = delay(loggingDelay, TaskPriority::FlushTrace);
}
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
Standalone<StringRef> snapFolder = StringRef(folder);
if (snapReq.role.toString() == "coord") {
snapFolder = coordFolder;
std::string snapUID = snapReq.snapUID.toString() + snapReq.role.toString();
if (snapReqResultMap.count(snapUID)) {
TEST(true); // Worker received a duplicate finished snap request
auto result = snapReqResultMap[snapUID];
result.isError() ? snapReq.reply.sendError(result.getError()) : snapReq.reply.send(result.get());
TraceEvent("RetryFinishedWorkerSnapRequest")
.detail("SnapUID", snapUID)
.detail("Role", snapReq.role)
.detail("Result", result.isError() ? result.getError().code() : 0);
} else if (snapReqMap.count(snapUID)) {
TEST(true); // Worker received a duplicate ongoing snap request
TraceEvent("RetryOngoingWorkerSnapRequest").detail("SnapUID", snapUID).detail("Role", snapReq.role);
ASSERT(snapReq.role == snapReqMap[snapUID].role);
ASSERT(snapReq.snapPayload == snapReqMap[snapUID].snapPayload);
snapReqMap[snapUID] = snapReq;
} else {
snapReqMap[snapUID] = snapReq; // set map point to the request
if (g_network->isSimulated() && (now() - lastSnapTime) < SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP) {
// only allow duplicate snapshots on same process in a short time for different roles
auto okay = (lastSnapReq.snapUID == snapReq.snapUID) && lastSnapReq.role != snapReq.role;
TraceEvent(okay ? SevInfo : SevError, "RapidSnapRequestsOnSameProcess")
.detail("CurrSnapUID", snapUID)
.detail("PrevSnapUID", lastSnapReq.snapUID)
.detail("CurrRole", snapReq.role)
.detail("PrevRole", lastSnapReq.role)
.detail("GapTime", now() - lastSnapTime);
}
errorForwarders.add(workerSnapCreate(snapReq,
snapReq.role.toString() == "coord" ? coordFolder : folder,
&snapReqMap,
&snapReqResultMap));
auto* snapReqResultMapPtr = &snapReqResultMap;
errorForwarders.add(fmap(
[snapReqResultMapPtr, snapUID](Void _) {
snapReqResultMapPtr->erase(snapUID);
return Void();
},
delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP)));
if (g_network->isSimulated()) {
lastSnapReq = snapReq;
lastSnapTime = now();
}
}
errorForwarders.add(workerSnapCreate(snapReq, snapFolder));
}
when(wait(errorForwarders.getResult())) {}
when(wait(handleErrors)) {}