Revert "Properly set simulation test for perpetual storage wiggle and bug fixing"

This commit is contained in:
Xiaoxi Wang 2021-06-11 09:07:45 -07:00 committed by GitHub
parent 5faf082f83
commit ad576e8c20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 68 additions and 162 deletions

View File

@ -52,7 +52,6 @@ class TCMachineTeamInfo;
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self); ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self);
ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self); ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self);
ACTOR Future<Void> waitForAllDataRemoved(Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams); ACTOR Future<Void> waitForAllDataRemoved(Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams);
bool _exclusionSafetyCheck(vector<UID>& excludeServerIDs, DDTeamCollection* teamCollection);
struct TCServerInfo : public ReferenceCounted<TCServerInfo> { struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
UID id; UID id;
@ -376,16 +375,14 @@ struct ServerStatus {
LocalityData locality; LocalityData locality;
ServerStatus() ServerStatus()
: isWiggling(false), isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {} : isWiggling(false), isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {}
ServerStatus(bool isFailed, bool isUndesired, bool isWiggling, LocalityData const& locality) ServerStatus(bool isFailed, bool isUndesired, LocalityData const& locality)
: isFailed(isFailed), isUndesired(isUndesired), locality(locality), isWrongConfiguration(false), : isFailed(isFailed), isUndesired(isUndesired), locality(locality), isWrongConfiguration(false),
initialized(true), isWiggling(isWiggling) {} initialized(true), isWiggling(false) {}
bool isUnhealthy() const { return isFailed || isUndesired; } bool isUnhealthy() const { return isFailed || isUndesired; }
const char* toString() const { const char* toString() const { return isFailed ? "Failed" : isUndesired ? "Undesired" : "Healthy"; }
return isFailed ? "Failed" : isUndesired ? "Undesired" : isWiggling ? "Wiggling" : "Healthy";
}
bool operator==(ServerStatus const& r) const { bool operator==(ServerStatus const& r) const {
return isFailed == r.isFailed && isUndesired == r.isUndesired && isWiggling == r.isWiggling && return isFailed == r.isFailed && isUndesired == r.isUndesired &&
isWrongConfiguration == r.isWrongConfiguration && locality == r.locality && initialized == r.initialized; isWrongConfiguration == r.isWrongConfiguration && locality == r.locality && initialized == r.initialized;
} }
bool operator!=(ServerStatus const& r) const { return !(*this == r); } bool operator!=(ServerStatus const& r) const { return !(*this == r); }
@ -624,7 +621,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
std::map<int,int> priority_teams; std::map<int,int> priority_teams;
std::map<UID, Reference<TCServerInfo>> server_info; std::map<UID, Reference<TCServerInfo>> server_info;
std::map<Key, std::vector<Reference<TCServerInfo>>> pid2server_info; // some process may serve as multiple storage servers std::map<Key, std::vector<Reference<TCServerInfo>>> pid2server_info; // some process may serve as multiple storage servers
std::vector<AddressExclusion> wiggle_addresses; // collection of wiggling servers' address
std::map<UID, Reference<TCServerInfo>> tss_info_by_pair; std::map<UID, Reference<TCServerInfo>> tss_info_by_pair;
std::map<UID, Reference<TCServerInfo>> server_and_tss_info; // TODO could replace this with an efficient way to do a read-only concatenation of 2 data structures? std::map<UID, Reference<TCServerInfo>> server_and_tss_info; // TODO could replace this with an efficient way to do a read-only concatenation of 2 data structures?
std::map<Key, int> lagging_zones; // zone to number of storage servers lagging std::map<Key, int> lagging_zones; // zone to number of storage servers lagging
@ -2830,7 +2826,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
this->excludedServers.get(addr) != DDTeamCollection::Status::NONE) { this->excludedServers.get(addr) != DDTeamCollection::Status::NONE) {
continue; // don't overwrite the value set by actor trackExcludedServer continue; // don't overwrite the value set by actor trackExcludedServer
} }
this->wiggle_addresses.push_back(addr);
this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING); this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING);
moveFutures.push_back( moveFutures.push_back(
waitForAllDataRemoved(this->cx, info->lastKnownInterface.id(), info->addedVersion, this)); waitForAllDataRemoved(this->cx, info->lastKnownInterface.id(), info->addedVersion, this));
@ -2842,19 +2837,19 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
return moveFutures; return moveFutures;
} }
// Include wiggled storage servers by setting their status from `WIGGLING` // Include storage servers held on process of which the Process Id is “pid” by setting their status from `WIGGLING`
// to `NONE`. The storage recruiter will recruit them as new storage servers // to `NONE`. The storage recruiter will recruit them as new storage servers
void includeStorageServersForWiggle() { void includeStorageServersForWiggle(const Value& pid) {
bool included = false; bool included = false;
for (auto& address : this->wiggle_addresses) { for (auto& info : this->pid2server_info[pid]) {
if (!this->excludedServers.count(address) || AddressExclusion addr(info->lastKnownInterface.address().ip);
this->excludedServers.get(address) != DDTeamCollection::Status::WIGGLING) { if (!this->excludedServers.count(addr) ||
this->excludedServers.get(addr) != DDTeamCollection::Status::WIGGLING) {
continue; continue;
} }
included = true; included = true;
this->excludedServers.set(address, DDTeamCollection::Status::NONE); this->excludedServers.set(addr, DDTeamCollection::Status::NONE);
} }
this->wiggle_addresses.clear();
if (included) { if (included) {
this->restartRecruiting.trigger(); this->restartRecruiting.trigger();
} }
@ -3536,7 +3531,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
} }
change.push_back(self->zeroHealthyTeams->onChange()); change.push_back(self->zeroHealthyTeams->onChange());
bool healthy = !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize; bool healthy =
!badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize && !anyWigglingServer;
team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam
bool optimal = team->isOptimal() && healthy; bool optimal = team->isOptimal() && healthy;
bool containsFailed = teamContainsFailedServer(self, team); bool containsFailed = teamContainsFailedServer(self, team);
@ -3833,12 +3829,10 @@ ACTOR Future<Void> trackExcludedServers(DDTeamCollection* self) {
// Reset and reassign self->excludedServers based on excluded, but we only // Reset and reassign self->excludedServers based on excluded, but we only
// want to trigger entries that are different // want to trigger entries that are different
// Do not retrigger and double-overwrite failed or wiggling servers // Do not retrigger and double-overwrite failed servers
auto old = self->excludedServers.getKeys(); auto old = self->excludedServers.getKeys();
for (const auto& o : old) { for (const auto& o : old) {
if (!excluded.count(o) && !failed.count(o) && if (!excluded.count(o) && !failed.count(o)) {
!(self->excludedServers.count(o) &&
self->excludedServers.get(o) == DDTeamCollection::Status::WIGGLING)) {
self->excludedServers.set(o, DDTeamCollection::Status::NONE); self->excludedServers.set(o, DDTeamCollection::Status::NONE);
} }
} }
@ -3890,7 +3884,6 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
// to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0. // to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0.
ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection) { ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection) {
state ReadYourWritesTransaction tr(teamCollection->cx); state ReadYourWritesTransaction tr(teamCollection->cx);
state Value writeValue = LiteralStringRef("0");
loop { loop {
try { try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -3903,14 +3896,11 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
auto nextIt = teamCollection->pid2server_info.upper_bound(value.get()); auto nextIt = teamCollection->pid2server_info.upper_bound(value.get());
if (nextIt == teamCollection->pid2server_info.end()) { if (nextIt == teamCollection->pid2server_info.end()) {
tr.set(wigglingStorageServerKey, pid); tr.set(wigglingStorageServerKey, pid);
writeValue = pid;
} else { } else {
tr.set(wigglingStorageServerKey, nextIt->first); tr.set(wigglingStorageServerKey, nextIt->first);
writeValue = nextIt->first;
} }
} else { } else {
tr.set(wigglingStorageServerKey, pid); tr.set(wigglingStorageServerKey, pid);
writeValue = pid;
} }
} }
wait(tr.commit()); wait(tr.commit());
@ -3919,9 +3909,6 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
wait(tr.onError(e)); wait(tr.onError(e));
} }
} }
TraceEvent(SevDebug, "PerpetualNextWigglingStoragePID", teamCollection->distributorId)
.detail("WriteValue", writeValue);
return Void(); return Void();
} }
@ -3931,6 +3918,9 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal, ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
FutureStream<Void> finishStorageWiggleSignal, FutureStream<Void> finishStorageWiggleSignal,
DDTeamCollection* teamCollection) { DDTeamCollection* teamCollection) {
// initialize PID
wait(updateNextWigglingStoragePID(teamCollection));
loop choose { loop choose {
when(wait(stopSignal->onTrigger())) { break; } when(wait(stopSignal->onTrigger())) { break; }
when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); } when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); }
@ -3941,8 +3931,8 @@ ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
// Watch the value change of `wigglingStorageServerKey`. // Watch the value change of `wigglingStorageServerKey`.
// Return the watch future and the current value of `wigglingStorageServerKey`. // Return the watch future and the current value of `wigglingStorageServerKey`.
ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(DDTeamCollection* self) { ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(Database cx) {
state ReadYourWritesTransaction tr(self->cx); state ReadYourWritesTransaction tr(cx);
state Future<Void> watchFuture; state Future<Void> watchFuture;
state Value ret; state Value ret;
loop { loop {
@ -3970,7 +3960,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
PromiseStream<Void> finishStorageWiggleSignal, PromiseStream<Void> finishStorageWiggleSignal,
DDTeamCollection* self, DDTeamCollection* self,
const DDEnabledState* ddEnabledState) { const DDEnabledState* ddEnabledState) {
state Future<Void> watchFuture = Never(); state Future<Void> watchFuture;
state Future<Void> moveFinishFuture = Never(); state Future<Void> moveFinishFuture = Never();
state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY); state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY);
state AsyncTrigger restart; state AsyncTrigger restart;
@ -3978,16 +3968,13 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow); delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow);
state int movingCount = 0; state int movingCount = 0;
state bool isPaused = false; state bool isPaused = false;
state vector<UID> excludedServerIds;
state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self)); state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self->cx));
ASSERT(!self->wigglingPid.present()); // only single process wiggle is allowed watchFuture = res.first;
self->wigglingPid = Optional<Key>(res.second); self->wigglingPid = Optional<Key>(res.second);
// start with the initial pid // start with the initial pid
for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) { if (self->healthyTeamCount > 1) { // pre-check health status
excludedServerIds.push_back(info->id);
}
if (self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) { // pre-check health status
TEST(true); // start the first wiggling TEST(true); // start the first wiggling
auto fv = self->excludeStorageServersForWiggle(self->wigglingPid.get()); auto fv = self->excludeStorageServersForWiggle(self->wigglingPid.get());
@ -4006,20 +3993,15 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
choose { choose {
when(wait(stopSignal->onTrigger())) { break; } when(wait(stopSignal->onTrigger())) { break; }
when(wait(watchFuture)) { when(wait(watchFuture)) {
ASSERT(!self->wigglingPid.present()); // the previous wiggle must be finished
watchFuture = Never();
// read new pid and set the next watch Future // read new pid and set the next watch Future
wait(store(res, watchPerpetualStoragePIDChange(self))); wait(store(res, watchPerpetualStoragePIDChange(self->cx)));
watchFuture = res.first;
self->wigglingPid = Optional<Key>(res.second); self->wigglingPid = Optional<Key>(res.second);
StringRef pid = self->wigglingPid.get(); StringRef pid = self->wigglingPid.get();
// pre-check health status if (self->healthyTeamCount <= 1) { // pre-check health status
excludedServerIds.clear(); pauseWiggle.trigger();
for (const auto& info : self->pid2server_info[self->wigglingPid.get()]) { } else {
excludedServerIds.push_back(info->id);
}
if (self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) {
TEST(true); // start wiggling TEST(true); // start wiggling
auto fv = self->excludeStorageServersForWiggle(pid); auto fv = self->excludeStorageServersForWiggle(pid);
@ -4028,8 +4010,6 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
TraceEvent("PerpetualStorageWiggleStart", self->distributorId) TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
.detail("ProcessId", pid) .detail("ProcessId", pid)
.detail("StorageCount", movingCount); .detail("StorageCount", movingCount);
} else {
pauseWiggle.trigger();
} }
} }
when(wait(restart.onTrigger())) { when(wait(restart.onTrigger())) {
@ -4050,13 +4030,12 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
StringRef pid = self->wigglingPid.get(); StringRef pid = self->wigglingPid.get();
moveFinishFuture = Never(); moveFinishFuture = Never();
self->includeStorageServersForWiggle(); self->includeStorageServersForWiggle(pid);
TraceEvent("PerpetualStorageWiggleFinish", self->distributorId) TraceEvent("PerpetualStorageWiggleFinish", self->distributorId)
.detail("ProcessId", pid.toString()) .detail("ProcessId", pid.toString())
.detail("StorageCount", movingCount); .detail("StorageCount", movingCount);
self->wigglingPid.reset(); self->wigglingPid.reset();
watchFuture = res.first;
finishStorageWiggleSignal.send(Void()); finishStorageWiggleSignal.send(Void());
} }
when(wait(self->zeroHealthyTeams->onChange())) { when(wait(self->zeroHealthyTeams->onChange())) {
@ -4071,11 +4050,11 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && !isPaused) { if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && !isPaused) {
pauseWiggle.trigger(); pauseWiggle.trigger();
} else if (isPaused && count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && } else if (count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && self->healthyTeamCount > 1 &&
self->teams.size() > 1 && _exclusionSafetyCheck(excludedServerIds, self)) { isPaused) {
restart.trigger(); restart.trigger();
} }
ddQueueCheck = delay(SERVER_KNOBS->CHECK_TEAM_DELAY, TaskPriority::DataDistributionLow); ddQueueCheck = delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow);
} }
when(wait(pauseWiggle.onTrigger())) { when(wait(pauseWiggle.onTrigger())) {
if (self->wigglingPid.present()) { if (self->wigglingPid.present()) {
@ -4083,7 +4062,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
StringRef pid = self->wigglingPid.get(); StringRef pid = self->wigglingPid.get();
isPaused = true; isPaused = true;
moveFinishFuture = Never(); moveFinishFuture = Never();
self->includeStorageServersForWiggle(); self->includeStorageServersForWiggle(pid);
TraceEvent("PerpetualStorageWigglePause", self->distributorId) TraceEvent("PerpetualStorageWigglePause", self->distributorId)
.detail("ProcessId", pid) .detail("ProcessId", pid)
.detail("StorageCount", movingCount); .detail("StorageCount", movingCount);
@ -4093,9 +4072,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
} }
if (self->wigglingPid.present()) { if (self->wigglingPid.present()) {
self->includeStorageServersForWiggle(); self->includeStorageServersForWiggle(self->wigglingPid.get());
TraceEvent("PerpetualStorageWiggleExitingPause", self->distributorId)
.detail("ProcessId", self->wigglingPid.get());
self->wigglingPid.reset(); self->wigglingPid.reset();
} }
@ -4111,7 +4088,7 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
state AsyncTrigger stopWiggleSignal; state AsyncTrigger stopWiggleSignal;
state PromiseStream<Void> finishStorageWiggleSignal; state PromiseStream<Void> finishStorageWiggleSignal;
state SignalableActorCollection collection; state SignalableActorCollection collection;
state bool started = false;
loop { loop {
state ReadYourWritesTransaction tr(teamCollection->cx); state ReadYourWritesTransaction tr(teamCollection->cx);
loop { loop {
@ -4126,18 +4103,16 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
wait(tr.commit()); wait(tr.commit());
ASSERT(speed == 1 || speed == 0); ASSERT(speed == 1 || speed == 0);
if (speed == 1 && !started) { if (speed == 1) {
collection.add(perpetualStorageWiggleIterator( collection.add(perpetualStorageWiggleIterator(
&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection)); &stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
collection.add(perpetualStorageWiggler( collection.add(perpetualStorageWiggler(
&stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState)); &stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState));
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId); TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId);
started = true; } else {
} else if (speed == 0 && started) {
stopWiggleSignal.trigger(); stopWiggleSignal.trigger();
wait(collection.signalAndReset()); wait(collection.signalAndReset());
TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId); TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId);
started = false;
} }
wait(watchFuture); wait(watchFuture);
break; break;
@ -4435,7 +4410,7 @@ ACTOR Future<Void> storageServerTracker(
bool isTss) { bool isTss) {
state Future<Void> failureTracker; state Future<Void> failureTracker;
state ServerStatus status(false, false, false, server->lastKnownInterface.locality); state ServerStatus status(false, false, server->lastKnownInterface.locality);
state bool lastIsUnhealthy = false; state bool lastIsUnhealthy = false;
state Future<Void> metricsTracker = serverMetricsPolling(server); state Future<Void> metricsTracker = serverMetricsPolling(server);
@ -4452,7 +4427,6 @@ ACTOR Future<Void> storageServerTracker(
loop { loop {
status.isUndesired = !self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get(); status.isUndesired = !self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get();
status.isWrongConfiguration = false; status.isWrongConfiguration = false;
status.isWiggling = false;
hasWrongDC = !isCorrectDC(self, server); hasWrongDC = !isCorrectDC(self, server);
hasInvalidLocality = hasInvalidLocality =
!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality); !self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality);
@ -4532,21 +4506,10 @@ ACTOR Future<Void> storageServerTracker(
status.isWrongConfiguration = true; status.isWrongConfiguration = true;
} }
// An invalid wiggle server should set itself the right status. Otherwise, it cannot be re-included by
// wiggler.
auto invalidWiggleServer =
[](const AddressExclusion& addr, const DDTeamCollection* tc, const TCServerInfo* server) {
return server->lastKnownInterface.locality.processId() != tc->wigglingPid;
};
// If the storage server is in the excluded servers list, it is undesired // If the storage server is in the excluded servers list, it is undesired
NetworkAddress a = server->lastKnownInterface.address(); NetworkAddress a = server->lastKnownInterface.address();
AddressExclusion worstAddr(a.ip, a.port); AddressExclusion worstAddr(a.ip, a.port);
DDTeamCollection::Status worstStatus = self->excludedServers.get(worstAddr); DDTeamCollection::Status worstStatus = self->excludedServers.get(worstAddr);
if (worstStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(worstAddr, self, server)) {
self->excludedServers.set(worstAddr, DDTeamCollection::Status::NONE);
worstStatus = DDTeamCollection::Status::NONE;
}
otherChanges.push_back(self->excludedServers.onChange(worstAddr)); otherChanges.push_back(self->excludedServers.onChange(worstAddr));
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
@ -4562,12 +4525,6 @@ ACTOR Future<Void> storageServerTracker(
else if (i == 2) else if (i == 2)
testAddr = AddressExclusion(server->lastKnownInterface.secondaryAddress().get().ip); testAddr = AddressExclusion(server->lastKnownInterface.secondaryAddress().get().ip);
DDTeamCollection::Status testStatus = self->excludedServers.get(testAddr); DDTeamCollection::Status testStatus = self->excludedServers.get(testAddr);
if (testStatus == DDTeamCollection::Status::WIGGLING && invalidWiggleServer(testAddr, self, server)) {
self->excludedServers.set(testAddr, DDTeamCollection::Status::NONE);
testStatus = DDTeamCollection::Status::NONE;
}
if (testStatus > worstStatus) { if (testStatus > worstStatus) {
worstStatus = testStatus; worstStatus = testStatus;
worstAddr = testAddr; worstAddr = testAddr;
@ -4586,7 +4543,6 @@ ACTOR Future<Void> storageServerTracker(
status.isWiggling = true; status.isWiggling = true;
TraceEvent("PerpetualWigglingStorageServer", self->distributorId) TraceEvent("PerpetualWigglingStorageServer", self->distributorId)
.detail("Server", server->id) .detail("Server", server->id)
.detail("ProcessId", server->lastKnownInterface.locality.processId())
.detail("Address", worstAddr.toString()); .detail("Address", worstAddr.toString());
} else if (worstStatus == DDTeamCollection::Status::FAILED && !isTss) { } else if (worstStatus == DDTeamCollection::Status::FAILED && !isTss) {
TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId) TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
@ -4651,14 +4607,11 @@ ACTOR Future<Void> storageServerTracker(
bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality; bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality;
bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() != bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() !=
newInterface.first.locality.zoneId().get(); newInterface.first.locality.zoneId().get();
bool processIdChanged = server->lastKnownInterface.locality.processId().get() !=
newInterface.first.locality.processId().get();
TraceEvent("StorageServerInterfaceChanged", self->distributorId) TraceEvent("StorageServerInterfaceChanged", self->distributorId)
.detail("ServerID", server->id) .detail("ServerID", server->id)
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token) .detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token) .detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged) .detail("LocalityChanged", localityChanged)
.detail("ProcessIdChanged", processIdChanged)
.detail("MachineLocalityChanged", machineLocalityChanged); .detail("MachineLocalityChanged", machineLocalityChanged);
server->lastKnownInterface = newInterface.first; server->lastKnownInterface = newInterface.first;
@ -4703,20 +4656,6 @@ ACTOR Future<Void> storageServerTracker(
ASSERT(destMachine.isValid()); ASSERT(destMachine.isValid());
} }
// update pid2server_info if the process id has changed
if (processIdChanged) {
self->pid2server_info[newInterface.first.locality.processId().get()].push_back(
self->server_info[server->id]);
// delete the old one
auto& old_infos =
self->pid2server_info[server->lastKnownInterface.locality.processId().get()];
for (int i = 0; i < old_infos.size(); ++i) {
if (old_infos[i].getPtr() == server) {
std::swap(old_infos[i--], old_infos.back());
old_infos.pop_back();
}
}
}
// Ensure the server's server team belong to a machine team, and // Ensure the server's server team belong to a machine team, and
// Get the newBadTeams due to the locality change // Get the newBadTeams due to the locality change
vector<Reference<TCTeamInfo>> newBadTeams; vector<Reference<TCTeamInfo>> newBadTeams;
@ -4763,8 +4702,7 @@ ACTOR Future<Void> storageServerTracker(
interfaceChanged = server->onInterfaceChanged; interfaceChanged = server->onInterfaceChanged;
// Old failureTracker for the old interface will be actorCancelled since the handler of the old // Old failureTracker for the old interface will be actorCancelled since the handler of the old
// actor now points to the new failure monitor actor. // actor now points to the new failure monitor actor.
status = ServerStatus( status = ServerStatus(status.isFailed, status.isUndesired, server->lastKnownInterface.locality);
status.isFailed, status.isUndesired, status.isWiggling, server->lastKnownInterface.locality);
// self->traceTeamCollectionInfo(); // self->traceTeamCollectionInfo();
recordTeamCollectionInfo = true; recordTeamCollectionInfo = true;
@ -5524,10 +5462,8 @@ ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> te
self->addActor.send(trackExcludedServers(self)); self->addActor.send(trackExcludedServers(self));
self->addActor.send(monitorHealthyTeams(self)); self->addActor.send(monitorHealthyTeams(self));
self->addActor.send(waitHealthyZoneChange(self)); self->addActor.send(waitHealthyZoneChange(self));
if (self->primary) { // the primary dc also handle the satellite dc's perpetual wiggling
self->addActor.send(monitorPerpetualStorageWiggle(self, ddEnabledState)); self->addActor.send(monitorPerpetualStorageWiggle(self, ddEnabledState));
}
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them // SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
loop choose { loop choose {
@ -6279,30 +6215,6 @@ ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
return Void(); return Void();
} }
// Find size of set intersection of excludeServerIDs and serverIDs on each team and see if the leftover team is valid
bool _exclusionSafetyCheck(vector<UID>& excludeServerIDs, DDTeamCollection* teamCollection) {
std::sort(excludeServerIDs.begin(), excludeServerIDs.end());
for (const auto& team : teamCollection->teams) {
vector<UID> teamServerIDs = team->getServerIDs();
std::sort(teamServerIDs.begin(), teamServerIDs.end());
TraceEvent(SevDebug, "DDExclusionSafetyCheck", teamCollection->distributorId)
.detail("Excluding", describe(excludeServerIDs))
.detail("Existing", team->getDesc());
// Find size of set intersection of both vectors and see if the leftover team is valid
vector<UID> intersectSet(teamServerIDs.size());
auto it = std::set_intersection(excludeServerIDs.begin(),
excludeServerIDs.end(),
teamServerIDs.begin(),
teamServerIDs.end(),
intersectSet.begin());
intersectSet.resize(it - intersectSet.begin());
if (teamServerIDs.size() - intersectSet.size() < SERVER_KNOBS->DD_EXCLUDE_MIN_REPLICAS) {
return false;
}
}
return true;
}
ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req, ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req,
Reference<DataDistributorData> self, Reference<DataDistributorData> self,
Database cx) { Database cx) {
@ -6332,7 +6244,26 @@ ACTOR Future<Void> ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest
} }
} }
} }
reply.safe = _exclusionSafetyCheck(excludeServerIDs, self->teamCollection); std::sort(excludeServerIDs.begin(), excludeServerIDs.end());
for (const auto& team : self->teamCollection->teams) {
vector<UID> teamServerIDs = team->getServerIDs();
std::sort(teamServerIDs.begin(), teamServerIDs.end());
TraceEvent(SevDebug, "DDExclusionSafetyCheck", self->ddId)
.detail("Excluding", describe(excludeServerIDs))
.detail("Existing", team->getDesc());
// Find size of set intersection of both vectors and see if the leftover team is valid
vector<UID> intersectSet(teamServerIDs.size());
auto it = std::set_intersection(excludeServerIDs.begin(),
excludeServerIDs.end(),
teamServerIDs.begin(),
teamServerIDs.end(),
intersectSet.begin());
intersectSet.resize(it - intersectSet.begin());
if (teamServerIDs.size() - intersectSet.size() < SERVER_KNOBS->DD_EXCLUDE_MIN_REPLICAS) {
reply.safe = false;
break;
}
}
TraceEvent("DDExclusionSafetyCheckFinish", self->ddId); TraceEvent("DDExclusionSafetyCheckFinish", self->ddId);
req.reply.send(reply); req.reply.send(reply);
return Void(); return Void();
@ -6509,7 +6440,7 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(id % 3))); interface.locality.set(LiteralStringRef("data_hall"), Standalone<StringRef>(std::to_string(id % 3)));
collection->server_info[uid] = makeReference<TCServerInfo>( collection->server_info[uid] = makeReference<TCServerInfo>(
interface, collection.get(), ProcessClass(), true, collection->storageServerSet); interface, collection.get(), ProcessClass(), true, collection->storageServerSet);
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality)); collection->server_status.set(uid, ServerStatus(false, false, interface.locality));
collection->checkAndCreateMachine(collection->server_info[uid]); collection->checkAndCreateMachine(collection->server_info[uid]);
} }
@ -6566,7 +6497,7 @@ std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
collection->server_info[uid] = makeReference<TCServerInfo>( collection->server_info[uid] = makeReference<TCServerInfo>(
interface, collection.get(), ProcessClass(), true, collection->storageServerSet); interface, collection.get(), ProcessClass(), true, collection->storageServerSet);
collection->server_status.set(uid, ServerStatus(false, false, false, interface.locality)); collection->server_status.set(uid, ServerStatus(false, false, interface.locality));
} }
int totalServerIndex = collection->constructMachinesFromServers(); int totalServerIndex = collection->constructMachinesFromServers();

View File

@ -993,7 +993,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
allHealthy = true; allHealthy = true;
anyWithSource = false; anyWithSource = false;
bestTeams.clear(); bestTeams.clear();
// Get team from teamCollections in different DCs and find the best one // Get team from teamCollections in diffrent DCs and find the best one
while (tciIndex < self->teamCollections.size()) { while (tciIndex < self->teamCollections.size()) {
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY; double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
if (rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || if (rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||

View File

@ -133,7 +133,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PRIORITY_RECOVER_MOVE, 110 ); init( PRIORITY_RECOVER_MOVE, 110 );
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 ); init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 ); init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 139 ); init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 140 );
init( PRIORITY_TEAM_HEALTHY, 140 ); init( PRIORITY_TEAM_HEALTHY, 140 );
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 ); init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
init( PRIORITY_TEAM_REDUNDANT, 200 ); init( PRIORITY_TEAM_REDUNDANT, 200 );

View File

@ -890,7 +890,6 @@ ACTOR Future<Void> checkConsistency(Database cx,
StringRef performTSSCheck = LiteralStringRef("false"); StringRef performTSSCheck = LiteralStringRef("false");
if (doQuiescentCheck) { if (doQuiescentCheck) {
performQuiescent = LiteralStringRef("true"); performQuiescent = LiteralStringRef("true");
spec.restorePerpetualWiggleSetting = false;
} }
if (doCacheCheck) { if (doCacheCheck) {
performCacheCheck = LiteralStringRef("true"); performCacheCheck = LiteralStringRef("true");
@ -1386,8 +1385,6 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
state bool useDB = false; state bool useDB = false;
state bool waitForQuiescenceBegin = false; state bool waitForQuiescenceBegin = false;
state bool waitForQuiescenceEnd = false; state bool waitForQuiescenceEnd = false;
state bool restorePerpetualWiggleSetting = false;
state bool perpetualWiggleEnabled = false;
state double startDelay = 0.0; state double startDelay = 0.0;
state double databasePingDelay = 1e9; state double databasePingDelay = 1e9;
state ISimulator::BackupAgentType simBackupAgents = ISimulator::BackupAgentType::NoBackupAgents; state ISimulator::BackupAgentType simBackupAgents = ISimulator::BackupAgentType::NoBackupAgents;
@ -1402,8 +1399,6 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
waitForQuiescenceBegin = true; waitForQuiescenceBegin = true;
if (iter->waitForQuiescenceEnd) if (iter->waitForQuiescenceEnd)
waitForQuiescenceEnd = true; waitForQuiescenceEnd = true;
if (iter->restorePerpetualWiggleSetting)
restorePerpetualWiggleSetting = true;
startDelay = std::max(startDelay, iter->startDelay); startDelay = std::max(startDelay, iter->startDelay);
databasePingDelay = std::min(databasePingDelay, iter->databasePingDelay); databasePingDelay = std::min(databasePingDelay, iter->databasePingDelay);
if (iter->simBackupAgents != ISimulator::BackupAgentType::NoBackupAgents) if (iter->simBackupAgents != ISimulator::BackupAgentType::NoBackupAgents)
@ -1442,15 +1437,6 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
} catch (Error& e) { } catch (Error& e) {
TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to set starting configuration"); TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to set starting configuration");
} }
if (restorePerpetualWiggleSetting) {
std::string_view confView(reinterpret_cast<const char*>(startingConfiguration.begin()),
startingConfiguration.size());
const std::string setting = "perpetual_storage_wiggle:=";
auto pos = confView.find(setting);
if (pos != confView.npos && confView.at(pos + setting.size()) == '1') {
perpetualWiggleEnabled = true;
}
}
} }
if (useDB && waitForQuiescenceBegin) { if (useDB && waitForQuiescenceBegin) {
@ -1466,10 +1452,6 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
TraceEvent("QuietDatabaseStartExternalError").error(e); TraceEvent("QuietDatabaseStartExternalError").error(e);
throw; throw;
} }
if (perpetualWiggleEnabled) { // restore the enabled perpetual storage wiggle setting
wait(setPerpetualStorageWiggle(cx, true, true));
}
} }
TraceEvent("TestsExpectedToPass").detail("Count", tests.size()); TraceEvent("TestsExpectedToPass").detail("Count", tests.size());

View File

@ -1777,7 +1777,6 @@ struct ConsistencyCheckWorkload : TestWorkload {
if (!found) { if (!found) {
TraceEvent("ConsistencyCheck_NoStorage") TraceEvent("ConsistencyCheck_NoStorage")
.detail("Address", addr) .detail("Address", addr)
.detail("ProcessId", workers[i].interf.locality.processId())
.detail("ProcessClassEqualToStorageClass", .detail("ProcessClassEqualToStorageClass",
(int)(workers[i].processClass == ProcessClass::StorageClass)); (int)(workers[i].processClass == ProcessClass::StorageClass));
missingStorage.push_back(workers[i].interf.locality.dcId()); missingStorage.push_back(workers[i].interf.locality.dcId());

View File

@ -159,7 +159,6 @@ public:
simConnectionFailuresDisableDuration = 0; simConnectionFailuresDisableDuration = 0;
simBackupAgents = ISimulator::BackupAgentType::NoBackupAgents; simBackupAgents = ISimulator::BackupAgentType::NoBackupAgents;
simDrAgents = ISimulator::BackupAgentType::NoBackupAgents; simDrAgents = ISimulator::BackupAgentType::NoBackupAgents;
restorePerpetualWiggleSetting = true;
} }
TestSpec(StringRef title, TestSpec(StringRef title,
bool dump, bool dump,
@ -170,8 +169,8 @@ public:
: title(title), dumpAfterTest(dump), clearAfterTest(clear), startDelay(startDelay), useDB(useDB), timeout(600), : title(title), dumpAfterTest(dump), clearAfterTest(clear), startDelay(startDelay), useDB(useDB), timeout(600),
databasePingDelay(databasePingDelay), runConsistencyCheck(g_network->isSimulated()), databasePingDelay(databasePingDelay), runConsistencyCheck(g_network->isSimulated()),
runConsistencyCheckOnCache(false), runConsistencyCheckOnTSS(false), waitForQuiescenceBegin(true), runConsistencyCheckOnCache(false), runConsistencyCheckOnTSS(false), waitForQuiescenceBegin(true),
waitForQuiescenceEnd(true), restorePerpetualWiggleSetting(true), simCheckRelocationDuration(false), waitForQuiescenceEnd(true), simCheckRelocationDuration(false), simConnectionFailuresDisableDuration(0),
simConnectionFailuresDisableDuration(0), simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents), simBackupAgents(ISimulator::BackupAgentType::NoBackupAgents),
simDrAgents(ISimulator::BackupAgentType::NoBackupAgents) { simDrAgents(ISimulator::BackupAgentType::NoBackupAgents) {
phases = TestWorkload::SETUP | TestWorkload::EXECUTION | TestWorkload::CHECK | TestWorkload::METRICS; phases = TestWorkload::SETUP | TestWorkload::EXECUTION | TestWorkload::CHECK | TestWorkload::METRICS;
if (databasePingDelay < 0) if (databasePingDelay < 0)
@ -192,11 +191,6 @@ public:
bool runConsistencyCheckOnTSS; bool runConsistencyCheckOnTSS;
bool waitForQuiescenceBegin; bool waitForQuiescenceBegin;
bool waitForQuiescenceEnd; bool waitForQuiescenceEnd;
bool restorePerpetualWiggleSetting; // whether set perpetual_storage_wiggle as the value after run
// QuietDatabase. QuietDatabase always disables perpetual storage wiggle on
// purpose. If waitForQuiescenceBegin == true and we want to keep perpetual
// storage wiggle the same setting as before during testing, this value should
// be set true.
bool simCheckRelocationDuration; // If set to true, then long duration relocations generate SevWarnAlways messages. bool simCheckRelocationDuration; // If set to true, then long duration relocations generate SevWarnAlways messages.
// Once any workload sets this to true, it will be true for the duration of the // Once any workload sets this to true, it will be true for the duration of the