Various TSS improvements from snowblower testing

This commit is contained in:
Josh Slocum 2021-07-28 09:36:17 -05:00
parent 303449f82f
commit e444d3781c
5 changed files with 115 additions and 122 deletions

View File

@ -154,16 +154,25 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe
result->second = tssi; result->second = tssi;
} }
// data requests duplicated for load and data comparison
queueModel.updateTssEndpoint(ssi.getValue.getEndpoint().token.first(), queueModel.updateTssEndpoint(ssi.getValue.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getValue.getEndpoint(), metrics)); TSSEndpointData(tssi.id(), tssi.getValue.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getKey.getEndpoint().token.first(), queueModel.updateTssEndpoint(ssi.getKey.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getKey.getEndpoint(), metrics)); TSSEndpointData(tssi.id(), tssi.getKey.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getKeyValues.getEndpoint().token.first(), queueModel.updateTssEndpoint(ssi.getKeyValues.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getKeyValues.getEndpoint(), metrics)); TSSEndpointData(tssi.id(), tssi.getKeyValues.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.watchValue.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.watchValue.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first(), queueModel.updateTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getKeyValuesStream.getEndpoint(), metrics)); TSSEndpointData(tssi.id(), tssi.getKeyValuesStream.getEndpoint(), metrics));
// non-data requests duplicated for load
queueModel.updateTssEndpoint(ssi.watchValue.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.watchValue.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.splitMetrics.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.splitMetrics.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getReadHotRanges.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getReadHotRanges.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getRangeSplitPoints.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getRangeSplitPoints.getEndpoint(), metrics));
} }
} }
@ -175,8 +184,12 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) {
queueModel.removeTssEndpoint(ssi.getValue.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getValue.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getKey.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKey.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getKeyValues.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKeyValues.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.watchValue.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.watchValue.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.splitMetrics.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getReadHotRanges.getEndpoint().token.first());
queueModel.removeTssEndpoint(ssi.getRangeSplitPoints.getEndpoint().token.first());
} }
} }

View File

@ -210,6 +210,66 @@ void TSS_traceMismatch(TraceEvent& event,
ASSERT(false); ASSERT(false);
} }
template <>
bool TSS_doCompare(const SplitMetricsReply& src, const SplitMetricsReply& tss) {
// We duplicate split metrics just for load, no need to validate replies.
return true;
}
template <>
const char* TSS_mismatchTraceName(const SplitMetricsRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const SplitMetricsRequest& req,
const SplitMetricsReply& src,
const SplitMetricsReply& tss) {
ASSERT(false);
}
template <>
bool TSS_doCompare(const ReadHotSubRangeReply& src, const ReadHotSubRangeReply& tss) {
// We duplicate read hot sub range metrics just for load, no need to validate replies.
return true;
}
template <>
const char* TSS_mismatchTraceName(const ReadHotSubRangeRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const ReadHotSubRangeRequest& req,
const ReadHotSubRangeReply& src,
const ReadHotSubRangeReply& tss) {
ASSERT(false);
}
template <>
bool TSS_doCompare(const SplitRangeReply& src, const SplitRangeReply& tss) {
// We duplicate read hot sub range metrics just for load, no need to validate replies.
return true;
}
template <>
const char* TSS_mismatchTraceName(const SplitRangeRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const SplitRangeRequest& req,
const SplitRangeReply& src,
const SplitRangeReply& tss) {
ASSERT(false);
}
// template specializations for metrics replies that should never be called because these requests aren't duplicated // template specializations for metrics replies that should never be called because these requests aren't duplicated
// storage metrics // storage metrics
@ -233,69 +293,6 @@ void TSS_traceMismatch(TraceEvent& event,
ASSERT(false); ASSERT(false);
} }
// split metrics
template <>
bool TSS_doCompare(const SplitMetricsReply& src, const SplitMetricsReply& tss) {
ASSERT(false);
return true;
}
template <>
const char* TSS_mismatchTraceName(const SplitMetricsRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const SplitMetricsRequest& req,
const SplitMetricsReply& src,
const SplitMetricsReply& tss) {
ASSERT(false);
}
// read hot sub range
template <>
bool TSS_doCompare(const ReadHotSubRangeReply& src, const ReadHotSubRangeReply& tss) {
ASSERT(false);
return true;
}
template <>
const char* TSS_mismatchTraceName(const ReadHotSubRangeRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const ReadHotSubRangeRequest& req,
const ReadHotSubRangeReply& src,
const ReadHotSubRangeReply& tss) {
ASSERT(false);
}
// split range
template <>
bool TSS_doCompare(const SplitRangeReply& src, const SplitRangeReply& tss) {
ASSERT(false);
return true;
}
template <>
const char* TSS_mismatchTraceName(const SplitRangeRequest& req) {
ASSERT(false);
return "";
}
template <>
void TSS_traceMismatch(TraceEvent& event,
const SplitRangeRequest& req,
const SplitRangeReply& src,
const SplitRangeReply& tss) {
ASSERT(false);
}
// only record metrics for data reads // only record metrics for data reads
template <> template <>

View File

@ -5131,8 +5131,10 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
if (doRecruit && newServer.isError()) { if (doRecruit && newServer.isError()) {
TraceEvent(SevWarn, "DDRecruitmentError").error(newServer.getError()); TraceEvent(SevWarn, "DDRecruitmentError").error(newServer.getError());
if (!newServer.isError(error_code_recruitment_failed) && if (!newServer.isError(error_code_recruitment_failed) &&
!newServer.isError(error_code_request_maybe_delivered)) !newServer.isError(error_code_request_maybe_delivered)) {
tssState->markComplete();
throw newServer.getError(); throw newServer.getError();
}
wait(delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskPriority::DataDistribution)); wait(delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskPriority::DataDistribution));
} }
@ -5250,8 +5252,13 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
} }
} }
int newTssToRecruit = targetTSSInDC - self->tss_info_by_pair.size() - inProgressTSSCount; int newTssToRecruit = targetTSSInDC - self->tss_info_by_pair.size() - inProgressTSSCount;
// FIXME: Should log this if the recruit count stays the same but the other numbers update?
if (newTssToRecruit != tssToRecruit) { if (newTssToRecruit != tssToRecruit) {
TraceEvent("TSS_RecruitUpdated", self->distributorId).detail("Count", newTssToRecruit); TraceEvent("TSS_RecruitUpdated", self->distributorId)
.detail("Desired", targetTSSInDC)
.detail("Existing", self->tss_info_by_pair.size())
.detail("InProgress", inProgressTSSCount)
.detail("NotStarted", newTssToRecruit);
tssToRecruit = newTssToRecruit; tssToRecruit = newTssToRecruit;
// if we need to get rid of some TSS processes, signal to either cancel recruitment or kill existing TSS // if we need to get rid of some TSS processes, signal to either cancel recruitment or kill existing TSS

View File

@ -635,9 +635,8 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
state int retries = 0; state int retries = 0;
state FlowLock::Releaser releaser; state FlowLock::Releaser releaser;
state std::vector<std::pair<UID, UID>> tssToKill;
state std::unordered_set<UID> tssToIgnore; state std::unordered_set<UID> tssToIgnore;
// try waiting for tss for a 2 loops, give up if they're stuck to not affect the rest of the cluster // try waiting for tss for a 2 loops, give up if they're behind to not affect the rest of the cluster
state int waitForTSSCounter = 2; state int waitForTSSCounter = 2;
ASSERT(!destinationTeam.empty()); ASSERT(!destinationTeam.empty());
@ -657,22 +656,6 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
// printf("finishMoveKeys( '%s'-'%s' )\n", begin.toString().c_str(), keys.end.toString().c_str()); // printf("finishMoveKeys( '%s'-'%s' )\n", begin.toString().c_str(), keys.end.toString().c_str());
loop { loop {
try { try {
if (tssToKill.size()) {
TEST(true); // killing TSS because they were unavailable for movekeys
// Kill tss BEFORE committing main txn so that client requests don't make it to the tss when it
// has a different shard set than its pair use a different RYW transaction since i'm too lazy
// (and don't want to add bugs) by changing whole method to RYW. Also, using a different
// transaction makes it commit earlier which we may need to guarantee causality of tss getting
// removed before client sends a request to this key range on the new SS
wait(removeTSSPairsFromCluster(occ, tssToKill));
for (auto& tssPair : tssToKill) {
TraceEvent(SevWarnAlways, "TSS_KillMoveKeys").detail("TSSID", tssPair.second);
tssToIgnore.insert(tssPair.second);
}
tssToKill.clear();
}
tr.info.taskID = TaskPriority::MoveKeys; tr.info.taskID = TaskPriority::MoveKeys;
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
@ -877,8 +860,8 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
TaskPriority::MoveKeys)); TaskPriority::MoveKeys));
// Check to see if we're waiting only on tss. If so, decrement the waiting counter. // Check to see if we're waiting only on tss. If so, decrement the waiting counter.
// If the waiting counter is zero, kill the slow/non-responsive tss processes before finalizing the // If the waiting counter is zero, ignore the slow/non-responsive tss processes before finalizing
// data move. // the data move.
if (tssReady.size()) { if (tssReady.size()) {
bool allSSDone = true; bool allSSDone = true;
for (auto& f : serverReady) { for (auto& f : serverReady) {
@ -902,12 +885,9 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
if (anyTssNotDone && waitForTSSCounter == 0) { if (anyTssNotDone && waitForTSSCounter == 0) {
for (int i = 0; i < tssReady.size(); i++) { for (int i = 0; i < tssReady.size(); i++) {
if (!tssReady[i].isReady() || tssReady[i].isError()) { if (!tssReady[i].isReady() || tssReady[i].isError()) {
tssToKill.push_back( tssToIgnore.insert(tssReadyInterfs[i].id());
std::pair(tssReadyInterfs[i].tssPairID.get(), tssReadyInterfs[i].id()));
} }
} }
// repeat loop and go back to start to kill tss' before continuing on
continue;
} }
} }
} }
@ -920,22 +900,11 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
for (int s = 0; s < tssReady.size(); s++) for (int s = 0; s < tssReady.size(); s++)
tssCount += tssReady[s].isReady() && !tssReady[s].isError(); tssCount += tssReady[s].isReady() && !tssReady[s].isError();
/*if (tssReady.size()) { TraceEvent readyServersEv(SevDebug, waitInterval.end(), relocationIntervalId);
printf(" fMK: [%s - %s) moved data to %d/%d servers and %d/%d tss\n", readyServersEv.detail("ReadyServers", count);
begin.toString().c_str(), if (tssReady.size()) {
keys.end.toString().c_str(), readyServersEv.detail("ReadyTSS", tssCount);
count, }
serverReady.size(),
tssCount,
tssReady.size());
} else {
printf(" fMK: [%s - %s) moved data to %d/%d servers\n",
begin.toString().c_str(),
keys.end.toString().c_str(),
count,
serverReady.size());
}*/
TraceEvent(SevDebug, waitInterval.end(), relocationIntervalId).detail("ReadyServers", count);
if (count == dest.size()) { if (count == dest.size()) {
// update keyServers, serverKeys // update keyServers, serverKeys

View File

@ -758,6 +758,7 @@ public:
Counter loops; Counter loops;
Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount; Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount;
Counter readsRejected; Counter readsRejected;
Counter wrongShardServer;
Counter fetchedVersions; Counter fetchedVersions;
Counter fetchesFromLogs; Counter fetchesFromLogs;
@ -778,8 +779,8 @@ public:
updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc),
fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc), fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc),
fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc), fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc),
fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc), wrongShardServer("WrongShardServer", cc), fetchedVersions("FetchedVersions", cc),
readLatencySample("ReadLatencyMetrics", fetchesFromLogs("FetchesFromLogs", cc), readLatencySample("ReadLatencyMetrics",
self->thisServerID, self->thisServerID,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE), SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
@ -950,8 +951,11 @@ public:
using isLoadBalancedReply = std::is_base_of<LoadBalancedReply, Reply>; using isLoadBalancedReply = std::is_base_of<LoadBalancedReply, Reply>;
template <class Reply> template <class Reply>
static typename std::enable_if<isLoadBalancedReply<Reply>::value, void>::type typename std::enable_if<isLoadBalancedReply<Reply>::value, void>::type
sendErrorWithPenalty(const ReplyPromise<Reply>& promise, const Error& err, double penalty) { sendErrorWithPenalty(const ReplyPromise<Reply>& promise, const Error& err, double penalty) {
if (err.code() == error_code_wrong_shard_server) {
++counters.wrongShardServer;
}
Reply reply; Reply reply;
reply.error = err; reply.error = err;
reply.penalty = penalty; reply.penalty = penalty;
@ -959,8 +963,11 @@ public:
} }
template <class Reply> template <class Reply>
static typename std::enable_if<!isLoadBalancedReply<Reply>::value, void>::type typename std::enable_if<!isLoadBalancedReply<Reply>::value, void>::type
sendErrorWithPenalty(const ReplyPromise<Reply>& promise, const Error& err, double) { sendErrorWithPenalty(const ReplyPromise<Reply>& promise, const Error& err, double) {
if (err.code() == error_code_wrong_shard_server) {
++counters.wrongShardServer;
}
promise.sendError(err); promise.sendError(err);
} }
@ -2117,13 +2124,13 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
"TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.AfterVersion"); "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.AfterVersion");
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end); //.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", //} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin",
//req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard",
//"None").detail("In", "getKeyValues>getShardKeyRange"); throw e; } //"None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) { if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) {
// TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", // TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin",
//req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin",
//shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents"); // shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents");
throw wrong_shard_server(); throw wrong_shard_server();
} }
@ -2202,8 +2209,8 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
/*for( int i = 0; i < r.data.size(); i++ ) { /*for( int i = 0; i < r.data.size(); i++ ) {
StorageMetrics m; StorageMetrics m;
m.bytesPerKSecond = r.data[i].expectedSize(); m.bytesPerKSecond = r.data[i].expectedSize();
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an
data->metrics.notify(r.data[i].key, m); int data->metrics.notify(r.data[i].key, m);
}*/ }*/
// For performance concerns, the cost of a range read is billed to the start key and end key of the // For performance concerns, the cost of a range read is billed to the start key and end key of the