mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 10:22:20 +08:00
Update type of coordinators hash
This fixes some serialization issues due to `BinaryReader` not being able to deserialize types of size_t.
This commit is contained in:
parent
9fd22546ea
commit
8c50f98c00
@ -35,7 +35,8 @@ class CommitQuorum {
|
|||||||
Standalone<VectorRef<ConfigMutationRef>> mutations;
|
Standalone<VectorRef<ConfigMutationRef>> mutations;
|
||||||
ConfigCommitAnnotation annotation;
|
ConfigCommitAnnotation annotation;
|
||||||
|
|
||||||
ConfigTransactionCommitRequest getCommitRequest(ConfigGeneration generation, size_t coordinatorsHash) const {
|
ConfigTransactionCommitRequest getCommitRequest(ConfigGeneration generation,
|
||||||
|
CoordinatorsHash coordinatorsHash) const {
|
||||||
return ConfigTransactionCommitRequest(coordinatorsHash, generation, mutations, annotation);
|
return ConfigTransactionCommitRequest(coordinatorsHash, generation, mutations, annotation);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,7 +64,7 @@ class CommitQuorum {
|
|||||||
|
|
||||||
ACTOR static Future<Void> addRequestActor(CommitQuorum* self,
|
ACTOR static Future<Void> addRequestActor(CommitQuorum* self,
|
||||||
ConfigGeneration generation,
|
ConfigGeneration generation,
|
||||||
size_t coordinatorsHash,
|
CoordinatorsHash coordinatorsHash,
|
||||||
ConfigTransactionInterface cti) {
|
ConfigTransactionInterface cti) {
|
||||||
try {
|
try {
|
||||||
if (cti.hostname.present()) {
|
if (cti.hostname.present()) {
|
||||||
@ -112,7 +113,7 @@ public:
|
|||||||
}
|
}
|
||||||
void setTimestamp() { annotation.timestamp = now(); }
|
void setTimestamp() { annotation.timestamp = now(); }
|
||||||
size_t expectedSize() const { return annotation.expectedSize() + mutations.expectedSize(); }
|
size_t expectedSize() const { return annotation.expectedSize() + mutations.expectedSize(); }
|
||||||
Future<Void> commit(ConfigGeneration generation, size_t coordinatorsHash) {
|
Future<Void> commit(ConfigGeneration generation, CoordinatorsHash coordinatorsHash) {
|
||||||
// Send commit message to all replicas, even those that did not return the used replica.
|
// Send commit message to all replicas, even those that did not return the used replica.
|
||||||
// This way, slow replicas are kept up date.
|
// This way, slow replicas are kept up date.
|
||||||
for (const auto& cti : ctis) {
|
for (const auto& cti : ctis) {
|
||||||
@ -125,7 +126,7 @@ public:
|
|||||||
|
|
||||||
class GetGenerationQuorum {
|
class GetGenerationQuorum {
|
||||||
ActorCollection actors{ false };
|
ActorCollection actors{ false };
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
std::vector<ConfigTransactionInterface> ctis;
|
std::vector<ConfigTransactionInterface> ctis;
|
||||||
std::map<ConfigGeneration, std::vector<ConfigTransactionInterface>> seenGenerations;
|
std::map<ConfigGeneration, std::vector<ConfigTransactionInterface>> seenGenerations;
|
||||||
Promise<ConfigGeneration> result;
|
Promise<ConfigGeneration> result;
|
||||||
@ -241,7 +242,7 @@ class GetGenerationQuorum {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
GetGenerationQuorum() = default;
|
GetGenerationQuorum() = default;
|
||||||
explicit GetGenerationQuorum(size_t coordinatorsHash,
|
explicit GetGenerationQuorum(CoordinatorsHash coordinatorsHash,
|
||||||
std::vector<ConfigTransactionInterface> const& ctis,
|
std::vector<ConfigTransactionInterface> const& ctis,
|
||||||
Future<Void> coordinatorsChangedFuture,
|
Future<Void> coordinatorsChangedFuture,
|
||||||
Optional<Version> const& lastSeenLiveVersion = {})
|
Optional<Version> const& lastSeenLiveVersion = {})
|
||||||
@ -267,7 +268,7 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
class PaxosConfigTransactionImpl {
|
class PaxosConfigTransactionImpl {
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
std::vector<ConfigTransactionInterface> ctis;
|
std::vector<ConfigTransactionInterface> ctis;
|
||||||
GetGenerationQuorum getGenerationQuorum;
|
GetGenerationQuorum getGenerationQuorum;
|
||||||
CommitQuorum commitQuorum;
|
CommitQuorum commitQuorum;
|
||||||
|
@ -25,6 +25,8 @@
|
|||||||
|
|
||||||
#include "fdbclient/FDBTypes.h"
|
#include "fdbclient/FDBTypes.h"
|
||||||
|
|
||||||
|
typedef uint64_t CoordinatorsHash;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* KnobValueRefs are stored in the configuration database, and in local configuration files. They are created from
|
* KnobValueRefs are stored in the configuration database, and in local configuration files. They are created from
|
||||||
* ParsedKnobValue objects, so it is assumed that the value type is correct for the corresponding knob name
|
* ParsedKnobValue objects, so it is assumed that the value type is correct for the corresponding knob name
|
||||||
|
@ -65,12 +65,12 @@ struct ConfigTransactionGetGenerationReply {
|
|||||||
|
|
||||||
struct ConfigTransactionGetGenerationRequest {
|
struct ConfigTransactionGetGenerationRequest {
|
||||||
static constexpr FileIdentifier file_identifier = 138941;
|
static constexpr FileIdentifier file_identifier = 138941;
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
// A hint to catch up lagging nodes:
|
// A hint to catch up lagging nodes:
|
||||||
Optional<Version> lastSeenLiveVersion;
|
Optional<Version> lastSeenLiveVersion;
|
||||||
ReplyPromise<ConfigTransactionGetGenerationReply> reply;
|
ReplyPromise<ConfigTransactionGetGenerationReply> reply;
|
||||||
ConfigTransactionGetGenerationRequest() = default;
|
ConfigTransactionGetGenerationRequest() = default;
|
||||||
explicit ConfigTransactionGetGenerationRequest(size_t coordinatorsHash,
|
explicit ConfigTransactionGetGenerationRequest(CoordinatorsHash coordinatorsHash,
|
||||||
Optional<Version> const& lastSeenLiveVersion)
|
Optional<Version> const& lastSeenLiveVersion)
|
||||||
: coordinatorsHash(coordinatorsHash), lastSeenLiveVersion(lastSeenLiveVersion) {}
|
: coordinatorsHash(coordinatorsHash), lastSeenLiveVersion(lastSeenLiveVersion) {}
|
||||||
|
|
||||||
@ -94,13 +94,13 @@ struct ConfigTransactionGetReply {
|
|||||||
|
|
||||||
struct ConfigTransactionGetRequest {
|
struct ConfigTransactionGetRequest {
|
||||||
static constexpr FileIdentifier file_identifier = 923040;
|
static constexpr FileIdentifier file_identifier = 923040;
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
ConfigGeneration generation;
|
ConfigGeneration generation;
|
||||||
ConfigKey key;
|
ConfigKey key;
|
||||||
ReplyPromise<ConfigTransactionGetReply> reply;
|
ReplyPromise<ConfigTransactionGetReply> reply;
|
||||||
|
|
||||||
ConfigTransactionGetRequest() = default;
|
ConfigTransactionGetRequest() = default;
|
||||||
explicit ConfigTransactionGetRequest(size_t coordinatorsHash, ConfigGeneration generation, ConfigKey key)
|
explicit ConfigTransactionGetRequest(CoordinatorsHash coordinatorsHash, ConfigGeneration generation, ConfigKey key)
|
||||||
: coordinatorsHash(coordinatorsHash), generation(generation), key(key) {}
|
: coordinatorsHash(coordinatorsHash), generation(generation), key(key) {}
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
@ -112,14 +112,14 @@ struct ConfigTransactionGetRequest {
|
|||||||
struct ConfigTransactionCommitRequest {
|
struct ConfigTransactionCommitRequest {
|
||||||
static constexpr FileIdentifier file_identifier = 103841;
|
static constexpr FileIdentifier file_identifier = 103841;
|
||||||
Arena arena;
|
Arena arena;
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
ConfigGeneration generation{ ::invalidVersion, ::invalidVersion };
|
ConfigGeneration generation{ ::invalidVersion, ::invalidVersion };
|
||||||
VectorRef<ConfigMutationRef> mutations;
|
VectorRef<ConfigMutationRef> mutations;
|
||||||
ConfigCommitAnnotationRef annotation;
|
ConfigCommitAnnotationRef annotation;
|
||||||
ReplyPromise<Void> reply;
|
ReplyPromise<Void> reply;
|
||||||
|
|
||||||
ConfigTransactionCommitRequest() = default;
|
ConfigTransactionCommitRequest() = default;
|
||||||
explicit ConfigTransactionCommitRequest(size_t coordinatorsHash,
|
explicit ConfigTransactionCommitRequest(CoordinatorsHash coordinatorsHash,
|
||||||
ConfigGeneration generation,
|
ConfigGeneration generation,
|
||||||
VectorRef<ConfigMutationRef> mutations,
|
VectorRef<ConfigMutationRef> mutations,
|
||||||
ConfigCommitAnnotationRef annotation)
|
ConfigCommitAnnotationRef annotation)
|
||||||
@ -150,12 +150,12 @@ struct ConfigTransactionGetConfigClassesReply {
|
|||||||
|
|
||||||
struct ConfigTransactionGetConfigClassesRequest {
|
struct ConfigTransactionGetConfigClassesRequest {
|
||||||
static constexpr FileIdentifier file_identifier = 7163400;
|
static constexpr FileIdentifier file_identifier = 7163400;
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
ConfigGeneration generation;
|
ConfigGeneration generation;
|
||||||
ReplyPromise<ConfigTransactionGetConfigClassesReply> reply;
|
ReplyPromise<ConfigTransactionGetConfigClassesReply> reply;
|
||||||
|
|
||||||
ConfigTransactionGetConfigClassesRequest() = default;
|
ConfigTransactionGetConfigClassesRequest() = default;
|
||||||
explicit ConfigTransactionGetConfigClassesRequest(size_t coordinatorsHash, ConfigGeneration generation)
|
explicit ConfigTransactionGetConfigClassesRequest(CoordinatorsHash coordinatorsHash, ConfigGeneration generation)
|
||||||
: coordinatorsHash(coordinatorsHash), generation(generation) {}
|
: coordinatorsHash(coordinatorsHash), generation(generation) {}
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
@ -179,13 +179,13 @@ struct ConfigTransactionGetKnobsReply {
|
|||||||
|
|
||||||
struct ConfigTransactionGetKnobsRequest {
|
struct ConfigTransactionGetKnobsRequest {
|
||||||
static constexpr FileIdentifier file_identifier = 987410;
|
static constexpr FileIdentifier file_identifier = 987410;
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
ConfigGeneration generation;
|
ConfigGeneration generation;
|
||||||
Optional<Key> configClass;
|
Optional<Key> configClass;
|
||||||
ReplyPromise<ConfigTransactionGetKnobsReply> reply;
|
ReplyPromise<ConfigTransactionGetKnobsReply> reply;
|
||||||
|
|
||||||
ConfigTransactionGetKnobsRequest() = default;
|
ConfigTransactionGetKnobsRequest() = default;
|
||||||
explicit ConfigTransactionGetKnobsRequest(size_t coordinatorsHash,
|
explicit ConfigTransactionGetKnobsRequest(CoordinatorsHash coordinatorsHash,
|
||||||
ConfigGeneration generation,
|
ConfigGeneration generation,
|
||||||
Optional<Key> configClass)
|
Optional<Key> configClass)
|
||||||
: coordinatorsHash(coordinatorsHash), generation(generation), configClass(configClass) {}
|
: coordinatorsHash(coordinatorsHash), generation(generation), configClass(configClass) {}
|
||||||
|
@ -41,6 +41,7 @@ typedef StringRef KeyRef;
|
|||||||
typedef StringRef ValueRef;
|
typedef StringRef ValueRef;
|
||||||
typedef int64_t Generation;
|
typedef int64_t Generation;
|
||||||
typedef UID SpanID;
|
typedef UID SpanID;
|
||||||
|
typedef uint64_t CoordinatorsHash;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2)
|
tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2)
|
||||||
|
@ -80,7 +80,7 @@ class ConfigBroadcasterImpl {
|
|||||||
Version lastCompactedVersion;
|
Version lastCompactedVersion;
|
||||||
Version largestLiveVersion;
|
Version largestLiveVersion;
|
||||||
Version mostRecentVersion;
|
Version mostRecentVersion;
|
||||||
size_t coordinatorsHash;
|
CoordinatorsHash coordinatorsHash;
|
||||||
std::unique_ptr<IConfigConsumer> consumer;
|
std::unique_ptr<IConfigConsumer> consumer;
|
||||||
Future<Void> consumerFuture;
|
Future<Void> consumerFuture;
|
||||||
ActorCollection actors{ false };
|
ActorCollection actors{ false };
|
||||||
@ -717,7 +717,7 @@ ACTOR static Future<Void> lockConfigNodesImpl(ServerCoordinators coordinators) {
|
|||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t coordinatorsHash = std::hash<std::string>()(coordinators.ccr->getConnectionString().toString());
|
CoordinatorsHash coordinatorsHash = std::hash<std::string>()(coordinators.ccr->getConnectionString().toString());
|
||||||
|
|
||||||
std::vector<Future<Void>> lockRequests;
|
std::vector<Future<Void>> lockRequests;
|
||||||
lockRequests.reserve(coordinators.configServers.size());
|
lockRequests.reserve(coordinators.configServers.size());
|
||||||
|
@ -135,11 +135,11 @@ class ConfigNodeImpl {
|
|||||||
Counter getGenerationRequests;
|
Counter getGenerationRequests;
|
||||||
Future<Void> logger;
|
Future<Void> logger;
|
||||||
|
|
||||||
ACTOR static Future<size_t> getCoordinatorsHash(ConfigNodeImpl* self) {
|
ACTOR static Future<CoordinatorsHash> getCoordinatorsHash(ConfigNodeImpl* self) {
|
||||||
state size_t coordinatorsHash = 0;
|
state CoordinatorsHash coordinatorsHash = 0;
|
||||||
Optional<Value> value = wait(self->kvStore->readValue(coordinatorsHashKey));
|
Optional<Value> value = wait(self->kvStore->readValue(coordinatorsHashKey));
|
||||||
if (value.present()) {
|
if (value.present()) {
|
||||||
coordinatorsHash = BinaryReader::fromStringRef<size_t>(value.get(), IncludeVersion());
|
coordinatorsHash = BinaryReader::fromStringRef<CoordinatorsHash>(value.get(), IncludeVersion());
|
||||||
} else {
|
} else {
|
||||||
self->kvStore->set(
|
self->kvStore->set(
|
||||||
KeyValueRef(coordinatorsHashKey, BinaryWriter::toValue(coordinatorsHash, IncludeVersion())));
|
KeyValueRef(coordinatorsHashKey, BinaryWriter::toValue(coordinatorsHash, IncludeVersion())));
|
||||||
@ -148,14 +148,12 @@ class ConfigNodeImpl {
|
|||||||
return coordinatorsHash;
|
return coordinatorsHash;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The returned value is a hash of the nodes current idea of the
|
ACTOR static Future<Optional<CoordinatorsHash>> getLocked(ConfigNodeImpl* self) {
|
||||||
// coordinators.
|
|
||||||
ACTOR static Future<Optional<size_t>> getLocked(ConfigNodeImpl* self) {
|
|
||||||
Optional<Value> value = wait(self->kvStore->readValue(lockedKey));
|
Optional<Value> value = wait(self->kvStore->readValue(lockedKey));
|
||||||
if (!value.present()) {
|
if (!value.present()) {
|
||||||
return Optional<size_t>();
|
return Optional<CoordinatorsHash>();
|
||||||
}
|
}
|
||||||
return BinaryReader::fromStringRef<Optional<size_t>>(value.get(), IncludeVersion());
|
return BinaryReader::fromStringRef<Optional<CoordinatorsHash>>(value.get(), IncludeVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<ConfigGeneration> getGeneration(ConfigNodeImpl* self) {
|
ACTOR static Future<ConfigGeneration> getGeneration(ConfigNodeImpl* self) {
|
||||||
@ -254,7 +252,7 @@ class ConfigNodeImpl {
|
|||||||
// New transactions increment the database's current live version. This effectively serves as a lock, providing
|
// New transactions increment the database's current live version. This effectively serves as a lock, providing
|
||||||
// serializability
|
// serializability
|
||||||
ACTOR static Future<Void> getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) {
|
ACTOR static Future<Void> getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) {
|
||||||
state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
|
state CoordinatorsHash coordinatorsHash = wait(getCoordinatorsHash(self));
|
||||||
if (req.coordinatorsHash != coordinatorsHash) {
|
if (req.coordinatorsHash != coordinatorsHash) {
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
@ -273,13 +271,13 @@ class ConfigNodeImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> get(ConfigNodeImpl* self, ConfigTransactionGetRequest req) {
|
ACTOR static Future<Void> get(ConfigNodeImpl* self, ConfigTransactionGetRequest req) {
|
||||||
state Optional<size_t> locked = wait(getLocked(self));
|
state Optional<CoordinatorsHash> locked = wait(getLocked(self));
|
||||||
if (locked.present()) {
|
if (locked.present()) {
|
||||||
CODE_PROBE(true, "attempting to read from a locked ConfigNode");
|
CODE_PROBE(true, "attempting to read from a locked ConfigNode");
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
|
state CoordinatorsHash coordinatorsHash = wait(getCoordinatorsHash(self));
|
||||||
if (req.coordinatorsHash != coordinatorsHash) {
|
if (req.coordinatorsHash != coordinatorsHash) {
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
@ -316,13 +314,13 @@ class ConfigNodeImpl {
|
|||||||
// TODO: Currently it is possible that extra configuration classes may be returned, we
|
// TODO: Currently it is possible that extra configuration classes may be returned, we
|
||||||
// may want to fix this to clean up the contract
|
// may want to fix this to clean up the contract
|
||||||
ACTOR static Future<Void> getConfigClasses(ConfigNodeImpl* self, ConfigTransactionGetConfigClassesRequest req) {
|
ACTOR static Future<Void> getConfigClasses(ConfigNodeImpl* self, ConfigTransactionGetConfigClassesRequest req) {
|
||||||
state Optional<size_t> locked = wait(getLocked(self));
|
state Optional<CoordinatorsHash> locked = wait(getLocked(self));
|
||||||
if (locked.present()) {
|
if (locked.present()) {
|
||||||
CODE_PROBE(true, "attempting to read config classes from locked ConfigNode");
|
CODE_PROBE(true, "attempting to read config classes from locked ConfigNode");
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
|
state CoordinatorsHash coordinatorsHash = wait(getCoordinatorsHash(self));
|
||||||
if (req.coordinatorsHash != coordinatorsHash) {
|
if (req.coordinatorsHash != coordinatorsHash) {
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
@ -360,13 +358,13 @@ class ConfigNodeImpl {
|
|||||||
|
|
||||||
// Retrieve all knobs explicitly defined for the specified configuration class
|
// Retrieve all knobs explicitly defined for the specified configuration class
|
||||||
ACTOR static Future<Void> getKnobs(ConfigNodeImpl* self, ConfigTransactionGetKnobsRequest req) {
|
ACTOR static Future<Void> getKnobs(ConfigNodeImpl* self, ConfigTransactionGetKnobsRequest req) {
|
||||||
state Optional<size_t> locked = wait(getLocked(self));
|
state Optional<CoordinatorsHash> locked = wait(getLocked(self));
|
||||||
if (locked.present()) {
|
if (locked.present()) {
|
||||||
CODE_PROBE(true, "attempting to read knobs from locked ConfigNode");
|
CODE_PROBE(true, "attempting to read knobs from locked ConfigNode");
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
|
state CoordinatorsHash coordinatorsHash = wait(getCoordinatorsHash(self));
|
||||||
if (req.coordinatorsHash != coordinatorsHash) {
|
if (req.coordinatorsHash != coordinatorsHash) {
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
@ -453,13 +451,13 @@ class ConfigNodeImpl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ACTOR static Future<Void> commit(ConfigNodeImpl* self, ConfigTransactionCommitRequest req) {
|
ACTOR static Future<Void> commit(ConfigNodeImpl* self, ConfigTransactionCommitRequest req) {
|
||||||
state Optional<size_t> locked = wait(getLocked(self));
|
state Optional<CoordinatorsHash> locked = wait(getLocked(self));
|
||||||
if (locked.present()) {
|
if (locked.present()) {
|
||||||
CODE_PROBE(true, "attempting to write to locked ConfigNode");
|
CODE_PROBE(true, "attempting to write to locked ConfigNode");
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
|
state CoordinatorsHash coordinatorsHash = wait(getCoordinatorsHash(self));
|
||||||
if (req.coordinatorsHash != coordinatorsHash) {
|
if (req.coordinatorsHash != coordinatorsHash) {
|
||||||
req.reply.sendError(coordinators_changed());
|
req.reply.sendError(coordinators_changed());
|
||||||
return Void();
|
return Void();
|
||||||
@ -675,7 +673,7 @@ class ConfigNodeImpl {
|
|||||||
req.reply.send(ConfigBroadcastRegisteredReply{ isRegistered, generation.committedVersion });
|
req.reply.send(ConfigBroadcastRegisteredReply{ isRegistered, generation.committedVersion });
|
||||||
}
|
}
|
||||||
when(state ConfigBroadcastReadyRequest readyReq = waitNext(cbi->ready.getFuture())) {
|
when(state ConfigBroadcastReadyRequest readyReq = waitNext(cbi->ready.getFuture())) {
|
||||||
state Optional<size_t> locked = wait(getLocked(self));
|
state Optional<CoordinatorsHash> locked = wait(getLocked(self));
|
||||||
|
|
||||||
// New ConfigNodes with no previous state should always
|
// New ConfigNodes with no previous state should always
|
||||||
// apply snapshots from the ConfigBroadcaster. Otherwise,
|
// apply snapshots from the ConfigBroadcaster. Otherwise,
|
||||||
@ -715,8 +713,8 @@ class ConfigNodeImpl {
|
|||||||
// Make sure freshly up to date ConfigNode isn't
|
// Make sure freshly up to date ConfigNode isn't
|
||||||
// locked! This is possible if it was a coordinator in
|
// locked! This is possible if it was a coordinator in
|
||||||
// a previous generation.
|
// a previous generation.
|
||||||
self->kvStore->set(
|
self->kvStore->set(KeyValueRef(
|
||||||
KeyValueRef(lockedKey, BinaryWriter::toValue(Optional<size_t>(), IncludeVersion())));
|
lockedKey, BinaryWriter::toValue(Optional<CoordinatorsHash>(), IncludeVersion())));
|
||||||
}
|
}
|
||||||
self->kvStore->set(KeyValueRef(coordinatorsHashKey,
|
self->kvStore->set(KeyValueRef(coordinatorsHashKey,
|
||||||
BinaryWriter::toValue(readyReq.coordinatorsHash, IncludeVersion())));
|
BinaryWriter::toValue(readyReq.coordinatorsHash, IncludeVersion())));
|
||||||
@ -768,13 +766,13 @@ class ConfigNodeImpl {
|
|||||||
}
|
}
|
||||||
when(state ConfigFollowerLockRequest req = waitNext(cfi->lock.getFuture())) {
|
when(state ConfigFollowerLockRequest req = waitNext(cfi->lock.getFuture())) {
|
||||||
++self->lockRequests;
|
++self->lockRequests;
|
||||||
size_t coordinatorsHash = wait(getCoordinatorsHash(self));
|
CoordinatorsHash coordinatorsHash = wait(getCoordinatorsHash(self));
|
||||||
if (coordinatorsHash == 0 || coordinatorsHash == req.coordinatorsHash) {
|
if (coordinatorsHash == 0 || coordinatorsHash == req.coordinatorsHash) {
|
||||||
TraceEvent("ConfigNodeLocking", self->id).log();
|
TraceEvent("ConfigNodeLocking", self->id).log();
|
||||||
self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(false, IncludeVersion())));
|
self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(false, IncludeVersion())));
|
||||||
self->kvStore->set(KeyValueRef(
|
self->kvStore->set(KeyValueRef(
|
||||||
lockedKey,
|
lockedKey,
|
||||||
BinaryWriter::toValue(Optional<size_t>(req.coordinatorsHash), IncludeVersion())));
|
BinaryWriter::toValue(Optional<CoordinatorsHash>(req.coordinatorsHash), IncludeVersion())));
|
||||||
wait(self->kvStore->commit());
|
wait(self->kvStore->commit());
|
||||||
}
|
}
|
||||||
req.reply.send(Void());
|
req.reply.send(Void());
|
||||||
|
@ -156,14 +156,14 @@ struct ConfigBroadcastReadyReply {
|
|||||||
|
|
||||||
struct ConfigBroadcastReadyRequest {
|
struct ConfigBroadcastReadyRequest {
|
||||||
static constexpr FileIdentifier file_identifier = 7402862;
|
static constexpr FileIdentifier file_identifier = 7402862;
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
std::map<ConfigKey, KnobValue> snapshot;
|
std::map<ConfigKey, KnobValue> snapshot;
|
||||||
Version snapshotVersion{ 0 };
|
Version snapshotVersion{ 0 };
|
||||||
Version liveVersion{ 0 };
|
Version liveVersion{ 0 };
|
||||||
ReplyPromise<ConfigBroadcastReadyReply> reply;
|
ReplyPromise<ConfigBroadcastReadyReply> reply;
|
||||||
|
|
||||||
ConfigBroadcastReadyRequest() = default;
|
ConfigBroadcastReadyRequest() = default;
|
||||||
ConfigBroadcastReadyRequest(size_t coordinatorsHash,
|
ConfigBroadcastReadyRequest(CoordinatorsHash coordinatorsHash,
|
||||||
std::map<ConfigKey, KnobValue> const& snapshot,
|
std::map<ConfigKey, KnobValue> const& snapshot,
|
||||||
Version snapshotVersion,
|
Version snapshotVersion,
|
||||||
Version liveVersion)
|
Version liveVersion)
|
||||||
|
@ -211,11 +211,11 @@ struct ConfigFollowerGetCommittedVersionRequest {
|
|||||||
|
|
||||||
struct ConfigFollowerLockRequest {
|
struct ConfigFollowerLockRequest {
|
||||||
static constexpr FileIdentifier file_identifier = 1867800;
|
static constexpr FileIdentifier file_identifier = 1867800;
|
||||||
size_t coordinatorsHash{ 0 };
|
CoordinatorsHash coordinatorsHash{ 0 };
|
||||||
ReplyPromise<Void> reply;
|
ReplyPromise<Void> reply;
|
||||||
|
|
||||||
ConfigFollowerLockRequest() = default;
|
ConfigFollowerLockRequest() = default;
|
||||||
explicit ConfigFollowerLockRequest(size_t coordinatorsHash) : coordinatorsHash(coordinatorsHash) {}
|
explicit ConfigFollowerLockRequest(CoordinatorsHash coordinatorsHash) : coordinatorsHash(coordinatorsHash) {}
|
||||||
|
|
||||||
template <class Ar>
|
template <class Ar>
|
||||||
void serialize(Ar& ar) {
|
void serialize(Ar& ar) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user