Fix another build team bug

The buildTeam() can create teams with undesired storage servers, which are
considered unhealthy. As a result, the data movement can become stuck.

Fix this by adding an ACTOR monitorHealthyTeams that builds team every one
second whenever there is no healthy teams.

Clean up storageServerTracker() interface.
This commit is contained in:
Jingyu Zhou 2019-02-12 14:02:21 -08:00
parent 8afe84d31b
commit fc3a784963
3 changed files with 51 additions and 54 deletions

View File

@ -531,11 +531,6 @@ Future<Void> storageServerTracker(
struct DDTeamCollection* const& self,
Database const& cx,
TCServerInfo* const& server,
ServerStatusMap* const& statusMap,
MoveKeysLock const& lock,
UID const& distributorId,
std::map<UID, Reference<TCServerInfo>>* const& other_servers,
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> const& changes,
Promise<Void> const& errorOut,
Version const& addedVersion);
@ -550,7 +545,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
DatabaseConfiguration configuration;
bool doBuildTeams;
Future<Void> checkBuildTeam;
Future<Void> teamBuilder;
AsyncTrigger restartTeamBuilder;
@ -1769,7 +1763,6 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
desc += i->first.shortString() + " (" + i->second->lastKnownInterface.toString() + "), ";
}
}
checkBuildTeam = DDTeamCollection::checkBuildTeams(this);
TraceEvent(SevWarn, "NoHealthyTeams", distributorId)
.detail("CurrentTeamCount", teams.size())
@ -1796,7 +1789,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// Establish the relation between server and machine
checkAndCreateMachine(r);
r->tracker = storageServerTracker( this, cx, r.getPtr(), &server_status, lock, distributorId, &server_info, serverChanges, errorOut, addedVersion );
r->tracker = storageServerTracker( this, cx, r.getPtr(), errorOut, addedVersion );
doBuildTeams = true; // Adding a new server triggers to build new teams
restartTeamBuilder.trigger();
}
@ -2107,7 +2100,6 @@ ACTOR Future<Void> teamTracker( DDTeamCollection* self, Reference<TCTeamInfo> te
TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->distributorId)
.detail("SignallingTeam", team->getDesc())
.detail("Primary", self->primary);
self->checkBuildTeam = DDTeamCollection::checkBuildTeams(self);
}
TraceEvent("TeamHealthDifference", self->distributorId)
@ -2426,15 +2418,14 @@ ACTOR Future<Void> storageServerFailureTracker(
DDTeamCollection* self,
TCServerInfo *server,
Database cx,
ServerStatusMap *statusMap,
ServerStatus *status,
Version addedVersion )
{
state StorageServerInterface interf = server->lastKnownInterface;
state bool doBuildTeam = false;
loop {
if( statusMap->get(interf.id()).initialized ) {
bool unhealthy = statusMap->get(interf.id()).isUnhealthy();
if( self->server_status.get(interf.id()).initialized ) {
bool unhealthy = self->server_status.get(interf.id()).isUnhealthy();
if(unhealthy && !status->isUnhealthy()) {
self->unhealthyServers--;
}
@ -2445,11 +2436,10 @@ ACTOR Future<Void> storageServerFailureTracker(
self->unhealthyServers++;
}
statusMap->set( interf.id(), *status );
self->server_status.set( interf.id(), *status );
if (doBuildTeam) {
doBuildTeam = false;
self->doBuildTeams = true;
self->checkBuildTeam = DDTeamCollection::checkBuildTeams(self);
}
if( status->isFailed )
self->restartRecruiting.trigger();
@ -2485,11 +2475,6 @@ ACTOR Future<Void> storageServerTracker(
DDTeamCollection* self,
Database cx,
TCServerInfo *server, //This actor is owned by this TCServerInfo
ServerStatusMap *statusMap,
MoveKeysLock lock,
UID distributorId,
std::map<UID, Reference<TCServerInfo>>* other_servers,
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> changes,
Promise<Void> errorOut,
Version addedVersion)
{
@ -2502,8 +2487,8 @@ ACTOR Future<Void> storageServerTracker(
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
state bool hasWrongStoreTypeOrDC = false;
if(changes.present()) {
changes.get().send( std::make_pair(server->id, server->lastKnownInterface) );
if(self->serverChanges.present()) {
self->serverChanges.get().send( std::make_pair(server->id, server->lastKnownInterface) );
}
try {
@ -2514,33 +2499,33 @@ ACTOR Future<Void> storageServerTracker(
// If there is any other server on this exact NetworkAddress, this server is undesired and will eventually be eliminated
state std::vector<Future<Void>> otherChanges;
std::vector<Promise<Void>> wakeUpTrackers;
for(auto i = other_servers->begin(); i != other_servers->end(); ++i) {
if (i->second.getPtr() != server && i->second->lastKnownInterface.address() == server->lastKnownInterface.address()) {
auto& statusInfo = statusMap->get( i->first );
TraceEvent("SameAddress", distributorId)
for(const auto& i : self->server_info) {
if (i.second.getPtr() != server && i.second->lastKnownInterface.address() == server->lastKnownInterface.address()) {
auto& statusInfo = self->server_status.get( i.first );
TraceEvent("SameAddress", self->distributorId)
.detail("Failed", statusInfo.isFailed)
.detail("Undesired", statusInfo.isUndesired)
.detail("Server", server->id).detail("OtherServer", i->second->id)
.detail("Server", server->id).detail("OtherServer", i.second->id)
.detail("Address", server->lastKnownInterface.address())
.detail("NumShards", self->shardsAffectedByTeamFailure->getNumberOfShards(server->id))
.detail("OtherNumShards", self->shardsAffectedByTeamFailure->getNumberOfShards(i->second->id))
.detail("OtherHealthy", !statusMap->get( i->second->id ).isUnhealthy());
.detail("OtherNumShards", self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->id))
.detail("OtherHealthy", !self->server_status.get( i.second->id ).isUnhealthy());
// wait for the server's ip to be changed
otherChanges.push_back(statusMap->onChange(i->second->id));
if(!statusMap->get( i->second->id ).isUnhealthy()) {
if(self->shardsAffectedByTeamFailure->getNumberOfShards(i->second->id) >= self->shardsAffectedByTeamFailure->getNumberOfShards(server->id))
otherChanges.push_back(self->server_status.onChange(i.second->id));
if(!self->server_status.get( i.second->id ).isUnhealthy()) {
if(self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->id) >= self->shardsAffectedByTeamFailure->getNumberOfShards(server->id))
{
TraceEvent(SevWarn, "UndesiredStorageServer", distributorId)
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
.detail("Server", server->id)
.detail("Address", server->lastKnownInterface.address())
.detail("OtherServer", i->second->id)
.detail("OtherServer", i.second->id)
.detail("NumShards", self->shardsAffectedByTeamFailure->getNumberOfShards(server->id))
.detail("OtherNumShards", self->shardsAffectedByTeamFailure->getNumberOfShards(i->second->id));
.detail("OtherNumShards", self->shardsAffectedByTeamFailure->getNumberOfShards(i.second->id));
status.isUndesired = true;
}
else
wakeUpTrackers.push_back(i->second->wakeUpTracker);
wakeUpTrackers.push_back(i.second->wakeUpTracker);
}
}
}
@ -2552,7 +2537,7 @@ ACTOR Future<Void> storageServerTracker(
if( server->lastKnownClass.machineClassFitness( ProcessClass::Storage ) > ProcessClass::UnsetFit ) {
if( self->optimalTeamCount > 0 ) {
TraceEvent(SevWarn, "UndesiredStorageServer", distributorId)
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
.detail("Server", server->id)
.detail("OptimalTeamCount", self->optimalTeamCount)
.detail("Fitness", server->lastKnownClass.machineClassFitness(ProcessClass::Storage));
@ -2563,7 +2548,7 @@ ACTOR Future<Void> storageServerTracker(
//If this storage server has the wrong key-value store type, then mark it undesired so it will be replaced with a server having the correct type
if(hasWrongStoreTypeOrDC) {
TraceEvent(SevWarn, "UndesiredStorageServer", distributorId).detail("Server", server->id).detail("StoreType", "?");
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId).detail("Server", server->id).detail("StoreType", "?");
status.isUndesired = true;
status.isWrongConfiguration = true;
}
@ -2573,7 +2558,7 @@ ACTOR Future<Void> storageServerTracker(
AddressExclusion addr( a.ip, a.port );
AddressExclusion ipaddr( a.ip );
if (self->excludedServers.get( addr ) || self->excludedServers.get( ipaddr )) {
TraceEvent(SevWarn, "UndesiredStorageServer", distributorId).detail("Server", server->id)
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId).detail("Server", server->id)
.detail("Excluded", self->excludedServers.get( addr ) ? addr.toString() : ipaddr.toString());
status.isUndesired = true;
status.isWrongConfiguration = true;
@ -2581,25 +2566,25 @@ ACTOR Future<Void> storageServerTracker(
otherChanges.push_back( self->excludedServers.onChange( addr ) );
otherChanges.push_back( self->excludedServers.onChange( ipaddr ) );
failureTracker = storageServerFailureTracker( self, server, cx, statusMap, &status, addedVersion );
failureTracker = storageServerFailureTracker( self, server, cx, &status, addedVersion );
//We need to recruit new storage servers if the key value store type has changed
if(hasWrongStoreTypeOrDC)
self->restartRecruiting.trigger();
TraceEvent("StatusMapChange", distributorId).detail("Status", status.toString()).detail("LastIsUnhealthy", lastIsUnhealthy);
TraceEvent("StatusMapChange", self->distributorId).detail("Status", status.toString())
.detail("Server", server->id).detail("LastIsUnhealthy", lastIsUnhealthy);
if ( lastIsUnhealthy && !status.isUnhealthy() && (!server->teams.size() || self->zeroHealthyTeams->get()) ) {
self->doBuildTeams = true;
self->checkBuildTeam = DDTeamCollection::checkBuildTeams(self);
}
lastIsUnhealthy = status.isUnhealthy();
choose {
when( wait( failureTracker ) ) {
// The server is failed AND all data has been removed from it, so permanently remove it.
TraceEvent("StatusMapChange", distributorId).detail("ServerID", server->id).detail("Status", "Removing");
if(changes.present()) {
changes.get().send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
TraceEvent("StatusMapChange", self->distributorId).detail("ServerID", server->id).detail("Status", "Removing");
if(self->serverChanges.present()) {
self->serverChanges.get().send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
}
if(server->updated.canBeSet()) {
@ -2607,9 +2592,9 @@ ACTOR Future<Void> storageServerTracker(
}
// Remove server from FF/serverList
wait( removeStorageServer( cx, server->id, lock ) );
wait( removeStorageServer( cx, server->id, self->lock ) );
TraceEvent("StatusMapChange", distributorId).detail("ServerID", server->id).detail("Status", "Removed");
TraceEvent("StatusMapChange", self->distributorId).detail("ServerID", server->id).detail("Status", "Removed");
// Sets removeSignal (alerting dataDistributionTeamCollection to remove the storage server from its own data structures)
server->removed.send( Void() );
self->removedServers.send( server->id );
@ -2620,7 +2605,7 @@ ACTOR Future<Void> storageServerTracker(
bool localityChanged = server->lastKnownInterface.locality != newInterface.first.locality;
bool machineLocalityChanged = server->lastKnownInterface.locality.zoneId().get() !=
newInterface.first.locality.zoneId().get();
TraceEvent("StorageServerInterfaceChanged", distributorId).detail("ServerID", server->id)
TraceEvent("StorageServerInterfaceChanged", self->distributorId).detail("ServerID", server->id)
.detail("NewWaitFailureToken", newInterface.first.waitFailure.getEndpoint().token)
.detail("OldWaitFailureToken", server->lastKnownInterface.waitFailure.getEndpoint().token)
.detail("LocalityChanged", localityChanged);
@ -2702,8 +2687,8 @@ ACTOR Future<Void> storageServerTracker(
}
interfaceChanged = server->onInterfaceChanged;
if(changes.present()) {
changes.get().send( std::make_pair(server->id, server->lastKnownInterface) );
if(self->serverChanges.present()) {
self->serverChanges.get().send( std::make_pair(server->id, server->lastKnownInterface) );
}
// We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location
status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality );
@ -2716,10 +2701,10 @@ ACTOR Future<Void> storageServerTracker(
self->restartRecruiting.trigger();
}
when( wait( otherChanges.empty() ? Never() : quorum( otherChanges, 1 ) ) ) {
TraceEvent("SameAddressChangedStatus", distributorId).detail("ServerID", server->id);
TraceEvent("SameAddressChangedStatus", self->distributorId).detail("ServerID", server->id);
}
when( KeyValueStoreType type = wait( storeTracker ) ) {
TraceEvent("KeyValueStoreTypeChanged", distributorId)
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("StoreType", type.toString())
.detail("DesiredType", self->configuration.storageServerStoreType.toString());
@ -2808,9 +2793,6 @@ ACTOR Future<Void> initializeStorage( DDTeamCollection* self, RecruitStorageRepl
TraceEvent(SevWarn, "DDRecruitmentError").detail("Reason", "Server ID already recruited");
self->doBuildTeams = true;
if( self->healthyTeamCount == 0 ) {
wait( self->checkBuildTeams( self ) );
}
}
self->restartRecruiting.trigger();
@ -2938,6 +2920,18 @@ ACTOR Future<Void> remoteRecovered( Reference<AsyncVar<struct ServerDBInfo>> db
return Void();
}
ACTOR Future<Void> monitorHealthyTeams( DDTeamCollection* self ) {
state Future<Void> checkHealth;
loop choose {
when ( wait( delay( SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY ) ) ) {
if ( self->healthyTeamCount == 0 ) {
self->doBuildTeams = true;
checkHealth = DDTeamCollection::checkBuildTeams(self);
}
}
}
}
// Keep track of servers and teams -- serves requests for getRandomTeam
ACTOR Future<Void> dataDistributionTeamCollection(
Reference<DDTeamCollection> teamCollection,
@ -2973,6 +2967,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(
self->addActor.send(monitorStorageServerRecruitment( self ));
self->addActor.send(waitServerListChange( self, serverRemoved.getFuture() ));
self->addActor.send(trackExcludedServers( self ));
self->addActor.send(monitorHealthyTeams( self ));
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them

View File

@ -166,6 +166,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MOVEKEYS_LOCK_POLLING_DELAY, 5.0 );
init( DEBOUNCE_RECRUITING_DELAY, 5.0 );
init( DD_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) DD_FAILURE_TIME = 10.0;
init( DD_ZERO_HEALTHY_TEAM_DELAY, 1.0 );
// Redwood Storage Engine
init( PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT, 30 );

View File

@ -129,6 +129,7 @@ public:
double MOVEKEYS_LOCK_POLLING_DELAY;
double DEBOUNCE_RECRUITING_DELAY;
double DD_FAILURE_TIME;
double DD_ZERO_HEALTHY_TEAM_DELAY;
// Redwood Storage Engine
int PREFIX_TREE_IMMEDIATE_KEY_SIZE_LIMIT;