Test-only bug fixes in Redwood along with debug logging detail improvements. Added clearRemapQueue() to Pager to more cleanly and reliably expire all old data and process the remap queue, fixing a bug where with certain configuration parameters and a lot of data the DestructiveSanityCheck would fail because it would not run cleanup long enough. Added more parameters to performance/set unit test.

This commit is contained in:
Steve Atherton 2021-11-19 01:00:14 -08:00
parent 124e7eb4f2
commit 3901d60548
2 changed files with 101 additions and 32 deletions

View File

@ -201,6 +201,9 @@ public:
virtual int getLogicalPageSize() const = 0;
virtual int getPagesPerExtent() const = 0;
// Write detail fields with pager stats to a trace event
virtual void toTraceEvent(TraceEvent& e) const = 0;
// Allocate a new page ID for a subsequent write. The page will be considered in-use after the next commit
// regardless of whether or not it was written to.
virtual Future<LogicalPageID> newPageID() = 0;
@ -304,6 +307,9 @@ public:
// invalidate them or keep their versions around until the snapshots are no longer in use.
virtual void setOldestReadableVersion(Version v) = 0;
// Advance the commit version and the oldest readble version and commit until the remap queue is empty.
virtual Future<Void> clearRemapQueue() = 0;
protected:
~IPager2() {} // Destruction should be done using close()/dispose() from the IClosable interface
};

View File

@ -2427,20 +2427,8 @@ public:
}
TraceEvent e(SevInfo, "RedwoodRecoveredPager");
e.detail("FileName", self->filename.c_str());
e.detail("LogicalFileSize", self->pHeader->pageCount * self->physicalPageSize);
e.detail("PhysicalFileSize", fileSize);
e.detail("OpenedExisting", exists);
e.detail("CommittedVersion", self->pHeader->committedVersion);
e.detail("LogicalPageSize", self->logicalPageSize);
e.detail("PhysicalPageSize", self->physicalPageSize);
self->remapQueue.toTraceEvent(e, "RemapQueue");
self->delayedFreeList.toTraceEvent(e, "FreeQueue");
self->freeList.toTraceEvent(e, "DelayedFreeQueue");
self->extentUsedList.toTraceEvent(e, "UsedExtentQueue");
self->extentFreeList.toTraceEvent(e, "FreeExtentQueue");
self->getStorageBytes().toTraceEvent(e);
self->toTraceEvent(e);
e.log();
self->recoveryVersion = self->pHeader->committedVersion;
@ -2457,6 +2445,22 @@ public:
return Void();
}
void toTraceEvent(TraceEvent& e) const override {
e.detail("FileName", filename.c_str());
e.detail("LogicalFileSize", pHeader->pageCount * physicalPageSize);
e.detail("PhysicalFileSize", filePageCountPending * physicalPageSize);
e.detail("CommittedVersion", pHeader->committedVersion);
e.detail("LogicalPageSize", logicalPageSize);
e.detail("PhysicalPageSize", physicalPageSize);
remapQueue.toTraceEvent(e, "RemapQueue");
delayedFreeList.toTraceEvent(e, "FreeQueue");
freeList.toTraceEvent(e, "DelayedFreeQueue");
extentUsedList.toTraceEvent(e, "UsedExtentQueue");
extentFreeList.toTraceEvent(e, "FreeExtentQueue");
getStorageBytes().toTraceEvent(e);
}
ACTOR static void extentCacheClear_impl(DWALPager* self) { wait(self->extentCache.clear()); }
void extentCacheClear() override { extentCacheClear_impl(this); }
@ -3432,24 +3436,39 @@ public:
// Cutoff is the version we can pop to
state RemappedPage cutoff(oldestRetainedVersion - self->remapCleanupWindow);
debug_printf("DWALPager(%s) remapCleanup cutoff %s oldestRetailedVersion=%" PRId64 " \n",
self->filename.c_str(),
::toString(cutoff).c_str(),
oldestRetainedVersion);
// Minimum version we must pop to before obeying stop command.
state Version minStopVersion =
cutoff.version - (BUGGIFY ? deterministicRandom()->randomInt(0, 10)
: (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG));
self->remapDestinationsSimOnly.clear();
debug_printf("DWALPager(%s) remapCleanup cutoff.version %" PRId64 " oldestRetainedVersion=%" PRId64
" minStopVersion %" PRId64 " items=%" PRId64 "\n",
self->filename.c_str(),
cutoff.version,
oldestRetainedVersion,
minStopVersion,
self->remapQueue.numEntries);
if (g_network->isSimulated()) {
self->remapDestinationsSimOnly.clear();
}
state int sinceYield = 0;
loop {
state Optional<RemappedPage> p = wait(self->remapQueue.pop(cutoff));
debug_printf("DWALPager(%s) remapCleanup popped %s\n", self->filename.c_str(), ::toString(p).c_str());
debug_printf("DWALPager(%s) remapCleanup popped %s items=%" PRId64 "\n",
self->filename.c_str(),
::toString(p).c_str(),
self->remapQueue.numEntries);
// Stop if we have reached the cutoff version, which is the start of the cleanup coalescing window
if (!p.present()) {
debug_printf("DWALPager(%s) remapCleanup pop failed minVer=%" PRId64 " cutoffVer=%" PRId64
" items=%" PRId64 "\n",
self->filename.c_str(),
minStopVersion,
cutoff.version,
self->remapQueue.numEntries);
break;
}
@ -3471,7 +3490,11 @@ public:
}
}
debug_printf("DWALPager(%s) remapCleanup stopped (stop=%d)\n", self->filename.c_str(), self->remapCleanupStop);
debug_printf("DWALPager(%s) remapCleanup stopped stopSignal=%d free=%lld delayedFree=%lld\n",
self->filename.c_str(),
self->remapCleanupStop,
self->freeList.numEntries,
self->delayedFreeList.numEntries);
signal.send(Void());
wait(tasks.getResult());
return Void();
@ -3772,6 +3795,28 @@ private:
Future<Void> onEvictable() const { return ready(readFuture) && writeFuture; }
};
ACTOR static Future<Void> clearRemapQueue_impl(DWALPager* self) {
// Wait for outstanding commit.
wait(self->commitFuture);
// While the remap queue isn't empty, advance the commit version and oldest readable version
// by the remap cleanup window and commit
while (self->remapQueue.numEntries > 0) {
self->setOldestReadableVersion(self->getLastCommittedVersion());
wait(self->commit(self->getLastCommittedVersion() + self->remapCleanupWindow + 1));
}
// One final commit because the active commit cycle may have popped from the remap queue
wait(self->commit(self->getLastCommittedVersion() + 1));
TraceEvent e("RedwoodClearRemapQueue");
self->toTraceEvent(e);
e.log();
return Void();
}
Future<Void> clearRemapQueue() override { return clearRemapQueue_impl(this); }
// Physical page sizes will always be a multiple of 4k because AsyncFileNonDurable requires
// this in simulation, and it also makes sense for current SSDs.
// Allowing a smaller 'logical' page size is very useful for testing.
@ -4786,6 +4831,11 @@ public:
m_latestCommit = m_init;
}
void toTraceEvent(TraceEvent& e) const {
m_pager->toTraceEvent(e);
m_lazyClearQueue.toTraceEvent(e, "LazyClearQueue");
}
ACTOR static Future<int> incrementalLazyClear(VersionedBTree* self) {
ASSERT(self->m_lazyClearActor.isReady());
self->m_lazyClearStop = false;
@ -4960,8 +5010,10 @@ public:
Future<Void> commit(Version v) { return commit_impl(this, v); }
ACTOR static Future<Void> clearAllAndCheckSanity_impl(VersionedBTree* self) {
// Clear and commit
debug_printf("Clearing tree.\n");
self->clear(KeyRangeRef(dbBegin.key, dbEnd.key));
wait(self->commit(self->getLastCommittedVersion() + 1));
// Loop commits until the the lazy delete queue is completely processed.
loop {
@ -4975,11 +5027,6 @@ public:
}
}
// Forget all but the latest version of the tree.
debug_printf("Discarding all old versions.\n");
self->setOldestReadableVersion(self->getLastCommittedVersion());
wait(self->commit(self->getLastCommittedVersion() + 1));
// The lazy delete queue should now be empty and contain only the new page to start writing to
// on the next commit.
LazyClearQueueT::QueueState s = self->m_lazyClearQueue.getState();
@ -4990,6 +5037,13 @@ public:
ASSERT(self->m_pHeader->height == 1);
ASSERT(self->m_pHeader->root.count == 1);
// Let pager do more commits to finish all cleanup of old pages
wait(self->m_pager->clearRemapQueue());
TraceEvent e("RedwoodDestructiveSanityCheck");
self->toTraceEvent(e);
e.log();
// From the pager's perspective the only pages that should be in use are the btree root and
// the previously mentioned lazy delete queue page.
int64_t userPageCount = wait(self->m_pager->getUserPageCount());
@ -9156,8 +9210,12 @@ TEST_CASE("Lredwood/correctness/btree") {
state int maxCommitSize =
params.getInt("maxCommitSize")
.orDefault(shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6)));
state double setExistingKeyProbability =
params.getDouble("setExistingKeyProbability").orDefault(deterministicRandom()->random01() * .5);
state double clearProbability =
params.getDouble("clearProbability").orDefault(deterministicRandom()->random01() * .1);
state double clearExistingBoundaryProbability =
params.getDouble("clearProbability").orDefault(deterministicRandom()->random01() * .5);
state double clearSingleKeyProbability =
params.getDouble("clearSingleKeyProbability").orDefault(deterministicRandom()->random01());
state double clearPostSetProbability =
@ -9190,7 +9248,9 @@ TEST_CASE("Lredwood/correctness/btree") {
printf("maxKeySize: %d\n", maxKeySize);
printf("maxValueSize: %d\n", maxValueSize);
printf("maxCommitSize: %d\n", maxCommitSize);
printf("setExistingKeyProbability: %f\n", setExistingKeyProbability);
printf("clearProbability: %f\n", clearProbability);
printf("clearExistingBoundaryProbability: %f\n", clearExistingBoundaryProbability);
printf("clearSingleKeyProbability: %f\n", clearSingleKeyProbability);
printf("clearPostSetProbability: %f\n", clearPostSetProbability);
printf("coldStartProbability: %f\n", coldStartProbability);
@ -9252,12 +9312,12 @@ TEST_CASE("Lredwood/correctness/btree") {
Key end = (deterministicRandom()->random01() < .01) ? keyAfter(start) : randomKV(maxKeySize, 1).key;
// Sometimes replace start and/or end with a close actual (previously used) value
if (deterministicRandom()->random01() < .10) {
if (deterministicRandom()->random01() < clearExistingBoundaryProbability) {
auto i = keys.upper_bound(start);
if (i != keys.end())
start = *i;
}
if (deterministicRandom()->random01() < .10) {
if (deterministicRandom()->random01() < clearExistingBoundaryProbability) {
auto i = keys.upper_bound(end);
if (i != keys.end())
end = *i;
@ -9319,7 +9379,7 @@ TEST_CASE("Lredwood/correctness/btree") {
// Set a key
KeyValue kv = randomKV(maxKeySize, maxValueSize);
// Sometimes change key to a close previously used key
if (deterministicRandom()->random01() < .01) {
if (deterministicRandom()->random01() < setExistingKeyProbability) {
auto i = keys.upper_bound(kv.key);
if (i != keys.end())
kv.key = StringRef(kv.arena(), *i);
@ -9812,11 +9872,10 @@ TEST_CASE(":/redwood/performance/set") {
state double intervalStart = timer();
state double start = intervalStart;
state int sinceYield = 0;
state Version version = btree->getLastCommittedVersion();
if (insertRecords) {
while (kvBytesTotal < kvBytesTarget) {
Version lastVer = btree->getLastCommittedVersion();
state Version version = lastVer + 1;
state int changesThisVersion =
deterministicRandom()->randomInt(0, maxRecordsPerCommit - recordsThisCommit + 1);
@ -9852,6 +9911,10 @@ TEST_CASE(":/redwood/performance/set") {
if (kvBytesThisCommit >= maxKVBytesPerCommit || recordsThisCommit >= maxRecordsPerCommit) {
btree->setOldestReadableVersion(btree->getLastCommittedVersion());
wait(commit);
TraceEvent e("RedwoodState");
btree->toTraceEvent(e);
e.log();
printf("Cumulative %.2f MB keyValue bytes written at %.2f MB/s\n",
kvBytesTotal / 1e6,
kvBytesTotal / (timer() - start) / 1e6);
@ -9864,7 +9927,7 @@ TEST_CASE(":/redwood/performance/set") {
// actor state object
double* pIntervalStart = &intervalStart;
commit = map(btree->commit(version), [=](Void result) {
commit = map(btree->commit(++version), [=](Void result) {
if (!traceMetrics) {
printf("%s\n", g_redwoodMetrics.toString(true).c_str());
}