remove header dependency; move getDatabaseConfiguration to txnProcessor

This commit is contained in:
Xiaoxi Wang 2022-07-08 16:01:23 -07:00
parent 9cead35911
commit b48b1e93f9
3 changed files with 53 additions and 39 deletions

View File

@ -20,6 +20,7 @@
#include "fdbserver/DDTxnProcessor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class DDTxnProcessorImpl {
@ -101,4 +102,8 @@ Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> DDTxnProces
Future<MoveKeysLock> DDTxnProcessor::takeMoveKeysLock(UID ddId) const {
return ::takeMoveKeysLock(cx, ddId);
}
Future<DatabaseConfiguration> DDTxnProcessor::getDatabaseConfiguration() const {
return ::getDatabaseConfiguration(cx);
}

View File

@ -565,6 +565,9 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
DDTeamCollection* teamCollection;
std::shared_ptr<IDDTxnProcessor> txnProcessor;
MoveKeysLock lock;
DatabaseConfiguration configuration;
std::vector<Optional<Key>> primaryDcId;
std::vector<Optional<Key>> remoteDcIds;
Reference<EventCacheHolder> initialDDEventHolder;
Reference<EventCacheHolder> movingDataEventHolder;
@ -578,7 +581,23 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
totalDataInFlightEventHolder(makeReference<EventCacheHolder>("TotalDataInFlight")),
totalDataInFlightRemoteEventHolder(makeReference<EventCacheHolder>("TotalDataInFlightRemote")) {}
// bootstrap steps
Future<Void> takeMoveKeysLock() { return store(lock, txnProcessor->takeMoveKeysLock(ddId)); }
Future<Void> loadDatabaseConfiguration() { return store(configuration, txnProcessor->getDatabaseConfiguration()); }
void initDcInfo() {
primaryDcId.clear();
remoteDcIds.clear();
const std::vector<RegionInfo>& regions = configuration.regions;
if (configuration.regions.size() > 0) {
primaryDcId.push_back(regions[0].dcId);
}
if (configuration.regions.size() > 1) {
remoteDcIds.push_back(regions[1].dcId);
}
}
};
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
@ -596,9 +615,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
// wait(debugCheckCoalescing(cx));
// FIXME: wrap the bootstrap process into class DataDistributor
state std::vector<Optional<Key>> primaryDcId;
state std::vector<Optional<Key>> remoteDcIds;
state DatabaseConfiguration configuration;
state Reference<InitialDataDistribution> initData;
state Reference<DDTeamCollection> primaryTeamCollection;
state Reference<DDTeamCollection> remoteTeamCollection;
@ -617,19 +633,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
wait(self->takeMoveKeysLock());
TraceEvent("DDInitTookMoveKeysLock", self->ddId).log();
DatabaseConfiguration configuration_ = wait(getDatabaseConfiguration(cx));
configuration = configuration_;
primaryDcId.clear();
remoteDcIds.clear();
const std::vector<RegionInfo>& regions = configuration.regions;
if (configuration.regions.size() > 0) {
primaryDcId.push_back(regions[0].dcId);
}
if (configuration.regions.size() > 1) {
remoteDcIds.push_back(regions[1].dcId);
}
TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", configuration.toString());
wait(self->loadDatabaseConfiguration());
self->initDcInfo();
TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", self->configuration.toString());
state Transaction tr(cx);
loop {
@ -642,10 +648,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
for (auto& kv : replicaKeys) {
auto dcId = decodeDatacenterReplicasKey(kv.key);
auto replicas = decodeDatacenterReplicasValue(kv.value);
if ((primaryDcId.size() && primaryDcId[0] == dcId) ||
(remoteDcIds.size() && remoteDcIds[0] == dcId && configuration.usableRegions > 1)) {
if (replicas > configuration.storageTeamSize) {
tr.set(kv.key, datacenterReplicasValue(configuration.storageTeamSize));
if ((self->primaryDcId.size() && self->primaryDcId[0] == dcId) ||
(self->remoteDcIds.size() && self->remoteDcIds[0] == dcId && self->configuration.usableRegions > 1)) {
if (replicas > self->configuration.storageTeamSize) {
tr.set(kv.key, datacenterReplicasValue(self->configuration.storageTeamSize));
}
} else {
tr.clear(kv.key);
@ -664,7 +670,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
cx,
self->ddId,
self->lock,
configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(),
self->configuration.usableRegions > 1 ? self->remoteDcIds : std::vector<Optional<Key>>(),
ddEnabledState));
initData = initData_;
if (initData->shards.size() > 1) {
@ -721,7 +727,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
.detail("Primary", false)
.detail("TotalBytes", 0)
.detail("UnhealthyServers", 0)
.detail("HighestPriority", configuration.usableRegions > 1 ? 0 : -1)
.detail("HighestPriority", self->configuration.usableRegions > 1 ? 0 : -1)
.trackLatest(self->totalDataInFlightRemoteEventHolder->trackingKey);
wait(waitForDataDistributionEnabled(cx, ddEnabledState));
@ -735,7 +741,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
}
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
ASSERT(configuration.storageTeamSize > 0);
ASSERT(self->configuration.storageTeamSize > 0);
state PromiseStream<RelocateShard> output;
state PromiseStream<RelocateShard> input;
@ -756,7 +762,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
shardsAffectedByTeamFailure->defineShard(keys);
std::vector<ShardsAffectedByTeamFailure::Team> teams;
teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.primarySrc, true));
if (configuration.usableRegions > 1) {
if (self->configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false));
}
if (g_network->isSimulated()) {
@ -775,9 +781,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
// This shard is already in flight. Ideally we should use dest in ShardsAffectedByTeamFailure and
// generate a dataDistributionRelocator directly in DataDistributionQueue to track it, but it's
// easier to just (with low priority) schedule it for movement.
bool unhealthy = iShard.primarySrc.size() != configuration.storageTeamSize;
if (!unhealthy && configuration.usableRegions > 1) {
unhealthy = iShard.remoteSrc.size() != configuration.storageTeamSize;
bool unhealthy = iShard.primarySrc.size() != self->configuration.storageTeamSize;
if (!unhealthy && self->configuration.usableRegions > 1) {
unhealthy = iShard.remoteSrc.size() != self->configuration.storageTeamSize;
}
output.send(RelocateShard(keys,
unhealthy ? SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY
@ -829,12 +835,12 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
std::vector<Reference<AsyncVar<bool>>> zeroHealthyTeams;
tcis.push_back(TeamCollectionInterface());
zeroHealthyTeams.push_back(makeReference<AsyncVar<bool>>(true));
int storageTeamSize = configuration.storageTeamSize;
int storageTeamSize = self->configuration.storageTeamSize;
std::vector<Future<Void>> actors;
if (configuration.usableRegions > 1) {
if (self->configuration.usableRegions > 1) {
tcis.push_back(TeamCollectionInterface());
storageTeamSize = 2 * configuration.storageTeamSize;
storageTeamSize = 2 * self->configuration.storageTeamSize;
zeroHealthyTeams.push_back(makeReference<AsyncVar<bool>>(true));
anyZeroHealthyTeams = makeReference<AsyncVar<bool>>(true);
@ -878,7 +884,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
getUnhealthyRelocationCount.getFuture(),
self->ddId,
storageTeamSize,
configuration.storageTeamSize,
self->configuration.storageTeamSize,
ddEnabledState),
"DDQueue",
self->ddId,
@ -891,9 +897,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
self->lock,
output,
shardsAffectedByTeamFailure,
configuration,
primaryDcId,
configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(),
self->configuration,
self->primaryDcId,
self->configuration.usableRegions > 1 ? self->remoteDcIds : std::vector<Optional<Key>>(),
readyToStart.getFuture(),
zeroHealthyTeams[0],
IsPrimary::True,
@ -905,15 +911,15 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
auto recruitStorage = IAsyncListener<RequestStream<RecruitStorageRequest>>::create(
self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; });
if (configuration.usableRegions > 1) {
if (self->configuration.usableRegions > 1) {
remoteTeamCollection =
makeReference<DDTeamCollection>(cx,
self->ddId,
self->lock,
output,
shardsAffectedByTeamFailure,
configuration,
remoteDcIds,
self->configuration,
self->remoteDcIds,
Optional<std::vector<Optional<Key>>>(),
readyToStart.getFuture() && remoteRecovered(self->dbInfo),
zeroHealthyTeams[1],
@ -955,7 +961,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
const UID serverID = removeFailedServer.getFuture().get();
std::vector<UID> pTeam = primaryTeamCollection->getRandomHealthyTeam(serverID);
teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end());
if (configuration.usableRegions > 1) {
if (self->configuration.usableRegions > 1) {
std::vector<UID> rTeam = remoteTeamCollection->getRandomHealthyTeam(serverID);
teamForDroppedRange.insert(teamForDroppedRange.end(), rTeam.begin(), rTeam.end());
}

View File

@ -22,7 +22,6 @@
#define FOUNDATIONDB_DDTXNPROCESSOR_H
#include "fdbserver/Knobs.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/MoveKeys.actor.h"
/* Testability Contract:
@ -44,6 +43,8 @@ public:
virtual ~IDDTxnProcessor() = default;
[[nodiscard]] virtual Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const { return MoveKeysLock(); }
virtual Future<DatabaseConfiguration> getDatabaseConfiguration() const { return DatabaseConfiguration(); }
};
class DDTxnProcessorImpl;
@ -63,6 +64,8 @@ public:
Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses() override;
Future<MoveKeysLock> takeMoveKeysLock(UID ddId) const override;
Future<DatabaseConfiguration> getDatabaseConfiguration() const override;
};
// run mock transaction