diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 51bed23693..ebca9a7976 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -154,16 +154,25 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe result->second = tssi; } + // data requests duplicated for load and data comparison queueModel.updateTssEndpoint(ssi.getValue.getEndpoint().token.first(), TSSEndpointData(tssi.id(), tssi.getValue.getEndpoint(), metrics)); queueModel.updateTssEndpoint(ssi.getKey.getEndpoint().token.first(), TSSEndpointData(tssi.id(), tssi.getKey.getEndpoint(), metrics)); queueModel.updateTssEndpoint(ssi.getKeyValues.getEndpoint().token.first(), TSSEndpointData(tssi.id(), tssi.getKeyValues.getEndpoint(), metrics)); - queueModel.updateTssEndpoint(ssi.watchValue.getEndpoint().token.first(), - TSSEndpointData(tssi.id(), tssi.watchValue.getEndpoint(), metrics)); queueModel.updateTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first(), TSSEndpointData(tssi.id(), tssi.getKeyValuesStream.getEndpoint(), metrics)); + + // non-data requests duplicated for load + queueModel.updateTssEndpoint(ssi.watchValue.getEndpoint().token.first(), + TSSEndpointData(tssi.id(), tssi.watchValue.getEndpoint(), metrics)); + queueModel.updateTssEndpoint(ssi.splitMetrics.getEndpoint().token.first(), + TSSEndpointData(tssi.id(), tssi.splitMetrics.getEndpoint(), metrics)); + queueModel.updateTssEndpoint(ssi.getReadHotRanges.getEndpoint().token.first(), + TSSEndpointData(tssi.id(), tssi.getReadHotRanges.getEndpoint(), metrics)); + queueModel.updateTssEndpoint(ssi.getRangeSplitPoints.getEndpoint().token.first(), + TSSEndpointData(tssi.id(), tssi.getRangeSplitPoints.getEndpoint(), metrics)); } } @@ -175,8 +184,12 @@ void DatabaseContext::removeTssMapping(StorageServerInterface const& ssi) { queueModel.removeTssEndpoint(ssi.getValue.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKey.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKeyValues.getEndpoint().token.first()); - queueModel.removeTssEndpoint(ssi.watchValue.getEndpoint().token.first()); queueModel.removeTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first()); + + queueModel.removeTssEndpoint(ssi.watchValue.getEndpoint().token.first()); + queueModel.removeTssEndpoint(ssi.splitMetrics.getEndpoint().token.first()); + queueModel.removeTssEndpoint(ssi.getReadHotRanges.getEndpoint().token.first()); + queueModel.removeTssEndpoint(ssi.getRangeSplitPoints.getEndpoint().token.first()); } } diff --git a/fdbclient/StorageServerInterface.cpp b/fdbclient/StorageServerInterface.cpp index d379a0fa69..d67df9cf95 100644 --- a/fdbclient/StorageServerInterface.cpp +++ b/fdbclient/StorageServerInterface.cpp @@ -210,6 +210,66 @@ void TSS_traceMismatch(TraceEvent& event, ASSERT(false); } +template <> +bool TSS_doCompare(const SplitMetricsReply& src, const SplitMetricsReply& tss) { + // We duplicate split metrics just for load, no need to validate replies. + return true; +} + +template <> +const char* TSS_mismatchTraceName(const SplitMetricsRequest& req) { + ASSERT(false); + return ""; +} + +template <> +void TSS_traceMismatch(TraceEvent& event, + const SplitMetricsRequest& req, + const SplitMetricsReply& src, + const SplitMetricsReply& tss) { + ASSERT(false); +} + +template <> +bool TSS_doCompare(const ReadHotSubRangeReply& src, const ReadHotSubRangeReply& tss) { + // We duplicate read hot sub range metrics just for load, no need to validate replies. + return true; +} + +template <> +const char* TSS_mismatchTraceName(const ReadHotSubRangeRequest& req) { + ASSERT(false); + return ""; +} + +template <> +void TSS_traceMismatch(TraceEvent& event, + const ReadHotSubRangeRequest& req, + const ReadHotSubRangeReply& src, + const ReadHotSubRangeReply& tss) { + ASSERT(false); +} + +template <> +bool TSS_doCompare(const SplitRangeReply& src, const SplitRangeReply& tss) { + // We duplicate read hot sub range metrics just for load, no need to validate replies. + return true; +} + +template <> +const char* TSS_mismatchTraceName(const SplitRangeRequest& req) { + ASSERT(false); + return ""; +} + +template <> +void TSS_traceMismatch(TraceEvent& event, + const SplitRangeRequest& req, + const SplitRangeReply& src, + const SplitRangeReply& tss) { + ASSERT(false); +} + // template specializations for metrics replies that should never be called because these requests aren't duplicated // storage metrics @@ -233,69 +293,6 @@ void TSS_traceMismatch(TraceEvent& event, ASSERT(false); } -// split metrics -template <> -bool TSS_doCompare(const SplitMetricsReply& src, const SplitMetricsReply& tss) { - ASSERT(false); - return true; -} - -template <> -const char* TSS_mismatchTraceName(const SplitMetricsRequest& req) { - ASSERT(false); - return ""; -} - -template <> -void TSS_traceMismatch(TraceEvent& event, - const SplitMetricsRequest& req, - const SplitMetricsReply& src, - const SplitMetricsReply& tss) { - ASSERT(false); -} - -// read hot sub range -template <> -bool TSS_doCompare(const ReadHotSubRangeReply& src, const ReadHotSubRangeReply& tss) { - ASSERT(false); - return true; -} - -template <> -const char* TSS_mismatchTraceName(const ReadHotSubRangeRequest& req) { - ASSERT(false); - return ""; -} - -template <> -void TSS_traceMismatch(TraceEvent& event, - const ReadHotSubRangeRequest& req, - const ReadHotSubRangeReply& src, - const ReadHotSubRangeReply& tss) { - ASSERT(false); -} - -// split range -template <> -bool TSS_doCompare(const SplitRangeReply& src, const SplitRangeReply& tss) { - ASSERT(false); - return true; -} - -template <> -const char* TSS_mismatchTraceName(const SplitRangeRequest& req) { - ASSERT(false); - return ""; -} - -template <> -void TSS_traceMismatch(TraceEvent& event, - const SplitRangeRequest& req, - const SplitRangeReply& src, - const SplitRangeReply& tss) { - ASSERT(false); -} - // only record metrics for data reads template <> diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 394f2ca8fc..9532f85f04 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -5131,8 +5131,10 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self, if (doRecruit && newServer.isError()) { TraceEvent(SevWarn, "DDRecruitmentError").error(newServer.getError()); if (!newServer.isError(error_code_recruitment_failed) && - !newServer.isError(error_code_request_maybe_delivered)) + !newServer.isError(error_code_request_maybe_delivered)) { + tssState->markComplete(); throw newServer.getError(); + } wait(delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskPriority::DataDistribution)); } @@ -5250,8 +5252,13 @@ ACTOR Future<Void> storageRecruiter(DDTeamCollection* self, } } int newTssToRecruit = targetTSSInDC - self->tss_info_by_pair.size() - inProgressTSSCount; + // FIXME: Should log this if the recruit count stays the same but the other numbers update? if (newTssToRecruit != tssToRecruit) { - TraceEvent("TSS_RecruitUpdated", self->distributorId).detail("Count", newTssToRecruit); + TraceEvent("TSS_RecruitUpdated", self->distributorId) + .detail("Desired", targetTSSInDC) + .detail("Existing", self->tss_info_by_pair.size()) + .detail("InProgress", inProgressTSSCount) + .detail("NotStarted", newTssToRecruit); tssToRecruit = newTssToRecruit; // if we need to get rid of some TSS processes, signal to either cancel recruitment or kill existing TSS diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 55706e458f..7a314a27a8 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -635,9 +635,8 @@ ACTOR static Future<Void> finishMoveKeys(Database occ, state int retries = 0; state FlowLock::Releaser releaser; - state std::vector<std::pair<UID, UID>> tssToKill; state std::unordered_set<UID> tssToIgnore; - // try waiting for tss for a 2 loops, give up if they're stuck to not affect the rest of the cluster + // try waiting for tss for a 2 loops, give up if they're behind to not affect the rest of the cluster state int waitForTSSCounter = 2; ASSERT(!destinationTeam.empty()); @@ -657,22 +656,6 @@ ACTOR static Future<Void> finishMoveKeys(Database occ, // printf("finishMoveKeys( '%s'-'%s' )\n", begin.toString().c_str(), keys.end.toString().c_str()); loop { try { - if (tssToKill.size()) { - TEST(true); // killing TSS because they were unavailable for movekeys - - // Kill tss BEFORE committing main txn so that client requests don't make it to the tss when it - // has a different shard set than its pair use a different RYW transaction since i'm too lazy - // (and don't want to add bugs) by changing whole method to RYW. Also, using a different - // transaction makes it commit earlier which we may need to guarantee causality of tss getting - // removed before client sends a request to this key range on the new SS - wait(removeTSSPairsFromCluster(occ, tssToKill)); - - for (auto& tssPair : tssToKill) { - TraceEvent(SevWarnAlways, "TSS_KillMoveKeys").detail("TSSID", tssPair.second); - tssToIgnore.insert(tssPair.second); - } - tssToKill.clear(); - } tr.info.taskID = TaskPriority::MoveKeys; tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); @@ -877,8 +860,8 @@ ACTOR static Future<Void> finishMoveKeys(Database occ, TaskPriority::MoveKeys)); // Check to see if we're waiting only on tss. If so, decrement the waiting counter. - // If the waiting counter is zero, kill the slow/non-responsive tss processes before finalizing the - // data move. + // If the waiting counter is zero, ignore the slow/non-responsive tss processes before finalizing + // the data move. if (tssReady.size()) { bool allSSDone = true; for (auto& f : serverReady) { @@ -902,12 +885,9 @@ ACTOR static Future<Void> finishMoveKeys(Database occ, if (anyTssNotDone && waitForTSSCounter == 0) { for (int i = 0; i < tssReady.size(); i++) { if (!tssReady[i].isReady() || tssReady[i].isError()) { - tssToKill.push_back( - std::pair(tssReadyInterfs[i].tssPairID.get(), tssReadyInterfs[i].id())); + tssToIgnore.insert(tssReadyInterfs[i].id()); } } - // repeat loop and go back to start to kill tss' before continuing on - continue; } } } @@ -920,22 +900,11 @@ ACTOR static Future<Void> finishMoveKeys(Database occ, for (int s = 0; s < tssReady.size(); s++) tssCount += tssReady[s].isReady() && !tssReady[s].isError(); - /*if (tssReady.size()) { - printf(" fMK: [%s - %s) moved data to %d/%d servers and %d/%d tss\n", - begin.toString().c_str(), - keys.end.toString().c_str(), - count, - serverReady.size(), - tssCount, - tssReady.size()); - } else { - printf(" fMK: [%s - %s) moved data to %d/%d servers\n", - begin.toString().c_str(), - keys.end.toString().c_str(), - count, - serverReady.size()); - }*/ - TraceEvent(SevDebug, waitInterval.end(), relocationIntervalId).detail("ReadyServers", count); + TraceEvent readyServersEv(SevDebug, waitInterval.end(), relocationIntervalId); + readyServersEv.detail("ReadyServers", count); + if (tssReady.size()) { + readyServersEv.detail("ReadyTSS", tssCount); + } if (count == dest.size()) { // update keyServers, serverKeys diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 15a06b5ce5..6470c3900e 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -758,6 +758,7 @@ public: Counter loops; Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount; Counter readsRejected; + Counter wrongShardServer; Counter fetchedVersions; Counter fetchesFromLogs; @@ -778,11 +779,11 @@ public: updateVersions("UpdateVersions", cc), loops("Loops", cc), fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc), fetchExecutingMS("FetchExecutingMS", cc), fetchExecutingCount("FetchExecutingCount", cc), readsRejected("ReadsRejected", cc), - fetchedVersions("FetchedVersions", cc), fetchesFromLogs("FetchesFromLogs", cc), - readLatencySample("ReadLatencyMetrics", - self->thisServerID, - SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, - SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + wrongShardServer("WrongShardServer", cc), fetchedVersions("FetchedVersions", cc), + fetchesFromLogs("FetchesFromLogs", cc), readLatencySample("ReadLatencyMetrics", + self->thisServerID, + SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SAMPLE_SIZE), readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; }); specialCounter(cc, "Version", [self]() { return self->version.get(); }); @@ -950,8 +951,11 @@ public: using isLoadBalancedReply = std::is_base_of<LoadBalancedReply, Reply>; template <class Reply> - static typename std::enable_if<isLoadBalancedReply<Reply>::value, void>::type + typename std::enable_if<isLoadBalancedReply<Reply>::value, void>::type sendErrorWithPenalty(const ReplyPromise<Reply>& promise, const Error& err, double penalty) { + if (err.code() == error_code_wrong_shard_server) { + ++counters.wrongShardServer; + } Reply reply; reply.error = err; reply.penalty = penalty; @@ -959,8 +963,11 @@ public: } template <class Reply> - static typename std::enable_if<!isLoadBalancedReply<Reply>::value, void>::type + typename std::enable_if<!isLoadBalancedReply<Reply>::value, void>::type sendErrorWithPenalty(const ReplyPromise<Reply>& promise, const Error& err, double) { + if (err.code() == error_code_wrong_shard_server) { + ++counters.wrongShardServer; + } promise.sendError(err); } @@ -2117,13 +2124,13 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe "TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValuesStream.AfterVersion"); //.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end); //} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", - //req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", + // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", //"None").detail("In", "getKeyValues>getShardKeyRange"); throw e; } if (!selectorInRange(req.end, shard) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == shard.end)) { // TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", - //req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", - //shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents"); + // req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", + // shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents"); throw wrong_shard_server(); } @@ -2200,10 +2207,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe } /*for( int i = 0; i < r.data.size(); i++ ) { - StorageMetrics m; - m.bytesPerKSecond = r.data[i].expectedSize(); - m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int - data->metrics.notify(r.data[i].key, m); + StorageMetrics m; + m.bytesPerKSecond = r.data[i].expectedSize(); + m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an + int data->metrics.notify(r.data[i].key, m); }*/ // For performance concerns, the cost of a range read is billed to the start key and end key of the