mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 02:18:39 +08:00
Merge pull request #4965 from sfc-gh-jslocum/tss_mapping_changes
TSS Mismatch Changes
This commit is contained in:
commit
ab0ab765c8
@ -275,7 +275,7 @@ public:
|
|||||||
Future<Void> monitorProxiesInfoChange;
|
Future<Void> monitorProxiesInfoChange;
|
||||||
Future<Void> monitorTssInfoChange;
|
Future<Void> monitorTssInfoChange;
|
||||||
Future<Void> tssMismatchHandler;
|
Future<Void> tssMismatchHandler;
|
||||||
PromiseStream<UID> tssMismatchStream;
|
PromiseStream<std::pair<UID, std::vector<DetailedTSSMismatch>>> tssMismatchStream;
|
||||||
Reference<CommitProxyInfo> commitProxies;
|
Reference<CommitProxyInfo> commitProxies;
|
||||||
Reference<GrvProxyInfo> grvProxies;
|
Reference<GrvProxyInfo> grvProxies;
|
||||||
bool proxyProvisional; // Provisional commit proxy and grv proxy are used at the same time.
|
bool proxyProvisional; // Provisional commit proxy and grv proxy are used at the same time.
|
||||||
@ -430,12 +430,12 @@ public:
|
|||||||
static const std::vector<std::string> debugTransactionTagChoices;
|
static const std::vector<std::string> debugTransactionTagChoices;
|
||||||
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
|
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
|
||||||
|
|
||||||
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
|
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
|
||||||
// Requests to the storage server will be duplicated to the TSS.
|
// Requests to the storage server will be duplicated to the TSS.
|
||||||
void addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi);
|
void addTssMapping(StorageServerInterface const& ssi, StorageServerInterface const& tssi);
|
||||||
|
|
||||||
// Removes the storage server and its TSS pair from the TSS mapping (if present).
|
// Removes the storage server and its TSS pair from the TSS mapping (if present).
|
||||||
// Requests to the storage server will no longer be duplicated to its pair TSS.
|
// Requests to the storage server will no longer be duplicated to its pair TSS.
|
||||||
void removeTssMapping(StorageServerInterface const& ssi);
|
void removeTssMapping(StorageServerInterface const& ssi);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -384,10 +384,11 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
|
|||||||
cx->bytesPerCommit.clear();
|
cx->bytesPerCommit.clear();
|
||||||
|
|
||||||
for (const auto& it : cx->tssMetrics) {
|
for (const auto& it : cx->tssMetrics) {
|
||||||
// TODO could skip this tss if request counter is zero? would potentially complicate elapsed calculation
|
// TODO could skip this whole thing if tss if request counter is zero?
|
||||||
// though
|
// That would potentially complicate elapsed calculation though
|
||||||
if (it.second->mismatches.getIntervalDelta()) {
|
if (it.second->mismatches.getIntervalDelta()) {
|
||||||
cx->tssMismatchStream.send(it.first);
|
cx->tssMismatchStream.send(
|
||||||
|
std::pair<UID, std::vector<DetailedTSSMismatch>>(it.first, it.second->detailedMismatches));
|
||||||
}
|
}
|
||||||
|
|
||||||
// do error histograms as separate event
|
// do error histograms as separate event
|
||||||
@ -826,13 +827,15 @@ ACTOR Future<Void> monitorCacheList(DatabaseContext* self) {
|
|||||||
ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
|
ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
|
||||||
state Reference<ReadYourWritesTransaction> tr;
|
state Reference<ReadYourWritesTransaction> tr;
|
||||||
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
|
state KeyBackedMap<UID, UID> tssMapDB = KeyBackedMap<UID, UID>(tssMappingKeys.begin);
|
||||||
|
state KeyBackedMap<Tuple, std::string> tssMismatchDB = KeyBackedMap<Tuple, std::string>(tssMismatchKeys.begin);
|
||||||
loop {
|
loop {
|
||||||
state UID tssID = waitNext(cx->tssMismatchStream.getFuture());
|
// <tssid, list of detailed mismatch data>
|
||||||
|
state std::pair<UID, std::vector<DetailedTSSMismatch>> data = waitNext(cx->tssMismatchStream.getFuture());
|
||||||
// find ss pair id so we can remove it from the mapping
|
// find ss pair id so we can remove it from the mapping
|
||||||
state UID tssPairID;
|
state UID tssPairID;
|
||||||
bool found = false;
|
bool found = false;
|
||||||
for (const auto& it : cx->tssMapping) {
|
for (const auto& it : cx->tssMapping) {
|
||||||
if (it.second.id() == tssID) {
|
if (it.second.id() == data.first) {
|
||||||
tssPairID = it.first;
|
tssPairID = it.first;
|
||||||
found = true;
|
found = true;
|
||||||
break;
|
break;
|
||||||
@ -841,7 +844,7 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
|
|||||||
if (found) {
|
if (found) {
|
||||||
state bool quarantine = CLIENT_KNOBS->QUARANTINE_TSS_ON_MISMATCH;
|
state bool quarantine = CLIENT_KNOBS->QUARANTINE_TSS_ON_MISMATCH;
|
||||||
TraceEvent(SevWarnAlways, quarantine ? "TSS_QuarantineMismatch" : "TSS_KillMismatch")
|
TraceEvent(SevWarnAlways, quarantine ? "TSS_QuarantineMismatch" : "TSS_KillMismatch")
|
||||||
.detail("TSSID", tssID.toString());
|
.detail("TSSID", data.first.toString());
|
||||||
TEST(quarantine); // Quarantining TSS because it got mismatch
|
TEST(quarantine); // Quarantining TSS because it got mismatch
|
||||||
TEST(!quarantine); // Killing TSS because it got mismatch
|
TEST(!quarantine); // Killing TSS because it got mismatch
|
||||||
|
|
||||||
@ -851,14 +854,21 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
|
|||||||
try {
|
try {
|
||||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
|
||||||
if (quarantine) {
|
if (quarantine) {
|
||||||
tr->set(tssQuarantineKeyFor(tssID), LiteralStringRef(""));
|
tr->set(tssQuarantineKeyFor(data.first), LiteralStringRef(""));
|
||||||
} else {
|
} else {
|
||||||
tr->clear(serverTagKeyFor(tssID));
|
tr->clear(serverTagKeyFor(data.first));
|
||||||
}
|
}
|
||||||
tssMapDB.erase(tr, tssPairID);
|
tssMapDB.erase(tr, tssPairID);
|
||||||
|
|
||||||
|
for (const DetailedTSSMismatch& d : data.second) {
|
||||||
|
// <tssid, time, mismatchid> -> mismatch data
|
||||||
|
tssMismatchDB.set(
|
||||||
|
tr,
|
||||||
|
Tuple().append(data.first.toString()).append(d.timestamp).append(d.mismatchId.toString()),
|
||||||
|
d.traceString);
|
||||||
|
}
|
||||||
|
|
||||||
wait(tr->commit());
|
wait(tr->commit());
|
||||||
|
|
||||||
break;
|
break;
|
||||||
@ -868,7 +878,7 @@ ACTOR static Future<Void> handleTssMismatches(DatabaseContext* cx) {
|
|||||||
tries++;
|
tries++;
|
||||||
if (tries > 10) {
|
if (tries > 10) {
|
||||||
// Give up, it'll get another mismatch or a human will investigate eventually
|
// Give up, it'll get another mismatch or a human will investigate eventually
|
||||||
TraceEvent("TSS_MismatchGaveUp").detail("TSSID", tssID.toString());
|
TraceEvent("TSS_MismatchGaveUp").detail("TSSID", data.first.toString());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,32 +30,31 @@ std::string traceChecksumValue(ValueRef s) {
|
|||||||
return s.size() > 12 ? format("(%d)%08x", s.size(), crc32c_append(0, s.begin(), s.size())) : s.toString();
|
return s.size() > 12 ? format("(%d)%08x", s.size(), crc32c_append(0, s.begin(), s.size())) : s.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// point reads
|
||||||
template <>
|
template <>
|
||||||
bool TSS_doCompare(const GetValueRequest& req,
|
bool TSS_doCompare(const GetValueReply& src, const GetValueReply& tss) {
|
||||||
const GetValueReply& src,
|
return src.value.present() == tss.value.present() && (!src.value.present() || src.value.get() == tss.value.get());
|
||||||
const GetValueReply& tss,
|
|
||||||
Severity traceSeverity,
|
|
||||||
UID tssId) {
|
|
||||||
if (src.value.present() != tss.value.present() || (src.value.present() && src.value.get() != tss.value.get())) {
|
|
||||||
TraceEvent(traceSeverity, "TSSMismatchGetValue")
|
|
||||||
.suppressFor(1.0)
|
|
||||||
.detail("TSSID", tssId)
|
|
||||||
.detail("Key", req.key.printable())
|
|
||||||
.detail("Version", req.version)
|
|
||||||
.detail("SSReply", src.value.present() ? traceChecksumValue(src.value.get()) : "missing")
|
|
||||||
.detail("TSSReply", tss.value.present() ? traceChecksumValue(tss.value.get()) : "missing");
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
bool TSS_doCompare(const GetKeyRequest& req,
|
const char* TSS_mismatchTraceName(const GetValueRequest& req) {
|
||||||
const GetKeyReply& src,
|
return "TSSMismatchGetValue";
|
||||||
const GetKeyReply& tss,
|
}
|
||||||
Severity traceSeverity,
|
|
||||||
UID tssId) {
|
template <>
|
||||||
|
void TSS_traceMismatch(TraceEvent& event,
|
||||||
|
const GetValueRequest& req,
|
||||||
|
const GetValueReply& src,
|
||||||
|
const GetValueReply& tss) {
|
||||||
|
event.detail("Key", req.key.printable())
|
||||||
|
.detail("Version", req.version)
|
||||||
|
.detail("SSReply", src.value.present() ? traceChecksumValue(src.value.get()) : "missing")
|
||||||
|
.detail("TSSReply", tss.value.present() ? traceChecksumValue(tss.value.get()) : "missing");
|
||||||
|
}
|
||||||
|
|
||||||
|
// key selector reads
|
||||||
|
template <>
|
||||||
|
bool TSS_doCompare(const GetKeyReply& src, const GetKeyReply& tss) {
|
||||||
// This process is a bit complicated. Since the tss and ss can return different results if neighboring shards to
|
// This process is a bit complicated. Since the tss and ss can return different results if neighboring shards to
|
||||||
// req.sel.key are currently being moved, We validate that the results are the same IF the returned key selectors
|
// req.sel.key are currently being moved, We validate that the results are the same IF the returned key selectors
|
||||||
// are final. Otherwise, we only mark the request as a mismatch if the difference between the two returned key
|
// are final. Otherwise, we only mark the request as a mismatch if the difference between the two returned key
|
||||||
@ -92,107 +91,170 @@ bool TSS_doCompare(const GetKeyRequest& req,
|
|||||||
bool tssOffsetLarger = (src.sel.offset == tss.sel.offset) ? tss.sel.orEqual : src.sel.offset < tss.sel.offset;
|
bool tssOffsetLarger = (src.sel.offset == tss.sel.offset) ? tss.sel.orEqual : src.sel.offset < tss.sel.offset;
|
||||||
matches = tssKeyLarger != tssOffsetLarger;
|
matches = tssKeyLarger != tssOffsetLarger;
|
||||||
}
|
}
|
||||||
if (!matches) {
|
|
||||||
TraceEvent(traceSeverity, "TSSMismatchGetKey")
|
|
||||||
.suppressFor(1.0)
|
|
||||||
.detail("TSSID", tssId)
|
|
||||||
.detail("KeySelector",
|
|
||||||
format("%s%s:%d", req.sel.orEqual ? "=" : "", req.sel.getKey().printable().c_str(), req.sel.offset))
|
|
||||||
.detail("Version", req.version)
|
|
||||||
.detail("SSReply",
|
|
||||||
format("%s%s:%d", src.sel.orEqual ? "=" : "", src.sel.getKey().printable().c_str(), src.sel.offset))
|
|
||||||
.detail(
|
|
||||||
"TSSReply",
|
|
||||||
format("%s%s:%d", tss.sel.orEqual ? "=" : "", tss.sel.getKey().printable().c_str(), tss.sel.offset));
|
|
||||||
}
|
|
||||||
return matches;
|
return matches;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
bool TSS_doCompare(const GetKeyValuesRequest& req,
|
const char* TSS_mismatchTraceName(const GetKeyRequest& req) {
|
||||||
const GetKeyValuesReply& src,
|
return "TSSMismatchGetKey";
|
||||||
const GetKeyValuesReply& tss,
|
}
|
||||||
Severity traceSeverity,
|
|
||||||
UID tssId) {
|
|
||||||
if (src.more != tss.more || src.data != tss.data) {
|
|
||||||
|
|
||||||
std::string ssResultsString = format("(%d)%s:\n", src.data.size(), src.more ? "+" : "");
|
template <>
|
||||||
for (auto& it : src.data) {
|
void TSS_traceMismatch(TraceEvent& event, const GetKeyRequest& req, const GetKeyReply& src, const GetKeyReply& tss) {
|
||||||
ssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
|
event
|
||||||
}
|
.detail("KeySelector",
|
||||||
|
format("%s%s:%d", req.sel.orEqual ? "=" : "", req.sel.getKey().printable().c_str(), req.sel.offset))
|
||||||
|
.detail("Version", req.version)
|
||||||
|
.detail("SSReply",
|
||||||
|
format("%s%s:%d", src.sel.orEqual ? "=" : "", src.sel.getKey().printable().c_str(), src.sel.offset))
|
||||||
|
.detail("TSSReply",
|
||||||
|
format("%s%s:%d", tss.sel.orEqual ? "=" : "", tss.sel.getKey().printable().c_str(), tss.sel.offset));
|
||||||
|
}
|
||||||
|
|
||||||
std::string tssResultsString = format("(%d)%s:\n", tss.data.size(), tss.more ? "+" : "");
|
// range reads
|
||||||
for (auto& it : tss.data) {
|
template <>
|
||||||
tssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
|
bool TSS_doCompare(const GetKeyValuesReply& src, const GetKeyValuesReply& tss) {
|
||||||
}
|
return src.more == tss.more && src.data == tss.data;
|
||||||
|
}
|
||||||
|
|
||||||
TraceEvent(traceSeverity, "TSSMismatchGetKeyValues")
|
template <>
|
||||||
.suppressFor(1.0)
|
const char* TSS_mismatchTraceName(const GetKeyValuesRequest& req) {
|
||||||
.detail("TSSID", tssId)
|
return "TSSMismatchGetKeyValues";
|
||||||
.detail(
|
}
|
||||||
"Begin",
|
|
||||||
format(
|
|
||||||
"%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
|
|
||||||
.detail("End",
|
|
||||||
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
|
|
||||||
.detail("Version", req.version)
|
|
||||||
.detail("Limit", req.limit)
|
|
||||||
.detail("LimitBytes", req.limitBytes)
|
|
||||||
.detail("SSReply", ssResultsString)
|
|
||||||
.detail("TSSReply", tssResultsString);
|
|
||||||
|
|
||||||
return false;
|
template <>
|
||||||
|
void TSS_traceMismatch(TraceEvent& event,
|
||||||
|
const GetKeyValuesRequest& req,
|
||||||
|
const GetKeyValuesReply& src,
|
||||||
|
const GetKeyValuesReply& tss) {
|
||||||
|
std::string ssResultsString = format("(%d)%s:\n", src.data.size(), src.more ? "+" : "");
|
||||||
|
for (auto& it : src.data) {
|
||||||
|
ssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string tssResultsString = format("(%d)%s:\n", tss.data.size(), tss.more ? "+" : "");
|
||||||
|
for (auto& it : tss.data) {
|
||||||
|
tssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
|
||||||
|
}
|
||||||
|
event
|
||||||
|
.detail(
|
||||||
|
"Begin",
|
||||||
|
format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
|
||||||
|
.detail("End",
|
||||||
|
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
|
||||||
|
.detail("Version", req.version)
|
||||||
|
.detail("Limit", req.limit)
|
||||||
|
.detail("LimitBytes", req.limitBytes)
|
||||||
|
.detail("SSReply", ssResultsString)
|
||||||
|
.detail("TSSReply", tssResultsString);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
bool TSS_doCompare(const WatchValueReply& src, const WatchValueReply& tss) {
|
||||||
|
// We duplicate watches just for load, no need to validate replies.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
bool TSS_doCompare(const WatchValueRequest& req,
|
const char* TSS_mismatchTraceName(const WatchValueRequest& req) {
|
||||||
const WatchValueReply& src,
|
ASSERT(false);
|
||||||
const WatchValueReply& tss,
|
return "";
|
||||||
Severity traceSeverity,
|
|
||||||
UID tssId) {
|
|
||||||
// We duplicate watches just for load, no need to validte replies.
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// no-op template specializations for metrics replies
|
|
||||||
template <>
|
template <>
|
||||||
bool TSS_doCompare(const WaitMetricsRequest& req,
|
void TSS_traceMismatch(TraceEvent& event,
|
||||||
const StorageMetrics& src,
|
const WatchValueRequest& req,
|
||||||
const StorageMetrics& tss,
|
const WatchValueReply& src,
|
||||||
Severity traceSeverity,
|
const WatchValueReply& tss) {
|
||||||
UID tssId) {
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// template specializations for metrics replies that should never be called because these requests aren't duplicated
|
||||||
|
|
||||||
|
// storage metrics
|
||||||
|
template <>
|
||||||
|
bool TSS_doCompare(const StorageMetrics& src, const StorageMetrics& tss) {
|
||||||
|
ASSERT(false);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
bool TSS_doCompare(const SplitMetricsRequest& req,
|
const char* TSS_mismatchTraceName(const WaitMetricsRequest& req) {
|
||||||
const SplitMetricsReply& src,
|
ASSERT(false);
|
||||||
const SplitMetricsReply& tss,
|
return "";
|
||||||
Severity traceSeverity,
|
}
|
||||||
UID tssId) {
|
|
||||||
|
template <>
|
||||||
|
void TSS_traceMismatch(TraceEvent& event,
|
||||||
|
const WaitMetricsRequest& req,
|
||||||
|
const StorageMetrics& src,
|
||||||
|
const StorageMetrics& tss) {
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// split metrics
|
||||||
|
template <>
|
||||||
|
bool TSS_doCompare(const SplitMetricsReply& src, const SplitMetricsReply& tss) {
|
||||||
|
ASSERT(false);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
bool TSS_doCompare(const ReadHotSubRangeRequest& req,
|
const char* TSS_mismatchTraceName(const SplitMetricsRequest& req) {
|
||||||
const ReadHotSubRangeReply& src,
|
ASSERT(false);
|
||||||
const ReadHotSubRangeReply& tss,
|
return "";
|
||||||
Severity traceSeverity,
|
}
|
||||||
UID tssId) {
|
|
||||||
|
template <>
|
||||||
|
void TSS_traceMismatch(TraceEvent& event,
|
||||||
|
const SplitMetricsRequest& req,
|
||||||
|
const SplitMetricsReply& src,
|
||||||
|
const SplitMetricsReply& tss) {
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// read hot sub range
|
||||||
|
template <>
|
||||||
|
bool TSS_doCompare(const ReadHotSubRangeReply& src, const ReadHotSubRangeReply& tss) {
|
||||||
|
ASSERT(false);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
bool TSS_doCompare(const SplitRangeRequest& req,
|
const char* TSS_mismatchTraceName(const ReadHotSubRangeRequest& req) {
|
||||||
const SplitRangeReply& src,
|
ASSERT(false);
|
||||||
const SplitRangeReply& tss,
|
return "";
|
||||||
Severity traceSeverity,
|
}
|
||||||
UID tssId) {
|
|
||||||
|
template <>
|
||||||
|
void TSS_traceMismatch(TraceEvent& event,
|
||||||
|
const ReadHotSubRangeRequest& req,
|
||||||
|
const ReadHotSubRangeReply& src,
|
||||||
|
const ReadHotSubRangeReply& tss) {
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// split range
|
||||||
|
template <>
|
||||||
|
bool TSS_doCompare(const SplitRangeReply& src, const SplitRangeReply& tss) {
|
||||||
|
ASSERT(false);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
const char* TSS_mismatchTraceName(const SplitRangeRequest& req) {
|
||||||
|
ASSERT(false);
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
void TSS_traceMismatch(TraceEvent& event,
|
||||||
|
const SplitRangeRequest& req,
|
||||||
|
const SplitRangeReply& src,
|
||||||
|
const SplitRangeReply& tss) {
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
// only record metrics for data reads
|
// only record metrics for data reads
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
@ -240,32 +302,20 @@ TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {
|
|||||||
std::string s_d = "d";
|
std::string s_d = "d";
|
||||||
std::string s_e = "e";
|
std::string s_e = "e";
|
||||||
|
|
||||||
// test getValue
|
|
||||||
GetValueRequest gvReq;
|
|
||||||
gvReq.key = StringRef(s_a);
|
|
||||||
gvReq.version = 5;
|
|
||||||
|
|
||||||
UID tssId;
|
UID tssId;
|
||||||
|
|
||||||
GetValueReply gvReplyMissing;
|
GetValueReply gvReplyMissing;
|
||||||
GetValueReply gvReplyA(Optional<Value>(StringRef(s_a)), false);
|
GetValueReply gvReplyA(Optional<Value>(StringRef(s_a)), false);
|
||||||
GetValueReply gvReplyB(Optional<Value>(StringRef(s_b)), false);
|
GetValueReply gvReplyB(Optional<Value>(StringRef(s_b)), false);
|
||||||
ASSERT(TSS_doCompare(gvReq, gvReplyMissing, gvReplyMissing, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gvReplyMissing, gvReplyMissing));
|
||||||
ASSERT(TSS_doCompare(gvReq, gvReplyA, gvReplyA, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gvReplyA, gvReplyA));
|
||||||
ASSERT(TSS_doCompare(gvReq, gvReplyB, gvReplyB, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gvReplyB, gvReplyB));
|
||||||
|
|
||||||
ASSERT(!TSS_doCompare(gvReq, gvReplyMissing, gvReplyA, SevInfo, tssId));
|
ASSERT(!TSS_doCompare(gvReplyMissing, gvReplyA));
|
||||||
ASSERT(!TSS_doCompare(gvReq, gvReplyA, gvReplyB, SevInfo, tssId));
|
ASSERT(!TSS_doCompare(gvReplyA, gvReplyB));
|
||||||
|
|
||||||
// test GetKeyValues
|
// test GetKeyValues
|
||||||
Arena a; // for all of the refs. ASAN complains if this isn't done. Could also make them all standalone i guess
|
Arena a;
|
||||||
GetKeyValuesRequest gkvReq;
|
|
||||||
gkvReq.begin = firstGreaterOrEqual(StringRef(a, s_a));
|
|
||||||
gkvReq.end = firstGreaterOrEqual(StringRef(a, s_b));
|
|
||||||
gkvReq.version = 5;
|
|
||||||
gkvReq.limit = 100;
|
|
||||||
gkvReq.limitBytes = 1000;
|
|
||||||
|
|
||||||
GetKeyValuesReply gkvReplyEmpty;
|
GetKeyValuesReply gkvReplyEmpty;
|
||||||
GetKeyValuesReply gkvReplyOne;
|
GetKeyValuesReply gkvReplyOne;
|
||||||
KeyValueRef v;
|
KeyValueRef v;
|
||||||
@ -276,16 +326,11 @@ TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {
|
|||||||
gkvReplyOneMore.data.push_back_deep(gkvReplyOneMore.arena, v);
|
gkvReplyOneMore.data.push_back_deep(gkvReplyOneMore.arena, v);
|
||||||
gkvReplyOneMore.more = true;
|
gkvReplyOneMore.more = true;
|
||||||
|
|
||||||
ASSERT(TSS_doCompare(gkvReq, gkvReplyEmpty, gkvReplyEmpty, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkvReplyEmpty, gkvReplyEmpty));
|
||||||
ASSERT(TSS_doCompare(gkvReq, gkvReplyOne, gkvReplyOne, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkvReplyOne, gkvReplyOne));
|
||||||
ASSERT(TSS_doCompare(gkvReq, gkvReplyOneMore, gkvReplyOneMore, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkvReplyOneMore, gkvReplyOneMore));
|
||||||
ASSERT(!TSS_doCompare(gkvReq, gkvReplyEmpty, gkvReplyOne, SevInfo, tssId));
|
ASSERT(!TSS_doCompare(gkvReplyEmpty, gkvReplyOne));
|
||||||
ASSERT(!TSS_doCompare(gkvReq, gkvReplyOne, gkvReplyOneMore, SevInfo, tssId));
|
ASSERT(!TSS_doCompare(gkvReplyOne, gkvReplyOneMore));
|
||||||
|
|
||||||
// test GetKey
|
|
||||||
GetKeyRequest gkReq;
|
|
||||||
gkReq.sel = KeySelectorRef(StringRef(a, s_a), false, 1);
|
|
||||||
gkReq.version = 5;
|
|
||||||
|
|
||||||
GetKeyReply gkReplyA(KeySelectorRef(StringRef(a, s_a), false, 20), false);
|
GetKeyReply gkReplyA(KeySelectorRef(StringRef(a, s_a), false, 20), false);
|
||||||
GetKeyReply gkReplyB(KeySelectorRef(StringRef(a, s_b), false, 10), false);
|
GetKeyReply gkReplyB(KeySelectorRef(StringRef(a, s_b), false, 10), false);
|
||||||
@ -294,85 +339,58 @@ TEST_CASE("/StorageServerInterface/TSSCompare/TestComparison") {
|
|||||||
GetKeyReply gkReplyE(KeySelectorRef(StringRef(a, s_e), false, -20), false);
|
GetKeyReply gkReplyE(KeySelectorRef(StringRef(a, s_e), false, -20), false);
|
||||||
|
|
||||||
// identical cases
|
// identical cases
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyA, gkReplyA, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyA, gkReplyA));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyB, gkReplyB, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyB, gkReplyB));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyC, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyC, gkReplyC));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyD, gkReplyD, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyD, gkReplyD));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyE, gkReplyE, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyE, gkReplyE));
|
||||||
|
|
||||||
// relative offset cases
|
// relative offset cases
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyA, gkReplyB, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyA, gkReplyB));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyB, gkReplyA, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyB, gkReplyA));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyA, gkReplyC, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyA, gkReplyC));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyA, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyC, gkReplyA));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyB, gkReplyC, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyB, gkReplyC));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyB, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyC, gkReplyB));
|
||||||
|
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyD, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyC, gkReplyD));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyD, gkReplyC, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyD, gkReplyC));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyC, gkReplyE, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyC, gkReplyE));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyE, gkReplyC, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyE, gkReplyC));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyD, gkReplyE, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyD, gkReplyE));
|
||||||
ASSERT(TSS_doCompare(gkReq, gkReplyE, gkReplyD, SevInfo, tssId));
|
ASSERT(TSS_doCompare(gkReplyE, gkReplyD));
|
||||||
|
|
||||||
// test same offset/orEqual wrong key
|
// test same offset/orEqual wrong key
|
||||||
ASSERT(!TSS_doCompare(gkReq,
|
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
|
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false)));
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false),
|
|
||||||
SevInfo,
|
|
||||||
tssId));
|
|
||||||
// this could be from different shard boundaries, so don't say it's a mismatch
|
// this could be from different shard boundaries, so don't say it's a mismatch
|
||||||
ASSERT(TSS_doCompare(gkReq,
|
ASSERT(TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 10), false),
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 10), false),
|
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 10), false)));
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 10), false),
|
|
||||||
SevInfo,
|
|
||||||
tssId));
|
|
||||||
|
|
||||||
// test offsets and key difference don't match
|
// test offsets and key difference don't match
|
||||||
ASSERT(!TSS_doCompare(gkReq,
|
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 0), false),
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 0), false),
|
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 10), false)));
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 10), false),
|
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, -10), false),
|
||||||
SevInfo,
|
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 0), false)));
|
||||||
tssId));
|
|
||||||
ASSERT(!TSS_doCompare(gkReq,
|
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, -10), false),
|
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 0), false),
|
|
||||||
SevInfo,
|
|
||||||
tssId));
|
|
||||||
|
|
||||||
// test key is next over in one shard, one found it and other didn't
|
// test key is next over in one shard, one found it and other didn't
|
||||||
// positive
|
// positive
|
||||||
// one that didn't find is +1
|
// one that didn't find is +1
|
||||||
ASSERT(TSS_doCompare(gkReq,
|
ASSERT(TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 1), false),
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 1), false),
|
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false)));
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false),
|
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
|
||||||
SevInfo,
|
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 1), false)));
|
||||||
tssId));
|
|
||||||
ASSERT(!TSS_doCompare(gkReq,
|
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
|
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 1), false),
|
|
||||||
SevInfo,
|
|
||||||
tssId));
|
|
||||||
|
|
||||||
// negative will have zero offset but not equal set
|
// negative will have zero offset but not equal set
|
||||||
ASSERT(TSS_doCompare(gkReq,
|
ASSERT(TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
|
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 0), false)));
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_b), false, 0), false),
|
ASSERT(!TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 0), false),
|
||||||
SevInfo,
|
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false)));
|
||||||
tssId));
|
|
||||||
ASSERT(!TSS_doCompare(gkReq,
|
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 0), false),
|
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_b), true, 0), false),
|
|
||||||
SevInfo,
|
|
||||||
tssId));
|
|
||||||
|
|
||||||
// test shard boundary key returned by incomplete query is the same as the key found by the other (only possible in
|
// test shard boundary key returned by incomplete query is the same as the key found by the other (only possible in
|
||||||
// positive direction)
|
// positive direction)
|
||||||
ASSERT(TSS_doCompare(gkReq,
|
ASSERT(TSS_doCompare(GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), true, 0), false),
|
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 1), false)));
|
||||||
GetKeyReply(KeySelectorRef(StringRef(a, s_a), false, 1), false),
|
|
||||||
SevInfo,
|
|
||||||
tssId));
|
|
||||||
|
|
||||||
// explictly test checksum function
|
// explictly test checksum function
|
||||||
std::string s12 = "ABCDEFGHIJKL";
|
std::string s12 = "ABCDEFGHIJKL";
|
||||||
|
@ -364,6 +364,8 @@ UID decodeTssQuarantineKey(KeyRef const& key) {
|
|||||||
return serverID;
|
return serverID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const KeyRangeRef tssMismatchKeys(LiteralStringRef("\xff/tssMismatch/"), LiteralStringRef("\xff/tssMismatch0"));
|
||||||
|
|
||||||
const KeyRangeRef serverTagKeys(LiteralStringRef("\xff/serverTag/"), LiteralStringRef("\xff/serverTag0"));
|
const KeyRangeRef serverTagKeys(LiteralStringRef("\xff/serverTag/"), LiteralStringRef("\xff/serverTag0"));
|
||||||
|
|
||||||
const KeyRef serverTagPrefix = serverTagKeys.begin;
|
const KeyRef serverTagPrefix = serverTagKeys.begin;
|
||||||
|
@ -124,6 +124,10 @@ extern const KeyRangeRef tssQuarantineKeys;
|
|||||||
const Key tssQuarantineKeyFor(UID serverID);
|
const Key tssQuarantineKeyFor(UID serverID);
|
||||||
UID decodeTssQuarantineKey(KeyRef const&);
|
UID decodeTssQuarantineKey(KeyRef const&);
|
||||||
|
|
||||||
|
// \xff/tssMismatch/[[Tuple<TSSStorageUID, timestamp, mismatchUID>]] := [[TraceEventString]]
|
||||||
|
// For recording tss mismatch details in the system keyspace
|
||||||
|
extern const KeyRangeRef tssMismatchKeys;
|
||||||
|
|
||||||
// "\xff/serverTag/[[serverID]]" = "[[Tag]]"
|
// "\xff/serverTag/[[serverID]]" = "[[Tag]]"
|
||||||
// Provides the Tag for the given serverID. Used to access a
|
// Provides the Tag for the given serverID. Used to access a
|
||||||
// storage server's corresponding TLog in order to apply mutations.
|
// storage server's corresponding TLog in order to apply mutations.
|
||||||
|
@ -79,16 +79,22 @@ struct LoadBalancedReply {
|
|||||||
Optional<LoadBalancedReply> getLoadBalancedReply(const LoadBalancedReply* reply);
|
Optional<LoadBalancedReply> getLoadBalancedReply(const LoadBalancedReply* reply);
|
||||||
Optional<LoadBalancedReply> getLoadBalancedReply(const void*);
|
Optional<LoadBalancedReply> getLoadBalancedReply(const void*);
|
||||||
|
|
||||||
ACTOR template <class Req, class Resp>
|
ACTOR template <class Req, class Resp, class Interface, class Multi>
|
||||||
Future<Void> tssComparison(Req req,
|
Future<Void> tssComparison(Req req,
|
||||||
Future<ErrorOr<Resp>> fSource,
|
Future<ErrorOr<Resp>> fSource,
|
||||||
Future<ErrorOr<Resp>> fTss,
|
Future<ErrorOr<Resp>> fTss,
|
||||||
TSSEndpointData tssData) {
|
TSSEndpointData tssData,
|
||||||
|
uint64_t srcEndpointId,
|
||||||
|
Reference<MultiInterface<Multi>> ssTeam,
|
||||||
|
RequestStream<Req> Interface::*channel) {
|
||||||
state double startTime = now();
|
state double startTime = now();
|
||||||
state Future<Optional<ErrorOr<Resp>>> fTssWithTimeout = timeout(fTss, FLOW_KNOBS->LOAD_BALANCE_TSS_TIMEOUT);
|
state Future<Optional<ErrorOr<Resp>>> fTssWithTimeout = timeout(fTss, FLOW_KNOBS->LOAD_BALANCE_TSS_TIMEOUT);
|
||||||
state int finished = 0;
|
state int finished = 0;
|
||||||
state double srcEndTime;
|
state double srcEndTime;
|
||||||
state double tssEndTime;
|
state double tssEndTime;
|
||||||
|
// we want to record ss/tss errors to metrics
|
||||||
|
state int srcErrorCode = error_code_success;
|
||||||
|
state int tssErrorCode = error_code_success;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
choose {
|
choose {
|
||||||
@ -110,11 +116,6 @@ Future<Void> tssComparison(Req req,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// we want to record ss/tss errors to metrics
|
|
||||||
int srcErrorCode = error_code_success;
|
|
||||||
int tssErrorCode = error_code_success;
|
|
||||||
|
|
||||||
++tssData.metrics->requests;
|
++tssData.metrics->requests;
|
||||||
|
|
||||||
if (src.isError()) {
|
if (src.isError()) {
|
||||||
@ -139,15 +140,82 @@ Future<Void> tssComparison(Req req,
|
|||||||
// apples
|
// apples
|
||||||
tssData.metrics->recordLatency(req, srcEndTime - startTime, tssEndTime - startTime);
|
tssData.metrics->recordLatency(req, srcEndTime - startTime, tssEndTime - startTime);
|
||||||
|
|
||||||
// expect mismatches in drop mutations mode.
|
if (!TSS_doCompare(src.get(), tss.get().get())) {
|
||||||
Severity traceSeverity =
|
|
||||||
(g_network->isSimulated() && g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations)
|
|
||||||
? SevWarnAlways
|
|
||||||
: SevError;
|
|
||||||
|
|
||||||
if (!TSS_doCompare(req, src.get(), tss.get().get(), traceSeverity, tssData.tssId)) {
|
|
||||||
TEST(true); // TSS Mismatch
|
TEST(true); // TSS Mismatch
|
||||||
++tssData.metrics->mismatches;
|
state TraceEvent mismatchEvent(
|
||||||
|
(g_network->isSimulated() && g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations)
|
||||||
|
? SevWarnAlways
|
||||||
|
: SevError,
|
||||||
|
TSS_mismatchTraceName(req));
|
||||||
|
mismatchEvent.detail("TSSID", tssData.tssId);
|
||||||
|
|
||||||
|
if (FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS && ssTeam->size() > 1) {
|
||||||
|
TEST(true); // checking TSS mismatch against rest of storage team
|
||||||
|
|
||||||
|
// if there is more than 1 SS in the team, attempt to verify that the other SS servers have the same
|
||||||
|
// data
|
||||||
|
state std::vector<Future<ErrorOr<Resp>>> restOfTeamFutures;
|
||||||
|
restOfTeamFutures.reserve(ssTeam->size() - 1);
|
||||||
|
for (int i = 0; i < ssTeam->size(); i++) {
|
||||||
|
RequestStream<Req> const* si = &ssTeam->get(i, channel);
|
||||||
|
if (si->getEndpoint().token.first() !=
|
||||||
|
srcEndpointId) { // don't re-request to SS we already have a response from
|
||||||
|
resetReply(req);
|
||||||
|
restOfTeamFutures.push_back(si->tryGetReply(req));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wait(waitForAllReady(restOfTeamFutures));
|
||||||
|
|
||||||
|
int numError = 0;
|
||||||
|
int numMatchSS = 0;
|
||||||
|
int numMatchTSS = 0;
|
||||||
|
int numMatchNeither = 0;
|
||||||
|
for (Future<ErrorOr<Resp>> f : restOfTeamFutures) {
|
||||||
|
if (!f.canGet() || f.get().isError()) {
|
||||||
|
numError++;
|
||||||
|
} else {
|
||||||
|
Optional<LoadBalancedReply> fLB = getLoadBalancedReply(&f.get().get());
|
||||||
|
if (fLB.present() && fLB.get().error.present()) {
|
||||||
|
numError++;
|
||||||
|
} else if (TSS_doCompare(src.get(), f.get().get())) {
|
||||||
|
numMatchSS++;
|
||||||
|
} else if (TSS_doCompare(tss.get().get(), f.get().get())) {
|
||||||
|
numMatchTSS++;
|
||||||
|
} else {
|
||||||
|
numMatchNeither++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mismatchEvent.detail("TeamCheckErrors", numError)
|
||||||
|
.detail("TeamCheckMatchSS", numMatchSS)
|
||||||
|
.detail("TeamCheckMatchTSS", numMatchTSS)
|
||||||
|
.detail("TeamCheckMatchNeither", numMatchNeither);
|
||||||
|
}
|
||||||
|
if (tssData.metrics->shouldRecordDetailedMismatch()) {
|
||||||
|
TSS_traceMismatch(mismatchEvent, req, src.get(), tss.get().get());
|
||||||
|
|
||||||
|
TEST(FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL); // Tracing Full TSS Mismatch
|
||||||
|
TEST(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL); // Tracing Partial TSS Mismatch and storing
|
||||||
|
// the rest in FDB
|
||||||
|
|
||||||
|
if (!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL) {
|
||||||
|
mismatchEvent.disable();
|
||||||
|
UID mismatchUID = deterministicRandom()->randomUniqueID();
|
||||||
|
tssData.metrics->recordDetailedMismatchData(mismatchUID, mismatchEvent.getFields().toString());
|
||||||
|
|
||||||
|
// record a summarized trace event instead
|
||||||
|
TraceEvent summaryEvent((g_network->isSimulated() &&
|
||||||
|
g_simulator.tssMode == ISimulator::TSSMode::EnabledDropMutations)
|
||||||
|
? SevWarnAlways
|
||||||
|
: SevError,
|
||||||
|
TSS_mismatchTraceName(req));
|
||||||
|
summaryEvent.detail("TSSID", tssData.tssId).detail("MismatchId", mismatchUID);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// don't record trace event
|
||||||
|
mismatchEvent.disable();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (tssLB.present() && tssLB.get().error.present()) {
|
} else if (tssLB.present() && tssLB.get().error.present()) {
|
||||||
tssErrorCode = tssLB.get().error.get().code();
|
tssErrorCode = tssLB.get().error.get().code();
|
||||||
@ -171,7 +239,7 @@ Future<Void> tssComparison(Req req,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stores state for a request made by the load balancer
|
// Stores state for a request made by the load balancer
|
||||||
template <class Request>
|
template <class Request, class Interface, class Multi>
|
||||||
struct RequestData : NonCopyable {
|
struct RequestData : NonCopyable {
|
||||||
typedef ErrorOr<REPLY_TYPE(Request)> Reply;
|
typedef ErrorOr<REPLY_TYPE(Request)> Reply;
|
||||||
|
|
||||||
@ -189,7 +257,9 @@ struct RequestData : NonCopyable {
|
|||||||
static void maybeDuplicateTSSRequest(RequestStream<Request> const* stream,
|
static void maybeDuplicateTSSRequest(RequestStream<Request> const* stream,
|
||||||
Request& request,
|
Request& request,
|
||||||
QueueModel* model,
|
QueueModel* model,
|
||||||
Future<Reply> ssResponse) {
|
Future<Reply> ssResponse,
|
||||||
|
Reference<MultiInterface<Multi>> alternatives,
|
||||||
|
RequestStream<Request> Interface::*channel) {
|
||||||
if (model) {
|
if (model) {
|
||||||
// Send parallel request to TSS pair, if it exists
|
// Send parallel request to TSS pair, if it exists
|
||||||
Optional<TSSEndpointData> tssData = model->getTssData(stream->getEndpoint().token.first());
|
Optional<TSSEndpointData> tssData = model->getTssData(stream->getEndpoint().token.first());
|
||||||
@ -200,34 +270,43 @@ struct RequestData : NonCopyable {
|
|||||||
// FIXME: optimize to avoid creating new netNotifiedQueue for each message
|
// FIXME: optimize to avoid creating new netNotifiedQueue for each message
|
||||||
RequestStream<Request> tssRequestStream(tssData.get().endpoint);
|
RequestStream<Request> tssRequestStream(tssData.get().endpoint);
|
||||||
Future<ErrorOr<REPLY_TYPE(Request)>> fTssResult = tssRequestStream.tryGetReply(request);
|
Future<ErrorOr<REPLY_TYPE(Request)>> fTssResult = tssRequestStream.tryGetReply(request);
|
||||||
model->addActor.send(tssComparison(request, ssResponse, fTssResult, tssData.get()));
|
model->addActor.send(tssComparison(request,
|
||||||
|
ssResponse,
|
||||||
|
fTssResult,
|
||||||
|
tssData.get(),
|
||||||
|
stream->getEndpoint().token.first(),
|
||||||
|
alternatives,
|
||||||
|
channel));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initializes the request state and starts it, possibly after a backoff delay
|
// Initializes the request state and starts it, possibly after a backoff delay
|
||||||
void startRequest(double backoff,
|
void startRequest(
|
||||||
bool triedAllOptions,
|
double backoff,
|
||||||
RequestStream<Request> const* stream,
|
bool triedAllOptions,
|
||||||
Request& request,
|
RequestStream<Request> const* stream,
|
||||||
QueueModel* model) {
|
Request& request,
|
||||||
|
QueueModel* model,
|
||||||
|
Reference<MultiInterface<Multi>> alternatives, // alternatives and channel passed through for TSS check
|
||||||
|
RequestStream<Request> Interface::*channel) {
|
||||||
modelHolder = Reference<ModelHolder>();
|
modelHolder = Reference<ModelHolder>();
|
||||||
requestStarted = false;
|
requestStarted = false;
|
||||||
|
|
||||||
if (backoff > 0) {
|
if (backoff > 0) {
|
||||||
response = mapAsync<Void, std::function<Future<Reply>(Void)>, Reply>(
|
response = mapAsync<Void, std::function<Future<Reply>(Void)>, Reply>(
|
||||||
delay(backoff), [this, stream, &request, model](Void _) {
|
delay(backoff), [this, stream, &request, model, alternatives, channel](Void _) {
|
||||||
requestStarted = true;
|
requestStarted = true;
|
||||||
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
|
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
|
||||||
Future<Reply> resp = stream->tryGetReply(request);
|
Future<Reply> resp = stream->tryGetReply(request);
|
||||||
maybeDuplicateTSSRequest(stream, request, model, resp);
|
maybeDuplicateTSSRequest(stream, request, model, resp, alternatives, channel);
|
||||||
return resp;
|
return resp;
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
requestStarted = true;
|
requestStarted = true;
|
||||||
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
|
modelHolder = Reference<ModelHolder>(new ModelHolder(model, stream->getEndpoint().token.first()));
|
||||||
response = stream->tryGetReply(request);
|
response = stream->tryGetReply(request);
|
||||||
maybeDuplicateTSSRequest(stream, request, model, response);
|
maybeDuplicateTSSRequest(stream, request, model, response, alternatives, channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
requestProcessed = false;
|
requestProcessed = false;
|
||||||
@ -365,8 +444,8 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
|||||||
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
|
bool atMostOnce = false, // if true, throws request_maybe_delivered() instead of retrying automatically
|
||||||
QueueModel* model = nullptr) {
|
QueueModel* model = nullptr) {
|
||||||
|
|
||||||
state RequestData<Request> firstRequestData;
|
state RequestData<Request, Interface, Multi> firstRequestData;
|
||||||
state RequestData<Request> secondRequestData;
|
state RequestData<Request, Interface, Multi> secondRequestData;
|
||||||
|
|
||||||
state Optional<uint64_t> firstRequestEndpoint;
|
state Optional<uint64_t> firstRequestEndpoint;
|
||||||
state Future<Void> secondDelay = Never();
|
state Future<Void> secondDelay = Never();
|
||||||
@ -553,7 +632,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
|||||||
firstRequestEndpoint = Optional<uint64_t>();
|
firstRequestEndpoint = Optional<uint64_t>();
|
||||||
} else if (firstRequestData.isValid()) {
|
} else if (firstRequestData.isValid()) {
|
||||||
// Issue a second request, the first one is taking a long time.
|
// Issue a second request, the first one is taking a long time.
|
||||||
secondRequestData.startRequest(backoff, triedAllOptions, stream, request, model);
|
secondRequestData.startRequest(backoff, triedAllOptions, stream, request, model, alternatives, channel);
|
||||||
state bool firstFinished = false;
|
state bool firstFinished = false;
|
||||||
|
|
||||||
loop choose {
|
loop choose {
|
||||||
@ -582,7 +661,7 @@ Future<REPLY_TYPE(Request)> loadBalance(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Issue a request, if it takes too long to get a reply, go around the loop
|
// Issue a request, if it takes too long to get a reply, go around the loop
|
||||||
firstRequestData.startRequest(backoff, triedAllOptions, stream, request, model);
|
firstRequestData.startRequest(backoff, triedAllOptions, stream, request, model, alternatives, channel);
|
||||||
firstRequestEndpoint = stream->getEndpoint().token.first();
|
firstRequestEndpoint = stream->getEndpoint().token.first();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -29,6 +29,15 @@
|
|||||||
#include "fdbrpc/Stats.h"
|
#include "fdbrpc/Stats.h"
|
||||||
|
|
||||||
// refcounted + noncopyable because both DatabaseContext and individual endpoints share ownership
|
// refcounted + noncopyable because both DatabaseContext and individual endpoints share ownership
|
||||||
|
struct DetailedTSSMismatch {
|
||||||
|
UID mismatchId;
|
||||||
|
double timestamp;
|
||||||
|
std::string traceString;
|
||||||
|
|
||||||
|
DetailedTSSMismatch(UID mismatchId, double timestamp, std::string traceString)
|
||||||
|
: mismatchId(mismatchId), timestamp(timestamp), traceString(traceString) {}
|
||||||
|
};
|
||||||
|
|
||||||
struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
||||||
CounterCollection cc;
|
CounterCollection cc;
|
||||||
Counter requests;
|
Counter requests;
|
||||||
@ -49,6 +58,8 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
|||||||
std::unordered_map<int, uint64_t> ssErrorsByCode;
|
std::unordered_map<int, uint64_t> ssErrorsByCode;
|
||||||
std::unordered_map<int, uint64_t> tssErrorsByCode;
|
std::unordered_map<int, uint64_t> tssErrorsByCode;
|
||||||
|
|
||||||
|
std::vector<DetailedTSSMismatch> detailedMismatches;
|
||||||
|
|
||||||
void ssError(int code) {
|
void ssError(int code) {
|
||||||
++ssErrors;
|
++ssErrors;
|
||||||
ssErrorsByCode[code]++;
|
ssErrorsByCode[code]++;
|
||||||
@ -62,6 +73,16 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
|||||||
template <class Req>
|
template <class Req>
|
||||||
void recordLatency(const Req& req, double ssLatency, double tssLatency);
|
void recordLatency(const Req& req, double ssLatency, double tssLatency);
|
||||||
|
|
||||||
|
// only record a small number of the detailed mismatches per client per metrics window
|
||||||
|
bool shouldRecordDetailedMismatch() {
|
||||||
|
++mismatches;
|
||||||
|
return (mismatches.getIntervalDelta() < 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
void recordDetailedMismatchData(UID mismatchUID, std::string traceString) {
|
||||||
|
detailedMismatches.push_back(DetailedTSSMismatch(mismatchUID, now(), traceString));
|
||||||
|
}
|
||||||
|
|
||||||
void clear() {
|
void clear() {
|
||||||
SSgetValueLatency.clear();
|
SSgetValueLatency.clear();
|
||||||
SSgetKeyLatency.clear();
|
SSgetKeyLatency.clear();
|
||||||
@ -73,6 +94,8 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
|||||||
|
|
||||||
tssErrorsByCode.clear();
|
tssErrorsByCode.clear();
|
||||||
ssErrorsByCode.clear();
|
ssErrorsByCode.clear();
|
||||||
|
|
||||||
|
detailedMismatches.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
TSSMetrics()
|
TSSMetrics()
|
||||||
@ -81,9 +104,13 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
|
|||||||
SSgetKeyValuesLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000), TSSgetKeyValuesLatency(1000) {}
|
SSgetKeyValuesLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000), TSSgetKeyValuesLatency(1000) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
// part of the contract of this function is that if there is a mismatch, the implementation needs to record a trace
|
template <class Rep>
|
||||||
// event with the specified severity and tssId in the event.
|
bool TSS_doCompare(const Rep& src, const Rep& tss);
|
||||||
|
|
||||||
|
template <class Req>
|
||||||
|
const char* TSS_mismatchTraceName(const Req& req);
|
||||||
|
|
||||||
template <class Req, class Rep>
|
template <class Req, class Rep>
|
||||||
bool TSS_doCompare(const Req& req, const Rep& src, const Rep& tss, Severity traceSeverity, UID tssId);
|
void TSS_traceMismatch(TraceEvent& event, const Req& req, const Rep& src, const Rep& tss);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -237,6 +237,8 @@ void FlowKnobs::initialize(Randomize _randomize, IsSimulated _isSimulated) {
|
|||||||
init( BASIC_LOAD_BALANCE_BUCKETS, 40 ); //proxies bin recent GRV requests into 40 time bins
|
init( BASIC_LOAD_BALANCE_BUCKETS, 40 ); //proxies bin recent GRV requests into 40 time bins
|
||||||
init( BASIC_LOAD_BALANCE_COMPUTE_PRECISION, 10000 ); //determines how much of the LB usage is holding the CPU usage of the proxy
|
init( BASIC_LOAD_BALANCE_COMPUTE_PRECISION, 10000 ); //determines how much of the LB usage is holding the CPU usage of the proxy
|
||||||
init( LOAD_BALANCE_TSS_TIMEOUT, 5.0 );
|
init( LOAD_BALANCE_TSS_TIMEOUT, 5.0 );
|
||||||
|
init( LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS, true ); if( randomize && BUGGIFY ) LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS = false; // Whether the client should validate the SS teams all agree on TSS mismatch
|
||||||
|
init( LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL, false ); if( randomize && BUGGIFY ) LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL = true; // If true, saves the full details of the mismatch in a trace event. If false, saves them in the DB and the trace event references the DB row.
|
||||||
|
|
||||||
// Health Monitor
|
// Health Monitor
|
||||||
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;
|
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;
|
||||||
|
@ -278,6 +278,8 @@ public:
|
|||||||
double BASIC_LOAD_BALANCE_MIN_REQUESTS;
|
double BASIC_LOAD_BALANCE_MIN_REQUESTS;
|
||||||
double BASIC_LOAD_BALANCE_MIN_CPU;
|
double BASIC_LOAD_BALANCE_MIN_CPU;
|
||||||
double LOAD_BALANCE_TSS_TIMEOUT;
|
double LOAD_BALANCE_TSS_TIMEOUT;
|
||||||
|
bool LOAD_BALANCE_TSS_MISMATCH_VERIFY_SS;
|
||||||
|
bool LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL;
|
||||||
|
|
||||||
// Health Monitor
|
// Health Monitor
|
||||||
int FAILURE_DETECTION_DELAY;
|
int FAILURE_DETECTION_DELAY;
|
||||||
|
@ -463,12 +463,14 @@ public:
|
|||||||
|
|
||||||
bool isEnabled() const { return enabled; }
|
bool isEnabled() const { return enabled; }
|
||||||
|
|
||||||
TraceEvent &setErrorKind(ErrorKind errorKind);
|
TraceEvent& setErrorKind(ErrorKind errorKind);
|
||||||
|
|
||||||
explicit operator bool() const { return enabled; }
|
explicit operator bool() const { return enabled; }
|
||||||
|
|
||||||
void log();
|
void log();
|
||||||
|
|
||||||
|
void disable() { enabled = false; } // Disables the trace event so it doesn't get
|
||||||
|
|
||||||
~TraceEvent(); // Actually logs the event
|
~TraceEvent(); // Actually logs the event
|
||||||
|
|
||||||
// Return the number of invocations of TraceEvent() at the specified logging level.
|
// Return the number of invocations of TraceEvent() at the specified logging level.
|
||||||
@ -476,6 +478,8 @@ public:
|
|||||||
|
|
||||||
std::unique_ptr<DynamicEventMetric> tmpEventMetric; // This just just a place to store fields
|
std::unique_ptr<DynamicEventMetric> tmpEventMetric; // This just just a place to store fields
|
||||||
|
|
||||||
|
const TraceEventFields& getFields() const { return fields; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool initialized;
|
bool initialized;
|
||||||
bool enabled;
|
bool enabled;
|
||||||
@ -491,7 +495,7 @@ private:
|
|||||||
int maxFieldLength;
|
int maxFieldLength;
|
||||||
int maxEventLength;
|
int maxEventLength;
|
||||||
int timeIndex;
|
int timeIndex;
|
||||||
int errorKindIndex { -1 };
|
int errorKindIndex{ -1 };
|
||||||
|
|
||||||
void setSizeLimits();
|
void setSizeLimits();
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user