mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 01:42:37 +08:00
Refactor resolution balancing into separate files
This commit is contained in:
parent
437e7d27c6
commit
0c88be0393
@ -96,6 +96,8 @@ set(FDBSERVER_SRCS
|
||||
Ratekeeper.h
|
||||
RatekeeperInterface.h
|
||||
RecoveryState.h
|
||||
ResolutionBalancer.actor.cpp
|
||||
ResolutionBalancer.actor.h
|
||||
Resolver.actor.cpp
|
||||
ResolverInterface.h
|
||||
RestoreApplier.actor.cpp
|
||||
|
187
fdbserver/ResolutionBalancer.actor.cpp
Normal file
187
fdbserver/ResolutionBalancer.actor.cpp
Normal file
@ -0,0 +1,187 @@
|
||||
/*
|
||||
* ResolutionBalancer.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/ResolutionBalancer.actor.h"
|
||||
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbserver/MasterInterface.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
void ResolutionBalancer::setResolvers(const std::vector<ResolverInterface>& v) {
|
||||
resolvers = v;
|
||||
if (resolvers.size() > 1)
|
||||
triggerResolution.trigger();
|
||||
}
|
||||
|
||||
void ResolutionBalancer::setChangesInReply(UID requestingProxy, GetCommitVersionReply& rep) {
|
||||
if (resolverNeedingChanges.count(requestingProxy)) {
|
||||
rep.resolverChanges = resolverChanges.get();
|
||||
rep.resolverChangesVersion = resolverChangesVersion;
|
||||
resolverNeedingChanges.erase(requestingProxy);
|
||||
|
||||
TEST(!rep.resolverChanges.empty()); // resolution balancing moves keyranges
|
||||
if (resolverNeedingChanges.empty())
|
||||
resolverChanges.set(Standalone<VectorRef<ResolverMoveRef>>());
|
||||
}
|
||||
}
|
||||
|
||||
static std::pair<KeyRangeRef, bool> findRange(CoalescedKeyRangeMap<int>& key_resolver,
|
||||
Standalone<VectorRef<ResolverMoveRef>>& movedRanges,
|
||||
int src,
|
||||
int dest) {
|
||||
auto ranges = key_resolver.ranges();
|
||||
auto prev = ranges.begin();
|
||||
auto it = ranges.begin();
|
||||
++it;
|
||||
if (it == ranges.end()) {
|
||||
if (ranges.begin().value() != src ||
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(ranges.begin()->range(), dest)) !=
|
||||
movedRanges.end())
|
||||
throw operation_failed();
|
||||
return std::make_pair(ranges.begin().range(), true);
|
||||
}
|
||||
|
||||
std::set<int> borders;
|
||||
// If possible expand an existing boundary between the two resolvers
|
||||
for (; it != ranges.end(); ++it) {
|
||||
if (it->value() == src && prev->value() == dest &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(it->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(it->range(), true);
|
||||
}
|
||||
if (it->value() == dest && prev->value() == src &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(prev->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(prev->range(), false);
|
||||
}
|
||||
if (it->value() == dest)
|
||||
borders.insert(prev->value());
|
||||
if (prev->value() == dest)
|
||||
borders.insert(it->value());
|
||||
++prev;
|
||||
}
|
||||
|
||||
prev = ranges.begin();
|
||||
it = ranges.begin();
|
||||
++it;
|
||||
// If possible create a new boundry which doesn't exist yet
|
||||
for (; it != ranges.end(); ++it) {
|
||||
if (it->value() == src && !borders.count(prev->value()) &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(it->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(it->range(), true);
|
||||
}
|
||||
if (prev->value() == src && !borders.count(it->value()) &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(prev->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(prev->range(), false);
|
||||
}
|
||||
++prev;
|
||||
}
|
||||
|
||||
it = ranges.begin();
|
||||
for (; it != ranges.end(); ++it) {
|
||||
if (it->value() == src &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(it->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(it->range(), true);
|
||||
}
|
||||
}
|
||||
throw operation_failed(); // we are already attempting to move all of the data one resolver is assigned, so do not
|
||||
// move anything
|
||||
}
|
||||
|
||||
// Balance key ranges among resolvers so that their load are evenly distributed.
|
||||
ACTOR Future<Void> ResolutionBalancer::resolutionBalancing_impl(ResolutionBalancer* self) {
|
||||
wait(self->triggerResolution.onTrigger());
|
||||
|
||||
state CoalescedKeyRangeMap<int> key_resolver;
|
||||
key_resolver.insert(allKeys, 0);
|
||||
loop {
|
||||
wait(delay(SERVER_KNOBS->MIN_BALANCE_TIME, TaskPriority::ResolutionMetrics));
|
||||
while (self->resolverChanges.get().size())
|
||||
wait(self->resolverChanges.onChange());
|
||||
state std::vector<Future<ResolutionMetricsReply>> futures;
|
||||
for (auto& p : self->resolvers)
|
||||
futures.push_back(
|
||||
brokenPromiseToNever(p.metrics.getReply(ResolutionMetricsRequest(), TaskPriority::ResolutionMetrics)));
|
||||
wait(waitForAll(futures));
|
||||
state IndexedSet<std::pair<int64_t, int>, NoMetric> metrics;
|
||||
|
||||
int64_t total = 0;
|
||||
for (int i = 0; i < futures.size(); i++) {
|
||||
total += futures[i].get().value;
|
||||
metrics.insert(std::make_pair(futures[i].get().value, i), NoMetric());
|
||||
//TraceEvent("ResolverMetric").detail("I", i).detail("Metric", futures[i].get());
|
||||
}
|
||||
if (metrics.lastItem()->first - metrics.begin()->first > SERVER_KNOBS->MIN_BALANCE_DIFFERENCE) {
|
||||
try {
|
||||
state int src = metrics.lastItem()->second;
|
||||
state int dest = metrics.begin()->second;
|
||||
state int64_t amount = std::min(metrics.lastItem()->first - total / self->resolvers.size(),
|
||||
total / self->resolvers.size() - metrics.begin()->first) /
|
||||
2;
|
||||
state Standalone<VectorRef<ResolverMoveRef>> movedRanges;
|
||||
|
||||
loop {
|
||||
state std::pair<KeyRangeRef, bool> range = findRange(key_resolver, movedRanges, src, dest);
|
||||
|
||||
ResolutionSplitRequest req;
|
||||
req.front = range.second;
|
||||
req.offset = amount;
|
||||
req.range = range.first;
|
||||
|
||||
ResolutionSplitReply split =
|
||||
wait(brokenPromiseToNever(self->resolvers[metrics.lastItem()->second].split.getReply(
|
||||
req, TaskPriority::ResolutionMetrics)));
|
||||
KeyRangeRef moveRange = range.second ? KeyRangeRef(range.first.begin, split.key)
|
||||
: KeyRangeRef(split.key, range.first.end);
|
||||
movedRanges.push_back_deep(movedRanges.arena(), ResolverMoveRef(moveRange, dest));
|
||||
TraceEvent("MovingResolutionRange")
|
||||
.detail("Src", src)
|
||||
.detail("Dest", dest)
|
||||
.detail("Amount", amount)
|
||||
.detail("StartRange", range.first)
|
||||
.detail("MoveRange", moveRange)
|
||||
.detail("Used", split.used)
|
||||
.detail("KeyResolverRanges", key_resolver.size());
|
||||
amount -= split.used;
|
||||
if (moveRange != range.first || amount <= 0)
|
||||
break;
|
||||
}
|
||||
for (auto& it : movedRanges)
|
||||
key_resolver.insert(it.range, it.dest);
|
||||
// for(auto& it : key_resolver.ranges())
|
||||
// TraceEvent("KeyResolver").detail("Range", it.range()).detail("Value", it.value());
|
||||
|
||||
self->resolverChangesVersion = *self->pVersion + 1;
|
||||
for (auto& p : self->commitProxies)
|
||||
self->resolverNeedingChanges.insert(p.id());
|
||||
self->resolverChanges.set(movedRanges);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_operation_failed)
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
64
fdbserver/ResolutionBalancer.actor.h
Normal file
64
fdbserver/ResolutionBalancer.actor.h
Normal file
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* ResolutionBalancer.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
#include "fdbserver/ResolverInterface.h"
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_RESOLUTION_BALANCER_G_H)
|
||||
#define FDBSERVER_RESOLUTION_BALANCER_G_H
|
||||
#include "fdbserver/ResolutionBalancer.actor.g.h"
|
||||
#elif !defined(FDBSERVER_RESOLUTION_BALANCER_H)
|
||||
#define FDBSERVER_RESOLUTION_BALANCER_H
|
||||
|
||||
#include <set>
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbserver/MasterInterface.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
|
||||
struct ResolutionBalancer {
|
||||
AsyncVar<Standalone<VectorRef<ResolverMoveRef>>> resolverChanges;
|
||||
Version resolverChangesVersion = invalidVersion;
|
||||
std::set<UID> resolverNeedingChanges;
|
||||
|
||||
Version* pVersion; // points to MasterData::version
|
||||
|
||||
std::vector<CommitProxyInterface> commitProxies;
|
||||
std::vector<ResolverInterface> resolvers;
|
||||
AsyncTrigger triggerResolution;
|
||||
|
||||
ResolutionBalancer(Version* version) : pVersion(version) {}
|
||||
|
||||
Future<Void> resolutionBalancing() { return resolutionBalancing_impl(this); }
|
||||
|
||||
ACTOR static Future<Void> resolutionBalancing_impl(ResolutionBalancer* self);
|
||||
|
||||
// Sets resolver interfaces. Trigger resolutionBalancing() actor if more
|
||||
// than one resolvers are present.
|
||||
void setResolvers(const std::vector<ResolverInterface>& resolvers);
|
||||
|
||||
void setCommitProxies(const std::vector<CommitProxyInterface>& proxies) { commitProxies = proxies; }
|
||||
|
||||
void setChangesInReply(UID requestingProxy, GetCommitVersionReply& rep);
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
@ -25,10 +25,10 @@
|
||||
#include "fdbserver/CoordinationInterface.h" // copy constructors for ServerCoordinators class
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/MasterInterface.h"
|
||||
#include "fdbserver/ResolutionBalancer.actor.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
@ -48,16 +48,11 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
||||
Version version; // The last version assigned to a proxy by getVersion()
|
||||
double lastVersionTime;
|
||||
|
||||
std::vector<CommitProxyInterface> commitProxies;
|
||||
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
|
||||
std::vector<ResolverInterface> resolvers;
|
||||
|
||||
MasterInterface myInterface;
|
||||
|
||||
AsyncVar<Standalone<VectorRef<ResolverMoveRef>>> resolverChanges;
|
||||
Version resolverChangesVersion;
|
||||
std::set<UID> resolverNeedingChanges;
|
||||
AsyncTrigger triggerResolution;
|
||||
ResolutionBalancer resolutionBalancer;
|
||||
|
||||
bool forceRecovery;
|
||||
|
||||
@ -67,6 +62,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
||||
Counter reportLiveCommittedVersionRequests;
|
||||
|
||||
Future<Void> logger;
|
||||
Future<Void> balancer;
|
||||
|
||||
MasterData(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
|
||||
MasterInterface const& myInterface,
|
||||
@ -78,7 +74,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
||||
: dbgid(myInterface.id()), lastEpochEnd(invalidVersion), recoveryTransactionVersion(invalidVersion),
|
||||
liveCommittedVersion(invalidVersion), databaseLocked(false), minKnownCommittedVersion(invalidVersion),
|
||||
coordinators(coordinators), version(invalidVersion), lastVersionTime(0), myInterface(myInterface),
|
||||
forceRecovery(forceRecovery), cc("Master", dbgid.toString()),
|
||||
resolutionBalancer(&version), forceRecovery(forceRecovery), cc("Master", dbgid.toString()),
|
||||
getCommitVersionRequests("GetCommitVersionRequests", cc),
|
||||
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
|
||||
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc) {
|
||||
@ -87,151 +83,11 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
||||
TraceEvent(SevError, "ForcedRecoveryRequiresDcID").log();
|
||||
forceRecovery = false;
|
||||
}
|
||||
balancer = resolutionBalancer.resolutionBalancing();
|
||||
}
|
||||
~MasterData() = default;
|
||||
};
|
||||
|
||||
static std::pair<KeyRangeRef, bool> findRange(CoalescedKeyRangeMap<int>& key_resolver,
|
||||
Standalone<VectorRef<ResolverMoveRef>>& movedRanges,
|
||||
int src,
|
||||
int dest) {
|
||||
auto ranges = key_resolver.ranges();
|
||||
auto prev = ranges.begin();
|
||||
auto it = ranges.begin();
|
||||
++it;
|
||||
if (it == ranges.end()) {
|
||||
if (ranges.begin().value() != src ||
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(ranges.begin()->range(), dest)) !=
|
||||
movedRanges.end())
|
||||
throw operation_failed();
|
||||
return std::make_pair(ranges.begin().range(), true);
|
||||
}
|
||||
|
||||
std::set<int> borders;
|
||||
// If possible expand an existing boundary between the two resolvers
|
||||
for (; it != ranges.end(); ++it) {
|
||||
if (it->value() == src && prev->value() == dest &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(it->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(it->range(), true);
|
||||
}
|
||||
if (it->value() == dest && prev->value() == src &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(prev->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(prev->range(), false);
|
||||
}
|
||||
if (it->value() == dest)
|
||||
borders.insert(prev->value());
|
||||
if (prev->value() == dest)
|
||||
borders.insert(it->value());
|
||||
++prev;
|
||||
}
|
||||
|
||||
prev = ranges.begin();
|
||||
it = ranges.begin();
|
||||
++it;
|
||||
// If possible create a new boundry which doesn't exist yet
|
||||
for (; it != ranges.end(); ++it) {
|
||||
if (it->value() == src && !borders.count(prev->value()) &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(it->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(it->range(), true);
|
||||
}
|
||||
if (prev->value() == src && !borders.count(it->value()) &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(prev->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(prev->range(), false);
|
||||
}
|
||||
++prev;
|
||||
}
|
||||
|
||||
it = ranges.begin();
|
||||
for (; it != ranges.end(); ++it) {
|
||||
if (it->value() == src &&
|
||||
std::find(movedRanges.begin(), movedRanges.end(), ResolverMoveRef(it->range(), dest)) ==
|
||||
movedRanges.end()) {
|
||||
return std::make_pair(it->range(), true);
|
||||
}
|
||||
}
|
||||
throw operation_failed(); // we are already attempting to move all of the data one resolver is assigned, so do not
|
||||
// move anything
|
||||
}
|
||||
|
||||
// Balance key ranges among resolvers so that their load are evenly distributed.
|
||||
ACTOR Future<Void> resolutionBalancing(Reference<MasterData> self) {
|
||||
wait(self->triggerResolution.onTrigger());
|
||||
|
||||
state CoalescedKeyRangeMap<int> key_resolver;
|
||||
key_resolver.insert(allKeys, 0);
|
||||
loop {
|
||||
wait(delay(SERVER_KNOBS->MIN_BALANCE_TIME, TaskPriority::ResolutionMetrics));
|
||||
while (self->resolverChanges.get().size())
|
||||
wait(self->resolverChanges.onChange());
|
||||
state std::vector<Future<ResolutionMetricsReply>> futures;
|
||||
for (auto& p : self->resolvers)
|
||||
futures.push_back(
|
||||
brokenPromiseToNever(p.metrics.getReply(ResolutionMetricsRequest(), TaskPriority::ResolutionMetrics)));
|
||||
wait(waitForAll(futures));
|
||||
state IndexedSet<std::pair<int64_t, int>, NoMetric> metrics;
|
||||
|
||||
int64_t total = 0;
|
||||
for (int i = 0; i < futures.size(); i++) {
|
||||
total += futures[i].get().value;
|
||||
metrics.insert(std::make_pair(futures[i].get().value, i), NoMetric());
|
||||
//TraceEvent("ResolverMetric").detail("I", i).detail("Metric", futures[i].get());
|
||||
}
|
||||
if (metrics.lastItem()->first - metrics.begin()->first > SERVER_KNOBS->MIN_BALANCE_DIFFERENCE) {
|
||||
try {
|
||||
state int src = metrics.lastItem()->second;
|
||||
state int dest = metrics.begin()->second;
|
||||
state int64_t amount = std::min(metrics.lastItem()->first - total / self->resolvers.size(),
|
||||
total / self->resolvers.size() - metrics.begin()->first) /
|
||||
2;
|
||||
state Standalone<VectorRef<ResolverMoveRef>> movedRanges;
|
||||
|
||||
loop {
|
||||
state std::pair<KeyRangeRef, bool> range = findRange(key_resolver, movedRanges, src, dest);
|
||||
|
||||
ResolutionSplitRequest req;
|
||||
req.front = range.second;
|
||||
req.offset = amount;
|
||||
req.range = range.first;
|
||||
|
||||
ResolutionSplitReply split =
|
||||
wait(brokenPromiseToNever(self->resolvers[metrics.lastItem()->second].split.getReply(
|
||||
req, TaskPriority::ResolutionMetrics)));
|
||||
KeyRangeRef moveRange = range.second ? KeyRangeRef(range.first.begin, split.key)
|
||||
: KeyRangeRef(split.key, range.first.end);
|
||||
movedRanges.push_back_deep(movedRanges.arena(), ResolverMoveRef(moveRange, dest));
|
||||
TraceEvent("MovingResolutionRange")
|
||||
.detail("Src", src)
|
||||
.detail("Dest", dest)
|
||||
.detail("Amount", amount)
|
||||
.detail("StartRange", range.first)
|
||||
.detail("MoveRange", moveRange)
|
||||
.detail("Used", split.used)
|
||||
.detail("KeyResolverRanges", key_resolver.size());
|
||||
amount -= split.used;
|
||||
if (moveRange != range.first || amount <= 0)
|
||||
break;
|
||||
}
|
||||
for (auto& it : movedRanges)
|
||||
key_resolver.insert(it.range, it.dest);
|
||||
// for(auto& it : key_resolver.ranges())
|
||||
// TraceEvent("KeyResolver").detail("Range", it.range()).detail("Value", it.value());
|
||||
|
||||
self->resolverChangesVersion = self->version + 1;
|
||||
for (auto& p : self->commitProxies)
|
||||
self->resolverNeedingChanges.insert(p.id());
|
||||
self->resolverChanges.set(movedRanges);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_operation_failed)
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionRequest req) {
|
||||
state Span span("M:getVersion"_loc, { req.spanContext });
|
||||
state std::map<UID, CommitProxyVersionReplies>::iterator proxyItr =
|
||||
@ -281,15 +137,7 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
|
||||
TEST(maxVersionGap); // Maximum possible version gap
|
||||
self->lastVersionTime = t1;
|
||||
|
||||
if (self->resolverNeedingChanges.count(req.requestingProxy)) {
|
||||
rep.resolverChanges = self->resolverChanges.get();
|
||||
rep.resolverChangesVersion = self->resolverChangesVersion;
|
||||
self->resolverNeedingChanges.erase(req.requestingProxy);
|
||||
|
||||
TEST(!rep.resolverChanges.empty()); // resolution balancing moves keyranges
|
||||
if (self->resolverNeedingChanges.empty())
|
||||
self->resolverChanges.set(Standalone<VectorRef<ResolverMoveRef>>());
|
||||
}
|
||||
self->resolutionBalancer.setChangesInReply(req.requestingProxy, rep);
|
||||
}
|
||||
|
||||
rep.version = self->version;
|
||||
@ -311,16 +159,11 @@ ACTOR Future<Void> getVersion(Reference<MasterData> self, GetCommitVersionReques
|
||||
ACTOR Future<Void> provideVersions(Reference<MasterData> self) {
|
||||
state ActorCollection versionActors(false);
|
||||
|
||||
for (auto& p : self->commitProxies)
|
||||
self->lastCommitProxyVersionReplies[p.id()] = CommitProxyVersionReplies();
|
||||
|
||||
loop {
|
||||
choose {
|
||||
when(GetCommitVersionRequest req = waitNext(self->myInterface.getCommitVersion.getFuture())) {
|
||||
versionActors.add(getVersion(self, req));
|
||||
}
|
||||
when(wait(versionActors.getResult())) {}
|
||||
loop choose {
|
||||
when(GetCommitVersionRequest req = waitNext(self->myInterface.getCommitVersion.getFuture())) {
|
||||
versionActors.add(getVersion(self, req));
|
||||
}
|
||||
when(wait(versionActors.getResult())) {}
|
||||
}
|
||||
}
|
||||
|
||||
@ -381,17 +224,15 @@ ACTOR Future<Void> updateRecoveryData(Reference<MasterData> self) {
|
||||
self->lastEpochEnd = req.lastEpochEnd;
|
||||
}
|
||||
if (req.commitProxies.size() > 0) {
|
||||
self->commitProxies = req.commitProxies;
|
||||
self->lastCommitProxyVersionReplies.clear();
|
||||
|
||||
for (auto& p : self->commitProxies) {
|
||||
for (auto& p : req.commitProxies) {
|
||||
self->lastCommitProxyVersionReplies[p.id()] = CommitProxyVersionReplies();
|
||||
}
|
||||
}
|
||||
|
||||
self->resolvers = req.resolvers;
|
||||
if (req.resolvers.size() > 1)
|
||||
self->triggerResolution.trigger();
|
||||
self->resolutionBalancer.setCommitProxies(req.commitProxies);
|
||||
self->resolutionBalancer.setResolvers(req.resolvers);
|
||||
|
||||
req.reply.send(Void());
|
||||
}
|
||||
@ -446,7 +287,6 @@ ACTOR Future<Void> masterServer(MasterInterface mi,
|
||||
addActor.send(provideVersions(self));
|
||||
addActor.send(serveLiveCommittedVersion(self));
|
||||
addActor.send(updateRecoveryData(self));
|
||||
addActor.send(resolutionBalancing(self));
|
||||
|
||||
TEST(!lifetime.isStillValid(db->get().masterLifetime, mi.id() == db->get().master.id())); // Master born doomed
|
||||
TraceEvent("MasterLifetime", self->dbgid).detail("LifetimeToken", lifetime.toString());
|
||||
|
Loading…
x
Reference in New Issue
Block a user