From 6b782c10f6ddd39f19b4b8ba3afbd53358ed0e73 Mon Sep 17 00:00:00 2001 From: hao fu Date: Tue, 14 May 2024 15:26:44 -0700 Subject: [PATCH] Fix globalconfig refresh hang issue CC sets a version to int_max in ClientDBInfo indicating a refresh, however, proxy server would reject this version for the error of future_version. This change fixes this issue by not sending int_max, instead maintaining a lastKnown in memory and send it to grvproxy to get latest globalconfig. this change also fixes some java tests that were used to test the fix --- .../apple/foundationdb/MultiClientHelper.java | 18 +--- fdbclient/GlobalConfig.actor.cpp | 92 +++++++++++-------- fdbclient/NativeAPI.actor.cpp | 5 +- .../include/fdbclient/CommitProxyInterface.h | 6 +- .../include/fdbclient/GlobalConfig.actor.h | 2 +- fdbserver/ClusterController.actor.cpp | 7 +- fdbserver/GrvProxyServer.actor.cpp | 2 +- 7 files changed, 69 insertions(+), 63 deletions(-) diff --git a/bindings/java/src/integration/com/apple/foundationdb/MultiClientHelper.java b/bindings/java/src/integration/com/apple/foundationdb/MultiClientHelper.java index 671163955f..935b0a9d41 100644 --- a/bindings/java/src/integration/com/apple/foundationdb/MultiClientHelper.java +++ b/bindings/java/src/integration/com/apple/foundationdb/MultiClientHelper.java @@ -21,15 +21,13 @@ package com.apple.foundationdb; import java.util.ArrayList; import java.util.Collection; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; - +import org.junit.jupiter.api.extension.BeforeAllCallback; /** * Callback to help define a multi-client scenario and ensure that * the clients can be configured properly. */ -public class MultiClientHelper implements BeforeAllCallback,AfterEachCallback{ +public class MultiClientHelper implements BeforeAllCallback { private String[] clusterFiles; private Collection openDatabases; @@ -67,16 +65,4 @@ public class MultiClientHelper implements BeforeAllCallback,AfterEachCallback{ public void beforeAll(ExtensionContext arg0) throws Exception { clusterFiles = readClusterFromEnv(); } - - @Override - public void afterEach(ExtensionContext arg0) throws Exception { - //close any databases that have been opened - if(openDatabases!=null){ - for(Database db : openDatabases){ - db.close(); - } - } - openDatabases = null; - } - } diff --git a/fdbclient/GlobalConfig.actor.cpp b/fdbclient/GlobalConfig.actor.cpp index 45db5aa45b..e3ea6468dd 100644 --- a/fdbclient/GlobalConfig.actor.cpp +++ b/fdbclient/GlobalConfig.actor.cpp @@ -154,8 +154,9 @@ void GlobalConfig::erase(KeyRangeRef range) { } // Updates local copy of global configuration by reading the entire key-range -// from storage (proxied through the GrvProxies). -ACTOR Future GlobalConfig::refresh(GlobalConfig* self, Version lastKnown) { +// from storage (proxied through the GrvProxies). Returns the version of the +// refreshed data. +ACTOR Future GlobalConfig::refresh(GlobalConfig* self, Version lastKnown, Version largestSeen) { // TraceEvent trace(SevInfo, "GlobalConfigRefresh"); self->erase(KeyRangeRef(""_sr, "\xff"_sr)); @@ -171,7 +172,11 @@ ACTOR Future GlobalConfig::refresh(GlobalConfig* self, Version lastKnown) KeyRef systemKey = kv.key.removePrefix(globalConfigKeysPrefix); self->insert(systemKey, kv.value); } - return Void(); + if (reply.version >= largestSeen || largestSeen == std::numeric_limits::max()) { + return reply.version; + } else { + wait(delay(0.25)); + } } catch (Error& e) { wait(backoff.onError()); } @@ -186,52 +191,61 @@ ACTOR Future GlobalConfig::updater(GlobalConfig* self, const ClientDBInfo* if (self->initialized.canBeSet()) { wait(self->cx->onConnected()); - wait(self->refresh(self, -1)); + Version version = wait(self->refresh(self, -1, 0)); + self->lastUpdate = version; + + self->cx->addref(); self->initialized.send(Void()); + self->cx->delref(); } loop { try { - wait(self->dbInfoChanged.onTrigger()); - - auto& history = dbInfo->history; - if (history.size() == 0) { - continue; - } - - if (self->lastUpdate < history[0].version) { - // This process missed too many global configuration - // history updates or the protocol version changed, so it - // must re-read the entire configuration range. - wait(self->refresh(self, history.back().version)); - if (dbInfo->history.size() > 0) { - self->lastUpdate = dbInfo->history.back().version; - } - } else { - // Apply history in order, from lowest version to highest - // version. Mutation history should already be stored in - // ascending version order. - for (const auto& vh : history) { - if (vh.version <= self->lastUpdate) { - continue; // already applied this mutation + // run one iteration at the beginning + wait(delay(0)); + if (dbInfo->history.size() > 0) { + if (self->lastUpdate < dbInfo->history[0].version) { + // This process missed too many global configuration + // history updates or the protocol version changed, so it + // must re-read the entire configuration range. + Version version = + wait(self->refresh(self, self->lastUpdate, dbInfo->history.back().version)); + self->lastUpdate = version; + // DBInfo could have changed after the wait. If + // changes are present, re-run the loop to make + // sure they are applied. + if (dbInfo->history.size() > 0 && + dbInfo->history[0].version != std::numeric_limits::max()) { + continue; } - - for (const auto& mutation : vh.mutations.contents()) { - if (mutation.type == MutationRef::SetValue) { - self->insert(mutation.param1, mutation.param2); - } else if (mutation.type == MutationRef::ClearRange) { - self->erase(KeyRangeRef(mutation.param1, mutation.param2)); - } else { - ASSERT(false); + } else { + // Apply history in order, from lowest version to highest + // version. Mutation history should already be stored in + // ascending version order. + for (const auto& vh : dbInfo->history) { + if (vh.version <= self->lastUpdate) { + continue; // already applied this mutation } + + for (const auto& mutation : vh.mutations.contents()) { + if (mutation.type == MutationRef::SetValue) { + self->insert(mutation.param1, mutation.param2); + } else if (mutation.type == MutationRef::ClearRange) { + self->erase(KeyRangeRef(mutation.param1, mutation.param2)); + } else { + ASSERT(false); + } + } + + ASSERT(vh.version > self->lastUpdate); + self->lastUpdate = vh.version; } - - ASSERT(vh.version > self->lastUpdate); - self->lastUpdate = vh.version; } + self->configChanged.trigger(); } - - self->configChanged.trigger(); + // In case this actor is canceled in the d'tor of GlobalConfig we can exit here. + wait(delay(0)); + wait(self->dbInfoChanged.onTrigger()); } catch (Error& e) { throw; } diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index bb8f45d9b2..2aa6ce5392 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -9075,8 +9075,9 @@ void Transaction::checkDeferredError() const { Reference Transaction::createTrLogInfoProbabilistically(const Database& cx) { if (!cx->isError()) { - double clientSamplingProbability = - cx->globalConfig->get(fdbClientInfoTxnSampleRate, CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY); + double sampleRate = + cx->globalConfig->get(fdbClientInfoTxnSampleRate, std::numeric_limits::infinity()); + double clientSamplingProbability = std::isinf(sampleRate) ? CLIENT_KNOBS->CSI_SAMPLING_PROBABILITY : sampleRate; if (((networkOptions.logClientInfo.present() && networkOptions.logClientInfo.get()) || BUGGIFY) && deterministicRandom()->random01() < clientSamplingProbability && (!g_network->isSimulated() || !g_simulator->speedUpSimulation)) { diff --git a/fdbclient/include/fdbclient/CommitProxyInterface.h b/fdbclient/include/fdbclient/CommitProxyInterface.h index e6ad63cd9b..84d962ff0d 100644 --- a/fdbclient/include/fdbclient/CommitProxyInterface.h +++ b/fdbclient/include/fdbclient/CommitProxyInterface.h @@ -727,14 +727,16 @@ struct ExclusionSafetyCheckRequest { struct GlobalConfigRefreshReply { constexpr static FileIdentifier file_identifier = 12680327; Arena arena; + Version version; RangeResultRef result; GlobalConfigRefreshReply() {} - GlobalConfigRefreshReply(Arena const& arena, RangeResultRef result) : arena(arena), result(result) {} + GlobalConfigRefreshReply(Arena const& arena, Version version, RangeResultRef result) + : arena(arena), version(version), result(result) {} template void serialize(Ar& ar) { - serializer(ar, result, arena); + serializer(ar, result, version, arena); } }; diff --git a/fdbclient/include/fdbclient/GlobalConfig.actor.h b/fdbclient/include/fdbclient/GlobalConfig.actor.h index fb03106630..b5f57c9105 100644 --- a/fdbclient/include/fdbclient/GlobalConfig.actor.h +++ b/fdbclient/include/fdbclient/GlobalConfig.actor.h @@ -163,7 +163,7 @@ private: // of the global configuration keyspace. void erase(KeyRangeRef range); - ACTOR static Future refresh(GlobalConfig* self, Version lastKnown); + ACTOR static Future refresh(GlobalConfig* self, Version lastKnown, Version largestSeen); ACTOR static Future updater(GlobalConfig* self, const ClientDBInfo* dbInfo); DatabaseContext* cx; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 1b089f3b66..0db1eabffe 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1077,6 +1077,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co .detail("GrvProxies", db->clientInfo->get().grvProxies) .detail("ReqGrvProxies", req.grvProxies) .detail("CommitProxies", db->clientInfo->get().commitProxies) + .detail("GlobalConfigHistorySize", db->clientInfo->get().history.size()) .detail("ReqCPs", req.commitProxies) .detail("TenantMode", db->clientInfo->get().tenantMode.toString()) .detail("ReqTenantMode", db->config.tenantMode.toString()) @@ -1093,6 +1094,7 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co clientInfo.id = deterministicRandom()->randomUniqueID(); clientInfo.commitProxies = req.commitProxies; clientInfo.grvProxies = req.grvProxies; + clientInfo.history = db->clientInfo->get().history; clientInfo.tenantMode = TenantAPI::tenantModeForClusterType(db->clusterType, db->config.tenantMode); clientInfo.clusterId = db->serverInfo->get().client.clusterId; clientInfo.clusterType = db->clusterType; @@ -1742,12 +1744,12 @@ ACTOR Future monitorStorageMetadata(ClusterControllerData* self) { ACTOR Future monitorGlobalConfig(ClusterControllerData::DBInfo* db) { loop { state ReadYourWritesTransaction tr(db->db); + state ClientDBInfo clientInfo; loop { try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); state Optional globalConfigVersion = wait(tr.get(globalConfigVersionKey)); - state ClientDBInfo clientInfo = db->serverInfo->get().client; if (globalConfigVersion.present()) { // Since the history keys end with versionstamps, they @@ -1758,11 +1760,12 @@ ACTOR Future monitorGlobalConfig(ClusterControllerData::DBInfo* db) { // If the global configuration version key has been set, // the history should contain at least one item. ASSERT(globalConfigHistory.size() > 0); + clientInfo = db->serverInfo->get().client; clientInfo.history.clear(); for (const auto& kv : globalConfigHistory) { ObjectReader reader(kv.value.begin(), IncludeVersion()); - if (reader.protocolVersion() != g_network->protocolVersion()) { + if (reader.protocolVersion() != g_network->protocolVersion() || BUGGIFY_WITH_PROB(0.01)) { // If the protocol version has changed, the // GlobalConfig actor should refresh its view by // reading the entire global configuration key diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index 91961710af..343590f999 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -366,7 +366,7 @@ ACTOR Future globalConfigRequestServer(GrvProxyData* grvProxyData, GrvProx // point of view. The client learns the version through a // ClientDBInfo update). if (refresh.lastKnown <= cachedVersion) { - refresh.reply.send(GlobalConfigRefreshReply{ cachedData.arena(), cachedData }); + refresh.reply.send(GlobalConfigRefreshReply{ cachedData.arena(), cachedVersion, cachedData }); } else { refresh.reply.sendError(future_version()); }