initial draft on RedWoodMetrics changes

This commit is contained in:
Fuheng Zhao 2021-06-23 13:46:17 -07:00
parent c808b314a6
commit 9d1aeb3e1e
4 changed files with 224 additions and 67 deletions

View File

@ -40,6 +40,9 @@ typedef uint32_t PhysicalPageID;
typedef uint32_t QueueID;
#define invalidQueueID std::numeric_limits<QueueID>::max()
// Reasons for page levle events.
enum class pagerEventReasons{ pointRead, rangeRead, rangePrefetch, commit, lazyClear, metaData};
// Represents a block of memory in a 4096-byte aligned location held by an Arena.
class ArenaPage : public ReferenceCounted<ArenaPage>, public FastAllocated<ArenaPage> {
public:
@ -128,7 +131,7 @@ public:
class IPagerSnapshot {
public:
virtual Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool nohit) = 0;
virtual Future<Reference<const ArenaPage>> getPhysicalPage(pagerEventReasons r, LogicalPageID pageID, bool cacheable, bool nohit) = 0;
virtual bool tryEvictPage(LogicalPageID id) = 0;
virtual Version getVersion() const = 0;
@ -188,8 +191,8 @@ public:
// Cacheable indicates that the page should be added to the page cache (if applicable?) as a result of this read.
// NoHit indicates that the read should not be considered a cache hit, such as when preloading pages that are
// considered likely to be needed soon.
virtual Future<Reference<ArenaPage>> readPage(LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0;
virtual Future<Reference<ArenaPage>> readExtent(LogicalPageID pageID) = 0;
virtual Future<Reference<ArenaPage>> readPage(pagerEventReasons r, LogicalPageID pageID, bool cacheable = true, bool noHit = false) = 0;
virtual Future<Reference<ArenaPage>> readExtent(pagerEventReasons r, LogicalPageID pageID) = 0;
virtual void releaseExtentReadLock() = 0;
// Temporary methods for testing

View File

@ -19,6 +19,9 @@
*/
#include "flow/flow.h"
#include "flow/Histogram.h"
#include <random>
#include "fdbrpc/ContinuousSample.h"
#include "fdbserver/IPager.h"
#include "fdbclient/Tuple.h"
#include "flow/serialize.h"
@ -466,13 +469,13 @@ public:
nextPageID = id;
debug_printf(
"FIFOQueue::Cursor(%s) loadPage start id=%s\n", toString().c_str(), ::toString(nextPageID).c_str());
nextPageReader = waitOrError(queue->pager->readPage(nextPageID, true), queue->pagerError);
nextPageReader = waitOrError(queue->pager->readPage(pagerEventReasons::rangePrefetch, nextPageID, true), queue->pagerError);
}
Future<Void> loadExtent() {
ASSERT(mode == POP | mode == READONLY);
debug_printf("FIFOQueue::Cursor(%s) loadExtent\n", toString().c_str());
return map(queue->pager->readExtent(pageID), [=](Reference<ArenaPage> p) {
return map(queue->pager->readExtent(pagerEventReasons::metaData, pageID), [=](Reference<ArenaPage> p) {
page = p;
debug_printf("FIFOQueue::Cursor(%s) loadExtent done. Page: %p\n", toString().c_str(), page->begin());
return Void();
@ -1291,12 +1294,35 @@ struct RedwoodMetrics {
void clear() {
memset(this, 0, sizeof(RedwoodMetrics));
int levelCounter = 0;
for (auto& level : levels) {
level = {};
level = {
.buildFillPctSketch = Histogram::getHistogram(LiteralStringRef("buildFillPct"), LiteralStringRef(std::to_string(levelCounter).c_str()), Histogram::Unit::bytes),
.modifyFillPctSketch = Histogram::getHistogram(LiteralStringRef("modifyFillPct"), LiteralStringRef(std::to_string(levelCounter).c_str()), Histogram::Unit::bytes),
.buildStoredPctSketch = Histogram::getHistogram(LiteralStringRef("buildStoredPct"), LiteralStringRef(std::to_string(levelCounter).c_str()), Histogram::Unit::bytes),
.modifyStoredPctSketch = Histogram::getHistogram(LiteralStringRef("modifyStoredPct"), LiteralStringRef(std::to_string(levelCounter).c_str()), Histogram::Unit::bytes),
.buildItemCountSketch = Histogram::getHistogram(LiteralStringRef("buildItemCount"), LiteralStringRef(std::to_string(levelCounter).c_str()), Histogram::Unit::bytes),
.modifyItemCountSketch = Histogram::getHistogram(LiteralStringRef("modifyItemCount"), LiteralStringRef(std::to_string(levelCounter).c_str()), Histogram::Unit::bytes)
};
++levelCounter;
}
const events eventsVector[] = {events::pagerCacheLookup, events::pagerCacheHit, events::pagerCacheMiss, events::pagerWrite};
const pagerEventReasons reasonsVector[] = {pagerEventReasons::pointRead, pagerEventReasons::rangeRead, pagerEventReasons::rangePrefetch, pagerEventReasons::commit, pagerEventReasons::lazyClear, pagerEventReasons::metaData};
for(events e : eventsVector){
for(pagerEventReasons r: reasonsVector){
eventsReasons[getIndex(e)][getIndex(r)] = 0;
}
}
kvSizeWritten = Histogram::getHistogram(LiteralStringRef("kvSize"), LiteralStringRef("Written"), Histogram::Unit::bytes);
kvSizeReadByGet = Histogram::getHistogram(LiteralStringRef("kvSize"), LiteralStringRef("ReadByGet "), Histogram::Unit::bytes);
kvSizeReadByRangeGet = Histogram::getHistogram(LiteralStringRef("kvSize"), LiteralStringRef("ReadByRangeGet"), Histogram::Unit::bytes);
startTime = g_network ? now() : 0;
}
// Page levle events
enum class events{ pagerCacheLookup, pagerCacheHit, pagerCacheMiss, pagerWrite, Update, Build};
struct Level {
unsigned int pageRead;
unsigned int pageReadExt;
@ -1317,6 +1343,13 @@ struct RedwoodMetrics {
double modifyStoredPct;
double modifyFillPct;
unsigned int modifyItemCount;
Reference<Histogram> buildFillPctSketch;
Reference<Histogram> modifyFillPctSketch;
Reference<Histogram> buildStoredPctSketch;
Reference<Histogram> modifyStoredPctSketch;
Reference<Histogram> buildItemCountSketch;
Reference<Histogram> modifyItemCountSketch;
};
Level levels[btreeLevels];
@ -1343,6 +1376,28 @@ struct RedwoodMetrics {
unsigned int btreeLeafPreload;
unsigned int btreeLeafPreloadExt;
unsigned int eventsReasons[4][6];
Reference<Histogram> kvSizeWritten;
Reference<Histogram> kvSizeReadByGet;
Reference<Histogram> kvSizeReadByRangeGet;
std::string getName(events e){
std::map<events, std::string> names = {{events::pagerCacheLookup, "pagerCacheLookup"}, {events::pagerCacheHit, "pagerCacheHit"}, {events::pagerCacheMiss, "pagerCacheMiss"}, {events::pagerWrite, "pagerWrite"}};
return names[e];
}
int getIndex(events e){
std::map<events, int> indices = {{events::pagerCacheLookup, 0}, {events::pagerCacheHit, 1}, {events::pagerCacheMiss, 2}, {events::pagerWrite, 3},{events::Update, 0},{events::Build, 1}};
return indices[e];
}
std::string getName(pagerEventReasons r){
std::map<pagerEventReasons, std::string> names = {{pagerEventReasons::pointRead, "pointRead"}, {pagerEventReasons::rangeRead, "rangeRead"}, {pagerEventReasons::rangePrefetch, "rangePrefetch"}, {pagerEventReasons::commit, "commit"}, {pagerEventReasons::lazyClear, "lazyClear"}, {pagerEventReasons::metaData, "metaData"}};
return names[r];
}
int getIndex(pagerEventReasons r){
std::map<pagerEventReasons, int> indices = {{pagerEventReasons::pointRead, 0}, {pagerEventReasons::rangeRead, 1}, {pagerEventReasons::rangePrefetch, 2}, {pagerEventReasons::commit, 3}, {pagerEventReasons::lazyClear, 4}, {pagerEventReasons::metaData, 5}};
return indices[r];
}
// Return number of pages read or written, from cache or disk
unsigned int pageOps() const {
// All page reads are either a cache hit, probe hit, or a disk read
@ -1408,6 +1463,24 @@ struct RedwoodMetrics {
}
}
}
const events eventsVector[] = {events::pagerCacheLookup, events::pagerCacheHit, events::pagerCacheMiss, events::pagerWrite};
const vector<pagerEventReasons> reasonsVector = {pagerEventReasons::pointRead, pagerEventReasons::rangeRead, pagerEventReasons::rangePrefetch, pagerEventReasons::commit, pagerEventReasons::lazyClear, pagerEventReasons::metaData};
for(events e : eventsVector){
std::cout<<"\nevents: "+getName(e)+" {";
for(auto r = reasonsVector.begin() ; r != reasonsVector.end(); ++r){
std::string temp = ""+getName(*r)+": "+std::to_string(eventsReasons[getIndex(e)][getIndex(*r)]);
temp += (std::next(r) != reasonsVector.end() ? ", " : "}");
std::cout<<temp;
}
}
std::cout<<'\n';
for(int i = 0; i<32; i++){
std::cout<<"Bucket "<<i<<": ";
std::cout<<"kvSizeReadByGet: "<<kvSizeReadByGet->buckets[i]<<"; ";
std::cout<<"kvSizeReadByRangeGet: "<<kvSizeReadByRangeGet->buckets[i]<<"; ";
std::cout<<"kvSizeWritten: "<<kvSizeWritten->buckets[i]<<";\n";
}
for (int i = 0; i < btreeLevels; ++i) {
auto& level = levels[i];
@ -1527,14 +1600,16 @@ public:
// Get the object for i if it exists, else return nullptr.
// If the object exists, its eviction order will NOT change as this is not a cache hit.
ObjectType* getIfExists(const IndexType& index) {
ObjectType* getIfExists(pagerEventReasons r, const IndexType& index) {
auto i = cache.find(index);
if (i != cache.end()) {
++i->second.hits;
++g_redwoodMetrics.pagerProbeHit;
g_redwoodMetrics.eventsReasons[g_redwoodMetrics.getIndex(RedwoodMetrics::events::pagerCacheLookup)][g_redwoodMetrics.getIndex(r)] += 1;
return &i->second.item;
}
++g_redwoodMetrics.pagerProbeMiss;
g_redwoodMetrics.eventsReasons[g_redwoodMetrics.getIndex(RedwoodMetrics::events::pagerCacheLookup)][g_redwoodMetrics.getIndex(r)] += 1;
return nullptr;
}
@ -1568,7 +1643,7 @@ public:
// After a get(), the object for i is the last in evictionOrder.
// If noHit is set, do not consider this access to be cache hit if the object is present
// If noMiss is set, do not consider this access to be a cache miss if the object is not present
ObjectType& get(const IndexType& index, bool noHit = false, bool noMiss = false) {
ObjectType& get(pagerEventReasons r, const IndexType& index, bool noHit = false, bool noMiss = false) {
Entry& entry = cache[index];
// If entry is linked into evictionOrder then move it to the back of the order
@ -1576,6 +1651,8 @@ public:
if (!noHit) {
++entry.hits;
++g_redwoodMetrics.pagerCacheHit;
g_redwoodMetrics.eventsReasons[g_redwoodMetrics.getIndex(RedwoodMetrics::events::pagerCacheHit)][g_redwoodMetrics.getIndex(r)] += 1;
g_redwoodMetrics.eventsReasons[g_redwoodMetrics.getIndex(RedwoodMetrics::events::pagerCacheLookup)][g_redwoodMetrics.getIndex(r)] += 1;
// Move the entry to the back of the eviction order
evictionOrder.erase(evictionOrder.iterator_to(entry));
@ -1585,6 +1662,8 @@ public:
// Otherwise it was a cache miss
if (!noMiss) {
++g_redwoodMetrics.pagerCacheMiss;
g_redwoodMetrics.eventsReasons[g_redwoodMetrics.getIndex(RedwoodMetrics::events::pagerCacheMiss)][g_redwoodMetrics.getIndex(r)] += 1;
g_redwoodMetrics.eventsReasons[g_redwoodMetrics.getIndex(RedwoodMetrics::events::pagerCacheLookup)][g_redwoodMetrics.getIndex(r)] += 1;
}
// Finish initializing entry
entry.index = index;
@ -1921,7 +2000,7 @@ public:
if (extents[i].queueID == remapQueueID) {
LogicalPageID extID = extents[i].extentID;
debug_printf("DWALPager Extents: ID: %s ", toString(extID).c_str());
self->readExtent(extID);
self->readExtent(pagerEventReasons::metaData, extID);
}
}
}
@ -1962,7 +2041,7 @@ public:
// If this fails, the backup header is still in tact for the next recovery attempt.
if (recoveredHeader) {
// Write the header to page 0
wait(self->writeHeaderPage(0, self->headerPage));
wait(self->writeHeaderPage(pagerEventReasons::metaData, 0, self->headerPage));
// Wait for all outstanding writes to complete
wait(self->operations.signalAndCollapse());
@ -2185,7 +2264,7 @@ public:
Future<LogicalPageID> newExtentPageID(QueueID queueID) override { return newExtentPageID_impl(this, queueID); }
Future<Void> writePhysicalPage(PhysicalPageID pageID, Reference<ArenaPage> page, bool header = false) {
Future<Void> writePhysicalPage(pagerEventReasons r, PhysicalPageID pageID, Reference<ArenaPage> page, bool header = false) {
debug_printf("DWALPager(%s) op=%s %s ptr=%p\n",
filename.c_str(),
(header ? "writePhysicalHeader" : "writePhysical"),
@ -2193,6 +2272,8 @@ public:
page->begin());
++g_redwoodMetrics.pagerDiskWrite;
g_redwoodMetrics.eventsReasons[g_redwoodMetrics.getIndex(RedwoodMetrics::events::pagerWrite)][g_redwoodMetrics.getIndex(r)] += 1;
VALGRIND_MAKE_MEM_DEFINED(page->begin(), page->size());
page->updateChecksum(pageID);
debug_printf("DWALPager(%s) writePhysicalPage %s CalculatedChecksum=%d ChecksumInPage=%d\n",
@ -2221,14 +2302,15 @@ public:
return f;
}
Future<Void> writeHeaderPage(PhysicalPageID pageID, Reference<ArenaPage> page) {
return writePhysicalPage(pageID, page, true);
Future<Void> writeHeaderPage(pagerEventReasons r, PhysicalPageID pageID, Reference<ArenaPage> page) {
return writePhysicalPage(r, pageID, page, true);
}
void updatePage(LogicalPageID pageID, Reference<ArenaPage> data) override {
// Get the cache entry for this page, without counting it as a cache hit as we're replacing its contents now
// or as a cache miss because there is no benefit to the page already being in cache
PageCacheEntry& cacheEntry = pageCache.get(pageID, true, true);
// this metaData reason will not be accounted since its not a cache hit or cache miss
PageCacheEntry& cacheEntry = pageCache.get(pagerEventReasons::metaData, pageID, true, true);
debug_printf("DWALPager(%s) op=write %s cached=%d reading=%d writing=%d\n",
filename.c_str(),
toString(pageID).c_str(),
@ -2244,11 +2326,11 @@ public:
// future reads of the version are not allowed) and the write of the next newest version over top
// of the original page begins.
if (!cacheEntry.initialized()) {
cacheEntry.writeFuture = writePhysicalPage(pageID, data);
cacheEntry.writeFuture = writePhysicalPage(pagerEventReasons::metaData, pageID, data);
} else if (cacheEntry.reading()) {
// Wait for the read to finish, then start the write.
cacheEntry.writeFuture = map(success(cacheEntry.readFuture), [=](Void) {
writePhysicalPage(pageID, data);
writePhysicalPage(pagerEventReasons::metaData, pageID, data);
return Void();
});
}
@ -2256,11 +2338,11 @@ public:
// writes happen in the correct order
else if (cacheEntry.writing()) {
cacheEntry.writeFuture = map(cacheEntry.writeFuture, [=](Void) {
writePhysicalPage(pageID, data);
writePhysicalPage(pagerEventReasons::metaData, pageID, data);
return Void();
});
} else {
cacheEntry.writeFuture = writePhysicalPage(pageID, data);
cacheEntry.writeFuture = writePhysicalPage(pagerEventReasons::metaData, pageID, data);
}
// Always update the page contents immediately regardless of what happened above.
@ -2448,12 +2530,12 @@ public:
// Reads the most recent version of pageID, either previously committed or written using updatePage()
// in the current commit
Future<Reference<ArenaPage>> readPage(LogicalPageID pageID, bool cacheable, bool noHit = false) override {
Future<Reference<ArenaPage>> readPage(pagerEventReasons r, LogicalPageID pageID, bool cacheable, bool noHit = false) override {
// Use cached page if present, without triggering a cache hit.
// Otherwise, read the page and return it but don't add it to the cache
if (!cacheable) {
debug_printf("DWALPager(%s) op=readUncached %s\n", filename.c_str(), toString(pageID).c_str());
PageCacheEntry* pCacheEntry = pageCache.getIfExists(pageID);
PageCacheEntry* pCacheEntry = pageCache.getIfExists(r, pageID);
if (pCacheEntry != nullptr) {
debug_printf("DWALPager(%s) op=readUncachedHit %s\n", filename.c_str(), toString(pageID).c_str());
@ -2464,7 +2546,7 @@ public:
return forwardError(readPhysicalPage(this, (PhysicalPageID)pageID), errorPromise);
}
PageCacheEntry& cacheEntry = pageCache.get(pageID, noHit);
PageCacheEntry& cacheEntry = pageCache.get(r, pageID, noHit);
debug_printf("DWALPager(%s) op=read %s cached=%d reading=%d writing=%d noHit=%d\n",
filename.c_str(),
toString(pageID).c_str(),
@ -2511,9 +2593,9 @@ public:
return (PhysicalPageID)pageID;
}
Future<Reference<ArenaPage>> readPageAtVersion(LogicalPageID logicalID, Version v, bool cacheable, bool noHit) {
Future<Reference<ArenaPage>> readPageAtVersion(pagerEventReasons r, LogicalPageID logicalID, Version v, bool cacheable, bool noHit) {
PhysicalPageID physicalID = getPhysicalPageID(logicalID, v);
return readPage(physicalID, cacheable, noHit);
return readPage(r, physicalID, cacheable, noHit);
}
void releaseExtentReadLock() override { concurrentExtentReads->release(); }
@ -2592,9 +2674,9 @@ public:
return extent;
}
Future<Reference<ArenaPage>> readExtent(LogicalPageID pageID) override {
Future<Reference<ArenaPage>> readExtent(pagerEventReasons r, LogicalPageID pageID) override {
debug_printf("DWALPager(%s) op=readExtent %s\n", filename.c_str(), toString(pageID).c_str());
PageCacheEntry* pCacheEntry = extentCache.getIfExists(pageID);
PageCacheEntry* pCacheEntry = extentCache.getIfExists(r, pageID);
if (pCacheEntry != nullptr) {
debug_printf("DWALPager(%s) Cache Entry exists for %s\n", filename.c_str(), toString(pageID).c_str());
return pCacheEntry->readFuture;
@ -2621,7 +2703,7 @@ public:
else if (tailExt)
readSize = (tailPageID - pageID + 1) * physicalPageSize;
PageCacheEntry& cacheEntry = extentCache.get(pageID);
PageCacheEntry& cacheEntry = extentCache.get(r, pageID);
if (!cacheEntry.initialized()) {
cacheEntry.writeFuture = Void();
cacheEntry.readFuture =
@ -2741,7 +2823,7 @@ public:
debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str());
// Read the data from the page that the original was mapped to
Reference<ArenaPage> data = wait(self->readPage(p.newPageID, false, true));
Reference<ArenaPage> data = wait(self->readPage(pagerEventReasons::metaData, p.newPageID, false, true));
// Write the data to the original page so it can be read using its original pageID
self->updatePage(p.originalPageID, data);
@ -2889,7 +2971,7 @@ public:
debug_printf("DWALPager(%s) commit begin\n", self->filename.c_str());
// Write old committed header to Page 1
self->writeHeaderPage(1, self->lastCommittedHeaderPage);
self->writeHeaderPage(pagerEventReasons::commit, 1, self->lastCommittedHeaderPage);
// Trigger the remap eraser to stop and then wait for it.
self->remapCleanupStop = true;
@ -2921,7 +3003,7 @@ public:
}
// Update header on disk and sync again.
wait(self->writeHeaderPage(0, self->headerPage));
wait(self->writeHeaderPage(pagerEventReasons::commit, 0, self->headerPage));
if (g_network->getCurrentTask() > TaskPriority::DiskWrite) {
wait(delay(0, TaskPriority::DiskWrite));
}
@ -3214,11 +3296,11 @@ public:
: pager(pager), metaKey(meta), version(version), expired(expiredFuture) {}
~DWALPagerSnapshot() override {}
Future<Reference<const ArenaPage>> getPhysicalPage(LogicalPageID pageID, bool cacheable, bool noHit) override {
Future<Reference<const ArenaPage>> getPhysicalPage(pagerEventReasons r, LogicalPageID pageID, bool cacheable, bool noHit) override {
if (expired.isError()) {
throw expired.getError();
}
return map(pager->readPageAtVersion(pageID, version, cacheable, noHit),
return map(pager->readPageAtVersion(r, pageID, version, cacheable, noHit),
[=](Reference<ArenaPage> p) { return Reference<const ArenaPage>(std::move(p)); });
}
@ -4155,7 +4237,7 @@ public:
break;
}
// Start reading the page, without caching
entries.push_back(std::make_pair(q.get(), self->readPage(snapshot, q.get().pageID, true, false)));
entries.push_back(std::make_pair(q.get(), self->readPage(pagerEventReasons::lazyClear, snapshot, q.get().pageID, true, false)));
--toPop;
}
@ -4856,6 +4938,8 @@ private:
btPage->height = height;
btPage->kvBytes = p.kvBytes;
g_redwoodMetrics.kvSizeWritten->sample(p.kvBytes);
debug_printf("Building tree for %s\nlower: %s\nupper: %s\n",
p.toString().c_str(),
pageLowerBound.toString(false).c_str(),
@ -4883,6 +4967,10 @@ private:
metrics.buildStoredPct += p.kvFraction();
metrics.buildItemCount += p.count;
metrics.buildFillPctSketch->sample(p.usedFraction());
metrics.buildStoredPctSketch->sample(p.kvFraction());
metrics.buildItemCountSketch->sample(p.count);
// Create chunked pages
// TODO: Avoid copying page bytes, but this is not trivial due to how pager checksums are currently handled.
if (p.blockCount != 1) {
@ -4986,7 +5074,8 @@ private:
return pager->tryEvictPage(id.front());
}
ACTOR static Future<Reference<const ArenaPage>> readPage(Reference<IPagerSnapshot> snapshot,
ACTOR static Future<Reference<const ArenaPage>> readPage(pagerEventReasons r,
Reference<IPagerSnapshot> snapshot,
BTreePageIDRef id,
bool forLazyClear = false,
bool cacheable = true) {
@ -4999,13 +5088,13 @@ private:
state Reference<const ArenaPage> page;
if (id.size() == 1) {
Reference<const ArenaPage> p = wait(snapshot->getPhysicalPage(id.front(), cacheable, false));
Reference<const ArenaPage> p = wait(snapshot->getPhysicalPage(r, id.front(), cacheable, false));
page = std::move(p);
} else {
ASSERT(!id.empty());
std::vector<Future<Reference<const ArenaPage>>> reads;
for (auto& pageID : id) {
reads.push_back(snapshot->getPhysicalPage(pageID, cacheable, false));
reads.push_back(snapshot->getPhysicalPage(r, pageID, cacheable, false));
}
std::vector<Reference<const ArenaPage>> pages = wait(getAll(reads));
// TODO: Cache reconstituted super pages somehow, perhaps with help from the Pager.
@ -5056,7 +5145,7 @@ private:
g_redwoodMetrics.btreeLeafPreloadExt += (id.size() - 1);
for (auto pageID : id) {
snapshot->getPhysicalPage(pageID, true, true);
snapshot->getPhysicalPage(pagerEventReasons::rangePrefetch, pageID, true, true);
}
}
@ -5203,6 +5292,12 @@ private:
metrics.modifyStoredPct += (double)btPage->kvBytes / capacity;
metrics.modifyItemCount += btPage->tree()->numItems;
metrics.modifyFillPctSketch->sample((double)btPage->size() / capacity);
metrics.modifyStoredPctSketch->sample((double)btPage->kvBytes / capacity);
metrics.modifyItemCountSketch->sample(btPage->tree()->numItems);
g_redwoodMetrics.kvSizeWritten->sample(btPage->kvBytes);
// The boundaries can't have changed, but the child page link may have.
if (maybeNewID != decodeLowerBound.getChildPage()) {
// Add page's decode lower bound to newLinks set without its child page, intially
@ -5458,7 +5553,7 @@ private:
debug_printf("%s -------------------------------------\n", context.c_str());
}
state Reference<const ArenaPage> page = wait(readPage(snapshot, rootID, false, false));
state Reference<const ArenaPage> page = wait(readPage(pagerEventReasons::commit, snapshot, rootID, false, false));
state Version writeVersion = self->getLastCommittedVersion() + 1;
// If the page exists in the cache, it must be copied before modification.
@ -6298,9 +6393,9 @@ public:
PathEntry& back() { return path.back(); }
void popPath() { path.pop_back(); }
Future<Void> pushPage(const BTreePage::BinaryTree::Cursor& link) {
Future<Void> pushPage(pagerEventReasons r, const BTreePage::BinaryTree::Cursor& link) {
debug_printf("pushPage(link=%s)\n", link.get().toString(false).c_str());
return map(readPage(pager, link.get().getChildPage()), [=](Reference<const ArenaPage> p) {
return map(readPage(r, pager, link.get().getChildPage()), [=](Reference<const ArenaPage> p) {
#if REDWOOD_DEBUG
path.push_back({ p, getCursor(p, link), link.get().getChildPage() });
#else
@ -6310,9 +6405,9 @@ public:
});
}
Future<Void> pushPage(BTreePageIDRef id) {
Future<Void> pushPage(pagerEventReasons r, BTreePageIDRef id) {
debug_printf("pushPage(root=%s)\n", ::toString(id).c_str());
return map(readPage(pager, id), [=](Reference<const ArenaPage> p) {
return map(readPage(r, pager, id), [=](Reference<const ArenaPage> p) {
#if REDWOOD_DEBUG
path.push_back({ p, getCursor(p, dbBegin, dbEnd), id });
#else
@ -6329,7 +6424,7 @@ public:
path.clear();
path.reserve(6);
valid = false;
return pushPage(root);
return pushPage(pagerEventReasons::commit, root);
}
// Seeks cursor to query if it exists, the record before or after it, or an undefined and invalid
@ -6341,7 +6436,7 @@ public:
// If there is a record in the tree > query then moveNext() will move to it.
// If non-zero is returned then the cursor is valid and the return value is logically equivalent
// to query.compare(cursor.get())
ACTOR Future<int> seek_impl(BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
ACTOR Future<int> seek_impl(pagerEventReasons r, BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
state RedwoodRecordRef internalPageQuery = query.withMaxPageID();
self->path.resize(1);
debug_printf(
@ -6369,7 +6464,7 @@ public:
query.toString().c_str(),
prefetchBytes,
self->toString().c_str());
Future<Void> f = self->pushPage(entry.cursor);
Future<Void> f = self->pushPage(r, entry.cursor);
// Prefetch siblings, at least prefetchBytes, at level 2 but without jumping to another level 2
// sibling
@ -6400,32 +6495,32 @@ public:
}
}
Future<int> seek(RedwoodRecordRef query, int prefetchBytes) { return seek_impl(this, query, prefetchBytes); }
Future<int> seek(pagerEventReasons r, RedwoodRecordRef query, int prefetchBytes) { return seek_impl(r, this, query, prefetchBytes); }
ACTOR Future<Void> seekGTE_impl(BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
ACTOR Future<Void> seekGTE_impl(pagerEventReasons r, BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
debug_printf("seekGTE(%s, %d) start\n", query.toString().c_str(), prefetchBytes);
int cmp = wait(self->seek(query, prefetchBytes));
int cmp = wait(self->seek(r, query, prefetchBytes));
if (cmp > 0 || (cmp == 0 && !self->isValid())) {
wait(self->moveNext());
}
return Void();
}
Future<Void> seekGTE(RedwoodRecordRef query, int prefetchBytes) {
return seekGTE_impl(this, query, prefetchBytes);
Future<Void> seekGTE(pagerEventReasons r, RedwoodRecordRef query, int prefetchBytes) {
return seekGTE_impl(r, this, query, prefetchBytes);
}
ACTOR Future<Void> seekLT_impl(BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
ACTOR Future<Void> seekLT_impl(pagerEventReasons r, BTreeCursor* self, RedwoodRecordRef query, int prefetchBytes) {
debug_printf("seekLT(%s, %d) start\n", query.toString().c_str(), prefetchBytes);
int cmp = wait(self->seek(query, prefetchBytes));
int cmp = wait(self->seek(r, query, prefetchBytes));
if (cmp <= 0) {
wait(self->movePrev());
}
return Void();
}
Future<Void> seekLT(RedwoodRecordRef query, int prefetchBytes) {
return seekLT_impl(this, query, -prefetchBytes);
Future<Void> seekLT(pagerEventReasons r, RedwoodRecordRef query, int prefetchBytes) {
return seekLT_impl(r, this, query, -prefetchBytes);
}
ACTOR Future<Void> move_impl(BTreeCursor* self, bool forward) {
@ -6476,7 +6571,7 @@ public:
ASSERT(entry.cursor.get().value.present());
}
wait(self->pushPage(entry.cursor));
wait(self->pushPage(pagerEventReasons::metaData, entry.cursor));
auto& newEntry = self->path.back();
ASSERT(forward ? newEntry.cursor.moveFirst() : newEntry.cursor.moveLast());
}
@ -6623,7 +6718,7 @@ public:
state int prefetchBytes = 0;
if (rowLimit > 0) {
wait(cur.seekGTE(keys.begin, prefetchBytes));
wait(cur.seekGTE(pagerEventReasons::rangeRead, keys.begin, prefetchBytes));
while (cur.isValid()) {
// Read page contents without using waits
BTreePage::BinaryTree::Cursor leafCursor = cur.back().cursor;
@ -6665,7 +6760,7 @@ public:
wait(cur.moveNext());
}
} else {
wait(cur.seekLT(keys.end, prefetchBytes));
wait(cur.seekLT(pagerEventReasons::rangeRead, keys.end, prefetchBytes));
while (cur.isValid()) {
// Read page contents without using waits
BTreePage::BinaryTree::Cursor leafCursor = cur.back().cursor;
@ -6713,6 +6808,7 @@ public:
ASSERT(result.size() > 0);
result.readThrough = result[result.size() - 1].key;
}
g_redwoodMetrics.kvSizeReadByRangeGet->sample(accumulatedBytes);
return result;
}
@ -6726,12 +6822,14 @@ public:
state FlowLock::Releaser releaser(self->m_concurrentReads);
++g_redwoodMetrics.opGet;
wait(cur.seekGTE(key, 0));
wait(cur.seekGTE(pagerEventReasons::pointRead, key, 0));
if (cur.isValid() && cur.get().key == key) {
// Return a Value whose arena depends on the source page arena
Value v;
v.arena().dependsOn(cur.back().page->getArena());
v.contents() = cur.get().value.get();
//std::cout<<"kvBytes for readValue impl: "<<cur.get().kvBytes()<<std::endl;
g_redwoodMetrics.kvSizeReadByGet->sample(cur.get().kvBytes());
return v;
}
@ -6842,12 +6940,12 @@ ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree,
start.printable().c_str(),
end.printable().c_str(),
randomKey.toString().c_str());
wait(success(cur.seek(randomKey, 0)));
wait(success(cur.seek(pagerEventReasons::rangeRead, randomKey, 0)));
}
debug_printf(
"VerifyRange(@%" PRId64 ", %s, %s): Actual seek\n", v, start.printable().c_str(), end.printable().c_str());
wait(cur.seekGTE(start, 0));
wait(cur.seekGTE(pagerEventReasons::rangeRead, start, 0));
state Standalone<VectorRef<KeyValueRef>> results;
@ -6947,7 +7045,7 @@ ACTOR Future<int> verifyRangeBTreeCursor(VersionedBTree* btree,
}
// Now read the range from the tree in reverse order and compare to the saved results
wait(cur.seekLT(end, 0));
wait(cur.seekLT(pagerEventReasons::rangeRead, end, 0));
state std::reverse_iterator<const KeyValueRef*> r = results.rbegin();
@ -7024,7 +7122,7 @@ ACTOR Future<int> seekAllBTreeCursor(VersionedBTree* btree,
state Optional<std::string> val = i->second;
debug_printf("Verifying @%" PRId64 " '%s'\n", ver, key.c_str());
state Arena arena;
wait(cur.seekGTE(RedwoodRecordRef(KeyRef(arena, key)), 0));
wait(cur.seekGTE(pagerEventReasons::metaData, RedwoodRecordRef(KeyRef(arena, key)), 0));
bool foundKey = cur.isValid() && cur.get().key == key;
bool hasValue = foundKey && cur.get().value.present();
@ -7149,7 +7247,7 @@ ACTOR Future<Void> randomReader(VersionedBTree* btree) {
}
state KeyValue kv = randomKV(10, 0);
wait(cur.seekGTE(kv.key, 0));
wait(cur.seekGTE(pagerEventReasons::pointRead, kv.key, 0));
state int c = deterministicRandom()->randomInt(0, 100);
state bool direction = deterministicRandom()->coinflip();
while (cur.isValid() && c-- > 0) {
@ -8729,7 +8827,7 @@ ACTOR Future<Void> randomSeeks(VersionedBTree* btree, int count, char firstChar,
wait(btree->initBTreeCursor(&cur, readVer));
while (c < count) {
state Key k = randomString(20, firstChar, lastChar);
wait(cur.seekGTE(k, 0));
wait(cur.seekGTE(pagerEventReasons::pointRead, k, 0));
++c;
}
double elapsed = timer() - readStart;
@ -8753,7 +8851,7 @@ ACTOR Future<Void> randomScans(VersionedBTree* btree,
state int totalScanBytes = 0;
while (c++ < count) {
state Key k = randomString(20, firstChar, lastChar);
wait(cur.seekGTE(k, readAhead));
wait(cur.seekGTE(pagerEventReasons::pointRead, k, readAhead));
if (adaptive) {
readAhead = totalScanBytes / c;
}
@ -8792,7 +8890,7 @@ TEST_CASE(":/redwood/correctness/pager/cow") {
pager->updatePage(id, p);
pager->setMetaKey(LiteralStringRef("asdfasdf"));
wait(pager->commit());
Reference<ArenaPage> p2 = wait(pager->readPage(id, true));
Reference<ArenaPage> p2 = wait(pager->readPage(pagerEventReasons::pointRead, id, true));
printf("%s\n", StringRef(p2->begin(), p2->size()).toHexString().c_str());
// TODO: Verify reads, do more writes and reads to make this a real pager validator
@ -8922,7 +9020,7 @@ TEST_CASE(":/redwood/performance/extentQueue") {
state int i;
for (i = 1; i < extentIDs.size() - 1; i++) {
LogicalPageID extID = extentIDs[i];
pager->readExtent(extID);
pager->readExtent(pagerEventReasons::rangeRead, extID);
}
state PromiseStream<Standalone<VectorRef<ExtentQueueEntry<16>>>> resultStream;
@ -9632,3 +9730,54 @@ TEST_CASE("!/redwood/performance/randomRangeScans") {
return Void();
}
TEST_CASE(":/redwood/performance/histogramThroughput") {
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0,pow(2,32));
state size_t inputSize = pow(10, 8);
state vector<uint32_t> uniform;
for(int i=0; i<inputSize; i++){
uniform.push_back(distribution(generator));
}
{
auto t_start = std::chrono::high_resolution_clock::now();
Reference<Histogram> h =
Histogram::getHistogram(LiteralStringRef("histogramTest"), LiteralStringRef("counts"), Histogram::Unit::bytes);
std::cout<<"histogramTest is okay"<<std::endl;
ASSERT(uniform.size() == inputSize);
for(size_t i=0; i<uniform.size(); i++){
h->sample(uniform[i]);
}
GetHistogramRegistry().logReport();
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms = std::chrono::duration<double, std::milli>(t_end-t_start).count();
std::cout<<"size of input: "<<uniform.size()<<std::endl;
std::cout<<"Time in millisecond: "<<elapsed_time_ms <<std::endl;
}
return Void();
}
TEST_CASE(":/redwood/performance/continuousSmapleThroughput") {
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0,pow(2,32));
state size_t inputSize = pow(10, 8);
state vector<uint32_t> uniform;
for(int i=0; i<inputSize; i++){
uniform.push_back(distribution(generator));
}
{
ContinuousSample<uint32_t> s = ContinuousSample<uint32_t>(pow(10,3));
auto t_start = std::chrono::high_resolution_clock::now();
ASSERT(uniform.size() == inputSize);
for(size_t i=0; i<uniform.size(); i++){
s.addSample(uniform[i]);
}
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms = std::chrono::duration<double, std::milli>(t_end-t_start).count();
std::cout<<"size of input: "<<uniform.size()<<std::endl;
std::cout<<"Time in millisecond: "<<elapsed_time_ms <<std::endl;
}
return Void();
}

View File

@ -115,6 +115,7 @@ void Histogram::writeToLog() {
}
TraceEvent e(SevInfo, "Histogram");
ASSERT(UnitToStringMapper.find(unit) != UnitToStringMapper.end());
e.detail("Group", group).detail("Op", op).detail("Unit", UnitToStringMapper.at(unit));
for (uint32_t i = 0; i < 32; i++) {

View File

@ -27,6 +27,7 @@
#include <string>
#include <map>
#include <unordered_map>
#include <iostream>
#ifdef _WIN32
#include <intrin.h>
@ -66,7 +67,10 @@ private:
Histogram(std::string const& group, std::string const& op, Unit unit, HistogramRegistry& registry)
: group(group), op(op), unit(unit), registry(registry), ReferenceCounted<Histogram>() {
ASSERT(UnitToStringMapper.find(unit) != UnitToStringMapper.end());
for(const auto & [ key, value ] : UnitToStringMapper){
std::cout<<value<<std::endl;
}
//ASSERT(UnitToStringMapper.find(unit) != UnitToStringMapper.end());
clear();
}