Revert "Merge pull request #3770 from dongxinEric/feature/ss-commit-pipelining"

This reverts commit e258dffc65a2cf3e7768f2c9f833f9f553a52d74, reversing
changes made to 0e96233015c25608df620a8c1b81f9945bbd6a33.

SS commit seems to be causing write performance degredation.
This commit is contained in:
Xin Dong 2020-10-19 09:28:38 -07:00
parent 944f30484a
commit 410d418c3e
9 changed files with 101 additions and 168 deletions

View File

@ -63,8 +63,6 @@ public:
virtual void enableSnapshot() {} virtual void enableSnapshot() {}
virtual bool canPipelineCommits() const = 0;
/* /*
Concurrency contract Concurrency contract
Causal consistency: Causal consistency:

View File

@ -35,10 +35,10 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
KeyValueStoreCompressTestData(IKeyValueStore* store) : store(store) {} KeyValueStoreCompressTestData(IKeyValueStore* store) : store(store) {}
bool canPipelineCommits() const override {return false;} virtual Future<Void> getError() override { return store->getError(); }
Future<Void> getError() override { return store->getError(); } virtual Future<Void> onClosed() override { return store->onClosed(); }
Future<Void> onClosed() override { return store->onClosed(); } virtual void dispose() override {
void dispose() override {
store->dispose(); store->dispose();
delete this; delete this;
} }

View File

@ -63,9 +63,7 @@ public:
// IKeyValueStore // IKeyValueStore
KeyValueStoreType getType() const override { return type; } KeyValueStoreType getType() const override { return type; }
virtual bool canPipelineCommits() const override { return false; } virtual std::tuple<size_t, size_t, size_t> getSize() const override { return data.size(); }
std::tuple<size_t, size_t, size_t> getSize() const override { return data.size(); }
int64_t getAvailableSize() const { int64_t getAvailableSize() const {
int64_t residentSize = data.sumTo(data.end()) + queue.totalSize() + // doesn't account for overhead in queue int64_t residentSize = data.sumTo(data.end()) + queue.totalSize() + // doesn't account for overhead in queue

View File

@ -285,8 +285,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return errorPromise.getFuture(); return errorPromise.getFuture();
} }
bool canPipelineCommits() const override { return true; }
ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) { ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) {
wait(self->readThreads->stop()); wait(self->readThreads->stop());
auto a = new Writer::CloseAction(self->path, deleteOnClose); auto a = new Writer::CloseAction(self->path, deleteOnClose);

View File

@ -1451,9 +1451,8 @@ public:
Future<Void> getError() override { return delayed(readThreads->getError() || writeThread->getError()); } Future<Void> getError() override { return delayed(readThreads->getError() || writeThread->getError()); }
Future<Void> onClosed() override { return stopped.getFuture(); } Future<Void> onClosed() override { return stopped.getFuture(); }
KeyValueStoreType getType() const override { return type; } virtual KeyValueStoreType getType() const override { return type; }
StorageBytes getStorageBytes() const override; virtual StorageBytes getStorageBytes() const override;
bool canPipelineCommits() const override { return false; }
void set(KeyValueRef keyValue, const Arena* arena = nullptr) override; void set(KeyValueRef keyValue, const Arena* arena = nullptr) override;
void clear(KeyRangeRef range, const Arena* arena = nullptr) override; void clear(KeyRangeRef range, const Arena* arena = nullptr) override;

View File

@ -548,7 +548,6 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FETCH_KEYS_LOWER_PRIORITY, 0 ); init( FETCH_KEYS_LOWER_PRIORITY, 0 );
init( BUGGIFY_BLOCK_BYTES, 10000 ); init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000; init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;
init( STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD, 100000 );
init( STORAGE_DURABILITY_LAG_REJECT_THRESHOLD, 0.25 ); init( STORAGE_DURABILITY_LAG_REJECT_THRESHOLD, 0.25 );
init( STORAGE_DURABILITY_LAG_MIN_RATE, 0.1 ); init( STORAGE_DURABILITY_LAG_MIN_RATE, 0.1 );
init( STORAGE_COMMIT_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_INTERVAL = 2.0; init( STORAGE_COMMIT_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_INTERVAL = 2.0;

View File

@ -481,7 +481,6 @@ public:
double STORAGE_DURABILITY_LAG_MIN_RATE; double STORAGE_DURABILITY_LAG_MIN_RATE;
int STORAGE_COMMIT_BYTES; int STORAGE_COMMIT_BYTES;
double STORAGE_COMMIT_INTERVAL; double STORAGE_COMMIT_INTERVAL;
int STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD;
double UPDATE_SHARD_VERSION_INTERVAL; double UPDATE_SHARD_VERSION_INTERVAL;
int BYTE_SAMPLING_FACTOR; int BYTE_SAMPLING_FACTOR;
int BYTE_SAMPLING_OVERHEAD; int BYTE_SAMPLING_OVERHEAD;

View File

@ -5738,8 +5738,6 @@ public:
KeyValueStoreType getType() const override { return KeyValueStoreType::SSD_REDWOOD_V1; } KeyValueStoreType getType() const override { return KeyValueStoreType::SSD_REDWOOD_V1; }
bool canPipelineCommits() const override { return true; }
StorageBytes getStorageBytes() const override { return m_tree->getStorageBytes(); } StorageBytes getStorageBytes() const override { return m_tree->getStorageBytes(); }
Future<Void> getError() { return delayed(m_error.getFuture()); }; Future<Void> getError() { return delayed(m_error.getFuture()); };

View File

@ -152,11 +152,10 @@ struct ShardInfo : ReferenceCounted<ShardInfo>, NonCopyable {
}; };
struct StorageServerDisk { struct StorageServerDisk {
explicit StorageServerDisk( struct StorageServer* data, IKeyValueStore* storage ) : data(data), storage(storage), _canPipelineCommits(storage->canPipelineCommits()) {} explicit StorageServerDisk(struct StorageServer* data, IKeyValueStore* storage) : data(data), storage(storage) {}
void makeNewStorageServerDurable(); void makeNewStorageServerDurable();
// Asyncronously move data from mutation log into SE's commit buffer for next commit. bool makeVersionMutationsDurable(Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft);
Future<bool> asyncPrepareVersionsForCommit(Version desiredOldestVersion, Future<Void> durable, Future<Void>durableMinDelay);
void makeVersionDurable( Version version ); void makeVersionDurable( Version version );
Future<bool> restoreDurableState(); Future<bool> restoreDurableState();
@ -179,15 +178,12 @@ struct StorageServerDisk {
KeyValueStoreType getKeyValueStoreType() const { return storage->getType(); } KeyValueStoreType getKeyValueStoreType() const { return storage->getType(); }
StorageBytes getStorageBytes() const { return storage->getStorageBytes(); } StorageBytes getStorageBytes() const { return storage->getStorageBytes(); }
std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); } std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); }
bool canPipelineCommits() const {return _canPipelineCommits;}
void set(KeyValueRef kv) { storage->set(kv);}
void clear(KeyRangeRef kr) { storage->clear(kr);}
private: private:
struct StorageServer* data; struct StorageServer* data;
IKeyValueStore* storage; IKeyValueStore* storage;
const bool _canPipelineCommits;
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) { ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
Standalone<RangeResultRef> r = wait( storage->readRange( range, 1 ) ); Standalone<RangeResultRef> r = wait( storage->readRange( range, 1 ) );
@ -3091,67 +3087,79 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
wait( delay(0, TaskPriority::UpdateStorage) ); wait( delay(0, TaskPriority::UpdateStorage) );
state Promise<Void> durableInProgress; state Promise<Void> durableInProgress;
data->durableInProgress = durableInProgress.getFuture();
state Version desiredOldestVersion = data->desiredOldestVersion.get(); state Version startOldestVersion = data->storageVersion();
state Version newOldestVersion = data->storageVersion();
state Version desiredVersion = data->desiredOldestVersion.get();
state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
state Future<Void> durableMinDelay = Void(); // Write mutations to storage until we reach the desiredVersion or have written too much (bytesleft)
state Future<Void> durable = Void();
state int64_t ssCommitQuotaBytes;
state Version pendingCommitVersion;
state int64_t bytesWritten = 0;
state bool finalCommit = false;
state bool done = false;
loop { loop {
// Keep making data from mutation log durable, until no data left whose version is <= desiredOldestVersion state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
pendingCommitVersion = data->storageVersion(); // We want to forget things from these data structures atomically with changing oldestVersion (and "before",
ssCommitQuotaBytes = SERVER_KNOBS->STORAGE_COMMIT_BYTES; // since oldestVersion.set() may trigger waiting actors) forgetVersionsBeforeAsync visibly forgets
durableInProgress.reset(); // immediately (without waiting) but asynchronously frees memory.
data->durableInProgress = durableInProgress.getFuture(); Future<Void> finishedForgetting =
durable = data->storage.commit(); // Commit data up to(inclusive) version pendingCommitVersion data->mutableData().forgetVersionsBeforeAsync(newOldestVersion, TaskPriority::UpdateStorage);
durableMinDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::UpdateStorage); data->oldestVersion.set(newOldestVersion);
if (finalCommit) { wait(finishedForgetting);
wait(durable && durableMinDelay); wait(yield(TaskPriority::UpdateStorage));
done = true;
} else {
// Move data start from pendingCommitVersion+1 to SE's commit buffer.
bool _finalCommit = wait(data->storage.asyncPrepareVersionsForCommit(desiredOldestVersion, durable, durableMinDelay));
finalCommit = _finalCommit;
}
debug_advanceMinCommittedVersion( data->thisServerID, pendingCommitVersion );
if(pendingCommitVersion > data->rebootAfterDurableVersion) {
TraceEvent("RebootWhenDurableTriggered", data->thisServerID).detail("PendingCommitVersion", pendingCommitVersion).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
// Otherwise, in the race situation when storage server receives both reboot and
// brokenPromise of durableInProgress, the worker of the storage server will die.
// We will eventually end up with no worker for storage server role.
// The data distributor's buildTeam() will get stuck in building a team
durableInProgress.sendError(please_reboot());
throw please_reboot();
}
durableInProgress.send(Void());
wait( delay(0, TaskPriority::UpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
// are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion,
// because otherwise the latest version of mutableData might be partially loaded.
wait( data->durableVersionLock.take() );
data->popVersion( data->durableVersion.get() + 1 );
// Update durableVersion to pendingCommitVersion, which has been committed.
while (!changeDurableVersion( data, pendingCommitVersion )) {
if(g_network->check_yield(TaskPriority::UpdateStorage)) {
data->durableVersionLock.release();
wait(delay(0, TaskPriority::UpdateStorage));
wait( data->durableVersionLock.take() );
}
}
data->durableVersionLock.release();
if (done) break; if (done) break;
} }
// Set the new durable version as part of the outstanding change set, before commit
if (startOldestVersion != newOldestVersion) data->storage.makeVersionDurable(newOldestVersion);
debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion);
state Future<Void> durable = data->storage.commit();
state Future<Void> durableDelay = Void();
if (bytesLeft > 0) {
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::UpdateStorage);
}
wait(durable);
debug_advanceMinCommittedVersion(data->thisServerID, newOldestVersion);
if (newOldestVersion > data->rebootAfterDurableVersion) {
TraceEvent("RebootWhenDurableTriggered", data->thisServerID)
.detail("NewOldestVersion", newOldestVersion)
.detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
// Otherwise, in the race situation when storage server receives both reboot and
// brokenPromise of durableInProgress, the worker of the storage server will die.
// We will eventually end up with no worker for storage server role.
// The data distributor's buildTeam() will get stuck in building a team
durableInProgress.sendError(please_reboot());
throw please_reboot();
}
durableInProgress.send(Void());
wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to shut
// down, so delay to check for cancellation
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was
// effective and are applied after we change the durable version. Also ensure that we have to lock while calling
// changeDurableVersion, because otherwise the latest version of mutableData might be partially loaded.
wait(data->durableVersionLock.take());
data->popVersion(data->durableVersion.get() + 1);
while (!changeDurableVersion(data, newOldestVersion)) {
if (g_network->check_yield(TaskPriority::UpdateStorage)) {
data->durableVersionLock.release();
wait(delay(0, TaskPriority::UpdateStorage));
wait(data->durableVersionLock.take());
}
}
data->durableVersionLock.release();
//TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion);
wait(durableDelay);
} }
} }
@ -3224,101 +3232,37 @@ void StorageServerDisk::writeMutation( MutationRef mutation ) {
ASSERT(false); ASSERT(false);
} }
ACTOR Future<int64_t> asyncWriteMutationsToCommitBuffer(StorageServer* data, VectorRef<MutationRef> mutations, Version debugVersion, const char* debugContext, int64_t ssCommitQuotaBytes) { void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion,
state int bytesWritten = 0; const char* debugContext) {
state int i = 0; for (const auto& m : mutations) {
for (;i < mutations.size(); i++) {
const auto& m = mutations[i];
DEBUG_MUTATION(debugContext, debugVersion, m).detail("UID", data->thisServerID); DEBUG_MUTATION(debugContext, debugVersion, m).detail("UID", data->thisServerID);
if (m.type == MutationRef::SetValue) { if (m.type == MutationRef::SetValue) {
data->storage.set(KeyValueRef(m.param1, m.param2)); storage->set(KeyValueRef(m.param1, m.param2));
} else if (m.type == MutationRef::ClearRange) { } else if (m.type == MutationRef::ClearRange) {
data->storage.clear(KeyRangeRef(m.param1, m.param2)); storage->clear(KeyRangeRef(m.param1, m.param2));
}
auto mutationBytes = mvccStorageBytes(m);
bytesWritten += mutationBytes;
ssCommitQuotaBytes -= mutationBytes;
if (data->storage.canPipelineCommits() && bytesWritten >= SERVER_KNOBS->STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD) {
bytesWritten = 0;
wait(yield());
} }
} }
return ssCommitQuotaBytes;
} }
ACTOR Future<bool> asyncPrepareVersionsForCommit_impl(StorageServerDisk* self, StorageServer* data, Version desiredOldestVersion, Future<Void> durable, Future<Void>durableMinDelay) { bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion, Version newStorageVersion,
state int64_t ssCommitQuotaBytes = SERVER_KNOBS->STORAGE_COMMIT_BYTES; int64_t& bytesLeft) {
state bool finalCommit = false; if (bytesLeft <= 0) return true;
state Version startOldestVersion = data->storageVersion();
state Version newOldestVersion = data->storageVersion();
state SignalableActorCollection forgetter;
loop {
// While committing previously written data, keep writting new data from later versions until
// 1.) commit is done, or
// 2.) ssCommitQuotaBytes <= 0, or
// 3.) no data in mutation log to write.
if (!data->storage.canPipelineCommits()) {
// Don't write version data while a commit is going on if the storage engine does not support pipelining
wait(durable && durableMinDelay);
}
state Future<Void> stopEarly = data->storage.canPipelineCommits() ? (durable && durableMinDelay) : Never();
// Apply mutations from the mutationLog
auto u = data->getMutationLog().upper_bound(newOldestVersion);
if (u != data->getMutationLog().end() && u->first <= desiredOldestVersion) {
VerUpdateRef const& v = u->second;
newOldestVersion = v.version;
ASSERT( newOldestVersion > data->storageVersion() && newOldestVersion <= desiredOldestVersion );
// TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("asyncPrepareVersionsForCommit", newOldestVersion, KeyRangeRef());
int64_t _ssCommitQuotaBytes = wait(asyncWriteMutationsToCommitBuffer(data, v.mutations, newOldestVersion, "asyncPrepareVersionsForCommit", ssCommitQuotaBytes));
ssCommitQuotaBytes = _ssCommitQuotaBytes;
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors) // Apply mutations from the mutationLog
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory. auto u = data->getMutationLog().upper_bound(prevStorageVersion);
forgetter.add(data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage )); if (u != data->getMutationLog().end() && u->first <= newStorageVersion) {
data->oldestVersion.set( newOldestVersion ); VerUpdateRef const& v = u->second;
if (ssCommitQuotaBytes <= 0) { ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion);
// No quota left. Wait for previous commit to finish. // TODO(alexmiller): Update to version tracking.
wait(durable && durableMinDelay); DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef());
break; writeMutations(v.mutations, v.version, "makeVersionDurable");
} for (const auto& m : v.mutations) bytesLeft -= mvccStorageBytes(m);
if (stopEarly.isReady()) { prevStorageVersion = v.version;
// Previous commit is done. return false;
if (stopEarly.isError()) { } else {
throw stopEarly.getError(); prevStorageVersion = newStorageVersion;
} return true;
break;
}
} else {
// Since there is no data in mutation log, in order to make progress,
// advance it to desiredOldestVersion directly
newOldestVersion = desiredOldestVersion;
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
forgetter.add(data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage ));
data->oldestVersion.set( newOldestVersion );
// No more data in mutation log can be written.
finalCommit = true;
// Wait the previously written data to be committed
wait(durable && durableMinDelay);
break;
}
} }
if (newOldestVersion > startOldestVersion){
// Set the new durable version as part of the outstanding change set, before commit
data->storage.makeVersionDurable( newOldestVersion );
}
debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion);
wait(forgetter.signal());
return finalCommit;
}
Future<bool> StorageServerDisk::asyncPrepareVersionsForCommit(Version desiredOldestVersion, Future<Void> durable, Future<Void>durableMinDelay) {
return asyncPrepareVersionsForCommit_impl(this, data, desiredOldestVersion, durable, durableMinDelay);
} }
// Update data->storage to persist the changes from (data->storageVersion(),version] // Update data->storage to persist the changes from (data->storageVersion(),version]