mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-31 10:14:52 +08:00
Check in the prototype of MutationLogReader.
This commit is contained in:
parent
55059b9a02
commit
6e89dc3da0
@ -533,6 +533,7 @@ Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr,
|
||||
CheckBackupUID = CheckBackupUID::False,
|
||||
Version backupUid = 0);
|
||||
Key getApplyKey(Version version, Key backupUid);
|
||||
Key getLogKey(Version version, Key backupUid);
|
||||
Version getLogKeyVersion(Key key);
|
||||
std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key);
|
||||
Future<Void> logError(Database cx, Key keyErrors, const std::string& message);
|
||||
|
@ -229,6 +229,16 @@ Key getApplyKey(Version version, Key backupUid) {
|
||||
return k2.withPrefix(applyLogKeys.begin);
|
||||
}
|
||||
|
||||
Key getLogKey(Version version, Key backupUid) {
|
||||
int64_t vblock = (version - 1) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
|
||||
uint64_t v = bigEndian64(version);
|
||||
uint32_t data = vblock & 0xffffffff;
|
||||
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
|
||||
Key k1 = StringRef((uint8_t*)&v, sizeof(uint64_t)).withPrefix(StringRef(&hash, sizeof(uint8_t)));
|
||||
Key k2 = k1.withPrefix(backupUid);
|
||||
return k2.withPrefix(backupLogKeys.begin);
|
||||
}
|
||||
|
||||
Version getLogKeyVersion(Key key) {
|
||||
return bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t)));
|
||||
}
|
||||
|
@ -70,6 +70,8 @@ set(FDBCLIENT_SRCS
|
||||
MultiVersionTransaction.actor.cpp
|
||||
MultiVersionTransaction.h
|
||||
MutationList.h
|
||||
MutationLogReader.actor.cpp
|
||||
MutationLogReader.actor.h
|
||||
NameLineage.h
|
||||
NameLineage.cpp
|
||||
NativeAPI.actor.cpp
|
||||
|
196
fdbclient/MutationLogReader.actor.cpp
Normal file
196
fdbclient/MutationLogReader.actor.cpp
Normal file
@ -0,0 +1,196 @@
|
||||
/*
|
||||
* MutationLogReader.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/MutationLogReader.actor.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
|
||||
Key versionToKey(Version version, Key prefix) {
|
||||
uint64_t versionBigEndian = bigEndian64(version);
|
||||
return KeyRef((uint8_t*)&versionBigEndian, sizeof(uint64_t)).withPrefix(prefix);
|
||||
}
|
||||
|
||||
Version keyRefToVersion(KeyRef key, int prefixLen) {
|
||||
return (Version)bigEndian64(*((uint64_t*)key.substr(prefixLen).begin()));
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace mutation_log_reader {
|
||||
|
||||
Standalone<RangeResultRef> RangeResultBlock::consume() {
|
||||
Version stopVersion = std::min(lastVersion,
|
||||
(firstVersion + CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE - 1) /
|
||||
CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE) +
|
||||
1; // firstVersion rounded up to the nearest 1M versions, then + 1
|
||||
int startIndex = indexToRead;
|
||||
while (indexToRead < result.size() && keyRefToVersion(result[indexToRead].key, prefixLen) < stopVersion) {
|
||||
++indexToRead;
|
||||
}
|
||||
if (indexToRead < result.size()) {
|
||||
firstVersion = keyRefToVersion(result[indexToRead].key, prefixLen); // the version of result[indexToRead]
|
||||
}
|
||||
return Standalone<RangeResultRef>(
|
||||
RangeResultRef(result.slice(startIndex, indexToRead), result.more, result.readThrough), result.arena());
|
||||
}
|
||||
|
||||
void PipelinedReader::startReading(Database cx) {
|
||||
reader = getNext(cx);
|
||||
}
|
||||
|
||||
Future<Void> PipelinedReader::getNext(Database cx) {
|
||||
return getNext_impl(this, cx);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> PipelinedReader::getNext_impl(PipelinedReader* self, Database cx) {
|
||||
state Transaction tr(cx);
|
||||
|
||||
state GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED,
|
||||
(g_network->isSimulated() && !g_simulator.speedUpSimulation)
|
||||
? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES
|
||||
: CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
|
||||
|
||||
state Key begin = versionToKey(self->currentBeginVersion, self->prefix);
|
||||
state Key end = versionToKey(self->endVersion, self->prefix);
|
||||
|
||||
loop {
|
||||
// Get the lock
|
||||
wait(self->readerLimit.take());
|
||||
|
||||
// Read begin to end forever until successful
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
RangeResult kvs = wait(tr.getRange(KeyRangeRef(begin, end), limits));
|
||||
|
||||
// No more results, send end of stream
|
||||
if (!kvs.empty()) {
|
||||
// Send results to the reads stream
|
||||
self->reads.send(
|
||||
RangeResultBlock{ .result = kvs,
|
||||
.firstVersion = keyRefToVersion(kvs.front().key, self->prefix.size()),
|
||||
.lastVersion = keyRefToVersion(kvs.back().key, self->prefix.size()),
|
||||
.hash = self->hash,
|
||||
.prefixLen = self->prefix.size(),
|
||||
.indexToRead = 0 });
|
||||
}
|
||||
|
||||
if (!kvs.more) {
|
||||
self->reads.sendError(end_of_stream());
|
||||
return Void();
|
||||
}
|
||||
|
||||
begin = kvs.readThrough.present() ? kvs.readThrough.get() : keyAfter(kvs.back().key);
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_transaction_too_old) {
|
||||
// We are using this transaction until it's too old and then resetting to a fresh one,
|
||||
// so we don't need to delay.
|
||||
tr.fullReset();
|
||||
} else {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace mutation_log_reader
|
||||
|
||||
ACTOR Future<Void> MutationLogReader::initializePQ(MutationLogReader* self) {
|
||||
state int h;
|
||||
for (h = 0; h < 256; ++h) {
|
||||
try {
|
||||
mutation_log_reader::RangeResultBlock front = waitNext(self->pipelinedReaders[h]->reads.getFuture());
|
||||
self->priorityQueue.push(front);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_end_of_stream) {
|
||||
throw e;
|
||||
}
|
||||
++self->finished;
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> MutationLogReader::getNext() {
|
||||
return getNext_impl(this);
|
||||
}
|
||||
|
||||
ACTOR Future<Standalone<RangeResultRef>> MutationLogReader::getNext_impl(MutationLogReader* self) {
|
||||
loop {
|
||||
if (self->finished == 256) {
|
||||
state int i;
|
||||
for (i = 0; i < self->pipelinedReaders.size(); ++i) {
|
||||
wait(self->pipelinedReaders[i]->done());
|
||||
}
|
||||
throw end_of_stream();
|
||||
}
|
||||
mutation_log_reader::RangeResultBlock top = self->priorityQueue.top();
|
||||
self->priorityQueue.pop();
|
||||
uint8_t hash = top.hash;
|
||||
state Standalone<RangeResultRef> ret = top.consume();
|
||||
if (top.empty()) {
|
||||
self->pipelinedReaders[(int)hash]->release();
|
||||
try {
|
||||
mutation_log_reader::RangeResultBlock next =
|
||||
waitNext(self->pipelinedReaders[(int)hash]->reads.getFuture());
|
||||
self->priorityQueue.push(next);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_end_of_stream) {
|
||||
++self->finished;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self->priorityQueue.push(top);
|
||||
}
|
||||
if (ret.size() != 0) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
// UNIT TESTS
|
||||
TEST_CASE("/fdbclient/mutationlogreader/VersionKeyRefConversion") {
|
||||
Key prefix = LiteralStringRef("foos");
|
||||
|
||||
ASSERT(keyRefToVersion(versionToKey(0, prefix), prefix.size()) == 0);
|
||||
ASSERT(keyRefToVersion(versionToKey(1, prefix), prefix.size()) == 1);
|
||||
ASSERT(keyRefToVersion(versionToKey(-1, prefix), prefix.size()) == -1);
|
||||
ASSERT(keyRefToVersion(versionToKey(std::numeric_limits<int64_t>::min(), prefix), prefix.size()) ==
|
||||
std::numeric_limits<int64_t>::min());
|
||||
ASSERT(keyRefToVersion(versionToKey(std::numeric_limits<int64_t>::max(), prefix), prefix.size()) ==
|
||||
std::numeric_limits<int64_t>::max());
|
||||
|
||||
return Void();
|
||||
}
|
||||
} // namespace
|
||||
|
||||
void forceLinkMutationLogReaderTests() {}
|
124
fdbclient/MutationLogReader.actor.h
Normal file
124
fdbclient/MutationLogReader.actor.h
Normal file
@ -0,0 +1,124 @@
|
||||
/*
|
||||
* MutationLogReader.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_MUTATION_LOG_READER_ACTOR_G_H)
|
||||
#define FDBCLIENT_MUTATION_LOG_READER_ACTOR_G_H
|
||||
#include "fdbclient/MutationLogReader.actor.g.h"
|
||||
#elif !defined(FDBCLIENT_MUTATION_LOG_READER_ACTOR_H)
|
||||
#define FDBCLIENT_MUTATION_LOG_READER_ACTOR_H
|
||||
|
||||
#include <deque>
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
namespace mutation_log_reader {
|
||||
|
||||
struct RangeResultBlock {
|
||||
RangeResult result;
|
||||
Version firstVersion; // version of first record
|
||||
Version lastVersion; // version of last record
|
||||
uint8_t hash; // points back to the PipelinedReader
|
||||
int prefixLen; // size of keyspace, uid, and hash prefix
|
||||
int indexToRead; // index of first unconsumed record
|
||||
|
||||
Standalone<RangeResultRef> consume();
|
||||
|
||||
bool empty() { return indexToRead == result.size(); }
|
||||
|
||||
bool operator<(const RangeResultBlock& r) const {
|
||||
// We want a min heap. The standard C++ priority queue is a max heap.
|
||||
return firstVersion > r.firstVersion;
|
||||
}
|
||||
};
|
||||
|
||||
class PipelinedReader {
|
||||
public:
|
||||
PipelinedReader(uint8_t h, Version bv, Version ev, unsigned pd, Key p)
|
||||
: readerLimit(pd), hash(h), prefix(StringRef(&hash, sizeof(uint8_t)).withPrefix(p)), beginVersion(bv),
|
||||
endVersion(ev), currentBeginVersion(bv), pipelineDepth(pd) {}
|
||||
|
||||
void startReading(Database cx);
|
||||
Future<Void> getNext(Database cx);
|
||||
ACTOR static Future<Void> getNext_impl(PipelinedReader* self, Database cx);
|
||||
|
||||
void release() { readerLimit.release(); }
|
||||
|
||||
PromiseStream<RangeResultBlock> reads;
|
||||
FlowLock readerLimit;
|
||||
uint8_t hash;
|
||||
Key prefix; // "\xff\x02/alog/UID/hash/" for restore, or "\xff\x02/blog/UID/hash/" for backup
|
||||
|
||||
Future<Void> done() { return reader; }
|
||||
|
||||
private:
|
||||
Version beginVersion, endVersion, currentBeginVersion;
|
||||
unsigned pipelineDepth;
|
||||
Future<Void> reader;
|
||||
};
|
||||
|
||||
} // namespace mutation_log_reader
|
||||
|
||||
class MutationLogReader : public ReferenceCounted<MutationLogReader> {
|
||||
public:
|
||||
MutationLogReader() : finished(256) {}
|
||||
MutationLogReader(Database cx, Version bv, Version ev, Key uid, Key beginKey, unsigned pd)
|
||||
: beginVersion(bv), endVersion(ev), prefix(uid.withPrefix(beginKey)), pipelineDepth(pd), finished(0) {
|
||||
pipelinedReaders.reserve(256);
|
||||
if (pipelineDepth > 0) {
|
||||
for (int h = 0; h < 256; ++h) {
|
||||
pipelinedReaders.emplace_back(new mutation_log_reader::PipelinedReader(
|
||||
(uint8_t)h, beginVersion, endVersion, pipelineDepth, prefix));
|
||||
pipelinedReaders[h]->startReading(cx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Reference<MutationLogReader>> Create(Database cx,
|
||||
Version bv,
|
||||
Version ev,
|
||||
Key uid,
|
||||
Key beginKey,
|
||||
unsigned pd) {
|
||||
state Reference<MutationLogReader> self(new MutationLogReader(cx, bv, ev, uid, beginKey, pd));
|
||||
wait(self->initializePQ(self.getPtr()));
|
||||
return self;
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> getNext();
|
||||
|
||||
private:
|
||||
ACTOR static Future<Void> initializePQ(MutationLogReader* self);
|
||||
ACTOR static Future<Standalone<RangeResultRef>> getNext_impl(MutationLogReader* self);
|
||||
|
||||
std::vector<std::unique_ptr<mutation_log_reader::PipelinedReader>> pipelinedReaders;
|
||||
std::priority_queue<mutation_log_reader::RangeResultBlock> priorityQueue;
|
||||
Version beginVersion, endVersion;
|
||||
Key prefix; // "\xff\x02/alog/UID/" for restore, or "\xff\x02/blog/UID/" for backup
|
||||
unsigned pipelineDepth;
|
||||
unsigned finished;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
@ -198,6 +198,7 @@ set(FDBSERVER_SRCS
|
||||
workloads/MemoryKeyValueStore.h
|
||||
workloads/MemoryLifetime.actor.cpp
|
||||
workloads/MetricLogging.actor.cpp
|
||||
workloads/MutationLogReaderCorrectness.actor.cpp
|
||||
workloads/ParallelRestore.actor.cpp
|
||||
workloads/Performance.actor.cpp
|
||||
workloads/Ping.actor.cpp
|
||||
|
161
fdbserver/workloads/MutationLogReaderCorrectness.actor.cpp
Normal file
161
fdbserver/workloads/MutationLogReaderCorrectness.actor.cpp
Normal file
@ -0,0 +1,161 @@
|
||||
/*
|
||||
* MutationLogReaderCorrectness.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
#include "fdbclient/MutationLogReader.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct MutationLogReaderCorrectnessWorkload : TestWorkload {
|
||||
bool enabled;
|
||||
int records;
|
||||
Version versionRange;
|
||||
Version versionIncrement;
|
||||
Version beginVersion;
|
||||
Version endVersion;
|
||||
Key uid;
|
||||
Key baLogRangePrefix;
|
||||
bool debug = false;
|
||||
|
||||
Version recordVersion(int index) { return beginVersion + versionIncrement * index; }
|
||||
|
||||
Key recordKey(int index) { return getLogKey(recordVersion(index), uid); }
|
||||
|
||||
Value recordValue(int index) {
|
||||
Version v = recordVersion(index);
|
||||
return StringRef(format("%" PRId64 " (%" PRIx64 ")", v, v));
|
||||
}
|
||||
|
||||
MutationLogReaderCorrectnessWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
enabled = !clientId; // only do this on the "first" client
|
||||
uid = BinaryWriter::toValue(deterministicRandom()->randomUniqueID(), Unversioned());
|
||||
baLogRangePrefix = uid.withPrefix(backupLogKeys.begin);
|
||||
|
||||
beginVersion = deterministicRandom()->randomInt64(
|
||||
0, std::numeric_limits<int32_t>::max()); // intentionally not max of int64
|
||||
records = deterministicRandom()->randomInt(0, 1e6);
|
||||
versionRange = deterministicRandom()->randomInt64(records, std::numeric_limits<Version>::max());
|
||||
versionIncrement = versionRange / records;
|
||||
|
||||
// The version immediately after the last actual record version
|
||||
endVersion = recordVersion(records - 1) + 1;
|
||||
}
|
||||
|
||||
std::string description() const override { return "MutationLogReaderCorrectness"; }
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (enabled) {
|
||||
return _start(cx, this);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _start(Database cx, MutationLogReaderCorrectnessWorkload* self) {
|
||||
state Transaction tr(cx);
|
||||
state int iStart = 0;
|
||||
state int batchSize = 1000;
|
||||
printf("Records: %d\n", self->records);
|
||||
printf("BeginVersion: %" PRId64 "\n", self->beginVersion);
|
||||
printf("EndVersion: %" PRId64 "\n", self->endVersion);
|
||||
|
||||
while (iStart < self->records) {
|
||||
loop {
|
||||
try {
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
int i = iStart;
|
||||
state int iEnd = std::min(iStart + batchSize, self->records);
|
||||
|
||||
for (; i < iEnd; ++i) {
|
||||
Key key = self->recordKey(i);
|
||||
Value value = self->recordValue(i);
|
||||
tr.set(key, value);
|
||||
}
|
||||
|
||||
wait(tr.commit());
|
||||
iStart = iEnd;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state Reference<MutationLogReader> reader = wait(MutationLogReader::Create(
|
||||
cx, self->beginVersion, self->endVersion, self->uid, backupLogKeys.begin, /*pipelineDepth=*/1));
|
||||
|
||||
state int nextExpectedRecord = 0;
|
||||
|
||||
try {
|
||||
loop {
|
||||
state Standalone<RangeResultRef> results = wait(reader->getNext());
|
||||
|
||||
for (const auto& rec : results) {
|
||||
Key expectedKey = self->recordKey(nextExpectedRecord);
|
||||
Value expectedValue = self->recordValue(nextExpectedRecord);
|
||||
|
||||
bool keyMatch = rec.key == expectedKey;
|
||||
bool valueMatch = rec.value == expectedValue;
|
||||
|
||||
if (self->debug) {
|
||||
if (!keyMatch) {
|
||||
printf("key: %s\n", rec.key.printable().c_str());
|
||||
printf("expected key: %s\n", expectedKey.printable().c_str());
|
||||
}
|
||||
if (!valueMatch) {
|
||||
printf("value: %s\n", rec.value.printable().c_str());
|
||||
printf("expected value: %s\n", expectedValue.printable().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(keyMatch);
|
||||
ASSERT(valueMatch);
|
||||
++nextExpectedRecord;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_end_of_stream) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
printf("records expected: %d\n", self->records);
|
||||
printf("records found: %d\n", nextExpectedRecord);
|
||||
|
||||
ASSERT_EQ(nextExpectedRecord, self->records);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<MutationLogReaderCorrectnessWorkload> MutationLogReaderCorrectnessWorkloadFactory(
|
||||
"MutationLogReaderCorrectness");
|
@ -33,6 +33,7 @@ void forceLinkStreamCipherTests();
|
||||
#endif
|
||||
void forceLinkParallelStreamTests();
|
||||
void forceLinkSimExternalConnectionTests();
|
||||
void forceLinkMutationLogReaderTests();
|
||||
void forceLinkIThreadPoolTests();
|
||||
|
||||
struct UnitTestWorkload : TestWorkload {
|
||||
@ -77,6 +78,7 @@ struct UnitTestWorkload : TestWorkload {
|
||||
#endif
|
||||
forceLinkParallelStreamTests();
|
||||
forceLinkSimExternalConnectionTests();
|
||||
forceLinkMutationLogReaderTests();
|
||||
forceLinkIThreadPoolTests();
|
||||
}
|
||||
|
||||
|
@ -140,6 +140,8 @@ if(WITH_PYTHON)
|
||||
add_fdb_test(TEST_FILES fast/LowLatencySingleClog.toml IGNORE)
|
||||
add_fdb_test(TEST_FILES fast/MemoryLifetime.toml)
|
||||
add_fdb_test(TEST_FILES fast/MoveKeysCycle.toml)
|
||||
add_fdb_test(TEST_FILES fast/MutationLogReader.toml)
|
||||
add_fdb_test(TEST_FILES fast/MutationLogReaderCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES fast/ProtocolVersion.toml)
|
||||
add_fdb_test(TEST_FILES fast/RandomSelector.toml)
|
||||
add_fdb_test(TEST_FILES fast/RandomUnitTests.toml)
|
||||
|
9
tests/fast/MutationLogReader.toml
Normal file
9
tests/fast/MutationLogReader.toml
Normal file
@ -0,0 +1,9 @@
|
||||
[[test]]
|
||||
testTitle = 'MutationLogReader Test'
|
||||
useDB = false
|
||||
startDelay = 0
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'UnitTests'
|
||||
maxTestCases = 1
|
||||
testsMatching = '/fdbclient/mutationlogreader/VersionKeyRefConversion'
|
6
tests/fast/MutationLogReaderCorrectness.toml
Normal file
6
tests/fast/MutationLogReaderCorrectness.toml
Normal file
@ -0,0 +1,6 @@
|
||||
[[test]]
|
||||
testTitle = 'MutationLogReaderCorrectness'
|
||||
useDB = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'MutationLogReaderCorrectness'
|
Loading…
x
Reference in New Issue
Block a user