Merge branch 'main' into block-down

This commit is contained in:
Bharadwaj V.R 2022-04-22 14:53:09 -07:00 committed by GitHub
commit 588b2fa509
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 80 additions and 2 deletions

View File

@ -695,6 +695,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"cluster_controller_timestamp":1415650089,
"protocol_version":"fdb00a400050001",
"newest_protocol_version":"fdb00a500040001",
"lowest_compatible_protocol_version":"fdb00a500040001",
"connection_string":"a:a@127.0.0.1:4000",
"full_replication":true,
"maintenance_zone":"0ccb4e0fdbdb5583010f6b77d9d10ece",

View File

@ -25,6 +25,7 @@
#include "fdbserver/MasterInterface.h"
#include "fdbserver/WaitFailure.h"
#include "flow/ProtocolVersion.h"
#include "flow/actorcompiler.h" // This must be the last #include.
static std::set<int> const& normalClusterRecoveryErrors() {
@ -1407,6 +1408,11 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
wait(self->cstate.read());
if (self->cstate.prevDBState.lowestCompatibleProtocolVersion > currentProtocolVersion) {
TraceEvent(SevWarnAlways, "IncompatibleProtocolVersion", self->dbgid).log();
throw internal_error();
}
self->recoveryState = RecoveryState::LOCKING_CSTATE;
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME).c_str(), self->dbgid)
.detail("StatusCode", RecoveryStatus::locking_coordinated_state)
@ -1462,8 +1468,21 @@ ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self) {
DBCoreState newState = self->cstate.myDBState;
newState.recoveryCount++;
newState.recoveryCount++;
if (self->cstate.prevDBState.newestProtocolVersion.isInvalid() ||
self->cstate.prevDBState.newestProtocolVersion < currentProtocolVersion) {
ASSERT(self->cstate.myDBState.lowestCompatibleProtocolVersion.isInvalid() ||
!self->cstate.myDBState.newestProtocolVersion.isInvalid());
newState.newestProtocolVersion = currentProtocolVersion;
newState.lowestCompatibleProtocolVersion = minCompatibleProtocolVersion;
}
wait(self->cstate.write(newState) || recoverAndEndEpoch);
TraceEvent("ProtocolVersionCompatibilityChecked", self->dbgid)
.detail("NewestProtocolVersion", self->cstate.myDBState.newestProtocolVersion)
.detail("LowestCompatibleProtocolVersion", self->cstate.myDBState.lowestCompatibleProtocolVersion)
.trackLatest(self->swVersionCheckedEventHolder->trackingKey);
self->recoveryState = RecoveryState::RECRUITING;
state std::vector<StorageServerInterface> seedServers;

View File

@ -22,6 +22,7 @@
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#include "flow/Trace.h"
#include <utility>
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_CLUSTERRECOVERY_ACTOR_G_H)
@ -244,6 +245,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
Future<Void> logger;
Reference<EventCacheHolder> swVersionCheckedEventHolder;
Reference<EventCacheHolder> recoveredConfigEventHolder;
Reference<EventCacheHolder> clusterRecoveryStateEventHolder;
Reference<EventCacheHolder> clusterRecoveryGenerationsEventHolder;
@ -273,6 +275,7 @@ struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData>
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc),
swVersionCheckedEventHolder(makeReference<EventCacheHolder>("SWVersionCompatibilityChecked")),
recoveredConfigEventHolder(makeReference<EventCacheHolder>("RecoveredConfig")) {
clusterRecoveryStateEventHolder = makeReference<EventCacheHolder>(
getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_STATE_EVENT_NAME));

View File

@ -141,8 +141,13 @@ struct DBCoreState {
DBRecoveryCount recoveryCount; // Increases with sequential successful recoveries.
LogSystemType logSystemType;
std::set<int8_t> pseudoLocalities;
ProtocolVersion newestProtocolVersion;
ProtocolVersion lowestCompatibleProtocolVersion;
DBCoreState() : logRouterTags(0), txsTags(0), recoveryCount(0), logSystemType(LogSystemType::empty) {}
DBCoreState()
: logRouterTags(0), txsTags(0), recoveryCount(0), logSystemType(LogSystemType::empty),
newestProtocolVersion(ProtocolVersion::invalidProtocolVersion),
lowestCompatibleProtocolVersion(ProtocolVersion::invalidProtocolVersion) {}
std::vector<UID> getPriorCommittedLogServers() {
std::vector<UID> priorCommittedLogServers;
@ -180,6 +185,9 @@ struct DBCoreState {
if (ar.protocolVersion().hasShardedTxsTags()) {
serializer(ar, txsTags);
}
if (ar.protocolVersion().hasSWVersionTracking()) {
serializer(ar, newestProtocolVersion, lowestCompatibleProtocolVersion);
}
} else if (ar.isDeserializing) {
tLogs.push_back(CoreTLogSet());
serializer(ar,

View File

@ -24,6 +24,7 @@
#include "fdbclient/KeyBackedTypes.h"
#include "fdbserver/Status.h"
#include "flow/ITrace.h"
#include "flow/ProtocolVersion.h"
#include "flow/Trace.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
@ -1527,6 +1528,41 @@ ACTOR static Future<Void> logRangeWarningFetcher(Database cx,
return Void();
}
struct ProtocolVersionData {
ProtocolVersion runningProtocolVersion;
ProtocolVersion newestProtocolVersion;
ProtocolVersion lowestCompatibleProtocolVersion;
ProtocolVersionData() : runningProtocolVersion(currentProtocolVersion) {}
ProtocolVersionData(uint64_t newestProtocolVersionValue, uint64_t lowestCompatibleProtocolVersionValue)
: runningProtocolVersion(currentProtocolVersion), newestProtocolVersion(newestProtocolVersionValue),
lowestCompatibleProtocolVersion(lowestCompatibleProtocolVersionValue) {}
};
ACTOR Future<ProtocolVersionData> getNewestProtocolVersion(Database cx, WorkerDetails ccWorker) {
try {
state Future<TraceEventFields> swVersionF = timeoutError(
ccWorker.interf.eventLogRequest.getReply(EventLogRequest("SWVersionCompatibilityChecked"_sr)), 1.0);
wait(success(swVersionF));
const TraceEventFields& swVersionTrace = swVersionF.get();
int64_t newestProtocolVersionValue =
std::stoull(swVersionTrace.getValue("NewestProtocolVersion").c_str(), nullptr, 16);
int64_t lowestCompatibleProtocolVersionValue =
std::stoull(swVersionTrace.getValue("LowestCompatibleProtocolVersion").c_str(), nullptr, 16);
return ProtocolVersionData(newestProtocolVersionValue, lowestCompatibleProtocolVersionValue);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
TraceEvent(SevWarnAlways, "SWVersionStatusFailed").error(e);
return ProtocolVersionData();
}
}
struct LoadConfigurationResult {
bool fullReplication;
Optional<Key> healthyZone;
@ -2880,6 +2916,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
messages.push_back(message);
}
state ProtocolVersionData protocolVersion = wait(getNewestProtocolVersion(cx, ccWorker));
// construct status information for cluster subsections
state int statusCode = (int)RecoveryStatus::END;
state JsonBuilderObject recoveryStateStatus = wait(
@ -2917,6 +2955,9 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["protocol_version"] = format("%" PRIx64, g_network->protocolVersion().version());
statusObj["connection_string"] = coordinators.ccr->getConnectionString().toString();
statusObj["bounce_impact"] = getBounceImpactInfo(statusCode);
statusObj["newest_protocol_version"] = format("%" PRIx64, protocolVersion.newestProtocolVersion.version());
statusObj["lowest_compatible_protocol_version"] =
format("%" PRIx64, protocolVersion.lowestCompatibleProtocolVersion.version());
state Optional<DatabaseConfiguration> configuration;
state Optional<LoadConfigurationResult> loadResult;

View File

@ -19,8 +19,8 @@
*/
#pragma once
#include <cstdint>
#include "flow/Trace.h"
#include <cstdint>
// This version impacts both communications and the deserialization of certain database and IKeyValueStore keys.
//
@ -62,6 +62,7 @@ public: // constants
static constexpr uint64_t objectSerializerFlag = 0x1000000000000000LL;
static constexpr uint64_t compatibleProtocolVersionMask = 0xFFFFFFFFFFFF0000LL;
static constexpr uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;
static constexpr uint64_t invalidProtocolVersion = 0x0FDB00A100000000LL;
public:
constexpr explicit ProtocolVersion(uint64_t version) : _version(version) {}
@ -77,6 +78,8 @@ public:
}
constexpr bool isValid() const { return version() >= minValidProtocolVersion; }
constexpr bool isInvalid() const { return version() == invalidProtocolVersion; }
constexpr uint64_t version() const { return _version & versionFlagMask; }
constexpr uint64_t versionWithFlags() const { return _version; }
@ -168,6 +171,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, StorageInterfaceReadiness);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext);
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking);
};
template <>
@ -242,3 +246,4 @@ struct Traceable<SWVersion> : std::true_type {
swVersion.lowestCompatibleProtocolVersion());
}
};