Version difference is now the diff between TLog versions and SS version.

This commit is contained in:
Balachandar Namasivayam 2020-04-03 19:04:43 -07:00
parent a5af31de23
commit 73272fc72e
6 changed files with 25 additions and 29 deletions

View File

@ -394,13 +394,14 @@ struct GetStorageMetricsReply {
StorageMetrics available;
StorageMetrics capacity;
double bytesInputRate;
Version version; // current storage server version
int64_t versionLag;
double lastUpdate;
GetStorageMetricsReply() : bytesInputRate(0) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, load, available, capacity, bytesInputRate, version);
serializer(ar, load, available, capacity, bytesInputRate, versionLag, lastUpdate);
}
};

View File

@ -111,7 +111,7 @@ struct TCMachineInfo : public ReferenceCounted<TCMachineInfo> {
}
};
ACTOR Future<Void> updateServerMetrics( TCServerInfo *server, Database cx ) {
ACTOR Future<Void> updateServerMetrics( TCServerInfo *server ) {
state StorageServerInterface ssi = server->lastKnownInterface;
state Future<ErrorOr<GetStorageMetricsReply>> metricsRequest = ssi.getStorageMetrics.tryGetReply( GetStorageMetricsRequest(), TaskPriority::DataDistributionLaunch );
state Future<Void> resetRequest = Never();
@ -151,28 +151,21 @@ ACTOR Future<Void> updateServerMetrics( TCServerInfo *server, Database cx ) {
}
}
if(cx.getPtr()) {
Version versionNow = wait(runRYWTransaction(cx, [](Reference<ReadYourWritesTransaction> tr) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
return tr->getReadVersion(); }));
Version versionDiff = versionNow - server->serverMetrics.get().version;
if(versionDiff > SERVER_KNOBS->DD_SS_FAILURE_VERSIONLAG) {
if(server->ssVersionTooFarBehind.get() == false) {
TraceEvent(SevInfo, "SSVersionDiffLarge").detail("ServerId", server->id.toString()).detail("VersionNow", versionNow).detail("SSVersion", server->serverMetrics.get().version).detail("Diff", versionDiff);
server->ssVersionTooFarBehind.set(true);
}
} else if(versionDiff < SERVER_KNOBS->DD_SS_ALLOWED_VERSIONLAG) {
if(server->ssVersionTooFarBehind.get() == true) {
TraceEvent(SevInfo, "SSVersionDiffNormal").detail("ServerId", server->id.toString()).detail("VersionNow", versionNow).detail("SSVersion", server->serverMetrics.get().version).detail("Diff", versionDiff);
server->ssVersionTooFarBehind.set(false);
}
}
if ( server->serverMetrics.get().lastUpdate < now() - SERVER_KNOBS->DD_SS_STUCK_TIME_LIMIT && server->ssVersionTooFarBehind.get() == false) {
TraceEvent(SevInfo, "StorageServerStuck").detail("ServerId", server->id.toString()).detail("LastUpdate", server->serverMetrics.get().lastUpdate);
server->ssVersionTooFarBehind.set(true);
} else if ( server->serverMetrics.get().versionLag > SERVER_KNOBS->DD_SS_FAILURE_VERSIONLAG && server->ssVersionTooFarBehind.get() == false ) {
TraceEvent(SevInfo, "SSVersionDiffLarge").detail("ServerId", server->id.toString()).detail("VersionLag", server->serverMetrics.get().versionLag);
server->ssVersionTooFarBehind.set(true);
} else if ( server->serverMetrics.get().versionLag < SERVER_KNOBS->DD_SS_ALLOWED_VERSIONLAG && server->ssVersionTooFarBehind.get() == true ) {
TraceEvent(SevInfo, "SSVersionDiffNormal").detail("ServerId", server->id.toString()).detail("VersionLag", server->serverMetrics.get().versionLag);
server->ssVersionTooFarBehind.set(false);
}
return Void();
}
ACTOR Future<Void> updateServerMetrics( Reference<TCServerInfo> server, Database cx = Database() ) {
wait( updateServerMetrics( server.getPtr(), cx ) );
ACTOR Future<Void> updateServerMetrics( Reference<TCServerInfo> server) {
wait( updateServerMetrics( server.getPtr() ) );
return Void();
}
@ -3353,10 +3346,10 @@ ACTOR Future<Void> waitHealthyZoneChange( DDTeamCollection* self ) {
}
}
ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server, Database cx) {
ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server ) {
state double lastUpdate = now();
loop {
wait( updateServerMetrics( server, cx ) );
wait( updateServerMetrics( server ) );
wait( delayUntil( lastUpdate + SERVER_KNOBS->STORAGE_METRICS_POLLING_DELAY + SERVER_KNOBS->STORAGE_METRICS_RANDOM_DELAY * deterministicRandom()->random01(), TaskPriority::DataDistributionLaunch ) );
lastUpdate = now();
}
@ -3499,7 +3492,7 @@ ACTOR Future<Void> storageServerTracker(
state Future<Void> failureTracker;
state ServerStatus status( false, false, server->lastKnownInterface.locality );
state bool lastIsUnhealthy = false;
state Future<Void> metricsTracker = serverMetricsPolling( server, self->cx );
state Future<Void> metricsTracker = serverMetricsPolling( server );
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged;

View File

@ -217,7 +217,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( DD_ENABLE_VERBOSE_TRACING, false ); if( randomize && BUGGIFY ) DD_ENABLE_VERBOSE_TRACING = true;
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
init( DD_SS_ALLOWED_VERSIONLAG, 200000000 ); if( randomize && BUGGIFY ) { DD_SS_FAILURE_VERSIONLAG = deterministicRandom()->randomInt(15000000, 500000000); DD_SS_ALLOWED_VERSIONLAG = 0.75 * DD_SS_FAILURE_VERSIONLAG; }
init( DD_SS_STUCK_TIME_LIMIT, 120.0 ); if( randomize && BUGGIFY ) { DD_SS_STUCK_TIME_LIMIT = 60.0 + deterministicRandom()->random01() * 60.0; }
// TeamRemover
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -174,7 +174,8 @@ public:
int DD_CHECK_INVALID_LOCALITY_DELAY;
bool DD_ENABLE_VERBOSE_TRACING;
int64_t DD_SS_FAILURE_VERSIONLAG; // Allowed SS version lag from the current read version before marking it as failed.
int64_t DD_SS_ALLOWED_VERSIONLAG; // SS will be marked as healthy if it's version lag goes below this value.
int64_t DD_SS_ALLOWED_VERSIONLAG; // SS will be marked as healthy if it's version lag goes below this value.
double DD_SS_STUCK_TIME_LIMIT; // If a storage server is not getting new versions for this amount of time, then it becomes undesired.
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor

View File

@ -378,7 +378,7 @@ struct StorageServerMetrics {
}
}
void getStorageMetrics( GetStorageMetricsRequest req, StorageBytes sb, double bytesInputRate, Version version ){
void getStorageMetrics( GetStorageMetricsRequest req, StorageBytes sb, double bytesInputRate, int64_t versionLag, double lastUpdate ){
GetStorageMetricsReply rep;
// SOMEDAY: make bytes dynamic with hard disk space
@ -405,7 +405,8 @@ struct StorageServerMetrics {
rep.bytesInputRate = bytesInputRate;
rep.version = version;
rep.versionLag = versionLag;
rep.lastUpdate = lastUpdate;
req.reply.send(rep);
}

View File

@ -3484,7 +3484,7 @@ ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi
}
when (GetStorageMetricsRequest req = waitNext(ssi.getStorageMetrics.getFuture())) {
StorageBytes sb = self->storage.getStorageBytes();
self->metrics.getStorageMetrics( req, sb, self->counters.bytesInput.getRate(), self->version.get() );
self->metrics.getStorageMetrics( req, sb, self->counters.bytesInput.getRate(), self->versionLag, self->lastUpdate );
}
when (wait(doPollMetrics) ) {
self->metrics.poll();