mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 10:45:56 +08:00
fix issues
This commit is contained in:
parent
af881298de
commit
29d61b708f
fdbserver
@ -48,35 +48,32 @@ enum class PagerEventReasons { pointRead = 0, rangeRead, rangePrefetch, commit,
|
||||
static const std::string PagerEventReasonsCodes[] = { "Get", "GetR", "GetRPF", "Commit", "LazyClr", "Meta" };
|
||||
|
||||
static const int nonBtreeLevel = 0;
|
||||
static const int rootLevel = 1;
|
||||
static const std::pair<PagerEvents, PagerEventReasons> possibleEventReasonPairs[] = {
|
||||
{ PagerEvents::pagerCacheLookup, PagerEventReasons::pointRead },
|
||||
{ PagerEvents::pagerCacheLookup, PagerEventReasons::rangeRead },
|
||||
{ PagerEvents::pagerCacheLookup, PagerEventReasons::lazyClear },
|
||||
{ PagerEvents::pagerCacheLookup, PagerEventReasons::metaData },
|
||||
{ PagerEvents::pagerCacheLookup, PagerEventReasons::commit },
|
||||
{ PagerEvents::pagerCacheHit, PagerEventReasons::pointRead },
|
||||
{ PagerEvents::pagerCacheHit, PagerEventReasons::rangeRead },
|
||||
{ PagerEvents::pagerCacheHit, PagerEventReasons::lazyClear },
|
||||
{ PagerEvents::pagerCacheHit, PagerEventReasons::metaData },
|
||||
{ PagerEvents::pagerCacheHit, PagerEventReasons::commit },
|
||||
{ PagerEvents::pagerCacheMiss, PagerEventReasons::pointRead },
|
||||
{ PagerEvents::pagerCacheMiss, PagerEventReasons::rangeRead },
|
||||
{ PagerEvents::pagerCacheMiss, PagerEventReasons::lazyClear },
|
||||
{ PagerEvents::pagerCacheMiss, PagerEventReasons::metaData },
|
||||
{ PagerEvents::pagerWrite, PagerEventReasons::commit },
|
||||
{ PagerEvents::pagerCacheMiss, PagerEventReasons::commit },
|
||||
{ PagerEvents::pagerWrite, PagerEventReasons::metaData },
|
||||
{ PagerEvents::pagerWrite, PagerEventReasons::lazyClear },
|
||||
};
|
||||
static const std::pair<PagerEvents, PagerEventReasons> L0PossibleEventReasonPairs[] = {
|
||||
{ PagerEvents::pagerCacheLookup, PagerEventReasons::rangePrefetch },
|
||||
{ PagerEvents::pagerCacheLookup, PagerEventReasons::metaData },
|
||||
{ PagerEvents::pagerCacheLookup, PagerEventReasons::commit },
|
||||
{ PagerEvents::pagerCacheHit, PagerEventReasons::rangePrefetch },
|
||||
{ PagerEvents::pagerCacheHit, PagerEventReasons::metaData },
|
||||
{ PagerEvents::pagerCacheMiss, PagerEventReasons::rangePrefetch },
|
||||
{ PagerEvents::pagerCacheMiss, PagerEventReasons::metaData },
|
||||
{ PagerEvents::pagerWrite, PagerEventReasons::metaData },
|
||||
{ PagerEvents::pagerWrite, PagerEventReasons::commit },
|
||||
};
|
||||
|
||||
// Represents a block of memory in a 4096-byte aligned location held by an Arena.
|
||||
@ -167,8 +164,8 @@ public:
|
||||
|
||||
class IPagerSnapshot {
|
||||
public:
|
||||
virtual Future<Reference<const ArenaPage>> getPhysicalPage(PagerEventReasons r,
|
||||
unsigned int l,
|
||||
virtual Future<Reference<const ArenaPage>> getPhysicalPage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
LogicalPageID pageID,
|
||||
bool cacheable,
|
||||
bool nohit) = 0;
|
||||
@ -207,13 +204,14 @@ public:
|
||||
// Replace the contents of a page with new data across *all* versions.
|
||||
// Existing holders of a page reference for pageID, read from any version,
|
||||
// may see the effects of this write.
|
||||
virtual void updatePage(PagerEventReasons r, unsigned int l, LogicalPageID pageID, Reference<ArenaPage> data) = 0;
|
||||
virtual void updatePage(PagerEventReasons reason, unsigned int level, LogicalPageID pageID, Reference<ArenaPage> data) = 0;
|
||||
|
||||
// Try to atomically update the contents of a page as of version v in the next commit.
|
||||
// If the pager is unable to do this at this time, it may choose to write the data to a new page ID
|
||||
// instead and return the new page ID to the caller. Otherwise the original pageID argument will be returned.
|
||||
// If a new page ID is returned, the old page ID will be freed as of version v
|
||||
virtual Future<LogicalPageID> atomicUpdatePage(unsigned int l,
|
||||
virtual Future<LogicalPageID> atomicUpdatePage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
LogicalPageID pageID,
|
||||
Reference<ArenaPage> data,
|
||||
Version v) = 0;
|
||||
@ -234,8 +232,8 @@ public:
|
||||
// Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read.
|
||||
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
|
||||
// considered likely to be needed soon.
|
||||
virtual Future<Reference<ArenaPage>> readPage(PagerEventReasons r,
|
||||
unsigned int l,
|
||||
virtual Future<Reference<ArenaPage>> readPage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
LogicalPageID pageID,
|
||||
bool cacheable = true,
|
||||
bool noHit = false) = 0;
|
||||
|
@ -489,7 +489,7 @@ public:
|
||||
debug_printf("FIFOQueue::Cursor(%s) writePage\n", toString().c_str());
|
||||
VALGRIND_MAKE_MEM_DEFINED(raw()->begin(), offset);
|
||||
VALGRIND_MAKE_MEM_DEFINED(raw()->begin() + offset, queue->dataBytesPerPage - raw()->endOffset);
|
||||
queue->pager->updatePage(PagerEventReasons::commit, nonBtreeLevel, pageID, page);
|
||||
queue->pager->updatePage(PagerEventReasons::metaData, nonBtreeLevel, pageID, page);
|
||||
if (firstPageIDWritten == invalidLogicalPageID) {
|
||||
firstPageIDWritten = pageID;
|
||||
}
|
||||
@ -2112,7 +2112,7 @@ public:
|
||||
// If this fails, the backup header is still in tact for the next recovery attempt.
|
||||
if (recoveredHeader) {
|
||||
// Write the header to page 0
|
||||
wait(self->writeHeaderPage(PagerEventReasons::metaData, nonBtreeLevel, self->headerPage));
|
||||
wait(self->writeHeaderPage(0, self->headerPage));
|
||||
|
||||
// Wait for all outstanding writes to complete
|
||||
wait(self->operations.signalAndCollapse());
|
||||
@ -2335,8 +2335,8 @@ public:
|
||||
|
||||
Future<LogicalPageID> newExtentPageID(QueueID queueID) override { return newExtentPageID_impl(this, queueID); }
|
||||
|
||||
Future<Void> writePhysicalPage(PagerEventReasons r,
|
||||
unsigned int l,
|
||||
Future<Void> writePhysicalPage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
PhysicalPageID pageID,
|
||||
Reference<ArenaPage> page,
|
||||
bool header = false) {
|
||||
@ -2347,8 +2347,8 @@ public:
|
||||
page->begin());
|
||||
|
||||
++g_redwoodMetrics.metric.pagerDiskWrite;
|
||||
auto& eventReasons = g_redwoodMetrics.level(l).metrics.eventReasons;
|
||||
eventReasons.addEventReason(PagerEvents::pagerWrite, r);
|
||||
auto& eventReasons = g_redwoodMetrics.level(level).metrics.eventReasons;
|
||||
eventReasons.addEventReason(PagerEvents::pagerWrite, reason);
|
||||
|
||||
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
|
||||
page->updateChecksum(pageID);
|
||||
@ -2378,11 +2378,11 @@ public:
|
||||
return f;
|
||||
}
|
||||
|
||||
Future<Void> writeHeaderPage(PagerEventReasons r, PhysicalPageID pageID, Reference<ArenaPage> page) {
|
||||
return writePhysicalPage(r, nonBtreeLevel, pageID, page, true);
|
||||
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<ArenaPage> page) {
|
||||
return writePhysicalPage(PagerEventReasons::metaData, nonBtreeLevel, pageID, page, true);
|
||||
}
|
||||
|
||||
void updatePage(PagerEventReasons r, unsigned int l, LogicalPageID pageID, Reference<ArenaPage> data) override {
|
||||
void updatePage(PagerEventReasons reason, unsigned int level, LogicalPageID pageID, Reference<ArenaPage> data) override {
|
||||
// Get the cache entry for this page, without counting it as a cache hit as we're replacing its contents now
|
||||
// or as a cache miss because there is no benefit to the page already being in cache
|
||||
// this metaData reason will not be accounted since its not a cache hit or cache miss
|
||||
@ -2402,11 +2402,11 @@ public:
|
||||
// future reads of the version are not allowed) and the write of the next newest version over top
|
||||
// of the original page begins.
|
||||
if (!cacheEntry.initialized()) {
|
||||
cacheEntry.writeFuture = writePhysicalPage(r, l, pageID, data);
|
||||
cacheEntry.writeFuture = writePhysicalPage(reason, level, pageID, data);
|
||||
} else if (cacheEntry.reading()) {
|
||||
// Wait for the read to finish, then start the write.
|
||||
cacheEntry.writeFuture = map(success(cacheEntry.readFuture), [=](Void) {
|
||||
writePhysicalPage(r, l, pageID, data);
|
||||
writePhysicalPage(reason, level, pageID, data);
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
@ -2414,24 +2414,25 @@ public:
|
||||
// writes happen in the correct order
|
||||
else if (cacheEntry.writing()) {
|
||||
cacheEntry.writeFuture = map(cacheEntry.writeFuture, [=](Void) {
|
||||
writePhysicalPage(r, l, pageID, data);
|
||||
writePhysicalPage(reason, level, pageID, data);
|
||||
return Void();
|
||||
});
|
||||
} else {
|
||||
cacheEntry.writeFuture = writePhysicalPage(r, l, pageID, data);
|
||||
cacheEntry.writeFuture = writePhysicalPage(reason, level, pageID, data);
|
||||
}
|
||||
|
||||
// Always update the page contents immediately regardless of what happened above.
|
||||
cacheEntry.readFuture = data;
|
||||
}
|
||||
|
||||
Future<LogicalPageID> atomicUpdatePage(unsigned int l,
|
||||
Future<LogicalPageID> atomicUpdatePage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
LogicalPageID pageID,
|
||||
Reference<ArenaPage> data,
|
||||
Version v) override {
|
||||
debug_printf("DWALPager(%s) op=writeAtomic %s @%" PRId64 "\n", filename.c_str(), toString(pageID).c_str(), v);
|
||||
Future<LogicalPageID> f = map(newPageID(), [=](LogicalPageID newPageID) {
|
||||
updatePage(PagerEventReasons::commit, l, newPageID, data);
|
||||
updatePage(reason, level, newPageID, data);
|
||||
// TODO: Possibly limit size of remap queue since it must be recovered on cold start
|
||||
RemappedPage r{ v, pageID, newPageID };
|
||||
remapQueue.pushBack(r);
|
||||
@ -2609,25 +2610,24 @@ public:
|
||||
|
||||
// Reads the most recent version of pageID, either previously committed or written using updatePage()
|
||||
// in the current commit
|
||||
Future<Reference<ArenaPage>> readPage(PagerEventReasons r,
|
||||
unsigned int l,
|
||||
Future<Reference<ArenaPage>> readPage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
LogicalPageID pageID,
|
||||
bool cacheable,
|
||||
bool noHit = false) override {
|
||||
// Use cached page if present, without triggering a cache hit.
|
||||
// Otherwise, read the page and return it but don't add it to the cache
|
||||
auto& eventReasons = g_redwoodMetrics.level(l).metrics.eventReasons;
|
||||
auto& eventReasons = g_redwoodMetrics.level(level).metrics.eventReasons;
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheLookup, reason);
|
||||
if (!cacheable) {
|
||||
debug_printf("DWALPager(%s) op=readUncached %s\n", filename.c_str(), toString(pageID).c_str());
|
||||
PageCacheEntry* pCacheEntry = pageCache.getIfExists(pageID);
|
||||
if (pCacheEntry != nullptr) {
|
||||
++g_redwoodMetrics.metric.pagerProbeHit;
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheLookup, r);
|
||||
debug_printf("DWALPager(%s) op=readUncachedHit %s\n", filename.c_str(), toString(pageID).c_str());
|
||||
return pCacheEntry->readFuture;
|
||||
}
|
||||
++g_redwoodMetrics.metric.pagerProbeMiss;
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheLookup, r);
|
||||
debug_printf("DWALPager(%s) op=readUncachedMiss %s\n", filename.c_str(), toString(pageID).c_str());
|
||||
return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise);
|
||||
}
|
||||
@ -2646,12 +2646,10 @@ public:
|
||||
cacheEntry.writeFuture = Void();
|
||||
|
||||
++g_redwoodMetrics.metric.pagerCacheMiss;
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheMiss, r);
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheLookup, r);
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheMiss, reason);
|
||||
} else {
|
||||
++g_redwoodMetrics.metric.pagerCacheHit;
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheHit, r);
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheLookup, r);
|
||||
eventReasons.addEventReason(PagerEvents::pagerCacheHit, reason);
|
||||
}
|
||||
return cacheEntry.readFuture;
|
||||
}
|
||||
@ -2685,14 +2683,14 @@ public:
|
||||
return (PhysicalPageID)pageID;
|
||||
}
|
||||
|
||||
Future<Reference<ArenaPage>> readPageAtVersion(PagerEventReasons r,
|
||||
unsigned int l,
|
||||
Future<Reference<ArenaPage>> readPageAtVersion(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
LogicalPageID logicalID,
|
||||
Version v,
|
||||
bool cacheable,
|
||||
bool noHit) {
|
||||
PhysicalPageID physicalID = getPhysicalPageID(logicalID, v);
|
||||
return readPage(r, l, physicalID, cacheable, noHit);
|
||||
return readPage(reason, level, physicalID, cacheable, noHit);
|
||||
}
|
||||
|
||||
void releaseExtentReadLock() override { concurrentExtentReads->release(); }
|
||||
@ -3082,7 +3080,7 @@ public:
|
||||
debug_printf("DWALPager(%s) commit begin\n", self->filename.c_str());
|
||||
|
||||
// Write old committed header to Page 1
|
||||
self->writeHeaderPage(PagerEventReasons::commit, 1, self->lastCommittedHeaderPage);
|
||||
self->writeHeaderPage(1, self->lastCommittedHeaderPage);
|
||||
|
||||
// Trigger the remap eraser to stop and then wait for it.
|
||||
self->remapCleanupStop = true;
|
||||
@ -3114,7 +3112,7 @@ public:
|
||||
}
|
||||
|
||||
// Update header on disk and sync again.
|
||||
wait(self->writeHeaderPage(PagerEventReasons::commit, nonBtreeLevel, self->headerPage));
|
||||
wait(self->writeHeaderPage(0, self->headerPage));
|
||||
if (g_network->getCurrentTask() > TaskPriority::DiskWrite) {
|
||||
wait(delay(0, TaskPriority::DiskWrite));
|
||||
}
|
||||
@ -3407,15 +3405,15 @@ public:
|
||||
: pager(pager), metaKey(meta), version(version), expired(expiredFuture) {}
|
||||
~DWALPagerSnapshot() override {}
|
||||
|
||||
Future<Reference<const ArenaPage>> getPhysicalPage(PagerEventReasons r,
|
||||
unsigned int l,
|
||||
Future<Reference<const ArenaPage>> getPhysicalPage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
LogicalPageID pageID,
|
||||
bool cacheable,
|
||||
bool noHit) override {
|
||||
if (expired.isError()) {
|
||||
throw expired.getError();
|
||||
}
|
||||
return map(pager->readPageAtVersion(r, l, pageID, version, cacheable, noHit),
|
||||
return map(pager->readPageAtVersion(reason, level, pageID, version, cacheable, noHit),
|
||||
[=](Reference<ArenaPage> p) { return Reference<const ArenaPage>(std::move(p)); });
|
||||
}
|
||||
|
||||
@ -4983,6 +4981,7 @@ private:
|
||||
|
||||
// Writes entries to 1 or more pages and return a vector of boundary keys with their ArenaPage(s)
|
||||
ACTOR static Future<Standalone<VectorRef<RedwoodRecordRef>>> writePages(VersionedBTree* self,
|
||||
PagerEventReasons reason,
|
||||
const RedwoodRecordRef* lowerBound,
|
||||
const RedwoodRecordRef* upperBound,
|
||||
VectorRef<RedwoodRecordRef> entries,
|
||||
@ -5117,7 +5116,7 @@ private:
|
||||
// LogicalPageIDs in previousID and try to update them atomically.
|
||||
if (pagesToBuild.size() == 1 && previousID.size() == pages.size()) {
|
||||
for (k = 0; k < pages.size(); ++k) {
|
||||
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(currLevel, previousID[k], pages[k], v));
|
||||
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(reason, currLevel, previousID[k], pages[k], v));
|
||||
childPageID.push_back(records.arena(), id);
|
||||
}
|
||||
} else {
|
||||
@ -5175,7 +5174,7 @@ private:
|
||||
while (records.size() > 1) {
|
||||
self->m_pHeader->height = ++height;
|
||||
Standalone<VectorRef<RedwoodRecordRef>> newRecords =
|
||||
wait(writePages(self, &dbBegin, &dbEnd, records, height, version, BTreePageIDRef()));
|
||||
wait(writePages(self, PagerEventReasons::metaData, &dbBegin, &dbEnd, records, height, version, BTreePageIDRef()));
|
||||
debug_printf("Wrote a new root level at version %" PRId64 " height %d size %lu pages\n",
|
||||
version,
|
||||
height,
|
||||
@ -5197,8 +5196,8 @@ private:
|
||||
return pager->tryEvictPage(id.front());
|
||||
}
|
||||
|
||||
ACTOR static Future<Reference<const ArenaPage>> readPage(PagerEventReasons r,
|
||||
unsigned int l,
|
||||
ACTOR static Future<Reference<const ArenaPage>> readPage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
Reference<IPagerSnapshot> snapshot,
|
||||
BTreePageIDRef id,
|
||||
bool forLazyClear = false,
|
||||
@ -5212,13 +5211,13 @@ private:
|
||||
state Reference<const ArenaPage> page;
|
||||
|
||||
if (id.size() == 1) {
|
||||
Reference<const ArenaPage> p = wait(snapshot->getPhysicalPage(r, l, id.front(), cacheable, false));
|
||||
Reference<const ArenaPage> p = wait(snapshot->getPhysicalPage(reason, level, id.front(), cacheable, false));
|
||||
page = std::move(p);
|
||||
} else {
|
||||
ASSERT(!id.empty());
|
||||
std::vector<Future<Reference<const ArenaPage>>> reads;
|
||||
for (auto& pageID : id) {
|
||||
reads.push_back(snapshot->getPhysicalPage(r, l, pageID, cacheable, false));
|
||||
reads.push_back(snapshot->getPhysicalPage(reason, level, pageID, cacheable, false));
|
||||
}
|
||||
std::vector<Reference<const ArenaPage>> pages = wait(getAll(reads));
|
||||
// TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager.
|
||||
@ -5283,6 +5282,7 @@ private:
|
||||
|
||||
// Write new version of pageID at version v using page as its data.
|
||||
// Attempts to reuse original id(s) in btPageID, returns BTreePageID.
|
||||
// UpdateBtreePage is only called from commitSubTree funciton
|
||||
ACTOR static Future<BTreePageIDRef> updateBTreePage(VersionedBTree* self,
|
||||
BTreePageIDRef oldID,
|
||||
Arena* arena,
|
||||
@ -5305,7 +5305,7 @@ private:
|
||||
|
||||
if (oldID.size() == 1) {
|
||||
BTreePage* btPage = (BTreePage*)page->begin();
|
||||
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(btPage->height, oldID.front(), page, writeVersion));
|
||||
LogicalPageID id = wait(self->m_pager->atomicUpdatePage(PagerEventReasons::commit, btPage->height, oldID.front(), page, writeVersion));
|
||||
newID.front() = id;
|
||||
} else {
|
||||
state std::vector<Reference<ArenaPage>> pages;
|
||||
@ -5326,7 +5326,7 @@ private:
|
||||
for (; i < pages.size(); ++i) {
|
||||
BTreePage* btPage = (BTreePage*)page->begin();
|
||||
LogicalPageID id =
|
||||
wait(self->m_pager->atomicUpdatePage(btPage->height, oldID[i], pages[i], writeVersion));
|
||||
wait(self->m_pager->atomicUpdatePage(PagerEventReasons::commit, btPage->height, oldID[i], pages[i], writeVersion));
|
||||
newID[i] = id;
|
||||
}
|
||||
}
|
||||
@ -5648,12 +5648,12 @@ private:
|
||||
};
|
||||
|
||||
ACTOR static Future<Void> commitSubtree(
|
||||
unsigned int l,
|
||||
VersionedBTree* self,
|
||||
Reference<IPagerSnapshot> snapshot,
|
||||
MutationBuffer* mutationBuffer,
|
||||
BTreePageIDRef rootID,
|
||||
bool isLeaf,
|
||||
unsigned int l,
|
||||
MutationBuffer::const_iterator mBegin, // greatest mutation boundary <= subtreeLowerBound->key
|
||||
MutationBuffer::const_iterator mEnd, // least boundary >= subtreeUpperBound->key
|
||||
InternalPageSliceUpdate* update) {
|
||||
@ -5989,6 +5989,7 @@ private:
|
||||
|
||||
// Rebuild new page(s).
|
||||
state Standalone<VectorRef<RedwoodRecordRef>> entries = wait(writePages(self,
|
||||
PagerEventReasons::metaData,
|
||||
&update->subtreeLowerBound,
|
||||
&update->subtreeUpperBound,
|
||||
merged,
|
||||
@ -6165,7 +6166,7 @@ private:
|
||||
|
||||
// If this page has height of 2 then its children are leaf nodes
|
||||
recursions.push_back(self->commitSubtree(
|
||||
btPage->height, self, snapshot, mutationBuffer, pageID, btPage->height == 2, mBegin, mEnd, &u));
|
||||
self, snapshot, mutationBuffer, pageID, btPage->height == 2, btPage->height, mBegin, mEnd, &u));
|
||||
}
|
||||
|
||||
debug_printf(
|
||||
@ -6336,6 +6337,7 @@ private:
|
||||
|
||||
Standalone<VectorRef<RedwoodRecordRef>> newChildEntries =
|
||||
wait(writePages(self,
|
||||
PagerEventReasons::metaData,
|
||||
&update->subtreeLowerBound,
|
||||
&update->subtreeUpperBound,
|
||||
modifier.rebuild,
|
||||
@ -6396,12 +6398,12 @@ private:
|
||||
MutationBuffer::const_iterator mBegin = mutations->upper_bound(all.subtreeLowerBound.key);
|
||||
--mBegin;
|
||||
MutationBuffer::const_iterator mEnd = mutations->lower_bound(all.subtreeUpperBound.key);
|
||||
wait(commitSubtree(0,
|
||||
self,
|
||||
wait(commitSubtree(self,
|
||||
self->m_pager->getReadSnapshot(latestVersion),
|
||||
mutations,
|
||||
rootPageID,
|
||||
self->m_pHeader->height == 1,
|
||||
self->m_pHeader->height,
|
||||
mBegin,
|
||||
mEnd,
|
||||
&all));
|
||||
@ -6414,7 +6416,7 @@ private:
|
||||
Reference<ArenaPage> page = self->m_pager->newPageBuffer();
|
||||
makeEmptyRoot(page);
|
||||
self->m_pHeader->height = 1;
|
||||
self->m_pager->updatePage(PagerEventReasons::commit, rootLevel, newRootID, page);
|
||||
self->m_pager->updatePage(PagerEventReasons::commit, self->m_pHeader->height, newRootID, page);
|
||||
rootPageID = BTreePageIDRef((LogicalPageID*)&newRootID, 1);
|
||||
} else {
|
||||
Standalone<VectorRef<RedwoodRecordRef>> newRootLevel(all.newLinks, all.newLinks.arena());
|
||||
@ -6524,9 +6526,9 @@ public:
|
||||
PathEntry& back() { return path.back(); }
|
||||
void popPath() { path.pop_back(); }
|
||||
|
||||
Future<Void> pushPage(PagerEventReasons r, unsigned int l, const BTreePage::BinaryTree::Cursor& link) {
|
||||
Future<Void> pushPage(PagerEventReasons reason, const BTreePage::BinaryTree::Cursor& link) {
|
||||
debug_printf("pushPage(link=%s)\n", link.get().toString(false).c_str());
|
||||
return map(readPage(r, l, pager, link.get().getChildPage()), [=](Reference<const ArenaPage> p) {
|
||||
return map(readPage(reason, path.back().btPage()->height - 1, pager, link.get().getChildPage()), [=](Reference<const ArenaPage> p) {
|
||||
#if REDWOOD_DEBUG
|
||||
path.push_back({ p, getCursor(p, link), link.get().getChildPage() });
|
||||
#else
|
||||
@ -6536,9 +6538,9 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> pushPage(PagerEventReasons r, unsigned int l, BTreePageIDRef id) {
|
||||
Future<Void> pushPage(PagerEventReasons reason, BTreePageIDRef id) {
|
||||
debug_printf("pushPage(root=%s)\n", ::toString(id).c_str());
|
||||
return map(readPage(r, l, pager, id), [=](Reference<const ArenaPage> p) {
|
||||
return map(readPage(reason, btree->m_pHeader->height, pager, id), [=](Reference<const ArenaPage> p) {
|
||||
#if REDWOOD_DEBUG
|
||||
path.push_back({ p, getCursor(p, dbBegin, dbEnd), id });
|
||||
#else
|
||||
@ -6555,7 +6557,7 @@ public:
|
||||
path.clear();
|
||||
path.reserve(6);
|
||||
valid = false;
|
||||
return pushPage(PagerEventReasons::commit, rootLevel, root);
|
||||
return pushPage(PagerEventReasons::pointRead, root);
|
||||
}
|
||||
|
||||
// Seeks cursor to query if it exists, the record before or after it, or an undefined and invalid
|
||||
@ -6568,7 +6570,7 @@ public:
|
||||
// If non-zero is returned then the cursor is valid and the return value is logically equivalent
|
||||
// to query.compare(cursor.get())
|
||||
|
||||
ACTOR Future<int> seek_impl(PagerEventReasons r, BTreeCursor* self, RedwoodRecordRef query) {
|
||||
ACTOR Future<int> seek_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) {
|
||||
state RedwoodRecordRef internalPageQuery = query.withMaxPageID();
|
||||
self->path.resize(1);
|
||||
debug_printf("seek(%s) start cursor = %s\n", query.toString().c_str(), self->toString().c_str());
|
||||
@ -6592,7 +6594,7 @@ public:
|
||||
if (entry.cursor.seekLessThan(internalPageQuery) && entry.cursor.get().value.present()) {
|
||||
debug_printf(
|
||||
"seek(%s) loop seek success cursor=%s\n", query.toString().c_str(), self->toString().c_str());
|
||||
Future<Void> f = self->pushPage(r, nonBtreeLevel, entry.cursor);
|
||||
Future<Void> f = self->pushPage(reason, entry.cursor);
|
||||
wait(f);
|
||||
} else {
|
||||
self->valid = false;
|
||||
@ -6603,18 +6605,18 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
Future<int> seek(PagerEventReasons r, RedwoodRecordRef query) { return seek_impl(r, this, query); }
|
||||
Future<int> seek(RedwoodRecordRef query, PagerEventReasons reason) { return seek_impl(this, query, reason); }
|
||||
|
||||
ACTOR Future<Void> seekGTE_impl(PagerEventReasons r, BTreeCursor* self, RedwoodRecordRef query) {
|
||||
ACTOR Future<Void> seekGTE_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) {
|
||||
debug_printf("seekGTE(%s) start\n", query.toString().c_str());
|
||||
int cmp = wait(self->seek(r, query));
|
||||
int cmp = wait(self->seek(query, reason));
|
||||
if (cmp > 0 || (cmp == 0 && !self->isValid())) {
|
||||
wait(self->moveNext());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> seekGTE(PagerEventReasons r, RedwoodRecordRef query) { return seekGTE_impl(r, this, query); }
|
||||
Future<Void> seekGTE(RedwoodRecordRef query, PagerEventReasons reason) { return seekGTE_impl(this, query, reason); }
|
||||
|
||||
// Start fetching sibling nodes in the forward or backward direction, stopping after recordLimit or byteLimit
|
||||
void prefetch(KeyRef rangeEnd, bool directionForward, int recordLimit, int byteLimit) {
|
||||
@ -6670,16 +6672,16 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> seekLT_impl(PagerEventReasons r, BTreeCursor* self, RedwoodRecordRef query) {
|
||||
ACTOR Future<Void> seekLT_impl(BTreeCursor* self, RedwoodRecordRef query, PagerEventReasons reason) {
|
||||
debug_printf("seekLT(%s) start\n", query.toString().c_str());
|
||||
int cmp = wait(self->seek(r, query));
|
||||
int cmp = wait(self->seek(query, reason));
|
||||
if (cmp <= 0) {
|
||||
wait(self->movePrev());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> seekLT(PagerEventReasons r, RedwoodRecordRef query) { return seekLT_impl(r, this, query); }
|
||||
Future<Void> seekLT(RedwoodRecordRef query, PagerEventReasons reason) { return seekLT_impl(this, query, reason); }
|
||||
|
||||
ACTOR Future<Void> move_impl(BTreeCursor* self, bool forward) {
|
||||
// Try to the move cursor at the end of the path in the correct direction
|
||||
@ -6728,7 +6730,7 @@ public:
|
||||
ASSERT(entry.cursor.movePrev());
|
||||
ASSERT(entry.cursor.get().value.present());
|
||||
}
|
||||
wait(self->pushPage(PagerEventReasons::metaData, entry.btPage()->height - 1, entry.cursor));
|
||||
wait(self->pushPage(PagerEventReasons::metaData, entry.cursor));
|
||||
auto& newEntry = self->path.back();
|
||||
ASSERT(forward ? newEntry.cursor.moveFirst() : newEntry.cursor.moveLast());
|
||||
}
|
||||
@ -6873,7 +6875,7 @@ public:
|
||||
}
|
||||
|
||||
if (rowLimit > 0) {
|
||||
wait(cur.seekGTE(PagerEventReasons::rangeRead, keys.begin));
|
||||
wait(cur.seekGTE(keys.begin, PagerEventReasons::rangeRead));
|
||||
|
||||
if (self->prefetch) {
|
||||
cur.prefetch(keys.end, true, rowLimit, byteLimit);
|
||||
@ -6919,7 +6921,7 @@ public:
|
||||
wait(cur.moveNext());
|
||||
}
|
||||
} else {
|
||||
wait(cur.seekLT(PagerEventReasons::rangeRead, keys.end));
|
||||
wait(cur.seekLT(keys.end, PagerEventReasons::rangeRead));
|
||||
|
||||
if (self->prefetch) {
|
||||
cur.prefetch(keys.begin, false, -rowLimit, byteLimit);
|
||||
@ -6986,7 +6988,7 @@ public:
|
||||
state FlowLock::Releaser releaser(self->m_concurrentReads);
|
||||
++g_redwoodMetrics.metric.opGet;
|
||||
|
||||
wait(cur.seekGTE(PagerEventReasons::pointRead, key));
|
||||
wait(cur.seekGTE(key, PagerEventReasons::pointRead));
|
||||
if (cur.isValid() && cur.get().key == key) {
|
||||
// Return a Value whose arena depends on the source page arena
|
||||
Value v;
|
||||
@ -7104,13 +7106,13 @@ ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree,
|
||||
start.printable().c_str(),
|
||||
end.printable().c_str(),
|
||||
randomKey.toString().c_str());
|
||||
wait(success(cur.seek(PagerEventReasons::rangeRead, randomKey)));
|
||||
wait(success(cur.seek(randomKey, PagerEventReasons::rangeRead)));
|
||||
}
|
||||
|
||||
debug_printf(
|
||||
"VerifyRange(@%" PRId64 ", %s, %s): Actual seek\n", v, start.printable().c_str(), end.printable().c_str());
|
||||
|
||||
wait(cur.seekGTE(PagerEventReasons::rangeRead, start));
|
||||
wait(cur.seekGTE(start, PagerEventReasons::rangeRead));
|
||||
|
||||
state Standalone<VectorRef<KeyValueRef>> results;
|
||||
|
||||
@ -7210,7 +7212,7 @@ ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree,
|
||||
}
|
||||
|
||||
// Now read the range from the tree in reverse order and compare to the saved results
|
||||
wait(cur.seekLT(PagerEventReasons::rangeRead, end));
|
||||
wait(cur.seekLT(end, PagerEventReasons::rangeRead));
|
||||
|
||||
state std::reverse_iterator<const KeyValueRef*> r = results.rbegin();
|
||||
|
||||
@ -7287,7 +7289,7 @@ ACTOR Future<int> seekAllBTreeCursor(VersionedBTree* btree,
|
||||
state Optional<std::string> val = i->second;
|
||||
debug_printf("Verifying @%" PRId64 " '%s'\n", ver, key.c_str());
|
||||
state Arena arena;
|
||||
wait(cur.seekGTE(PagerEventReasons::metaData, RedwoodRecordRef(KeyRef(arena, key))));
|
||||
wait(cur.seekGTE(RedwoodRecordRef(KeyRef(arena, key)), PagerEventReasons::metaData));
|
||||
bool foundKey = cur.isValid() && cur.get().key == key;
|
||||
bool hasValue = foundKey && cur.get().value.present();
|
||||
|
||||
@ -7412,7 +7414,7 @@ ACTOR Future<Void> randomReader(VersionedBTree* btree) {
|
||||
}
|
||||
|
||||
state KeyValue kv = randomKV(10, 0);
|
||||
wait(cur.seekGTE(PagerEventReasons::pointRead, kv.key));
|
||||
wait(cur.seekGTE(kv.key, PagerEventReasons::pointRead));
|
||||
state int c = deterministicRandom()->randomInt(0, 100);
|
||||
state bool direction = deterministicRandom()->coinflip();
|
||||
while (cur.isValid() && c-- > 0) {
|
||||
@ -8992,7 +8994,7 @@ ACTOR Future<Void> randomSeeks(VersionedBTree* btree, int count, char firstChar,
|
||||
wait(btree->initBTreeCursor(&cur, readVer));
|
||||
while (c < count) {
|
||||
state Key k = randomString(20, firstChar, lastChar);
|
||||
wait(cur.seekGTE(PagerEventReasons::pointRead, k));
|
||||
wait(cur.seekGTE(k, PagerEventReasons::pointRead));
|
||||
++c;
|
||||
}
|
||||
double elapsed = timer() - readStart;
|
||||
@ -9015,7 +9017,7 @@ ACTOR Future<Void> randomScans(VersionedBTree* btree,
|
||||
state int totalScanBytes = 0;
|
||||
while (c++ < count) {
|
||||
state Key k = randomString(20, firstChar, lastChar);
|
||||
wait(cur.seekGTE(PagerEventReasons::pointRead, k));
|
||||
wait(cur.seekGTE(k, PagerEventReasons::pointRead));
|
||||
state int w = width;
|
||||
state bool directionFwd = deterministicRandom()->coinflip();
|
||||
|
||||
@ -9057,7 +9059,7 @@ TEST_CASE(":/redwood/correctness/pager/cow") {
|
||||
state LogicalPageID id = wait(pager->newPageID());
|
||||
Reference<ArenaPage> p = pager->newPageBuffer();
|
||||
memset(p->mutate(), (char)id, p->size());
|
||||
pager->updatePage(PagerEventReasons::commit, nonBtreeLevel, id, p);
|
||||
pager->updatePage(PagerEventReasons::metaData, nonBtreeLevel, id, p);
|
||||
pager->setMetaKey(LiteralStringRef("asdfasdf"));
|
||||
wait(pager->commit());
|
||||
Reference<ArenaPage> p2 = wait(pager->readPage(PagerEventReasons::pointRead, nonBtreeLevel, id, true));
|
||||
|
Loading…
x
Reference in New Issue
Block a user