assert we never get a stale read version

This commit is contained in:
Markus Pilman 2021-10-05 16:34:51 -06:00
parent 424b35de63
commit ffdba4a133
4 changed files with 79 additions and 7 deletions

View File

@ -387,6 +387,7 @@ public:
int snapshotRywEnabled;
int transactionTracingEnabled;
double verifyCausalReadsProp = 0.0;
Future<Void> logger;
Future<Void> throttleExpirer;

View File

@ -22,6 +22,9 @@
#ifndef FDBCLIENT_GRVPROXYINTERFACE_H
#define FDBCLIENT_GRVPROXYINTERFACE_H
#pragma once
#include "flow/FileIdentifier.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbclient/FDBTypes.h"
// GrvProxy is proxy primarily specializing on serving GetReadVersion. It also serves health metrics since it
// communicates with RateKeeper to gather health information of the cluster.

View File

@ -669,19 +669,82 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
}
}
ACTOR static Future<Void> monitorProxiesChange(Reference<AsyncVar<ClientDBInfo> const> clientDBInfo,
ACTOR Future<Void> assertFailure(GrvProxyInterface remote, Future<ErrorOr<GetReadVersionReply>> reply) {
try {
ErrorOr<GetReadVersionReply> res = wait(reply);
if (!res.isError()) {
TraceEvent(SevError, "GotStaleReadVersion")
.detail("Remote", remote.getConsistentReadVersion.getEndpoint().addresses.address.toString())
.detail("Provisional", remote.provisional)
.detail("ReadVersion", res.get().version);
ASSERT_WE_THINK(false);
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
// we want this to fail -- so getting here is good, we'll just ignore the error.
}
return Void();
}
Future<Void> attemptGRVFromOldProxies(std::vector<GrvProxyInterface> oldProxies,
std::vector<GrvProxyInterface> newProxies) {
Span span(deterministicRandom()->randomUniqueID(), "VerifyCausalReadRisky"_loc);
std::vector<Future<Void>> replies;
replies.reserve(oldProxies.size());
GetReadVersionRequest req(
span.context, 1, TransactionPriority::DEFAULT, GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY);
TraceEvent evt("AttemptGRVFromOldProxies");
evt.detail("NumOldProxies", oldProxies.size()).detail("NumNewProxies", newProxies.size());
auto traceProxies = [&](std::vector<GrvProxyInterface>& proxies, std::string const& key) {
for (int i = 0; i < proxies.size(); ++i) {
auto k = key + std::to_string(i);
evt.detail(k.c_str(), proxies[i].id());
}
};
traceProxies(oldProxies, "OldProxy"s);
traceProxies(newProxies, "NewProxy"s);
for (auto& i : oldProxies) {
req.reply = ReplyPromise<GetReadVersionReply>();
replies.push_back(assertFailure(i, i.getConsistentReadVersion.tryGetReply(req)));
}
return waitForAll(replies);
}
ACTOR static Future<Void> monitorProxiesChange(DatabaseContext* cx,
Reference<AsyncVar<ClientDBInfo> const> clientDBInfo,
AsyncTrigger* triggerVar) {
state std::vector<CommitProxyInterface> curCommitProxies;
state std::vector<GrvProxyInterface> curGrvProxies;
state ActorCollection actors(false);
curCommitProxies = clientDBInfo->get().commitProxies;
curGrvProxies = clientDBInfo->get().grvProxies;
loop {
wait(clientDBInfo->onChange());
if (clientDBInfo->get().commitProxies != curCommitProxies || clientDBInfo->get().grvProxies != curGrvProxies) {
curCommitProxies = clientDBInfo->get().commitProxies;
curGrvProxies = clientDBInfo->get().grvProxies;
triggerVar->trigger();
choose {
when(wait(clientDBInfo->onChange())) {
if (clientDBInfo->get().commitProxies != curCommitProxies ||
clientDBInfo->get().grvProxies != curGrvProxies) {
// This condition is a bit complicated. Here we want to verify that we're unable to receive a read
// version from a proxy of an old generation after a successful recovery. The conditions are:
// 1. We only do this with a configured probability.
// 2. If the old set of Grv proxies is empty, there's nothing to do
// 3. If the new set of Grv proxies is empty, it means the recovery is not complete. So if an old
// Grv proxy still gives out read versions, this would be correct behavior.
// 4. If we see a provisionoal proxy, it means the recovery didn't complete yet, so the same as (3)
// applies.
if (deterministicRandom()->random01() < cx->verifyCausalReadsProp && !curGrvProxies.empty() &&
!curGrvProxies.empty() && !clientDBInfo->get().grvProxies.empty() &&
!clientDBInfo->get().grvProxies[0].provisional) {
actors.add(attemptGRVFromOldProxies(curGrvProxies, clientDBInfo->get().grvProxies));
}
curCommitProxies = clientDBInfo->get().commitProxies;
curGrvProxies = clientDBInfo->get().grvProxies;
triggerVar->trigger();
}
}
when(wait(actors.getResult())) { UNSTOPPABLE_ASSERT(false); }
}
}
}
@ -1167,7 +1230,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
getValueSubmitted.init(LiteralStringRef("NativeAPI.GetValueSubmitted"));
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
monitorProxiesInfoChange = monitorProxiesChange(this, clientInfo, &proxiesChangeTrigger);
tssMismatchHandler = handleTssMismatches(this);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
cacheListMonitor = monitorCacheList(this);
@ -1610,6 +1673,9 @@ void DatabaseContext::setOption(FDBDatabaseOptions::Option option, Optional<Stri
validateOptionValueNotPresent(value);
useConfigDatabase = true;
break;
case FDBDatabaseOptions::TEST_CAUSAL_READ_RISKY:
verifyCausalReadsProp = double(extractIntOption(value, 0, 100)) / 100.0;
break;
default:
break;
}

View File

@ -200,6 +200,8 @@ description is not currently required but encouraged.
defaultFor="1100"/>
<Option name="use_config_database" code="800"
description="Use configuration database." />
<Option name="test_causal_read_risky" code="900"
description="An integer between 0 and 100 (default is 0) expression the probability that a client will verify it can't read stale data whenever it detects a recovery." />
</Scope>
<Scope name="TransactionOption">