From 6e89dc3da079414dbcdb1daccd1852296ca2abc0 Mon Sep 17 00:00:00 2001 From: Renxuan Wang Date: Mon, 9 Aug 2021 14:36:07 -0700 Subject: [PATCH] Check in the prototype of MutationLogReader. --- fdbclient/BackupAgent.actor.h | 1 + fdbclient/BackupAgentBase.actor.cpp | 10 + fdbclient/CMakeLists.txt | 2 + fdbclient/MutationLogReader.actor.cpp | 196 ++++++++++++++++++ fdbclient/MutationLogReader.actor.h | 124 +++++++++++ fdbserver/CMakeLists.txt | 1 + .../MutationLogReaderCorrectness.actor.cpp | 161 ++++++++++++++ fdbserver/workloads/UnitTests.actor.cpp | 2 + tests/CMakeLists.txt | 2 + tests/fast/MutationLogReader.toml | 9 + tests/fast/MutationLogReaderCorrectness.toml | 6 + 11 files changed, 514 insertions(+) create mode 100644 fdbclient/MutationLogReader.actor.cpp create mode 100644 fdbclient/MutationLogReader.actor.h create mode 100644 fdbserver/workloads/MutationLogReaderCorrectness.actor.cpp create mode 100644 tests/fast/MutationLogReader.toml create mode 100644 tests/fast/MutationLogReaderCorrectness.toml diff --git a/fdbclient/BackupAgent.actor.h b/fdbclient/BackupAgent.actor.h index 450bc30c63..5669f5d9d7 100644 --- a/fdbclient/BackupAgent.actor.h +++ b/fdbclient/BackupAgent.actor.h @@ -533,6 +533,7 @@ Future eraseLogData(Reference tr, CheckBackupUID = CheckBackupUID::False, Version backupUid = 0); Key getApplyKey(Version version, Key backupUid); +Key getLogKey(Version version, Key backupUid); Version getLogKeyVersion(Key key); std::pair decodeBKMutationLogKey(Key key); Future logError(Database cx, Key keyErrors, const std::string& message); diff --git a/fdbclient/BackupAgentBase.actor.cpp b/fdbclient/BackupAgentBase.actor.cpp index 484daa0984..d62fdda4aa 100644 --- a/fdbclient/BackupAgentBase.actor.cpp +++ b/fdbclient/BackupAgentBase.actor.cpp @@ -229,6 +229,16 @@ Key getApplyKey(Version version, Key backupUid) { return k2.withPrefix(applyLogKeys.begin); } +Key getLogKey(Version version, Key backupUid) { + int64_t vblock = (version - 1) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; + uint64_t v = bigEndian64(version); + uint32_t data = vblock & 0xffffffff; + uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0); + Key k1 = StringRef((uint8_t*)&v, sizeof(uint64_t)).withPrefix(StringRef(&hash, sizeof(uint8_t))); + Key k2 = k1.withPrefix(backupUid); + return k2.withPrefix(backupLogKeys.begin); +} + Version getLogKeyVersion(Key key) { return bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t))); } diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index cbc60f5df4..1b7e7efcf9 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -70,6 +70,8 @@ set(FDBCLIENT_SRCS MultiVersionTransaction.actor.cpp MultiVersionTransaction.h MutationList.h + MutationLogReader.actor.cpp + MutationLogReader.actor.h NameLineage.h NameLineage.cpp NativeAPI.actor.cpp diff --git a/fdbclient/MutationLogReader.actor.cpp b/fdbclient/MutationLogReader.actor.cpp new file mode 100644 index 0000000000..a8202a928b --- /dev/null +++ b/fdbclient/MutationLogReader.actor.cpp @@ -0,0 +1,196 @@ +/* + * MutationLogReader.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/MutationLogReader.actor.h" +#include "fdbrpc/simulator.h" +#include "flow/UnitTest.h" +#include "flow/flow.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +namespace { + +Key versionToKey(Version version, Key prefix) { + uint64_t versionBigEndian = bigEndian64(version); + return KeyRef((uint8_t*)&versionBigEndian, sizeof(uint64_t)).withPrefix(prefix); +} + +Version keyRefToVersion(KeyRef key, int prefixLen) { + return (Version)bigEndian64(*((uint64_t*)key.substr(prefixLen).begin())); +} + +} // namespace + +namespace mutation_log_reader { + +Standalone RangeResultBlock::consume() { + Version stopVersion = std::min(lastVersion, + (firstVersion + CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE - 1) / + CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE) + + 1; // firstVersion rounded up to the nearest 1M versions, then + 1 + int startIndex = indexToRead; + while (indexToRead < result.size() && keyRefToVersion(result[indexToRead].key, prefixLen) < stopVersion) { + ++indexToRead; + } + if (indexToRead < result.size()) { + firstVersion = keyRefToVersion(result[indexToRead].key, prefixLen); // the version of result[indexToRead] + } + return Standalone( + RangeResultRef(result.slice(startIndex, indexToRead), result.more, result.readThrough), result.arena()); +} + +void PipelinedReader::startReading(Database cx) { + reader = getNext(cx); +} + +Future PipelinedReader::getNext(Database cx) { + return getNext_impl(this, cx); +} + +ACTOR Future PipelinedReader::getNext_impl(PipelinedReader* self, Database cx) { + state Transaction tr(cx); + + state GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED, + (g_network->isSimulated() && !g_simulator.speedUpSimulation) + ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES + : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES); + + state Key begin = versionToKey(self->currentBeginVersion, self->prefix); + state Key end = versionToKey(self->endVersion, self->prefix); + + loop { + // Get the lock + wait(self->readerLimit.take()); + + // Read begin to end forever until successful + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + + RangeResult kvs = wait(tr.getRange(KeyRangeRef(begin, end), limits)); + + // No more results, send end of stream + if (!kvs.empty()) { + // Send results to the reads stream + self->reads.send( + RangeResultBlock{ .result = kvs, + .firstVersion = keyRefToVersion(kvs.front().key, self->prefix.size()), + .lastVersion = keyRefToVersion(kvs.back().key, self->prefix.size()), + .hash = self->hash, + .prefixLen = self->prefix.size(), + .indexToRead = 0 }); + } + + if (!kvs.more) { + self->reads.sendError(end_of_stream()); + return Void(); + } + + begin = kvs.readThrough.present() ? kvs.readThrough.get() : keyAfter(kvs.back().key); + + break; + } catch (Error& e) { + if (e.code() == error_code_transaction_too_old) { + // We are using this transaction until it's too old and then resetting to a fresh one, + // so we don't need to delay. + tr.fullReset(); + } else { + wait(tr.onError(e)); + } + } + } + } +} + +} // namespace mutation_log_reader + +ACTOR Future MutationLogReader::initializePQ(MutationLogReader* self) { + state int h; + for (h = 0; h < 256; ++h) { + try { + mutation_log_reader::RangeResultBlock front = waitNext(self->pipelinedReaders[h]->reads.getFuture()); + self->priorityQueue.push(front); + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw e; + } + ++self->finished; + } + } + return Void(); +} + +Future> MutationLogReader::getNext() { + return getNext_impl(this); +} + +ACTOR Future> MutationLogReader::getNext_impl(MutationLogReader* self) { + loop { + if (self->finished == 256) { + state int i; + for (i = 0; i < self->pipelinedReaders.size(); ++i) { + wait(self->pipelinedReaders[i]->done()); + } + throw end_of_stream(); + } + mutation_log_reader::RangeResultBlock top = self->priorityQueue.top(); + self->priorityQueue.pop(); + uint8_t hash = top.hash; + state Standalone ret = top.consume(); + if (top.empty()) { + self->pipelinedReaders[(int)hash]->release(); + try { + mutation_log_reader::RangeResultBlock next = + waitNext(self->pipelinedReaders[(int)hash]->reads.getFuture()); + self->priorityQueue.push(next); + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + ++self->finished; + } else { + throw e; + } + } + } else { + self->priorityQueue.push(top); + } + if (ret.size() != 0) { + return ret; + } + } +} + +namespace { +// UNIT TESTS +TEST_CASE("/fdbclient/mutationlogreader/VersionKeyRefConversion") { + Key prefix = LiteralStringRef("foos"); + + ASSERT(keyRefToVersion(versionToKey(0, prefix), prefix.size()) == 0); + ASSERT(keyRefToVersion(versionToKey(1, prefix), prefix.size()) == 1); + ASSERT(keyRefToVersion(versionToKey(-1, prefix), prefix.size()) == -1); + ASSERT(keyRefToVersion(versionToKey(std::numeric_limits::min(), prefix), prefix.size()) == + std::numeric_limits::min()); + ASSERT(keyRefToVersion(versionToKey(std::numeric_limits::max(), prefix), prefix.size()) == + std::numeric_limits::max()); + + return Void(); +} +} // namespace + +void forceLinkMutationLogReaderTests() {} diff --git a/fdbclient/MutationLogReader.actor.h b/fdbclient/MutationLogReader.actor.h new file mode 100644 index 0000000000..fa92b7df7a --- /dev/null +++ b/fdbclient/MutationLogReader.actor.h @@ -0,0 +1,124 @@ +/* + * MutationLogReader.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_MUTATION_LOG_READER_ACTOR_G_H) +#define FDBCLIENT_MUTATION_LOG_READER_ACTOR_G_H +#include "fdbclient/MutationLogReader.actor.g.h" +#elif !defined(FDBCLIENT_MUTATION_LOG_READER_ACTOR_H) +#define FDBCLIENT_MUTATION_LOG_READER_ACTOR_H + +#include +#include "fdbclient/FDBTypes.h" +#include "fdbclient/NativeAPI.actor.h" +#include "flow/flow.h" +#include "flow/ActorCollection.h" +#include "flow/actorcompiler.h" // has to be last include + +namespace mutation_log_reader { + +struct RangeResultBlock { + RangeResult result; + Version firstVersion; // version of first record + Version lastVersion; // version of last record + uint8_t hash; // points back to the PipelinedReader + int prefixLen; // size of keyspace, uid, and hash prefix + int indexToRead; // index of first unconsumed record + + Standalone consume(); + + bool empty() { return indexToRead == result.size(); } + + bool operator<(const RangeResultBlock& r) const { + // We want a min heap. The standard C++ priority queue is a max heap. + return firstVersion > r.firstVersion; + } +}; + +class PipelinedReader { +public: + PipelinedReader(uint8_t h, Version bv, Version ev, unsigned pd, Key p) + : readerLimit(pd), hash(h), prefix(StringRef(&hash, sizeof(uint8_t)).withPrefix(p)), beginVersion(bv), + endVersion(ev), currentBeginVersion(bv), pipelineDepth(pd) {} + + void startReading(Database cx); + Future getNext(Database cx); + ACTOR static Future getNext_impl(PipelinedReader* self, Database cx); + + void release() { readerLimit.release(); } + + PromiseStream reads; + FlowLock readerLimit; + uint8_t hash; + Key prefix; // "\xff\x02/alog/UID/hash/" for restore, or "\xff\x02/blog/UID/hash/" for backup + + Future done() { return reader; } + +private: + Version beginVersion, endVersion, currentBeginVersion; + unsigned pipelineDepth; + Future reader; +}; + +} // namespace mutation_log_reader + +class MutationLogReader : public ReferenceCounted { +public: + MutationLogReader() : finished(256) {} + MutationLogReader(Database cx, Version bv, Version ev, Key uid, Key beginKey, unsigned pd) + : beginVersion(bv), endVersion(ev), prefix(uid.withPrefix(beginKey)), pipelineDepth(pd), finished(0) { + pipelinedReaders.reserve(256); + if (pipelineDepth > 0) { + for (int h = 0; h < 256; ++h) { + pipelinedReaders.emplace_back(new mutation_log_reader::PipelinedReader( + (uint8_t)h, beginVersion, endVersion, pipelineDepth, prefix)); + pipelinedReaders[h]->startReading(cx); + } + } + } + + ACTOR static Future> Create(Database cx, + Version bv, + Version ev, + Key uid, + Key beginKey, + unsigned pd) { + state Reference self(new MutationLogReader(cx, bv, ev, uid, beginKey, pd)); + wait(self->initializePQ(self.getPtr())); + return self; + } + + Future> getNext(); + +private: + ACTOR static Future initializePQ(MutationLogReader* self); + ACTOR static Future> getNext_impl(MutationLogReader* self); + + std::vector> pipelinedReaders; + std::priority_queue priorityQueue; + Version beginVersion, endVersion; + Key prefix; // "\xff\x02/alog/UID/" for restore, or "\xff\x02/blog/UID/" for backup + unsigned pipelineDepth; + unsigned finished; +}; + +#include "flow/unactorcompiler.h" +#endif diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 6f416598b0..b2e5ff445f 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -198,6 +198,7 @@ set(FDBSERVER_SRCS workloads/MemoryKeyValueStore.h workloads/MemoryLifetime.actor.cpp workloads/MetricLogging.actor.cpp + workloads/MutationLogReaderCorrectness.actor.cpp workloads/ParallelRestore.actor.cpp workloads/Performance.actor.cpp workloads/Ping.actor.cpp diff --git a/fdbserver/workloads/MutationLogReaderCorrectness.actor.cpp b/fdbserver/workloads/MutationLogReaderCorrectness.actor.cpp new file mode 100644 index 0000000000..832adbd443 --- /dev/null +++ b/fdbserver/workloads/MutationLogReaderCorrectness.actor.cpp @@ -0,0 +1,161 @@ +/* + * MutationLogReaderCorrectness.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "fdbrpc/simulator.h" +#include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/BackupContainer.h" +#include "fdbclient/MutationLogReader.actor.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/Error.h" +#include "flow/IRandom.h" +#include "flow/flow.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +struct MutationLogReaderCorrectnessWorkload : TestWorkload { + bool enabled; + int records; + Version versionRange; + Version versionIncrement; + Version beginVersion; + Version endVersion; + Key uid; + Key baLogRangePrefix; + bool debug = false; + + Version recordVersion(int index) { return beginVersion + versionIncrement * index; } + + Key recordKey(int index) { return getLogKey(recordVersion(index), uid); } + + Value recordValue(int index) { + Version v = recordVersion(index); + return StringRef(format("%" PRId64 " (%" PRIx64 ")", v, v)); + } + + MutationLogReaderCorrectnessWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + enabled = !clientId; // only do this on the "first" client + uid = BinaryWriter::toValue(deterministicRandom()->randomUniqueID(), Unversioned()); + baLogRangePrefix = uid.withPrefix(backupLogKeys.begin); + + beginVersion = deterministicRandom()->randomInt64( + 0, std::numeric_limits::max()); // intentionally not max of int64 + records = deterministicRandom()->randomInt(0, 1e6); + versionRange = deterministicRandom()->randomInt64(records, std::numeric_limits::max()); + versionIncrement = versionRange / records; + + // The version immediately after the last actual record version + endVersion = recordVersion(records - 1) + 1; + } + + std::string description() const override { return "MutationLogReaderCorrectness"; } + + Future start(Database const& cx) override { + if (enabled) { + return _start(cx, this); + } + return Void(); + } + + ACTOR Future _start(Database cx, MutationLogReaderCorrectnessWorkload* self) { + state Transaction tr(cx); + state int iStart = 0; + state int batchSize = 1000; + printf("Records: %d\n", self->records); + printf("BeginVersion: %" PRId64 "\n", self->beginVersion); + printf("EndVersion: %" PRId64 "\n", self->endVersion); + + while (iStart < self->records) { + loop { + try { + tr.reset(); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + int i = iStart; + state int iEnd = std::min(iStart + batchSize, self->records); + + for (; i < iEnd; ++i) { + Key key = self->recordKey(i); + Value value = self->recordValue(i); + tr.set(key, value); + } + + wait(tr.commit()); + iStart = iEnd; + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + + state Reference reader = wait(MutationLogReader::Create( + cx, self->beginVersion, self->endVersion, self->uid, backupLogKeys.begin, /*pipelineDepth=*/1)); + + state int nextExpectedRecord = 0; + + try { + loop { + state Standalone results = wait(reader->getNext()); + + for (const auto& rec : results) { + Key expectedKey = self->recordKey(nextExpectedRecord); + Value expectedValue = self->recordValue(nextExpectedRecord); + + bool keyMatch = rec.key == expectedKey; + bool valueMatch = rec.value == expectedValue; + + if (self->debug) { + if (!keyMatch) { + printf("key: %s\n", rec.key.printable().c_str()); + printf("expected key: %s\n", expectedKey.printable().c_str()); + } + if (!valueMatch) { + printf("value: %s\n", rec.value.printable().c_str()); + printf("expected value: %s\n", expectedValue.printable().c_str()); + } + } + + ASSERT(keyMatch); + ASSERT(valueMatch); + ++nextExpectedRecord; + } + } + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + throw e; + } + } + + printf("records expected: %d\n", self->records); + printf("records found: %d\n", nextExpectedRecord); + + ASSERT_EQ(nextExpectedRecord, self->records); + + return Void(); + } + + Future check(Database const& cx) override { return true; } + + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory MutationLogReaderCorrectnessWorkloadFactory( + "MutationLogReaderCorrectness"); diff --git a/fdbserver/workloads/UnitTests.actor.cpp b/fdbserver/workloads/UnitTests.actor.cpp index 6bf9eb39dc..b9b792afe4 100644 --- a/fdbserver/workloads/UnitTests.actor.cpp +++ b/fdbserver/workloads/UnitTests.actor.cpp @@ -33,6 +33,7 @@ void forceLinkStreamCipherTests(); #endif void forceLinkParallelStreamTests(); void forceLinkSimExternalConnectionTests(); +void forceLinkMutationLogReaderTests(); void forceLinkIThreadPoolTests(); struct UnitTestWorkload : TestWorkload { @@ -77,6 +78,7 @@ struct UnitTestWorkload : TestWorkload { #endif forceLinkParallelStreamTests(); forceLinkSimExternalConnectionTests(); + forceLinkMutationLogReaderTests(); forceLinkIThreadPoolTests(); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9b0aa0ce55..e91ef08269 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -140,6 +140,8 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/LowLatencySingleClog.toml IGNORE) add_fdb_test(TEST_FILES fast/MemoryLifetime.toml) add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml) + add_fdb_test(TEST_FILES fast/MutationLogReader.toml) + add_fdb_test(TEST_FILES fast/MutationLogReaderCorrectness.toml) add_fdb_test(TEST_FILES fast/ProtocolVersion.toml) add_fdb_test(TEST_FILES fast/RandomSelector.toml) add_fdb_test(TEST_FILES fast/RandomUnitTests.toml) diff --git a/tests/fast/MutationLogReader.toml b/tests/fast/MutationLogReader.toml new file mode 100644 index 0000000000..ed696027ca --- /dev/null +++ b/tests/fast/MutationLogReader.toml @@ -0,0 +1,9 @@ +[[test]] +testTitle = 'MutationLogReader Test' +useDB = false +startDelay = 0 + + [[test.workload]] + testName = 'UnitTests' + maxTestCases = 1 + testsMatching = '/fdbclient/mutationlogreader/VersionKeyRefConversion' diff --git a/tests/fast/MutationLogReaderCorrectness.toml b/tests/fast/MutationLogReaderCorrectness.toml new file mode 100644 index 0000000000..5f171800ba --- /dev/null +++ b/tests/fast/MutationLogReaderCorrectness.toml @@ -0,0 +1,6 @@ +[[test]] +testTitle = 'MutationLogReaderCorrectness' +useDB = true + + [[test.workload]] + testName = 'MutationLogReaderCorrectness'