Merge pull request #1785 from xumengpanda/mengxu/server-team-remover-PR

Remove redundant server teams
This commit is contained in:
Evan Tschannen 2019-07-19 17:44:16 -07:00 committed by GitHub
commit c70e762f0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 364 additions and 207 deletions

View File

@ -477,6 +477,8 @@ public:
virtual Reference<StringToIntMap> const& getGroupKeyMap() const
{ return _localitygroup->getGroupKeyMap(); }
Reference<StringToIntMap> _keymap;
protected:
virtual Reference<StringToIntMap>& getGroupValueMap()
{ return _localitygroup->getGroupValueMap(); }
@ -491,7 +493,7 @@ protected:
std::vector<AttribKey> _keyIndexArray;
std::vector<LocalityCacheRecord> _cacheArray;
Reference<StringToIntMap> _keymap;
LocalitySet* _localitygroup;
long long unsigned int _cachehits;
long long unsigned int _cachemisses;

View File

@ -113,18 +113,14 @@ bool PolicyOne::selectReplicas(
int itemsUsed = 0;
if (alsoServers.size()) {
totalUsed ++;
}
else if (fromServers->size()) {
} else if (fromServers->size()) {
auto randomEntry = fromServers->random();
results.push_back(randomEntry);
itemsUsed ++;
totalUsed ++;
if (g_replicationdebug > 5) {
printf("One added:%4d %33s entry: %s\n", itemsUsed, "", fromServers->getEntryInfo(randomEntry).c_str());
}
}
if (g_replicationdebug > 2) {
printf("One used:%5d results:%3d from %3d servers\n", totalUsed, itemsUsed, fromServers->size());
if (g_replicationdebug > 0) {
printf("PolicyOne used:%5d results:%3d from %3d servers\n", totalUsed, itemsUsed, fromServers->size());
}
return (totalUsed > 0);
}
@ -263,6 +259,8 @@ bool PolicyAcross::validate(
// fromserverse are the servers that have already been chosen and
// that should be excluded from being selected as replicas.
// FIXME: Simplify this function, such as removing unnecessary printf
// fromServers are the servers that must have;
// alsoServers are the servers you can choose.
bool PolicyAcross::selectReplicas(
Reference<LocalitySet> & fromServers,
std::vector<LocalityEntry> const& alsoServers,
@ -279,7 +277,7 @@ bool PolicyAcross::selectReplicas(
_newResults.clear();
_addedResults.resize(_arena, 0);
if ((g_replicationdebug > 3) && (alsoServers.size())) {
if (g_replicationdebug > 0) {
printf("Across !also:%4lu key: %-7s policy: %-10s => %s\n", alsoServers.size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str());
}
for (auto& alsoServer : alsoServers) {
@ -289,14 +287,18 @@ bool PolicyAcross::selectReplicas(
if ((lowerBound == _usedValues.end()) || (*lowerBound != value.get())) {
//_selected is a set of processes that have the same indexKey and indexValue (value)
_selected = fromServers->restrict(indexKey, value.get());
if (_selected->size()) {
// Pass only the also array item which are valid for the value
if (g_replicationdebug > 5) {
if (g_replicationdebug > 0) {
if (_selected->size() > 0) {
// entry is the locality entry info (entryValue) from the to-be-selected team member alsoServer
printf("Across !select key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(),
value.get()._id, fromServers->valueText(value.get()).c_str(),
fromServers->getEntryInfo(alsoServer).c_str());
} else {
printf("Across !select empty\n");
}
}
if (_selected->size()) {
// Pass only the also array item which are valid for the value
resultsSize = _newResults.size();
if (_policy->selectReplicas(_selected, alsoServers, _newResults))
{
@ -307,30 +309,11 @@ bool PolicyAcross::selectReplicas(
else {
_addedResults.push_back(_arena, std::pair<int, int>(resultsAdded, resultsSize));
}
if (g_replicationdebug > 5) {
printf("Across !added:%3d key: %-7s count:%3d of%3d value: (%3d) %-10s entry: %s\n",
resultsAdded, _attribKey.c_str(), count, _count, value.get()._id,
fromServers->valueText(value.get()).c_str(),
fromServers->getEntryInfo(alsoServer).c_str());
}
if (count >= _count) break;
_usedValues.insert(lowerBound, value.get());
}
else if (g_replicationdebug > 5) {
printf("Across !no answer key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(alsoServer).c_str());
}
}
else if (g_replicationdebug > 5) {
printf("Across !empty set key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(alsoServer).c_str());
}
}
else if (g_replicationdebug > 5) {
printf("Across !duplicate key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(alsoServer).c_str());
}
}
else if (g_replicationdebug > 5) {
printf("Across !no value key: %-7s %21s entry: %s\n", _attribKey.c_str(), "", fromServers->getEntryInfo(alsoServer).c_str());
}
}
@ -339,11 +322,11 @@ bool PolicyAcross::selectReplicas(
// Sort the added results array
std::sort(_addedResults.begin(), _addedResults.end(), PolicyAcross::compareAddedResults);
if (g_replicationdebug > 2) {
if (g_replicationdebug > 0) {
printf("Across !add sets key: %-7s sets:%3d results:%3lu count:%3d of%3d\n", _attribKey.c_str(), _addedResults.size(), _newResults.size(), count, _count);
}
if (g_replicationdebug > 6) {
if (g_replicationdebug > 0) {
LocalitySet::staticDisplayEntries(fromServers, alsoServers, "also");
LocalitySet::staticDisplayEntries(fromServers, results, "results");
LocalitySet::staticDisplayEntries(fromServers, _newResults, "add items");
@ -351,14 +334,14 @@ bool PolicyAcross::selectReplicas(
for (auto& addedResult : _addedResults) {
count ++;
if (g_replicationdebug > 2) {
if (g_replicationdebug > 0) {
printf("Across !add set key: %-7s count:%3d of%3d results:%3d index:%3d\n", _attribKey.c_str(), count, _count, addedResult.first, addedResult.second);
}
results.reserve(results.size() + addedResult.first);
results.insert(results.end(), _newResults.begin()+addedResult.second, _newResults.begin()+addedResult.second+addedResult.first);
if (count >= _count) break;
}
if (g_replicationdebug > 7) {
if (g_replicationdebug > 0) {
LocalitySet::staticDisplayEntries(fromServers, results, "results");
}
}
@ -366,14 +349,14 @@ bool PolicyAcross::selectReplicas(
// Cannot find replica from the least used alsoServers, now try to find replicas from all servers
// Process the remaining values
if (count < _count) {
if (g_replicationdebug > 3) {
if (g_replicationdebug > 0) {
printf("Across items:%4d key: %-7s policy: %-10s => %s count:%3d of%3d\n", fromServers->size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str(), count, _count);
}
int recordIndex;
// Use mutable array so that swaps does not affect actual element array
auto& mutableArray = fromServers->getMutableEntries();
for (int checksLeft = fromServers->size(); checksLeft > 0; checksLeft --) {
if (g_replicationdebug > 6) {
if (g_replicationdebug > 0) {
LocalitySet::staticDisplayEntries(fromServers, mutableArray, "mutable");
}
recordIndex = deterministicRandom()->randomInt(0, checksLeft);
@ -402,36 +385,21 @@ bool PolicyAcross::selectReplicas(
if (count >= _count) break;
_usedValues.insert(lowerBound, value.get());
}
else if (g_replicationdebug > 5) {
printf("Across no answer key: %-7s value: (%3d) %-10s entry: %s\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(entry).c_str());
}
}
else if (g_replicationdebug > 5) {
printf("Across empty set:%3d key: %-7s value: (%3d) %-10s entry: %s index:%4d\n", fromServers->size()-checksLeft+1, _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(entry).c_str(), recordIndex);
}
}
else if (g_replicationdebug > 5) {
printf("Across duplicate key: %-7s value: (%3d) %-10s entry: %s attempt:%3d index:%4d\n", _attribKey.c_str(), value.get()._id, fromServers->valueText(value.get()).c_str(), fromServers->getEntryInfo(entry).c_str(), fromServers->size()-checksLeft+1, recordIndex);
}
}
else if (g_replicationdebug > 5) {
printf("Across no value key: %-7s %21s entry: %s attempt:%3d index:%4d\n", _attribKey.c_str(), "", fromServers->getEntryInfo(entry).c_str(), fromServers->size()-checksLeft+1, recordIndex);
}
if (recordIndex != checksLeft-1) {
if (g_replicationdebug > 5) {
printf("Across swap key: %-7s index:%4d last:%4d entry: %s\n", _attribKey.c_str(), recordIndex, checksLeft-1, fromServers->getEntryInfo(entry).c_str());
}
fromServers->swapMutableRecords(recordIndex, checksLeft-1);
}
}
}
// Clear the return array, if not satified
if (count < _count) {
if (g_replicationdebug > 4) printf("Across result count: %d < %d requested\n", count, _count);
if (g_replicationdebug > 0) printf("Across result count: %d < %d requested\n", count, _count);
results.resize(resultsInit);
count = 0;
}
if (g_replicationdebug > 2) {
if (g_replicationdebug > 0) {
printf("Across used:%5lu results:%3d from %3d items key: %-7s policy: %-10s => %s\n", results.size()-resultsInit, count, fromServers->size(), _attribKey.c_str(), _policy->name().c_str(), _policy->info().c_str());
}
return (count >= _count);
@ -465,14 +433,8 @@ bool PolicyAnd::selectReplicas(
}
for (auto& policy : _sortedPolicies) {
if (g_replicationdebug > 3) {
printf("And also:%5lu used: %4lu from %3d items policy: %-10s => %s\n", newResults.size(), newResults.size()-alsoServers.size(), fromServers->size(), policy->name().c_str(), policy->info().c_str());
}
if (!policy->selectReplicas(fromServers, newResults, newResults))
{
if (g_replicationdebug > 3) {
printf("And failed set:%4d policy: %-10s => %s\n", fromServers->size(), policy->name().c_str(), policy->info().c_str());
}
passed = false;
break;
}
@ -482,9 +444,6 @@ bool PolicyAnd::selectReplicas(
results.insert(results.end(), newResults.begin()+alsoServers.size(), newResults.end());
}
if (g_replicationdebug > 2) {
printf("And used:%5lu results:%3lu from %3d items\n", newResults.size()-alsoServers.size(), results.size(), fromServers->size());
}
return passed;
}

View File

@ -580,7 +580,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Future<Void> checkTeamDelay;
Promise<Void> addSubsetComplete;
Future<Void> badTeamRemover;
Future<Void> redundantTeamRemover;
Future<Void> redundantMachineTeamRemover;
Future<Void> redundantServerTeamRemover;
Reference<LocalitySet> storageServerSet;
std::vector<LocalityEntry> forcedEntries, resultEntries;
@ -617,13 +618,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
DatabaseConfiguration configuration, std::vector<Optional<Key>> includedDCs,
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
Optional<std::vector<Optional<Key>>> otherTrackedDCs, Future<Void> readyToStart,
Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
Reference<AsyncVar<bool>> processingUnhealthy)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), teamBuilder(Void()),
badTeamRemover(Void()), redundantTeamRemover(Void()), configuration(configuration),
readyToStart(readyToStart), clearHealthyZoneFuture(Void()),
badTeamRemover(Void()), redundantMachineTeamRemover(Void()), redundantServerTeamRemover(Void()),
configuration(configuration), readyToStart(readyToStart), clearHealthyZoneFuture(Void()),
checkTeamDelay(delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistribution)),
initialFailureReactionDelay(
delayed(readyToStart, SERVER_KNOBS->INITIAL_FAILURE_REACTION_DELAY, TaskPriority::DataDistribution)),
@ -1116,6 +1117,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
teamInfo->machineTeam = machineTeamInfo;
machineTeamInfo->serverTeams.push_back(teamInfo);
if (g_network->isSimulated()) {
// Update server team information for consistency check in simulation
traceTeamCollectionInfo();
}
}
void addTeam(std::set<UID> const& team, bool isInitialTeam) { addTeam(team.begin(), team.end(), isInitialTeam); }
@ -1234,6 +1239,17 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
// Locality string is hashed into integer, used as KeyIndex
// For better understand which KeyIndex is used for locality, we print this info in trace.
void traceLocalityArrayIndexName() {
TraceEvent("LocalityRecordKeyName").detail("Size", machineLocalityMap._keymap->_lookuparray.size());
for (int i = 0; i < machineLocalityMap._keymap->_lookuparray.size(); ++i) {
TraceEvent("LocalityRecordKeyIndexName")
.detail("KeyIndex", i)
.detail("KeyName", machineLocalityMap._keymap->_lookuparray[i]);
}
}
void traceMachineLocalityMap() {
int i = 0;
@ -1256,6 +1272,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// To enable verbose debug info, set shouldPrint to true
void traceAllInfo(bool shouldPrint = false) {
if (!shouldPrint) return;
TraceEvent("TraceAllInfo").detail("Primary", primary);
@ -1264,6 +1281,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
traceServerTeamInfo();
traceMachineInfo();
traceMachineTeamInfo();
traceLocalityArrayIndexName();
traceMachineLocalityMap();
}
@ -1298,25 +1316,20 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Five steps to create each machine team, which are document in the function
// Reuse ReplicationPolicy selectReplicas func to select machine team
// return number of added machine teams
int addBestMachineTeams(int targetMachineTeamsToBuild, int remainingMachineTeamBudget) {
int addBestMachineTeams(int machineTeamsToBuild) {
int addedMachineTeams = 0;
int machineTeamsToBuild = 0;
ASSERT(targetMachineTeamsToBuild >= 0);
// Not build any machine team if asked to build none
if (targetMachineTeamsToBuild == 0) return 0;
machineTeamsToBuild = targetMachineTeamsToBuild;
ASSERT(machineTeamsToBuild >= 0);
// The number of machines is always no smaller than the storageTeamSize in a correct configuration
ASSERT(machine_info.size() >= configuration.storageTeamSize);
// Future: Consider if we should overbuild more machine teams to
// allow machineTeamRemover() to get a more balanced machine teams per machine
// Step 1: Create machineLocalityMap which will be used in building machine team
rebuildMachineLocalityMap();
int loopCount = 0;
// Add a team in each iteration
while (addedMachineTeams < machineTeamsToBuild || addedMachineTeams < remainingMachineTeamBudget) {
while (addedMachineTeams < machineTeamsToBuild || notEnoughMachineTeamsForAMachine()) {
// Step 2: Get least used machines from which we choose machines as a machine team
std::vector<Reference<TCMachineInfo>> leastUsedMachines; // A less used machine has less number of teams
int minTeamCount = std::numeric_limits<int>::max();
@ -1343,33 +1356,39 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
std::vector<UID*> team;
std::vector<LocalityEntry> forcedAttributes;
// Step 3: Create a representative process for each machine.
// Construct forcedAttribute from leastUsedMachines.
// We will use forcedAttribute to call existing function to form a team
if (leastUsedMachines.size()) {
// Randomly choose 1 least used machine
Reference<TCMachineInfo> tcMachineInfo = deterministicRandom()->randomChoice(leastUsedMachines);
ASSERT(!tcMachineInfo->serversOnMachine.empty());
LocalityEntry process = tcMachineInfo->localityEntry;
forcedAttributes.push_back(process);
} else {
// when leastUsedMachine is empty, we will never find a team later, so we can simply return.
return addedMachineTeams;
}
// Step 4: Reuse Policy's selectReplicas() to create team for the representative process.
std::vector<UID*> bestTeam;
int bestScore = std::numeric_limits<int>::max();
int maxAttempts = SERVER_KNOBS->BEST_OF_AMT; // BEST_OF_AMT = 4
for (int i = 0; i < maxAttempts && i < 100; ++i) {
// Step 3: Create a representative process for each machine.
// Construct forcedAttribute from leastUsedMachines.
// We will use forcedAttribute to call existing function to form a team
if (leastUsedMachines.size()) {
forcedAttributes.clear();
// Randomly choose 1 least used machine
Reference<TCMachineInfo> tcMachineInfo = deterministicRandom()->randomChoice(leastUsedMachines);
ASSERT(!tcMachineInfo->serversOnMachine.empty());
LocalityEntry process = tcMachineInfo->localityEntry;
forcedAttributes.push_back(process);
TraceEvent("ChosenMachine")
.detail("MachineInfo", tcMachineInfo->machineID)
.detail("LeaseUsedMachinesSize", leastUsedMachines.size())
.detail("ForcedAttributesSize", forcedAttributes.size());
} else {
// when leastUsedMachine is empty, we will never find a team later, so we can simply return.
return addedMachineTeams;
}
// Choose a team that balances the # of teams per server among the teams
// that have the least-utilized server
team.clear();
ASSERT_WE_THINK(forcedAttributes.size() == 1);
auto success = machineLocalityMap.selectReplicas(configuration.storagePolicy, forcedAttributes, team);
// NOTE: selectReplicas() should always return success when storageTeamSize = 1
ASSERT_WE_THINK(configuration.storageTeamSize > 1 || (configuration.storageTeamSize == 1 && success));
if (!success) {
break;
continue; // Try up to maxAttempts, since next time we may choose a different forcedAttributes
}
ASSERT(forcedAttributes.size() > 0);
team.push_back((UID*)machineLocalityMap.getObject(forcedAttributes[0]));
@ -1421,19 +1440,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
addMachineTeam(machines);
addedMachineTeams++;
// Update the remaining machine team budget because the budget may decrease by
// any value between 1 and storageTeamSize
remainingMachineTeamBudget = getRemainingMachineTeamBudget();
} else {
traceAllInfo(true);
TraceEvent(SevWarn, "DataDistributionBuildTeams", distributorId)
.detail("Primary", primary)
.detail("Reason", "Unable to make desired machine Teams");
break;
}
if (++loopCount > 2 * machineTeamsToBuild * (configuration.storageTeamSize + 1)) {
break;
}
}
return addedMachineTeams;
@ -1637,6 +1650,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
if (EXPENSIVE_VALIDATION) {
ASSERT(isServerTeamCountCorrect(mt));
}
if (mt->serverTeams.size() < minNumProcessTeams) {
minNumProcessTeams = mt->serverTeams.size();
retMT = mt;
@ -1646,6 +1660,53 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return std::pair<Reference<TCMachineTeamInfo>, int>(retMT, minNumProcessTeams);
}
// Find the machine team whose members are on the most number of machine teams, same logic as serverTeamRemover
std::pair<Reference<TCMachineTeamInfo>, int> getMachineTeamWithMostMachineTeams() {
Reference<TCMachineTeamInfo> retMT;
int maxNumMachineTeams = 0;
int targetMachineTeamNumPerMachine =
(SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (configuration.storageTeamSize + 1)) / 2;
for (auto& mt : machineTeams) {
// The representative team number for the machine team mt is
// the minimum number of machine teams of a machine in the team mt
int representNumMachineTeams = std::numeric_limits<int>::max();
for (auto& m : mt->machines) {
representNumMachineTeams = std::min<int>(representNumMachineTeams, m->machineTeams.size());
}
if (representNumMachineTeams > targetMachineTeamNumPerMachine &&
representNumMachineTeams > maxNumMachineTeams) {
maxNumMachineTeams = representNumMachineTeams;
retMT = mt;
}
}
return std::pair<Reference<TCMachineTeamInfo>, int>(retMT, maxNumMachineTeams);
}
// Find the server team whose members are on the most number of server teams
std::pair<Reference<TCTeamInfo>, int> getServerTeamWithMostProcessTeams() {
Reference<TCTeamInfo> retST;
int maxNumProcessTeams = 0;
int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (configuration.storageTeamSize + 1)) / 2;
for (auto& t : teams) {
// The minimum number of teams of a server in a team is the representative team number for the team t
int representNumProcessTeams = std::numeric_limits<int>::max();
for (auto& server : t->getServers()) {
representNumProcessTeams = std::min<int>(representNumProcessTeams, server->teams.size());
}
// We only remove the team whose representNumProcessTeams is larger than the targetTeamNumPerServer number
// otherwise, teamBuilder will build the to-be-removed team again
if (representNumProcessTeams > targetTeamNumPerServer && representNumProcessTeams > maxNumProcessTeams) {
maxNumProcessTeams = representNumProcessTeams;
retST = t;
}
}
return std::pair<Reference<TCTeamInfo>, int>(retST, maxNumProcessTeams);
}
int getHealthyMachineTeamCount() {
int healthyTeamCount = 0;
for (auto mt = machineTeams.begin(); mt != machineTeams.end(); ++mt) {
@ -1659,53 +1720,66 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return healthyTeamCount;
}
// Each machine is expected to have SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER,
// remainingMachineTeamBudget is the number of machine teams needed to ensure every machine has
// SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER teams
int getRemainingMachineTeamBudget() {
int remainingMachineTeamBudget = 0;
// Each machine is expected to have targetMachineTeamNumPerMachine
// Return true if there exists a machine that does not have enough teams.
bool notEnoughMachineTeamsForAMachine() {
// If we want to remove the machine team with most machine teams, we use the same logic as
// notEnoughTeamsForAServer
int targetMachineTeamNumPerMachine =
SERVER_KNOBS->TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS
? (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (configuration.storageTeamSize + 1)) / 2
: SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER;
for (auto& m : machine_info) {
int machineTeamCount = m.second->machineTeams.size();
remainingMachineTeamBudget += std::max(0, (int)(SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER - machineTeamCount));
// If SERVER_KNOBS->TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS is false,
// The desired machine team number is not the same with the desired server team number
// in notEnoughTeamsForAServer() below, because the machineTeamRemover() does not
// remove a machine team with the most number of machine teams.
if (m.second->machineTeams.size() < targetMachineTeamNumPerMachine) {
return true;
}
}
// We over-provision the remainingMachineTeamBudget because we do not know, when a new machine team is built,
// how many times it can be counted into the budget. For example, when a new machine is added,
// a new machine team only consume 1 such budget
return remainingMachineTeamBudget;
return false;
}
// Each server is expected to have SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER,
int getRemainingServerTeamBudget() {
// remainingTeamBudget is the number of teams needed to ensure every server has
// SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER teams
int remainingTeamBudget = 0;
// Each server is expected to have targetTeamNumPerServer teams.
// Return true if there exists a server that does not have enough teams.
bool notEnoughTeamsForAServer() {
// We build more teams than we finally want so that we can use serverTeamRemover() actor to remove the teams
// whose member belong to too many teams. This allows us to get a more balanced number of teams per server.
// We want to ensure every server has targetTeamNumPerServer teams.
// The numTeamsPerServerFactor is calculated as
// (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER + ideal_num_of_teams_per_server) / 2
// ideal_num_of_teams_per_server is (#teams * storageTeamSize) / #servers, which is
// (#servers * DESIRED_TEAMS_PER_SERVER * storageTeamSize) / #servers.
int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (configuration.storageTeamSize + 1)) / 2;
ASSERT(targetTeamNumPerServer > 0);
for (auto& s : server_info) {
int numValidTeams = s.second->teams.size();
remainingTeamBudget += std::max(0, (int)(SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER - numValidTeams));
if (s.second->teams.size() < targetTeamNumPerServer) {
return true;
}
}
return remainingTeamBudget;
return false;
}
// Create server teams based on machine teams
// Before the number of machine teams reaches the threshold, build a machine team for each server team
// When it reaches the threshold, first try to build a server team with existing machine teams; if failed,
// build an extra machine team and record the event in trace
int addTeamsBestOf(int teamsToBuild, int desiredTeams, int maxTeams, int remainingTeamBudget) {
int addTeamsBestOf(int teamsToBuild, int desiredTeams, int maxTeams) {
ASSERT(teamsToBuild >= 0);
ASSERT_WE_THINK(machine_info.size() > 0 || server_info.size() == 0);
ASSERT_WE_THINK(SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER >= 1 && configuration.storageTeamSize >= 1);
int addedMachineTeams = 0;
int addedTeams = 0;
int loopCount = 0;
// Exclude machine teams who have members in the wrong configuration.
// When we change configuration, we may have machine teams with storageTeamSize in the old configuration.
int healthyMachineTeamCount = getHealthyMachineTeamCount();
int totalMachineTeamCount = machineTeams.size();
int totalHealthyMachineCount = calculateHealthyMachineCount();
int remainingMachineTeamBudget = getRemainingMachineTeamBudget();
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * totalHealthyMachineCount;
int maxMachineTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * totalHealthyMachineCount;
@ -1718,14 +1792,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("HealthyMachineTeamCount", healthyMachineTeamCount)
.detail("DesiredMachineTeams", desiredMachineTeams)
.detail("MaxMachineTeams", maxMachineTeams)
.detail("MachineTeamsToBuild", machineTeamsToBuild)
.detail("RemainingMachineTeamBudget", remainingMachineTeamBudget);
.detail("MachineTeamsToBuild", machineTeamsToBuild);
// Pre-build all machine teams until we have the desired number of machine teams
if (machineTeamsToBuild > 0 || remainingMachineTeamBudget > 0) {
addedMachineTeams = addBestMachineTeams(machineTeamsToBuild, remainingMachineTeamBudget);
if (machineTeamsToBuild > 0 || notEnoughMachineTeamsForAMachine()) {
addedMachineTeams = addBestMachineTeams(machineTeamsToBuild);
}
while (addedTeams < teamsToBuild || addedTeams < remainingTeamBudget) {
while (addedTeams < teamsToBuild || notEnoughTeamsForAServer()) {
// Step 1: Create 1 best machine team
std::vector<UID> bestServerTeam;
int bestScore = std::numeric_limits<int>::max();
@ -1778,6 +1851,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
// Pick the server team with smallest score in all attempts
// If we use different metric here, DD may oscillate infinitely in creating and removing teams.
// SOMEDAY: Improve the code efficiency by using reservoir algorithm
int score = 0;
for (auto& server : serverTeam) {
@ -1802,11 +1876,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Step 4: Add the server team
addTeam(bestServerTeam.begin(), bestServerTeam.end(), false);
addedTeams++;
remainingTeamBudget = getRemainingServerTeamBudget();
if (++loopCount > 2 * teamsToBuild * (configuration.storageTeamSize + 1)) {
break;
}
}
healthyMachineTeamCount = getHealthyMachineTeamCount();
@ -1818,7 +1887,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("Primary", primary)
.detail("AddedTeams", addedTeams)
.detail("TeamsToBuild", teamsToBuild)
.detail("RemainingTeamBudget", remainingTeamBudget)
.detail("CurrentTeams", teams.size())
.detail("DesiredTeams", desiredTeams)
.detail("MaxTeams", maxTeams)
@ -1856,7 +1924,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("Primary", primary)
.detail("AddedTeams", 0)
.detail("TeamsToBuild", 0)
.detail("RemainingTeamBudget", 0)
.detail("CurrentTeams", teams.size())
.detail("DesiredTeams", desiredServerTeams)
.detail("MaxTeams", maxServerTeams)
@ -1914,8 +1981,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// If there are too few machines to even build teams or there are too few represented datacenters, build no new teams
if( uniqueMachines >= self->configuration.storageTeamSize ) {
desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER*serverCount;
int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER*serverCount;
desiredTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * serverCount;
int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * serverCount;
// Exclude teams who have members in the wrong configuration, since we don't want these teams
int teamCount = 0;
@ -1928,10 +1995,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
totalTeamCount++;
}
}
// Each server is expected to have SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER,
// remainingTeamBudget is the number of teams needed to ensure every server has
// SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER teams
int remainingTeamBudget = self->getRemainingServerTeamBudget();
// teamsToBuild is calculated such that we will not build too many teams in the situation
// when all (or most of) teams become unhealthy temporarily and then healthy again
@ -1952,13 +2015,13 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("MachineCount", self->machine_info.size())
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER);
if (teamsToBuild > 0 || remainingTeamBudget > 0) {
if (teamsToBuild > 0 || self->notEnoughTeamsForAServer()) {
state vector<std::vector<UID>> builtTeams;
// addTeamsBestOf() will not add more teams than needed.
// If the team number is more than the desired, the extra teams are added in the code path when
// a team is added as an initial team
int addedTeams = self->addTeamsBestOf(teamsToBuild, desiredTeams, maxTeams, remainingTeamBudget);
int addedTeams = self->addTeamsBestOf(teamsToBuild, desiredTeams, maxTeams);
if (addedTeams <= 0 && self->teams.size() == 0) {
TraceEvent(SevWarn, "NoTeamAfterBuildTeam")
@ -1981,7 +2044,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("Primary", self->primary)
.detail("AddedTeams", 0)
.detail("TeamsToBuild", teamsToBuild)
.detail("RemainingTeamBudget", remainingTeamBudget)
.detail("CurrentTeams", self->teams.size())
.detail("DesiredTeams", desiredTeams)
.detail("MaxTeams", maxTeams)
@ -2086,6 +2148,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
ASSERT_WE_THINK(foundInMachineTeam);
team->tracker.cancel();
if (g_network->isSimulated()) {
// Update server team information for consistency check in simulation
traceTeamCollectionInfo();
}
return found;
}
@ -2097,8 +2163,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Standalone<StringRef> machine_id = locality.zoneId().get(); // locality to machine_id with std::string type
Reference<TCMachineInfo> machineInfo;
if (machine_info.find(machine_id) ==
machine_info.end()) { // uid is the first storage server process on the machine
if (machine_info.find(machine_id) == machine_info.end()) {
// uid is the first storage server process on the machine
TEST(true);
// For each machine, store the first server's localityEntry into machineInfo for later use.
LocalityEntry localityEntry = machineLocalityMap.add(locality, &server->id);
@ -2311,15 +2377,24 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
};
ACTOR Future<Void> waitUntilHealthy(DDTeamCollection* self) {
ACTOR Future<Void> waitUntilHealthy(DDTeamCollection* self, double extraDelay = 0) {
state int waitCount = 0;
loop {
while(self->zeroHealthyTeams->get() || self->processingUnhealthy->get()) {
// processingUnhealthy: true when there exists data movement
TraceEvent("WaitUntilHealthyStalled", self->distributorId).detail("Primary", self->primary).detail("ZeroHealthy", self->zeroHealthyTeams->get()).detail("ProcessingUnhealthy", self->processingUnhealthy->get());
wait(self->zeroHealthyTeams->onChange() || self->processingUnhealthy->onChange());
waitCount = 0;
}
wait(delay(SERVER_KNOBS->DD_STALL_CHECK_DELAY, TaskPriority::Low)); //After the team trackers wait on the initial failure reaction delay, they yield. We want to make sure every tracker has had the opportunity to send their relocations to the queue.
if(!self->zeroHealthyTeams->get() && !self->processingUnhealthy->get()) {
return Void();
if (extraDelay <= 0.01 || waitCount >= 1) {
// Return healthy if we do not need extraDelay or when DD are healthy in at least two consecutive check
return Void();
} else {
wait(delay(extraDelay, TaskPriority::Low));
waitCount++;
}
}
}
}
@ -2336,19 +2411,18 @@ ACTOR Future<Void> removeBadTeams(DDTeamCollection* self) {
return Void();
}
ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
ACTOR Future<Void> machineTeamRemover(DDTeamCollection* self) {
state int numMachineTeamRemoved = 0;
loop {
// In case the teamRemover cause problems in production, we can disable it
if (SERVER_KNOBS->TR_FLAG_DISABLE_TEAM_REMOVER) {
// In case the machineTeamRemover cause problems in production, we can disable it
if (SERVER_KNOBS->TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER) {
return Void(); // Directly return Void()
}
wait(waitUntilHealthy(self));
// To avoid removing machine teams too fast, which is unlikely happen though
wait( delay(SERVER_KNOBS->TR_REMOVE_MACHINE_TEAM_DELAY) );
wait(waitUntilHealthy(self));
// Wait for the badTeamRemover() to avoid the potential race between adding the bad team (add the team tracker)
// and remove bad team (cancel the team tracker).
wait(self->badTeamRemover);
@ -2377,18 +2451,21 @@ ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
// In most cases, all machine teams should be healthy teams at this point.
int desiredMachineTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * healthyMachineCount;
int totalMTCount = self->machineTeams.size();
// Pick the machine team to remove. After release-6.2 version,
// we remove the machine team with most machine teams, the same logic as serverTeamRemover
std::pair<Reference<TCMachineTeamInfo>, int> foundMTInfo = SERVER_KNOBS->TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS
? self->getMachineTeamWithMostMachineTeams()
: self->getMachineTeamWithLeastProcessTeams();
if (totalMTCount > desiredMachineTeams) {
// Pick the machine team with the least number of server teams and mark it undesired
state std::pair<Reference<TCMachineTeamInfo>, int> foundMTInfo = self->getMachineTeamWithLeastProcessTeams();
state Reference<TCMachineTeamInfo> mt = foundMTInfo.first;
state int minNumProcessTeams = foundMTInfo.second;
if (totalMTCount > desiredMachineTeams && foundMTInfo.first.isValid()) {
Reference<TCMachineTeamInfo> mt = foundMTInfo.first;
int minNumProcessTeams = foundMTInfo.second;
ASSERT(mt.isValid());
// Pick one process team, and mark it as a bad team
// Remove the machine by removing its process team one by one
state Reference<TCTeamInfo> team;
state int teamIndex = 0;
Reference<TCTeamInfo> team;
int teamIndex = 0;
for (teamIndex = 0; teamIndex < mt->serverTeams.size(); ++teamIndex) {
team = mt->serverTeams[teamIndex];
ASSERT(team->machineTeam->machineIDs == mt->machineIDs); // Sanity check
@ -2420,7 +2497,7 @@ ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
self->addActor.send(self->badTeamRemover);
}
TraceEvent("TeamRemover")
TraceEvent("MachineTeamRemover", self->distributorId)
.detail("MachineTeamToRemove", mt->getMachineIDsStr())
.detail("NumProcessTeamsOnTheMachineTeam", minNumProcessTeams)
.detail("CurrentMachineTeams", self->machineTeams.size())
@ -2443,13 +2520,84 @@ ACTOR Future<Void> teamRemover(DDTeamCollection* self) {
.detail("DesiredMachineTeams", desiredMachineTeams)
.detail("NumMachineTeamsRemoved", numMachineTeamRemoved);
self->traceTeamCollectionInfo();
numMachineTeamRemoved = 0; //Reset the counter to avoid keep printing the message
}
}
}
}
// Remove the server team whose members have the most number of process teams
// until the total number of server teams is no larger than the desired number
ACTOR Future<Void> serverTeamRemover(DDTeamCollection* self) {
state int numServerTeamRemoved = 0;
loop {
// In case the serverTeamRemover cause problems in production, we can disable it
if (SERVER_KNOBS->TR_FLAG_DISABLE_SERVER_TEAM_REMOVER) {
return Void(); // Directly return Void()
}
double removeServerTeamDelay = SERVER_KNOBS->TR_REMOVE_SERVER_TEAM_DELAY;
if (g_network->isSimulated()) {
// Speed up the team remover in simulation; otherwise,
// it may time out because we need to remove hundreds of teams
removeServerTeamDelay = removeServerTeamDelay / 100;
}
// To avoid removing server teams too fast, which is unlikely happen though
wait(delay(removeServerTeamDelay));
wait(waitUntilHealthy(self, SERVER_KNOBS->TR_REMOVE_SERVER_TEAM_EXTRA_DELAY));
// Wait for the badTeamRemover() to avoid the potential race between
// adding the bad team (add the team tracker) and remove bad team (cancel the team tracker).
wait(self->badTeamRemover);
// From this point, all server teams should be healthy, because we wait above
// until processingUnhealthy is done, and all machines are healthy
int desiredServerTeams = SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * self->server_info.size();
int totalSTCount = self->teams.size();
// Pick the server team whose members are on the most number of server teams, and mark it undesired
std::pair<Reference<TCTeamInfo>, int> foundSTInfo = self->getServerTeamWithMostProcessTeams();
if (totalSTCount > desiredServerTeams && foundSTInfo.first.isValid()) {
ASSERT(foundSTInfo.first.isValid());
Reference<TCTeamInfo> st = foundSTInfo.first;
int maxNumProcessTeams = foundSTInfo.second;
ASSERT(st.isValid());
// The team will be marked as a bad team
bool foundTeam = self->removeTeam(st);
ASSERT(foundTeam == true);
self->addTeam(st->getServers(), true, true);
TEST(true);
self->doBuildTeams = true;
if (self->badTeamRemover.isReady()) {
self->badTeamRemover = removeBadTeams(self);
self->addActor.send(self->badTeamRemover);
}
TraceEvent("ServerTeamRemover", self->distributorId)
.detail("ServerTeamToRemove", st->getServerIDsStr())
.detail("NumProcessTeamsOnTheServerTeam", maxNumProcessTeams)
.detail("CurrentServerTeamNumber", self->teams.size())
.detail("DesiredTeam", desiredServerTeams);
numServerTeamRemoved++;
} else {
if (numServerTeamRemoved > 0) {
// Only trace the information when we remove a machine team
TraceEvent("ServerTeamRemoverDone", self->distributorId)
.detail("CurrentServerTeamNumber", self->teams.size())
.detail("DesiredServerTeam", desiredServerTeams)
.detail("NumServerTeamRemoved", numServerTeamRemoved);
self->traceTeamCollectionInfo();
numServerTeamRemoved = 0; //Reset the counter to avoid keep printing the message
}
}
}
}
// Track a team and issue RelocateShards when the level of degradation changes
// A badTeam can be unhealthy or just a redundantTeam removed by teamRemover()
// A badTeam can be unhealthy or just a redundantTeam removed by machineTeamRemover() or serverTeamRemover()
ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> team, bool badTeam, bool redundantTeam) {
state int lastServersLeft = team->size();
state bool lastAnyUndesired = false;
@ -2474,6 +2622,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
.detail("Team", team->getDesc())
.detail("Primary", self->primary)
.detail("IsReady", self->initialFailureReactionDelay.isReady());
self->traceTeamCollectionInfo();
}
// Check if the number of degraded machines has changed
state vector<Future<Void>> change;
@ -2679,7 +2828,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
TraceEvent("TeamTrackerStopping", self->distributorId).detail("Team", team->getDesc());
}
self->priority_teams[team->getPriority()]--;
if( team->isHealthy() ) {
if (team->isHealthy()) {
self->healthyTeamCount--;
ASSERT( self->healthyTeamCount >= 0 );
@ -2688,6 +2837,11 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
self->zeroHealthyTeams->set(true);
}
}
if (lastOptimal) {
self->optimalTeamCount--;
ASSERT( self->optimalTeamCount >= 0 );
self->zeroOptimalTeams.set(self->optimalTeamCount == 0);
}
throw;
}
}
@ -3024,7 +3178,9 @@ ACTOR Future<Void> storageServerTracker(
}
if( server->lastKnownClass.machineClassFitness( ProcessClass::Storage ) > ProcessClass::UnsetFit ) {
if( self->optimalTeamCount > 0 ) {
// We saw a corner case in in 3 data_hall configuration
// when optimalTeamCount = 1, healthyTeamCount = 0.
if (self->optimalTeamCount > 0 && self->healthyTeamCount > 0) {
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
.detail("Server", server->id)
.detail("OptimalTeamCount", self->optimalTeamCount)
@ -3091,10 +3247,12 @@ ACTOR Future<Void> storageServerTracker(
bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality;
bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() !=
newInterface.first.locality.zoneId().get();
TraceEvent("StorageServerInterfaceChanged", self->distributorId).detail("ServerID", server->id)
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged);
TraceEvent("StorageServerInterfaceChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged)
.detail("MachineLocalityChanged", machineLocalityChanged);
server->lastKnownInterface = newInterface.first;
server->lastKnownClass = newInterface.second;
@ -3118,6 +3276,7 @@ ACTOR Future<Void> storageServerTracker(
int serverIndex = -1;
for (int i = 0; i < machine->serversOnMachine.size(); ++i) {
if (machine->serversOnMachine[i].getPtr() == server) {
// NOTE: now the machine's locality is wrong. Need update it whenever uses it.
serverIndex = i;
machine->serversOnMachine[i] = machine->serversOnMachine.back();
machine->serversOnMachine.pop_back();
@ -3174,6 +3333,9 @@ ACTOR Future<Void> storageServerTracker(
// self->traceTeamCollectionInfo();
recordTeamCollectionInfo = true;
}
// The locality change of the server will invalid the server's old teams,
// so we need to rebuild teams for the server
self->doBuildTeams = true;
}
interfaceChanged = server->onInterfaceChanged;
@ -3441,9 +3603,13 @@ ACTOR Future<Void> dataDistributionTeamCollection(
self->addActor.send(self->badTeamRemover);
}
if (self->redundantTeamRemover.isReady()) {
self->redundantTeamRemover = teamRemover(self);
self->addActor.send(self->redundantTeamRemover);
if (self->redundantMachineTeamRemover.isReady()) {
self->redundantMachineTeamRemover = machineTeamRemover(self);
self->addActor.send(self->redundantMachineTeamRemover);
}
if (self->redundantServerTeamRemover.isReady()) {
self->redundantServerTeamRemover = serverTeamRemover(self);
self->addActor.send(self->redundantServerTeamRemover);
}
self->traceTeamCollectionInfo();
@ -3960,7 +4126,7 @@ TEST_CASE("DataDistribution/AddTeamsBestOf/UseMachineID") {
Reference<IReplicationPolicy> policy = Reference<IReplicationPolicy>(new PolicyAcross(teamSize, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
state DDTeamCollection* collection = testMachineTeamCollection(teamSize, policy, processSize);
collection->addTeamsBestOf(30, desiredTeams, maxTeams, 30);
collection->addTeamsBestOf(30, desiredTeams, maxTeams);
ASSERT(collection->sanityCheckTeams() == true);
@ -3985,8 +4151,8 @@ TEST_CASE("DataDistribution/AddTeamsBestOf/NotUseMachineID") {
return Void();
}
collection->addBestMachineTeams(30, 30); // Create machine teams to help debug
collection->addTeamsBestOf(30, desiredTeams, maxTeams, 30);
collection->addBestMachineTeams(30); // Create machine teams to help debug
collection->addTeamsBestOf(30, desiredTeams, maxTeams);
collection->sanityCheckTeams(); // Server team may happen to be on the same machine team, although unlikely
if (collection) delete (collection);
@ -4001,7 +4167,7 @@ TEST_CASE("DataDistribution/AddAllTeams/isExhaustive") {
state int maxTeams = SERVER_KNOBS->MAX_TEAMS_PER_SERVER * processSize;
state DDTeamCollection* collection = testTeamCollection(3, policy, processSize);
int result = collection->addTeamsBestOf(200, desiredTeams, maxTeams, 200);
int result = collection->addTeamsBestOf(200, desiredTeams, maxTeams);
delete(collection);
@ -4021,7 +4187,7 @@ TEST_CASE("/DataDistribution/AddAllTeams/withLimit") {
state DDTeamCollection* collection = testTeamCollection(3, policy, processSize);
int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams, 10);
int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams);
delete(collection);
@ -4041,14 +4207,14 @@ TEST_CASE("/DataDistribution/AddTeamsBestOf/SkippingBusyServers") {
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
collection->addTeam(std::set<UID>({ UID(1, 0), UID(3, 0), UID(4, 0) }), true);
int result = collection->addTeamsBestOf(8, desiredTeams, maxTeams, 8);
state int result = collection->addTeamsBestOf(8, desiredTeams, maxTeams);
ASSERT(result >= 8);
for(auto process = collection->server_info.begin(); process != collection->server_info.end(); process++) {
auto teamCount = process->second->teams.size();
ASSERT(teamCount >= 1);
ASSERT(teamCount <= 5);
// ASSERT(teamCount <= 5);
}
delete(collection);
@ -4071,8 +4237,8 @@ TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
collection->addTeam(std::set<UID>({ UID(1, 0), UID(2, 0), UID(3, 0) }), true);
collection->addTeam(std::set<UID>({ UID(1, 0), UID(3, 0), UID(4, 0) }), true);
collection->addBestMachineTeams(10, 10);
int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams, 10);
collection->addBestMachineTeams(10);
int result = collection->addTeamsBestOf(10, desiredTeams, maxTeams);
if (collection->machineTeams.size() != 10 || result != 8) {
collection->traceAllInfo(true); // Debug message

View File

@ -251,8 +251,6 @@ ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);
int64_t getMaxShardSize( double dbSizeEstimate );
struct DDTeamCollection;
ACTOR Future<Void> teamRemover(DDTeamCollection* self);
ACTOR Future<Void> teamRemoverPeriodic(DDTeamCollection* self);
ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(Transaction* tr);
#include "flow/unactorcompiler.h"

View File

@ -180,8 +180,12 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( DD_ZERO_HEALTHY_TEAM_DELAY, 1.0 );
// TeamRemover
TR_FLAG_DISABLE_TEAM_REMOVER = false; if( randomize && BUGGIFY ) TR_FLAG_DISABLE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = false; if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
init( TR_REMOVE_MACHINE_TEAM_DELAY, 60.0 ); if( randomize && BUGGIFY ) TR_REMOVE_MACHINE_TEAM_DELAY = deterministicRandom()->random01() * 60.0;
TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS = true; if( randomize && BUGGIFY ) TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS = deterministicRandom()->random01() < 0.1 ? true : false;
TR_FLAG_DISABLE_SERVER_TEAM_REMOVER = false; if( randomize && BUGGIFY ) TR_FLAG_DISABLE_SERVER_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true
init( TR_REMOVE_SERVER_TEAM_DELAY, 60.0 ); if( randomize && BUGGIFY ) TR_REMOVE_SERVER_TEAM_DELAY = deterministicRandom()->random01() * 60.0;
init( TR_REMOVE_SERVER_TEAM_EXTRA_DELAY, 5.0 ); if( randomize && BUGGIFY ) TR_REMOVE_SERVER_TEAM_EXTRA_DELAY = deterministicRandom()->random01() * 10.0;
// Redwood Storage Engine
init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT, 30 );

View File

@ -142,8 +142,13 @@ public:
double DEBOUNCE_RECRUITING_DELAY;
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_TEAM_REMOVER; // disable the teamRemover actor
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor
double TR_REMOVE_MACHINE_TEAM_DELAY; // wait for the specified time before try to remove next machine team
bool TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS; // guard to select which machineTeamRemover logic to use
bool TR_FLAG_DISABLE_SERVER_TEAM_REMOVER; // disable the serverTeamRemover actor
double TR_REMOVE_SERVER_TEAM_DELAY; // wait for the specified time before try to remove next server team
double TR_REMOVE_SERVER_TEAM_EXTRA_DELAY; // serverTeamRemover waits for the delay and check DD healthyness again to ensure it runs after machineTeamRemover
double DD_FAILURE_TIME;
double DD_ZERO_HEALTHY_TEAM_DELAY;

View File

@ -274,8 +274,15 @@ ACTOR Future<int64_t> getDataDistributionQueueSize( Database cx, Reference<Async
// Gets if the number of process and machine teams does not exceed the maximum allowed number of teams
ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistributorWorker) {
state int attempts = 0;
state bool ret = false;
loop {
try {
if (g_simulator.storagePolicy.isValid() &&
g_simulator.storagePolicy->info().find("data_hall") != std::string::npos) {
// Do not test DD team number for data_hall modes
return true;
}
TraceEvent("GetTeamCollectionValid").detail("Stage", "ContactingMaster");
TraceEventFields teamCollectionInfoMessage = wait(timeoutError(
@ -283,36 +290,51 @@ ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistr
TraceEvent("GetTeamCollectionValid").detail("Stage", "GotString");
int64_t currentTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("CurrentTeams"));
int64_t desiredTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("DesiredTeams"));
int64_t maxTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("MaxTeams"));
int64_t currentMachineTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("CurrentMachineTeams"));
int64_t healthyMachineTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("CurrentHealthyMachineTeams"));
int64_t desiredMachineTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("DesiredMachineTeams"));
int64_t maxMachineTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("MaxMachineTeams"));
state int64_t currentTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("CurrentTeams"));
state int64_t desiredTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("DesiredTeams"));
state int64_t maxTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("MaxTeams"));
state int64_t currentMachineTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("CurrentMachineTeams"));
state int64_t healthyMachineTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("CurrentHealthyMachineTeams"));
state int64_t desiredMachineTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("DesiredMachineTeams"));
state int64_t maxMachineTeams = boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("MaxMachineTeams"));
int64_t minServerTeamsOnServer =
state int64_t minServerTeamsOnServer =
boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("MinTeamsOnServer"));
int64_t maxServerTeamsOnServer =
state int64_t maxServerTeamsOnServer =
boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("MaxTeamsOnServer"));
int64_t minMachineTeamsOnMachine =
state int64_t minMachineTeamsOnMachine =
boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("MinMachineTeamsOnMachine"));
int64_t maxMachineTeamsOnMachine =
state int64_t maxMachineTeamsOnMachine =
boost::lexical_cast<int64_t>(teamCollectionInfoMessage.getValue("MaxMachineTeamsOnMachine"));
// Team number is always valid when we disable teamRemover. This avoids false positive in simulation test
if (SERVER_KNOBS->TR_FLAG_DISABLE_TEAM_REMOVER) {
TraceEvent("GetTeamCollectionValid")
.detail("KnobsTeamRemoverDisabled", SERVER_KNOBS->TR_FLAG_DISABLE_TEAM_REMOVER);
return true;
}
// The if condition should be consistent with the condition in serverTeamRemover() and
// machineTeamRemover() that decides if redundant teams exist.
// Team number is always valid when we disable teamRemover, which avoids false positive in simulation test.
// The minimun team number per server (and per machine) should be no less than 0 so that newly added machine
// can host data on it.
//
// If the machineTeamRemover does not remove the machine team with the most machine teams,
// we may oscillate between building more server teams by teamBuilder() and removing those teams by
// teamRemover To avoid false positive in simulation, we skip the consistency check in this case.
// This is a corner case. This is a work-around if case the team number requirements cannot be satisfied.
//
// The checking for too many teams is disabled because teamRemover may not remove a team if it leads to 0 team on a server
//(!SERVER_KNOBS->TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER &&
// healthyMachineTeams > desiredMachineTeams) ||
// (!SERVER_KNOBS->TR_FLAG_DISABLE_SERVER_TEAM_REMOVER && currentTeams > desiredTeams) ||
if ((minMachineTeamsOnMachine <= 0 || minServerTeamsOnServer <= 0) &&
SERVER_KNOBS->TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS) {
ret = false;
// The if condition should be consistent with the condition in teamRemover() that decides
// if redundant teams exist.
if (healthyMachineTeams > desiredMachineTeams ||
(minMachineTeamsOnMachine <= 0 && SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER == 3)) {
// When DESIRED_TEAMS_PER_SERVER == 1, we see minMachineTeamsOnMachine can be 0 in one out of 30k test
if (attempts++ < 10) {
wait(delay(60));
continue; // We may not receive the most recent TeamCollectionInfo
}
// When DESIRED_TEAMS_PER_SERVER == 1, we see minMachineTeamOnMachine can be 0 in one out of 30k test
// cases. Only check DESIRED_TEAMS_PER_SERVER == 3 for now since it is mostly used configuration.
// TODO: Remove the constraint SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER == 3 to ensure that
// the minimun team number per server (and per machine) is always > 0 for any number of replicas
TraceEvent("GetTeamCollectionValid")
.detail("CurrentTeams", currentTeams)
.detail("DesiredTeams", desiredTeams)
@ -326,8 +348,9 @@ ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistr
.detail("MinMachineTeamsOnMachine", minMachineTeamsOnMachine)
.detail("MaxMachineTeamsOnMachine", maxMachineTeamsOnMachine)
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER)
.detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER);
return false;
.detail("MaxTeamsPerServer", SERVER_KNOBS->MAX_TEAMS_PER_SERVER)
.detail("RemoveMTWithMostTeams", SERVER_KNOBS->TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS);
return ret;
} else {
return true;
}