mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 18:56:00 +08:00
Fix globalconfig refresh hang issue
CC sets a version to int_max in ClientDBInfo indicating a refresh, however, proxy server would reject this version for the error of future_version. This change fixes this issue by not sending int_max, instead maintaining a lastKnown in memory and send it to grvproxy to get latest globalconfig. this change also fixes some java tests that were used to test the fix
This commit is contained in:
parent
ede22972c5
commit
6b782c10f6
@ -21,15 +21,13 @@ package com.apple.foundationdb;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
|
||||||
import org.junit.jupiter.api.extension.BeforeAllCallback;
|
|
||||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||||
|
import org.junit.jupiter.api.extension.BeforeAllCallback;
|
||||||
/**
|
/**
|
||||||
* Callback to help define a multi-client scenario and ensure that
|
* Callback to help define a multi-client scenario and ensure that
|
||||||
* the clients can be configured properly.
|
* the clients can be configured properly.
|
||||||
*/
|
*/
|
||||||
public class MultiClientHelper implements BeforeAllCallback,AfterEachCallback{
|
public class MultiClientHelper implements BeforeAllCallback {
|
||||||
private String[] clusterFiles;
|
private String[] clusterFiles;
|
||||||
private Collection<Database> openDatabases;
|
private Collection<Database> openDatabases;
|
||||||
|
|
||||||
@ -67,16 +65,4 @@ public class MultiClientHelper implements BeforeAllCallback,AfterEachCallback{
|
|||||||
public void beforeAll(ExtensionContext arg0) throws Exception {
|
public void beforeAll(ExtensionContext arg0) throws Exception {
|
||||||
clusterFiles = readClusterFromEnv();
|
clusterFiles = readClusterFromEnv();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterEach(ExtensionContext arg0) throws Exception {
|
|
||||||
//close any databases that have been opened
|
|
||||||
if(openDatabases!=null){
|
|
||||||
for(Database db : openDatabases){
|
|
||||||
db.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
openDatabases = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -154,8 +154,9 @@ void GlobalConfig::erase(KeyRangeRef range) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Updates local copy of global configuration by reading the entire key-range
|
// Updates local copy of global configuration by reading the entire key-range
|
||||||
// from storage (proxied through the GrvProxies).
|
// from storage (proxied through the GrvProxies). Returns the version of the
|
||||||
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self, Version lastKnown) {
|
// refreshed data.
|
||||||
|
ACTOR Future<Version> GlobalConfig::refresh(GlobalConfig* self, Version lastKnown, Version largestSeen) {
|
||||||
// TraceEvent trace(SevInfo, "GlobalConfigRefresh");
|
// TraceEvent trace(SevInfo, "GlobalConfigRefresh");
|
||||||
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
|
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
|
||||||
|
|
||||||
@ -171,7 +172,11 @@ ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self, Version lastKnown)
|
|||||||
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
|
KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix);
|
||||||
self->insert(systemKey, kv.value);
|
self->insert(systemKey, kv.value);
|
||||||
}
|
}
|
||||||
return Void();
|
if (reply.version >= largestSeen || largestSeen == std::numeric_limits<Version>::max()) {
|
||||||
|
return reply.version;
|
||||||
|
} else {
|
||||||
|
wait(delay(0.25));
|
||||||
|
}
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
wait(backoff.onError());
|
wait(backoff.onError());
|
||||||
}
|
}
|
||||||
@ -186,52 +191,61 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo*
|
|||||||
if (self->initialized.canBeSet()) {
|
if (self->initialized.canBeSet()) {
|
||||||
wait(self->cx->onConnected());
|
wait(self->cx->onConnected());
|
||||||
|
|
||||||
wait(self->refresh(self, -1));
|
Version version = wait(self->refresh(self, -1, 0));
|
||||||
|
self->lastUpdate = version;
|
||||||
|
|
||||||
|
self->cx->addref();
|
||||||
self->initialized.send(Void());
|
self->initialized.send(Void());
|
||||||
|
self->cx->delref();
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
wait(self->dbInfoChanged.onTrigger());
|
// run one iteration at the beginning
|
||||||
|
wait(delay(0));
|
||||||
auto& history = dbInfo->history;
|
if (dbInfo->history.size() > 0) {
|
||||||
if (history.size() == 0) {
|
if (self->lastUpdate < dbInfo->history[0].version) {
|
||||||
continue;
|
// This process missed too many global configuration
|
||||||
}
|
// history updates or the protocol version changed, so it
|
||||||
|
// must re-read the entire configuration range.
|
||||||
if (self->lastUpdate < history[0].version) {
|
Version version =
|
||||||
// This process missed too many global configuration
|
wait(self->refresh(self, self->lastUpdate, dbInfo->history.back().version));
|
||||||
// history updates or the protocol version changed, so it
|
self->lastUpdate = version;
|
||||||
// must re-read the entire configuration range.
|
// DBInfo could have changed after the wait. If
|
||||||
wait(self->refresh(self, history.back().version));
|
// changes are present, re-run the loop to make
|
||||||
if (dbInfo->history.size() > 0) {
|
// sure they are applied.
|
||||||
self->lastUpdate = dbInfo->history.back().version;
|
if (dbInfo->history.size() > 0 &&
|
||||||
}
|
dbInfo->history[0].version != std::numeric_limits<Version>::max()) {
|
||||||
} else {
|
continue;
|
||||||
// Apply history in order, from lowest version to highest
|
|
||||||
// version. Mutation history should already be stored in
|
|
||||||
// ascending version order.
|
|
||||||
for (const auto& vh : history) {
|
|
||||||
if (vh.version <= self->lastUpdate) {
|
|
||||||
continue; // already applied this mutation
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
for (const auto& mutation : vh.mutations.contents()) {
|
// Apply history in order, from lowest version to highest
|
||||||
if (mutation.type == MutationRef::SetValue) {
|
// version. Mutation history should already be stored in
|
||||||
self->insert(mutation.param1, mutation.param2);
|
// ascending version order.
|
||||||
} else if (mutation.type == MutationRef::ClearRange) {
|
for (const auto& vh : dbInfo->history) {
|
||||||
self->erase(KeyRangeRef(mutation.param1, mutation.param2));
|
if (vh.version <= self->lastUpdate) {
|
||||||
} else {
|
continue; // already applied this mutation
|
||||||
ASSERT(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const auto& mutation : vh.mutations.contents()) {
|
||||||
|
if (mutation.type == MutationRef::SetValue) {
|
||||||
|
self->insert(mutation.param1, mutation.param2);
|
||||||
|
} else if (mutation.type == MutationRef::ClearRange) {
|
||||||
|
self->erase(KeyRangeRef(mutation.param1, mutation.param2));
|
||||||
|
} else {
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(vh.version > self->lastUpdate);
|
||||||
|
self->lastUpdate = vh.version;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(vh.version > self->lastUpdate);
|
|
||||||
self->lastUpdate = vh.version;
|
|
||||||
}
|
}
|
||||||
|
self->configChanged.trigger();
|
||||||
}
|
}
|
||||||
|
// In case this actor is canceled in the d'tor of GlobalConfig we can exit here.
|
||||||
self->configChanged.trigger();
|
wait(delay(0));
|
||||||
|
wait(self->dbInfoChanged.onTrigger());
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
@ -9075,8 +9075,9 @@ void Transaction::checkDeferredError() const {
|
|||||||
|
|
||||||
Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database& cx) {
|
Reference<TransactionLogInfo> Transaction::createTrLogInfoProbabilistically(const Database& cx) {
|
||||||
if (!cx->isError()) {
|
if (!cx->isError()) {
|
||||||
double clientSamplingProbability =
|
double sampleRate =
|
||||||
cx->globalConfig->get<double>(fdbClientInfoTxnSampleRate, CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY);
|
cx->globalConfig->get<double>(fdbClientInfoTxnSampleRate, std::numeric_limits<double>::infinity());
|
||||||
|
double clientSamplingProbability = std::isinf(sampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : sampleRate;
|
||||||
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) &&
|
if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) &&
|
||||||
deterministicRandom()->random01() < clientSamplingProbability &&
|
deterministicRandom()->random01() < clientSamplingProbability &&
|
||||||
(!g_network->isSimulated() || !g_simulator->speedUpSimulation)) {
|
(!g_network->isSimulated() || !g_simulator->speedUpSimulation)) {
|
||||||
|
@ -727,14 +727,16 @@ struct ExclusionSafetyCheckRequest {
|
|||||||
struct GlobalConfigRefreshReply {
|
struct GlobalConfigRefreshReply {
|
||||||
constexpr static FileIdentifier file_identifier = 12680327;
|
constexpr static FileIdentifier file_identifier = 12680327;
|
||||||
Arena arena;
|
Arena arena;
|
||||||
|
Version version;
|
||||||
RangeResultRef result;
|
RangeResultRef result;
|
||||||
|
|
||||||
GlobalConfigRefreshReply() {}
|
GlobalConfigRefreshReply() {}
|
||||||
GlobalConfigRefreshReply(Arena const& arena, RangeResultRef result) : arena(arena), result(result) {}
|
GlobalConfigRefreshReply(Arena const& arena, Version version, RangeResultRef result)
|
||||||
|
: arena(arena), version(version), result(result) {}
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
serializer(ar, result, arena);
|
serializer(ar, result, version, arena);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -163,7 +163,7 @@ private:
|
|||||||
// of the global configuration keyspace.
|
// of the global configuration keyspace.
|
||||||
void erase(KeyRangeRef range);
|
void erase(KeyRangeRef range);
|
||||||
|
|
||||||
ACTOR static Future<Void> refresh(GlobalConfig* self, Version lastKnown);
|
ACTOR static Future<Version> refresh(GlobalConfig* self, Version lastKnown, Version largestSeen);
|
||||||
ACTOR static Future<Void> updater(GlobalConfig* self, const ClientDBInfo* dbInfo);
|
ACTOR static Future<Void> updater(GlobalConfig* self, const ClientDBInfo* dbInfo);
|
||||||
|
|
||||||
DatabaseContext* cx;
|
DatabaseContext* cx;
|
||||||
|
@ -1077,6 +1077,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
|
|||||||
.detail("GrvProxies", db->clientInfo->get().grvProxies)
|
.detail("GrvProxies", db->clientInfo->get().grvProxies)
|
||||||
.detail("ReqGrvProxies", req.grvProxies)
|
.detail("ReqGrvProxies", req.grvProxies)
|
||||||
.detail("CommitProxies", db->clientInfo->get().commitProxies)
|
.detail("CommitProxies", db->clientInfo->get().commitProxies)
|
||||||
|
.detail("GlobalConfigHistorySize", db->clientInfo->get().history.size())
|
||||||
.detail("ReqCPs", req.commitProxies)
|
.detail("ReqCPs", req.commitProxies)
|
||||||
.detail("TenantMode", db->clientInfo->get().tenantMode.toString())
|
.detail("TenantMode", db->clientInfo->get().tenantMode.toString())
|
||||||
.detail("ReqTenantMode", db->config.tenantMode.toString())
|
.detail("ReqTenantMode", db->config.tenantMode.toString())
|
||||||
@ -1093,6 +1094,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
|
|||||||
clientInfo.id = deterministicRandom()->randomUniqueID();
|
clientInfo.id = deterministicRandom()->randomUniqueID();
|
||||||
clientInfo.commitProxies = req.commitProxies;
|
clientInfo.commitProxies = req.commitProxies;
|
||||||
clientInfo.grvProxies = req.grvProxies;
|
clientInfo.grvProxies = req.grvProxies;
|
||||||
|
clientInfo.history = db->clientInfo->get().history;
|
||||||
clientInfo.tenantMode = TenantAPI::tenantModeForClusterType(db->clusterType, db->config.tenantMode);
|
clientInfo.tenantMode = TenantAPI::tenantModeForClusterType(db->clusterType, db->config.tenantMode);
|
||||||
clientInfo.clusterId = db->serverInfo->get().client.clusterId;
|
clientInfo.clusterId = db->serverInfo->get().client.clusterId;
|
||||||
clientInfo.clusterType = db->clusterType;
|
clientInfo.clusterType = db->clusterType;
|
||||||
@ -1742,12 +1744,12 @@ ACTOR Future<Void> monitorStorageMetadata(ClusterControllerData* self) {
|
|||||||
ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
||||||
loop {
|
loop {
|
||||||
state ReadYourWritesTransaction tr(db->db);
|
state ReadYourWritesTransaction tr(db->db);
|
||||||
|
state ClientDBInfo clientInfo;
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
|
state Optional<Value> globalConfigVersion = wait(tr.get(globalConfigVersionKey));
|
||||||
state ClientDBInfo clientInfo = db->serverInfo->get().client;
|
|
||||||
|
|
||||||
if (globalConfigVersion.present()) {
|
if (globalConfigVersion.present()) {
|
||||||
// Since the history keys end with versionstamps, they
|
// Since the history keys end with versionstamps, they
|
||||||
@ -1758,11 +1760,12 @@ ACTOR Future<Void> monitorGlobalConfig(ClusterControllerData::DBInfo* db) {
|
|||||||
// If the global configuration version key has been set,
|
// If the global configuration version key has been set,
|
||||||
// the history should contain at least one item.
|
// the history should contain at least one item.
|
||||||
ASSERT(globalConfigHistory.size() > 0);
|
ASSERT(globalConfigHistory.size() > 0);
|
||||||
|
clientInfo = db->serverInfo->get().client;
|
||||||
clientInfo.history.clear();
|
clientInfo.history.clear();
|
||||||
|
|
||||||
for (const auto& kv : globalConfigHistory) {
|
for (const auto& kv : globalConfigHistory) {
|
||||||
ObjectReader reader(kv.value.begin(), IncludeVersion());
|
ObjectReader reader(kv.value.begin(), IncludeVersion());
|
||||||
if (reader.protocolVersion() != g_network->protocolVersion()) {
|
if (reader.protocolVersion() != g_network->protocolVersion() || BUGGIFY_WITH_PROB(0.01)) {
|
||||||
// If the protocol version has changed, the
|
// If the protocol version has changed, the
|
||||||
// GlobalConfig actor should refresh its view by
|
// GlobalConfig actor should refresh its view by
|
||||||
// reading the entire global configuration key
|
// reading the entire global configuration key
|
||||||
|
@ -366,7 +366,7 @@ ACTOR Future<Void> globalConfigRequestServer(GrvProxyData* grvProxyData, GrvProx
|
|||||||
// point of view. The client learns the version through a
|
// point of view. The client learns the version through a
|
||||||
// ClientDBInfo update).
|
// ClientDBInfo update).
|
||||||
if (refresh.lastKnown <= cachedVersion) {
|
if (refresh.lastKnown <= cachedVersion) {
|
||||||
refresh.reply.send(GlobalConfigRefreshReply{ cachedData.arena(), cachedData });
|
refresh.reply.send(GlobalConfigRefreshReply{ cachedData.arena(), cachedVersion, cachedData });
|
||||||
} else {
|
} else {
|
||||||
refresh.reply.sendError(future_version());
|
refresh.reply.sendError(future_version());
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user