From 0613a348451502be3e2162042a625ade3196cb96 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Thu, 18 Oct 2018 13:37:31 -0700 Subject: [PATCH] The storage server would block the main thread when processing a single version with a large amount of data --- fdbserver/Knobs.cpp | 2 ++ fdbserver/Knobs.h | 2 ++ fdbserver/storageserver.actor.cpp | 58 +++++++++++++++++++++++-------- 3 files changed, 47 insertions(+), 15 deletions(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 52db1b018f..398c07dafd 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -42,6 +42,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL, 30 ); init( UNFLUSHED_DATA_RATIO, 0.05 ); if( randomize && BUGGIFY ) UNFLUSHED_DATA_RATIO = 0.0; init( DESIRED_TOTAL_BYTES, 150000 ); if( randomize && BUGGIFY ) DESIRED_TOTAL_BYTES = 10000; + init( DESIRED_UPDATE_BYTES, 2*DESIRED_TOTAL_BYTES ); + init( UPDATE_DELAY, 0.001 ); init( MAXIMUM_PEEK_BYTES, 10e6 ); init( APPLY_MUTATION_BYTES, 1e6 ); init( RECOVERY_DATA_BYTE_LIMIT, 100000 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 74cfb7ee0d..89ed8051e8 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -46,6 +46,8 @@ public: double BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL; double UNFLUSHED_DATA_RATIO; int DESIRED_TOTAL_BYTES; + int DESIRED_UPDATE_BYTES; + double UPDATE_DELAY; int MAXIMUM_PEEK_BYTES; int APPLY_MUTATION_BYTES; int RECOVERY_DATA_BYTE_LIMIT; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index fb9575c2d3..e49229de41 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1824,11 +1824,18 @@ ACTOR Future fetchKeys( StorageServer *data, AddingShard* shard ) { ++data->counters.fetchWaitingCount; data->counters.fetchWaitingMS += 1000*(executeStart - startt); - Void _ = wait(delay(0)); + // Fetch keys gets called while the update actor is processing mutations. data->version will not be updated until all mutations for a version + // have been processed. We need to take the durableVersionLock to ensure data->version is greater than the version of the mutation which caused + // the fetch to be initiated. + Void _ = wait( data->durableVersionLock.take() ); shard->phase = AddingShard::Fetching; state Version fetchVersion = data->version.get(); + data->durableVersionLock.release(); + + Void _ = wait(delay(0)); + TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID).detail("FKID", interval.pairID).detail("Version", fetchVersion); // Get the history @@ -2267,6 +2274,7 @@ bool containsRollback( VersionUpdateRef const& changes, Version& rollbackVersion class StorageUpdater { public: + StorageUpdater() : fromVersion(invalidVersion), currentVersion(invalidVersion), restoredVersion(invalidVersion), processedStartKey(false) {} StorageUpdater(Version fromVersion, Version restoredVersion) : fromVersion(fromVersion), currentVersion(fromVersion), restoredVersion(restoredVersion), processedStartKey(false) {} void applyMutation(StorageServer* data, MutationRef const& m, Version ver) { @@ -2471,26 +2479,43 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) data->updateEagerReads = &eager; data->debug_inApplyUpdate = true; - StorageUpdater updater(data->lastVersionWithData, data->restoredVersion); + state StorageUpdater updater(data->lastVersionWithData, data->restoredVersion); if (EXPENSIVE_VALIDATION) data->data().atLatest().validate(); validate(data); state bool injectedChanges = false; - for(auto& c : fii.changes) { - for(auto& m : c.mutations) { - updater.applyMutation(data, m, c.version); + state int changeNum = 0; + state int mutationBytes = 0; + for(; changeNum < fii.changes.size(); changeNum++) { + state int mutationNum = 0; + state VerUpdateRef* pUpdate = &fii.changes[changeNum]; + for(; mutationNum < pUpdate->mutations.size(); mutationNum++) { + updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version); + mutationBytes += pUpdate->mutations[mutationNum].totalSize(); injectedChanges = true; + if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) { + mutationBytes = 0; + Void _ = wait(delay(SERVER_KNOBS->UPDATE_DELAY)); + } } } - Version ver = invalidVersion; + state Version ver = invalidVersion; cloneCursor2->setProtocolVersion(data->logProtocol); //TraceEvent("SSUpdatePeeked", data->thisServerID).detail("FromEpoch", data->updateEpoch).detail("FromSeq", data->updateSequence).detail("ToEpoch", results.end_epoch).detail("ToSeq", results.end_seq).detail("MsgSize", results.messages.size()); for (;cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) { - auto &rd = *cloneCursor2->reader(); + if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) { + mutationBytes = 0; + //Instead of just yielding, leave time for the storage server to respond to reads + Void _ = wait(delay(SERVER_KNOBS->UPDATE_DELAY)); + } - if (cloneCursor2->version().version > ver) ASSERT(cloneCursor2->version().version > data->version.get()); + if (cloneCursor2->version().version > ver) { + ASSERT(cloneCursor2->version().version > data->version.get()); + } + + auto &rd = *cloneCursor2->reader(); if (cloneCursor2->version().version > ver && cloneCursor2->version().version > data->version.get()) { ++data->counters.updateVersions; @@ -2514,7 +2539,7 @@ ACTOR Future update( StorageServer* data, bool* pReceivedUpdate ) TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString()); updater.applyMutation(data, msg, ver); - + mutationBytes += msg.totalSize(); data->counters.mutationBytes += msg.totalSize(); ++data->counters.mutations; switch(msg.type) { @@ -2645,18 +2670,21 @@ ACTOR Future updateStorage(StorageServer* data) { debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion ); // Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and - // are applied after we change the durable version. + // are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion, + // because otherwise the latest version of mutableData might be partially loaded. Void _ = wait( data->durableVersionLock.take() ); - data->durableVersionLock.release(); - - Void _ = wait( delay(0, TaskUpdateStorage) ); - data->popVersion( data->durableVersion.get() + 1 ); while (!changeDurableVersion( data, newOldestVersion )) { - Void _ = wait( yield(TaskUpdateStorage) ); + if(g_network->check_yield(TaskUpdateStorage)) { + data->durableVersionLock.release(); + Void _ = wait(delay(0, TaskUpdateStorage)); + Void _ = wait( data->durableVersionLock.take() ); + } } + data->durableVersionLock.release(); + //TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion); Void _ = wait( durableDelay );