mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-02 03:12:12 +08:00
add exclusion tracker utility and use it in DD (#9669)
This commit is contained in:
parent
0edf34b3b3
commit
03818e94f3
@ -19,6 +19,7 @@
|
||||
*/
|
||||
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "fdbserver/ExclusionTracker.actor.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
#include <climits>
|
||||
@ -1829,99 +1830,38 @@ public:
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> trackExcludedServers(DDTeamCollection* self) {
|
||||
// Fetch the list of excluded servers
|
||||
state ReadYourWritesTransaction tr(self->dbContext());
|
||||
state ExclusionTracker exclusionTracker(self->dbContext());
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
state Future<RangeResult> fresultsExclude = tr.getRange(excludedServersKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
state Future<RangeResult> fresultsFailed = tr.getRange(failedServersKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
state Future<RangeResult> flocalitiesExclude =
|
||||
tr.getRange(excludedLocalityKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
state Future<RangeResult> flocalitiesFailed = tr.getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
state Future<std::vector<ProcessData>> fworkers = self->db->getWorkers();
|
||||
wait(success(fresultsExclude) && success(fresultsFailed) && success(flocalitiesExclude) &&
|
||||
success(flocalitiesFailed));
|
||||
// wait for new set of excluded servers
|
||||
wait(exclusionTracker.changed.onTrigger());
|
||||
|
||||
state RangeResult excludedResults = fresultsExclude.get();
|
||||
ASSERT(!excludedResults.more && excludedResults.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state RangeResult failedResults = fresultsFailed.get();
|
||||
ASSERT(!failedResults.more && failedResults.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state RangeResult excludedLocalityResults = flocalitiesExclude.get();
|
||||
ASSERT(!excludedLocalityResults.more && excludedLocalityResults.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state RangeResult failedLocalityResults = flocalitiesFailed.get();
|
||||
ASSERT(!failedLocalityResults.more && failedLocalityResults.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state std::set<AddressExclusion> excluded;
|
||||
state std::set<AddressExclusion> failed;
|
||||
for (const auto& r : excludedResults) {
|
||||
AddressExclusion addr = decodeExcludedServersKey(r.key);
|
||||
if (addr.isValid()) {
|
||||
excluded.insert(addr);
|
||||
}
|
||||
// Reset and reassign self->excludedServers based on excluded, but we only
|
||||
// want to trigger entries that are different
|
||||
// Do not retrigger and double-overwrite failed or wiggling servers
|
||||
auto old = self->excludedServers.getKeys();
|
||||
for (const auto& o : old) {
|
||||
if (!exclusionTracker.excluded.count(o) && !exclusionTracker.failed.count(o) &&
|
||||
!(self->excludedServers.count(o) &&
|
||||
self->excludedServers.get(o) == DDTeamCollection::Status::WIGGLING)) {
|
||||
self->excludedServers.set(o, DDTeamCollection::Status::NONE);
|
||||
}
|
||||
for (const auto& r : failedResults) {
|
||||
AddressExclusion addr = decodeFailedServersKey(r.key);
|
||||
if (addr.isValid()) {
|
||||
failed.insert(addr);
|
||||
}
|
||||
}
|
||||
|
||||
wait(success(fworkers));
|
||||
std::vector<ProcessData> workers = fworkers.get();
|
||||
for (const auto& r : excludedLocalityResults) {
|
||||
std::string locality = decodeExcludedLocalityKey(r.key);
|
||||
std::set<AddressExclusion> localityExcludedAddresses = getAddressesByLocality(workers, locality);
|
||||
excluded.insert(localityExcludedAddresses.begin(), localityExcludedAddresses.end());
|
||||
}
|
||||
for (const auto& r : failedLocalityResults) {
|
||||
std::string locality = decodeFailedLocalityKey(r.key);
|
||||
std::set<AddressExclusion> localityFailedAddresses = getAddressesByLocality(workers, locality);
|
||||
failed.insert(localityFailedAddresses.begin(), localityFailedAddresses.end());
|
||||
}
|
||||
|
||||
// Reset and reassign self->excludedServers based on excluded, but we only
|
||||
// want to trigger entries that are different
|
||||
// Do not retrigger and double-overwrite failed or wiggling servers
|
||||
auto old = self->excludedServers.getKeys();
|
||||
for (const auto& o : old) {
|
||||
if (!excluded.count(o) && !failed.count(o) &&
|
||||
!(self->excludedServers.count(o) &&
|
||||
self->excludedServers.get(o) == DDTeamCollection::Status::WIGGLING)) {
|
||||
self->excludedServers.set(o, DDTeamCollection::Status::NONE);
|
||||
}
|
||||
}
|
||||
for (const auto& n : excluded) {
|
||||
if (!failed.count(n)) {
|
||||
self->excludedServers.set(n, DDTeamCollection::Status::EXCLUDED);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& f : failed) {
|
||||
self->excludedServers.set(f, DDTeamCollection::Status::FAILED);
|
||||
}
|
||||
|
||||
TraceEvent("DDExcludedServersChanged", self->distributorId)
|
||||
.detail("AddressesExcluded", excludedResults.size())
|
||||
.detail("AddressesFailed", failedResults.size())
|
||||
.detail("LocalitiesExcluded", excludedLocalityResults.size())
|
||||
.detail("Primary", self->isPrimary())
|
||||
.detail("LocalitiesFailed", failedLocalityResults.size());
|
||||
|
||||
self->restartRecruiting.trigger();
|
||||
state Future<Void> watchFuture =
|
||||
tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey) ||
|
||||
tr.watch(excludedLocalityVersionKey) || tr.watch(failedLocalityVersionKey);
|
||||
wait(tr.commit());
|
||||
wait(watchFuture);
|
||||
tr.reset();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
for (const auto& n : exclusionTracker.excluded) {
|
||||
if (!exclusionTracker.failed.count(n)) {
|
||||
self->excludedServers.set(n, DDTeamCollection::Status::EXCLUDED);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& f : exclusionTracker.failed) {
|
||||
self->excludedServers.set(f, DDTeamCollection::Status::FAILED);
|
||||
}
|
||||
|
||||
TraceEvent("DDExcludedServersChanged", self->distributorId)
|
||||
.detail("AddressesExcluded", exclusionTracker.excluded.size())
|
||||
.detail("AddressesFailed", exclusionTracker.failed.size())
|
||||
.detail("Primary", self->isPrimary());
|
||||
|
||||
self->restartRecruiting.trigger();
|
||||
}
|
||||
}
|
||||
|
||||
@ -3251,7 +3191,6 @@ public:
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
}; // class DDTeamCollectionImpl
|
||||
|
||||
int32_t DDTeamCollection::getTargetTSSInDC() const {
|
||||
|
124
fdbserver/include/fdbserver/ExclusionTracker.actor.h
Normal file
124
fdbserver/include/fdbserver/ExclusionTracker.actor.h
Normal file
@ -0,0 +1,124 @@
|
||||
/*
|
||||
* ExclusionTracker.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
|
||||
// version.
|
||||
#if defined(NO_INTELLISENSE) && !defined(EXCLUSION_TRACKER_ACTOR_G_H)
|
||||
#define EXCLUSION_TRACKER_ACTOR_G_H
|
||||
#include "fdbserver/ExclusionTracker.actor.g.h"
|
||||
#elif !defined(EXCLUSION_TRACKER_ACTOR_H)
|
||||
#define EXCLUSION_TRACKER_ACTOR_H
|
||||
|
||||
#include <set>
|
||||
#include "flow/flow.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct ExclusionTracker {
|
||||
std::set<AddressExclusion> excluded;
|
||||
std::set<AddressExclusion> failed;
|
||||
|
||||
AsyncTrigger changed;
|
||||
|
||||
Database db;
|
||||
Future<Void> trackerFuture;
|
||||
|
||||
ExclusionTracker() {}
|
||||
ExclusionTracker(Database db) : db(db) { trackerFuture = tracker(this); }
|
||||
|
||||
ACTOR static Future<Void> tracker(ExclusionTracker* self) {
|
||||
// Fetch the list of excluded servers
|
||||
state ReadYourWritesTransaction tr(self->db);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state Future<RangeResult> fresultsExclude = tr.getRange(excludedServersKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
state Future<RangeResult> fresultsFailed = tr.getRange(failedServersKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
state Future<RangeResult> flocalitiesExclude =
|
||||
tr.getRange(excludedLocalityKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
state Future<RangeResult> flocalitiesFailed = tr.getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY);
|
||||
state Future<std::vector<ProcessData>> fworkers = getWorkers(&tr.getTransaction());
|
||||
wait(success(fresultsExclude) && success(fresultsFailed) && success(flocalitiesExclude) &&
|
||||
success(flocalitiesFailed));
|
||||
|
||||
state RangeResult excludedResults = fresultsExclude.get();
|
||||
ASSERT(!excludedResults.more && excludedResults.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state RangeResult failedResults = fresultsFailed.get();
|
||||
ASSERT(!failedResults.more && failedResults.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state RangeResult excludedLocalityResults = flocalitiesExclude.get();
|
||||
ASSERT(!excludedLocalityResults.more && excludedLocalityResults.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state RangeResult failedLocalityResults = flocalitiesFailed.get();
|
||||
ASSERT(!failedLocalityResults.more && failedLocalityResults.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state std::set<AddressExclusion> newExcluded;
|
||||
state std::set<AddressExclusion> newFailed;
|
||||
for (const auto& r : excludedResults) {
|
||||
AddressExclusion addr = decodeExcludedServersKey(r.key);
|
||||
if (addr.isValid()) {
|
||||
newExcluded.insert(addr);
|
||||
}
|
||||
}
|
||||
for (const auto& r : failedResults) {
|
||||
AddressExclusion addr = decodeFailedServersKey(r.key);
|
||||
if (addr.isValid()) {
|
||||
newFailed.insert(addr);
|
||||
}
|
||||
}
|
||||
|
||||
wait(success(fworkers));
|
||||
std::vector<ProcessData> workers = fworkers.get();
|
||||
for (const auto& r : excludedLocalityResults) {
|
||||
std::string locality = decodeExcludedLocalityKey(r.key);
|
||||
std::set<AddressExclusion> localityExcludedAddresses = getAddressesByLocality(workers, locality);
|
||||
newExcluded.insert(localityExcludedAddresses.begin(), localityExcludedAddresses.end());
|
||||
}
|
||||
for (const auto& r : failedLocalityResults) {
|
||||
std::string locality = decodeFailedLocalityKey(r.key);
|
||||
std::set<AddressExclusion> localityFailedAddresses = getAddressesByLocality(workers, locality);
|
||||
newFailed.insert(localityFailedAddresses.begin(), localityFailedAddresses.end());
|
||||
}
|
||||
|
||||
self->excluded = newExcluded;
|
||||
self->failed = newFailed;
|
||||
self->changed.trigger();
|
||||
|
||||
state Future<Void> watchFuture =
|
||||
tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey) ||
|
||||
tr.watch(excludedLocalityVersionKey) || tr.watch(failedLocalityVersionKey);
|
||||
wait(tr.commit());
|
||||
wait(watchFuture);
|
||||
tr.reset();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
Loading…
x
Reference in New Issue
Block a user