Checkpointing incomplete and correctness-breaking progress on adding single-version mode to VersionedBTree.

This commit is contained in:
Stephen Atherton 2019-04-29 17:00:29 -07:00
parent 64554e90d4
commit 2801298ae8
2 changed files with 397 additions and 174 deletions

View File

@ -44,6 +44,8 @@ public:
virtual void addref() = 0;
virtual void delref() = 0;
virtual std::string toString() const = 0;
};
class IVersionedStore : public IClosable {

View File

@ -38,17 +38,10 @@
#define STR(x) LiteralStringRef(x)
struct RedwoodRecordRef {
#pragma pack(push,1)
struct ValuePart {
int32_t total;
int32_t start;
};
#pragma pack(pop)
RedwoodRecordRef(KeyRef key = KeyRef(), Version ver = std::numeric_limits<Version>::max(), Optional<ValueRef> value = {}, Optional<ValuePart> part = {})
: key(key), version(ver), value(value), valuePart(part)
RedwoodRecordRef(KeyRef key = KeyRef(), Version ver = 0, Optional<ValueRef> value = {}, uint32_t chunkTotal = 0, uint32_t chunkStart = 0)
: key(key), version(ver), value(value), chunk({chunkTotal, chunkStart})
{
ASSERT(!part.present() || value.present());
ASSERT(value.present() || !isMultiPart());
}
RedwoodRecordRef(Arena &arena, const RedwoodRecordRef &toCopy) {
@ -62,30 +55,32 @@ struct RedwoodRecordRef {
KeyRef key;
Version version;
Optional<ValueRef> value;
Optional<ValuePart> valuePart;
struct {
uint32_t total;
uint32_t start;
} chunk;
int expectedSize() const {
return key.expectedSize() + value.expectedSize() + sizeof(version) + sizeof(valuePart);
return key.expectedSize() + value.expectedSize();
}
bool isMultiPart() const {
return valuePart.present();
return value.present() && chunk.total != 0;
}
// Generate a kv shard from a complete kv
RedwoodRecordRef split(int start, int len) {
ASSERT(!isMultiPart() && value.present());
return RedwoodRecordRef(key, version, value.get().substr(start, len), ValuePart({value.get().size(), start}));
ASSERT(!isMultiPart());
return RedwoodRecordRef(key, version, value.get().substr(start, len), value.get().size(), start);
}
#pragma pack(push,1)
struct Delta {
// TODO: Make this actually a delta
enum EFlags {HAS_VALUE = 1, HAS_VALUE_PART = 4};
enum EFlags {HAS_VALUE = 1, HAS_VERSION = 2, IS_MULTIPART = 4};
uint8_t flags;
uint16_t keySize;
Version version;
uint8_t bytes[];
RedwoodRecordRef apply(const RedwoodRecordRef &prev, const RedwoodRecordRef &next, Arena arena) {
@ -93,14 +88,26 @@ struct RedwoodRecordRef {
const uint8_t *rptr = bytes;
r.key = StringRef(rptr, keySize);
rptr += keySize;
r.version = version;
if(flags & HAS_VERSION) {
r.version = (*(Version *)rptr);
rptr += sizeof(Version);
}
else {
r.version = 0;
}
if(flags & HAS_VALUE) {
uint16_t valueSize = *(uint16_t *)rptr;
rptr += 2;
r.value = StringRef(rptr, valueSize);
rptr += valueSize;
if(flags & HAS_VALUE_PART) {
r.valuePart = *(ValuePart *)rptr;
if(flags & IS_MULTIPART) {
r.chunk.total = *(uint32_t *)rptr;
rptr += sizeof(uint32_t);
r.chunk.start = *(uint32_t *)rptr;
}
else {
r.chunk.total = 0;
r.chunk.start = 0;
}
}
return r;
@ -108,11 +115,14 @@ struct RedwoodRecordRef {
int size() const {
int s = sizeof(Delta) + keySize;
if(flags & HAS_VERSION) {
s += sizeof(Version);
}
if(flags & HAS_VALUE) {
s += 2;
s += *(uint16_t *)(bytes + keySize);
if(flags & HAS_VALUE_PART) {
s += sizeof(ValuePart);
s += *(uint16_t *)((uint8_t *)this + s);
s += sizeof(uint16_t);
if(flags & IS_MULTIPART) {
s += (2 * sizeof(uint32_t));
}
}
return s;
@ -133,18 +143,15 @@ struct RedwoodRecordRef {
if(cmp == 0) {
cmp = version - rhs.version;
if(cmp == 0) {
// Absent value is greater than present (for reasons)
cmp = (value.present() ? 0 : 1) - (rhs.value.present() ? 0 : 1);
if(cmp == 0) {
// Chunked is greater than whole
cmp = (valuePart.present() ? 1 : 0) - (rhs.valuePart.present() ? 1 : 0);
if(cmp == 0 && valuePart.present()) {
// Larger total size is greater
cmp = valuePart.get().total - rhs.valuePart.get().total;
if(cmp == 0) {
// Order by start
cmp = valuePart.get().start - rhs.valuePart.get().start;
}
// Absent value sorts higher than present (for reasons)
if(value.present() != rhs.value.present()) {
cmp = value.present() ? -1 : 1;
}
else {
// Chunked (represented by chunk.total > 0) sorts higher than whole
cmp = chunk.total - rhs.chunk.total;
if(cmp == 0) {
cmp = chunk.start - rhs.chunk.start;
}
}
}
@ -174,11 +181,14 @@ struct RedwoodRecordRef {
int deltaSize(const RedwoodRecordRef &base) const {
int s = sizeof(Delta) + key.size();
if(version != 0) {
s += sizeof(Version);
}
if(value.present()) {
s += 2;
s += value.get().size();
if(valuePart.present()) {
s += sizeof(ValuePart);
if(isMultiPart()) {
s += (2 * sizeof(uint32_t));
}
}
return s;
@ -186,20 +196,25 @@ struct RedwoodRecordRef {
void writeDelta(Delta &d, const RedwoodRecordRef &prev, const RedwoodRecordRef &next) const {
d.flags = value.present() ? Delta::EFlags::HAS_VALUE : 0;
if(valuePart.present())
d.flags |= Delta::EFlags::HAS_VALUE_PART;
d.keySize = key.size();
d.version = version;
uint8_t *wptr = d.bytes;
memcpy(wptr, key.begin(), key.size());
wptr += key.size();
if(version != 0) {
d.flags |= Delta::EFlags::HAS_VERSION;
*(Version *)wptr = (version);
wptr += sizeof(Version);
}
if(value.present()) {
*(uint16_t *)wptr = value.get().size();
wptr += 2;
memcpy(wptr, value.get().begin(), value.get().size());
wptr += value.get().size();
if(valuePart.present()) {
*(ValuePart *)wptr = valuePart.get();
if(isMultiPart()) {
d.flags |= Delta::EFlags::IS_MULTIPART;
*(uint32_t *)wptr = chunk.total;
wptr += sizeof(uint32_t);
*(uint32_t *)wptr = chunk.start;
}
}
}
@ -220,8 +235,8 @@ struct RedwoodRecordRef {
std::string toString(int hexLimit = 15) const {
std::string r;
r += format("'%s' @%lld ", kvformat(key, hexLimit).c_str(), version);
if(valuePart.present()) {
r += format("[%d/%d] ", valuePart.get().start, valuePart.get().total);
if(isMultiPart()) {
r += format("[%d/%d] ", chunk.start, chunk.total);
}
if(value.present()) {
r += format("-> '%s'", kvformat(value.get(), hexLimit).c_str());
@ -600,9 +615,19 @@ public:
++counts.sets;
SingleKeyMutationsByVersion &changes = insertMutationBoundary(keyValue.key)->second.startKeyMutations;
// Add the set if the changes set is empty or the last entry isn't a set to exactly the same value
if(changes.empty() || !changes.rbegin()->second.equalToSet(keyValue.value)) {
changes[m_writeVersion] = SingleKeyMutation(keyValue.value);
if(singleVersion) {
if(changes.empty()) {
changes[0] = SingleKeyMutation(keyValue.value);
}
else {
changes.begin()->second.value = keyValue.value;
}
}
else {
// Add the set if the changes set is empty or the last entry isn't a set to exactly the same value
if(changes.empty() || !changes.rbegin()->second.equalToSet(keyValue.value)) {
changes[m_writeVersion] = SingleKeyMutation(keyValue.value);
}
}
}
virtual void clear(KeyRangeRef range) {
@ -610,19 +635,29 @@ public:
MutationBufferT::iterator iBegin = insertMutationBoundary(range.begin);
MutationBufferT::iterator iEnd = insertMutationBoundary(range.end);
// For each boundary in the cleared range
while(iBegin != iEnd) {
// In single version mode, clear all pending updates in the affected range
if(singleVersion) {
RangeMutation &range = iBegin->second;
// Set the rangeClearedVersion if not set
if(!range.rangeClearVersion.present())
range.rangeClearVersion = m_writeVersion;
// Add a clear to the startKeyMutations map if it's empty or the last item is not a clear
if(range.startKeyMutations.empty() || !range.startKeyMutations.rbegin()->second.isClear())
range.startKeyMutations[m_writeVersion] = SingleKeyMutation();
range.startKeyMutations.clear();
range.rangeClearVersion = 0;
++iBegin;
m_pBuffer->erase(iBegin, iEnd);
}
else {
// For each boundary in the cleared range
while(iBegin != iEnd) {
RangeMutation &range = iBegin->second;
// Set the rangeClearedVersion if not set
if(!range.rangeClearVersion.present())
range.rangeClearVersion = m_writeVersion;
// Add a clear to the startKeyMutations map if it's empty or the last item is not a clear
if(range.startKeyMutations.empty() || !range.startKeyMutations.rbegin()->second.isClear())
range.startKeyMutations[m_writeVersion] = SingleKeyMutation();
++iBegin;
}
}
}
@ -645,13 +680,14 @@ public:
return m_lastCommittedVersion;
}
VersionedBTree(IPager *pager, std::string name, int target_page_size = -1)
VersionedBTree(IPager *pager, std::string name, bool singleVersion = false, int target_page_size = -1)
: m_pager(pager),
m_writeVersion(invalidVersion),
m_usablePageSizeOverride(pager->getUsablePageSize()),
m_lastCommittedVersion(invalidVersion),
m_pBuffer(nullptr),
m_name(name)
m_name(name),
singleVersion(singleVersion)
{
if(target_page_size > 0 && target_page_size < m_usablePageSizeOverride)
m_usablePageSizeOverride = target_page_size;
@ -693,8 +729,12 @@ public:
virtual Reference<IStoreCursor> readAtVersion(Version v) {
// TODO: Use the buffer to return uncommitted data
// For now, only committed versions can be read.
Version recordVersion = singleVersion ? 0 : v;
ASSERT(v <= m_lastCommittedVersion);
return Reference<IStoreCursor>(new Cursor(m_pager->getReadSnapshot(v), m_root, m_usablePageSizeOverride));
if(singleVersion) {
ASSERT(v == m_lastCommittedVersion);
}
return Reference<IStoreCursor>(new Cursor(m_pager->getReadSnapshot(v), m_root, recordVersion, m_usablePageSizeOverride));
}
// Must be nondecreasing
@ -723,6 +763,10 @@ public:
return commit_impl(this);
}
bool isSingleVersion() const {
return singleVersion;
}
private:
void writePage(LogicalPageID id, Reference<IPage> page, Version ver, const RedwoodRecordRef *pageLowerBound, const RedwoodRecordRef *pageUpperBound) {
debug_printf("writePage(): %s\n", ((const BTreePage *)page->begin())->toString(true, id, ver, pageLowerBound, pageUpperBound).c_str());
@ -735,6 +779,26 @@ private:
typedef std::pair<Version, std::vector<KeyPagePairT>> VersionedKeyToPageSetT;
typedef std::vector<VersionedKeyToPageSetT> VersionedChildrenT;
static std::string toString(const KeyPagePairT &c) {
return format("(%s, Page %u)", c.first.toString(-1).c_str(), c.second);
}
static std::string toString(const VersionedKeyToPageSetT &c) {
std::string r = format("Version %lld => [", c.first);
for(auto &o : c.second) {
r += toString(o) + " ";
}
return r + "]";
}
static std::string toString(const VersionedChildrenT &c) {
std::string r = "{ ";
for(auto &o : c) {
r += toString(o) + ", ";
}
return r + " }";
}
// Represents a change to a single key - set, clear, or atomic op
struct SingleKeyMutation {
// Clear
@ -800,9 +864,13 @@ private:
/* Mutation Buffer Overview
*
* MutationBuffer maps the start of a range to a RangeMutation. The end of the range is
* the next range start in the map.
*
* This structure's organization is meant to put pending updates for the btree in an order
* that makes it efficient to query all pending mutations across all pending versions which are
* relevant to a particular subtree of the btree.
*
* At the top level, it is a map of the start of a range being modified to a RangeMutation.
* The end of the range is map key (which is the next range start in the map).
*
* - The buffer starts out with keys '' and endKVV.key already populated.
*
* - When a new key is inserted into the buffer map, it is by definition
@ -837,7 +905,11 @@ private:
* but it would also be valid to see if the last key before startKey is equal to
* keyBefore(startKey), and if so that mutation buffer boundary key can be used instead
* without adding an additional key to the buffer.
*/
* TODO: A possible optimization here could be to only use existing btree leaf page boundaries as keys,
* with mutation point keys being stored in an unsorted strucutre under those boundary map keys,
* to be sorted later just before being merged into the existing leaf page.
*/
IPager *m_pager;
MutationBufferT *m_pBuffer;
@ -849,6 +921,7 @@ private:
int m_usablePageSizeOverride;
Future<Void> m_init;
std::string m_name;
bool singleVersion;
void printMutationBuffer(MutationBufferT::const_iterator begin, MutationBufferT::const_iterator end) const {
#if REDWOOD_DEBUG
@ -1058,16 +1131,18 @@ private:
// Returns list of (version, list of (lower_bound, list of children) )
// TODO: Probably should pass prev/next records by pointer in many places
ACTOR static Future<VersionedChildrenT> commitSubtree(VersionedBTree *self, MutationBufferT *mutationBuffer, Reference<IPagerSnapshot> snapshot, LogicalPageID root, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound) {
ACTOR static Future<VersionedChildrenT> commitSubtree(VersionedBTree *self, MutationBufferT *mutationBuffer, Reference<IPagerSnapshot> snapshot, LogicalPageID root, const RedwoodRecordRef *lowerBound, const RedwoodRecordRef *upperBound, const RedwoodRecordRef *newLowerBound = nullptr, const RedwoodRecordRef *newUpperBound = nullptr) {
debug_printf("%p commitSubtree: root=%d lower='%s' upper='%s'\n", THIS, root, lowerBound->toString().c_str(), upperBound->toString().c_str());
self->counts.commitToPageStart++;
// If the lower bound key and the upper bound key are the same then there can't be any changes to
// this subtree since changes would happen after the upper bound key as the mutated versions would
// necessarily be higher than all previous versions
// TODO: Avoid calling commitSubtree() when this is true to avoid creating the rather large state of this actor
if(lowerBound->key == upperBound->key) {
debug_printf("%p no changes, lower and upper bound keys are the same.\n", THIS);
return VersionedChildrenT({ {0,{{*lowerBound,root}}} });
VersionedChildrenT c({ {0,{{*lowerBound,root}}} });
debug_printf("%p id=%u no changes, lower and upper bound keys are the same, returning %s\n", THIS, root, toString(c).c_str());
return c;
}
// Find the slice of the mutation buffer that is relevant to this subtree
@ -1083,8 +1158,9 @@ private:
else {
// If the there are no mutations, we're done
if(iMutationBoundary == iMutationBoundaryEnd) {
debug_printf("%p no changes, mutation buffer start/end are the same\n", THIS);
return VersionedChildrenT({ {0,{{*lowerBound,root}}} });
VersionedChildrenT c({ {0,{{*lowerBound,root}}} });
debug_printf("%p id=%d no changes, mutation buffer start/end are the same, returning %s\n", THIS, root, toString(c).c_str());
return c;
}
}
@ -1101,8 +1177,9 @@ private:
iMutationBoundary->first < lowerBound->key)
)
) {
debug_printf("%p no changes because sole mutation range was not cleared\n", THIS);
return VersionedChildrenT({ {0,{{*lowerBound,root}}} });
VersionedChildrenT c({ {0,{{*lowerBound,root}}} });
debug_printf("%p no changes because sole mutation range was not cleared, returning %s\n", THIS, toString(c).c_str());
return c;
}
self->counts.commitToPage++;
@ -1138,21 +1215,32 @@ private:
// If the mutation boundary key is less than the lower bound key then skip startKeyMutations for
// this bounary, we're only processing this mutation range here to apply any clears to existing data.
if(iMutationBoundary->first < lowerBound->key)
if(iMutationBoundary->first < lowerBound->key) {
iMutations = iMutationBoundary->second.startKeyMutations.end();
}
// If the mutation boundary key is the same as the page lowerBound key then start reading single
// key mutations at the first version greater than the lowerBound key's version.
else if(iMutationBoundary->first == lowerBound->key)
else if(!self->singleVersion && iMutationBoundary->first == lowerBound->key) {
iMutations = iMutationBoundary->second.startKeyMutations.upper_bound(lowerBound->version);
else
}
else {
iMutations = iMutationBoundary->second.startKeyMutations.begin();
}
SingleKeyMutationsByVersion::const_iterator iMutationsEnd = iMutationBoundary->second.startKeyMutations.end();
// Output old versions of the mutation boundary key
// Iterate over old versions of the mutation boundary key, outputting if necessary
while(cursor.valid() && cursor.get().key == iMutationBoundary->first) {
merged.push_back(cursor.get());
debug_printf("%p: Added %s [existing, boundary start]\n", THIS, merged.back().toString().c_str());
// If not in single version mode or there were no changes to the key
if(!self->singleVersion || iMutationBoundary->second.noChanges()) {
merged.push_back(cursor.get());
debug_printf("%p: Added %s [existing, boundary start]\n", THIS, merged.back().toString().c_str());
}
else {
ASSERT(self->singleVersion);
debug_printf("%p: Skipped %s [existing, boundary start, singleVersion mode]\n", THIS, cursor.get().toString().c_str());
minVersion = 0;
}
cursor.moveNext();
}
@ -1196,24 +1284,42 @@ private:
// Write existing keys which are less than the next mutation boundary key, clearing if needed.
while(cursor.valid() && cursor.get().key < iMutationBoundary->first) {
merged.push_back(cursor.get());
debug_printf("%p: Added %s [existing, middle]\n", THIS, merged.back().toString().c_str());
// TODO: Remove old versions that are too old
// Write a clear of this key if needed. A clear is required if clearRangeVersion is set and the next cursor
// key is different than the current one. If the last cursor key in the page is different from the
// first key in the right sibling page then the page's upper bound will reflect that.
auto nextCursor = cursor;
nextCursor.moveNext();
if(clearRangeVersion.present() && cursor.get().key != nextCursor.getOrUpperBound().key) {
bool remove = self->singleVersion && clearRangeVersion.present();
if(!remove) {
merged.push_back(cursor.get());
debug_printf("%p: Added %s [existing, middle]\n", THIS, merged.back().toString().c_str());
}
else {
ASSERT(self->singleVersion);
debug_printf("%p: Skipped %s [existing, boundary start, singleVersion mode]\n", THIS, cursor.get().toString().c_str());
Version clearVersion = clearRangeVersion.get();
if(clearVersion < minVersion || minVersion == invalidVersion)
minVersion = clearVersion;
++changes;
merged.push_back(RedwoodRecordRef(cursor.get().key, clearVersion));
debug_printf("%p: Added %s [existing, middle clear]\n", THIS, merged.back().toString().c_str());
}
cursor = nextCursor;
// If keeping version history, write clears for records that exist in this range if the range was cleared
if(!self->singleVersion) {
// Write a clear of this key if needed. A clear is required if clearRangeVersion is set and the next cursor
// key is different than the current one. If the last cursor key in the page is different from the
// first key in the right sibling page then the page's upper bound will reflect that.
auto nextCursor = cursor;
nextCursor.moveNext();
if(clearRangeVersion.present() && cursor.get().key != nextCursor.getOrUpperBound().key) {
Version clearVersion = clearRangeVersion.get();
if(clearVersion < minVersion || minVersion == invalidVersion)
minVersion = clearVersion;
++changes;
merged.push_back(RedwoodRecordRef(cursor.get().key, clearVersion));
debug_printf("%p: Added %s [existing, middle clear]\n", THIS, merged.back().toString().c_str());
}
cursor = nextCursor;
}
else {
cursor.moveNext();
}
}
}
@ -1228,45 +1334,61 @@ private:
// No changes were actually made. This could happen if the only mutations are clear ranges which do not match any records.
if(minVersion == invalidVersion) {
debug_printf("%p No changes were made during mutation merge\n", THIS);
VersionedChildrenT c({ {0,{{*lowerBound,root}}} });
debug_printf("%p No changes were made during mutation merge, returning %s\n", THIS, toString(c).c_str());
ASSERT(changes == 0);
return VersionedChildrenT({ {0,{{*lowerBound,root}}} });
return c;
}
// TODO: Make version and key splits based on contents of merged list, if keeping history
// If everything in the page was deleted then this page should be deleted as of the new version
// Note that if a single range clear covered the entire page then we should not get this far
if(merged.empty()) {
// TODO: For multi version mode only delete this page as of the new version
VersionedChildrenT c({});
debug_printf("%p All leaf page contents were cleared, returning %s\n", THIS, toString(c).c_str());
return c;
}
IPager *pager = self->m_pager;
std::vector<BoundaryAndPage> pages = buildPages(true, *lowerBound, *upperBound, merged, BTreePage::IS_LEAF, [pager](){ return pager->newPageBuffer(); }, self->m_usablePageSizeOverride);
// If there isn't still just a single page of data then this page became too large and was split.
// The new split pages will be valid as of minVersion, but the old page remains valid at the old version
// (TODO: unless history isn't being kept at all)
if(pages.size() != 1) {
results.push_back( {0, {{*lowerBound, root}}} );
if(!self->singleVersion) {
// If there isn't still just a single page of data then this page became too large and was split.
// The new split pages will be valid as of minVersion, but the old page remains valid at the old version
if(pages.size() != 1) {
results.push_back( {0, {{*lowerBound, root}}} );
debug_printf("%p Added versioned child set: %s\n", THIS, toString(results.back()).c_str());
}
else {
// The page was updated but not size-split or version-split so the last page version's data
// can be replaced with the new page contents
if(pages.size() == 1)
minVersion = 0;
}
}
if(pages.size() == 1)
minVersion = 0;
// Write page(s), get new page IDs
std::vector<LogicalPageID> newPageIDs = self->writePages(pages, minVersion, root, page, upperBound, THIS);
Version writeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : minVersion;
std::vector<LogicalPageID> newPageIDs = self->writePages(pages, writeVersion, root, page, upperBound, THIS);
// If this commitSubtree() is operating on the root, write new levels if needed until until we're returning a single page
if(root == self->m_root && pages.size() > 1) {
debug_printf("%p Building new root\n", THIS);
self->buildNewRoot(minVersion, pages, newPageIDs, page);
self->buildNewRoot(writeVersion, pages, newPageIDs, page);
}
results.push_back({minVersion, {}});
// TODO: Can this be moved into writePages?
// TODO: This can probably be skipped for root
results.push_back({writeVersion, {}});
for(int i=0; i<pages.size(); i++) {
// The lower bound of the first page is the lower bound of the subtree, not the first entry in the page
const RedwoodRecordRef &lower = (i == 0) ? *lowerBound : pages[i].lowerBound;
debug_printf("%p Adding page to results: %s => %d\n", THIS, lower.toString().c_str(), newPageIDs[i]);
results.back().second.push_back( {lower, newPageIDs[i]} );
}
debug_printf("%p Added versioned child set: %s\n", THIS, toString(results.back()).c_str());
debug_printf("%p DONE.\n", THIS);
return results;
@ -1290,10 +1412,40 @@ private:
const RedwoodRecordRef &childUpperBound = cursor.moveNext() ? cursor.get() : *upperBound;
debug_printf("lower '%s'\n", childLowerBound.toString().c_str());
debug_printf("upper '%s'\n", childUpperBound.toString().c_str());
debug_printf("%p lower '%s'\n", THIS, childLowerBound.toString().c_str());
debug_printf("%p upper '%s'\n", THIS, childUpperBound.toString().c_str());
ASSERT(childLowerBound <= childUpperBound);
/*
// TODO: If lower bound and upper bound have the same key, do something intelligent if possible
//
if(childLowerBound.key == childUpperBound.key) {
if(key is modified or cleared) {
if(self->singleVersion) {
// In single version mode, don't keep any records with the old key if the key is modified, so return
// an empty page set to replace the child page
futureChildren.push_back(VersionedChildrenT({ {0,{} } }));
}
else {
// In versioned mode, there is no need to recurse to this page because new versions of key
// will go in the right most page that has the same lowerBound key, but since the key is
// being changed the new version of this page should exclude the old subtree
}
else {
// Return the child page as-is, no need to visit it
futureChildren.push_back(VersionedChildrenT({ {0,{{childLowerBound, pageID}}} }));
}
}
else {
// No changes
futureChildren.push_back(VersionedChildrenT({ {0,{{childLowerBound, pageID}}} }));
}
}
else {
futureChildren.push_back(self->commitSubtree(self, mutationBuffer, snapshot, pageID, &childLowerBound, &childUpperBound));
}
*/
futureChildren.push_back(self->commitSubtree(self, mutationBuffer, snapshot, pageID, &childLowerBound, &childUpperBound));
childPageIDs.push_back(pageID);
}
@ -1306,23 +1458,37 @@ private:
}
bool modified = false;
if(REDWOOD_DEBUG) {
debug_printf("%p Subtree update results for id=%d\n", THIS, root);
for(int i = 0; i < futureChildren.size(); ++i) {
const VersionedChildrenT &children = futureChildren[i].get();
debug_printf("%p subtree for child page id=%u: %s\n", THIS, childPageIDs[i], toString(children).c_str());
}
}
for(int i = 0; i < futureChildren.size(); ++i) {
const VersionedChildrenT &children = futureChildren[i].get();
// Handle multipages
if(children.size() != 1 || children[0].second.size() != 1) {
// If the merge resulted in 1 versioned child set with exactly one child
// page, and its id is the same as the original, then no changes were made.
if(!(children.size() == 1 && children.front().second.size() == 1 && children.front().second.front().second == childPageIDs[i])) {
modified = true;
break;
}
}
if(!modified) {
debug_printf("%p not modified.\n", THIS);
return VersionedChildrenT({{0, {{*lowerBound, root}}}});
VersionedChildrenT c({{0, {{*lowerBound, root}}}});
debug_printf("%p not modified, returning %s\n", THIS, toString(c).c_str());
return c;
}
Version version = 0;
VersionedChildrenT result;
debug_printf("%p: Internal page %u modified, creating replacements.\n", THIS, root);
loop { // over version splits of this page
Version nextVersion = std::numeric_limits<Version>::max();
@ -1337,40 +1503,57 @@ private:
for(int i = 0; i < futureChildren.size(); ++i) {
LogicalPageID pageID = childPageIDs[i];
const VersionedChildrenT &children = futureChildren[i].get();
debug_printf("%p Versioned page set that replaced Page id=%d: %lu versions\n", THIS, pageID, children.size());
for(auto &versionedPageSet : children) {
debug_printf("%p version: Page id=%lld\n", THIS, versionedPageSet.first);
for(auto &boundaryPage : versionedPageSet.second) {
debug_printf("%p '%s' -> Page id=%u\n", THIS, boundaryPage.first.toString().c_str(), boundaryPage.second);
}
}
// Find the first version greater than the current version we are writing
auto cv = std::upper_bound( children.begin(), children.end(), version, [](Version a, VersionedChildrenT::value_type const &b) { return a < b.first; } );
// If there are no versions before the one we found, just update nextVersion and continue.
if(cv == children.begin()) {
debug_printf("%p First version (%lld) in set is greater than current, setting nextVersion and continuing\n", THIS, cv->first);
nextVersion = std::min(nextVersion, cv->first);
debug_printf("%p curr %lld next %lld\n", THIS, version, nextVersion);
if(children.empty()) {
modified = true;
continue;
}
// If a version greater than the current version being written was found, update nextVersion
if(cv != children.end()) {
nextVersion = std::min(nextVersion, cv->first);
debug_printf("%p curr %lld next %lld\n", THIS, version, nextVersion);
debug_printf("%p Versioned page set that replaced Page id=%d: %lu versions\n", THIS, pageID, children.size());
if(REDWOOD_DEBUG) {
for(auto &versionedPageSet : children) {
debug_printf("%p version %lld\n", THIS, versionedPageSet.first);
for(auto &boundaryPage : versionedPageSet.second) {
debug_printf("%p '%s' -> Page id=%u\n", THIS, boundaryPage.first.toString().c_str(), boundaryPage.second);
}
}
}
// Go back one to the last version that was valid prior to or at the current version we are writing
--cv;
vector<VersionedBTree::VersionedKeyToPageSetT>::const_iterator cv;
if(self->singleVersion) {
ASSERT(children.size() == 1);
cv = children.begin();
nextVersion = std::numeric_limits<Version>::max();
}
else {
// Find the first version greater than the current version we are writing
cv = std::upper_bound( children.begin(), children.end(), version, [](Version a, VersionedChildrenT::value_type const &b) { return a < b.first; } );
// If there are no versions before the one we found, just update nextVersion and continue.
if(cv == children.begin()) {
debug_printf("%p First version (%lld) in set is greater than current, setting nextVersion and continuing\n", THIS, cv->first);
nextVersion = std::min(nextVersion, cv->first);
debug_printf("%p curr %lld next %lld\n", THIS, version, nextVersion);
continue;
}
// If a version greater than the current version being written was found, update nextVersion
if(cv != children.end()) {
nextVersion = std::min(nextVersion, cv->first);
debug_printf("%p curr %lld next %lld\n", THIS, version, nextVersion);
}
// Go back one to the last version that was valid prior to or at the current version we are writing
--cv;
}
debug_printf("%p Using children for version %lld from this set, building version %lld\n", THIS, cv->first, version);
// If page count isn't 1 then the root is definitely modified
modified = modified || cv->second.size() != 1;
// TODO: If page ID changed in singleVersion mode then root is modified
// Add the children at this version to the child entries list for the current version being built.
for (auto &childPage : cv->second) {
debug_printf("%p Adding child page %s\n", THIS, childPage.first.toString().c_str());
@ -1383,6 +1566,15 @@ private:
debug_printf("%p Finished pass through futurechildren. childEntries=%lu version=%lld nextVersion=%lld\n", THIS, childEntries.size(), version, nextVersion);
if(modified) {
// If all children were deleted then this page should be deleted as of the new version
// Note that if a single range clear covered the entire page then we should not get this far
if(childEntries.empty()) {
// TODO: delete page as of new version
VersionedChildrenT c({});
debug_printf("%p All internal page children were deleted, returning %s\n", THIS, toString(c).c_str());
return c;
}
// TODO: Track split points across iterations of this loop, so that they don't shift unnecessarily and
// cause unnecessary path copying
@ -1390,14 +1582,15 @@ private:
std::vector<BoundaryAndPage> pages = buildPages(false, *lowerBound, *upperBound, childEntries, 0, [pager](){ return pager->newPageBuffer(); }, self->m_usablePageSizeOverride);
// Write page(s), use version 0 to replace latest version if only writing one page
std::vector<LogicalPageID> newPageIDs = self->writePages(pages, version, root, page, upperBound, THIS);
Version writeVersion = self->singleVersion ? self->getLastCommittedVersion() + 1 : version;
std::vector<LogicalPageID> newPageIDs = self->writePages(pages, writeVersion, root, page, upperBound, THIS);
// If this commitSubtree() is operating on the root, write new levels if needed until until we're returning a single page
if(root == self->m_root)
self->buildNewRoot(version, pages, newPageIDs, page);
self->buildNewRoot(writeVersion, pages, newPageIDs, page);
result.resize(result.size()+1);
result.back().first = version;
result.back().first = writeVersion;
for(int i=0; i<pages.size(); i++)
result.back().second.push_back( {pages[i].lowerBound, newPageIDs[i]} );
@ -1407,10 +1600,12 @@ private:
debug_printf("%p Output same as last version, popping it.\n", THIS);
result.pop_back();
}
debug_printf("%p Added versioned child set: %s\n", THIS, toString(result.back()).c_str());
}
else {
debug_printf("%p Version 0 has no changes\n", THIS);
result.push_back({0, {{*lowerBound, root}}});
debug_printf("%p Added versioned child set: %s\n", THIS, toString(result.back()).c_str());
}
if (nextVersion == std::numeric_limits<Version>::max())
@ -1538,9 +1733,18 @@ private:
std::string toString() const {
std::string r;
Reference<PageCursor> c = pageCursor;
int maxDepth = 0;
while(c) {
r = format("[%s] ", c->toString().c_str()) + r;
c = c->parent;
++maxDepth;
}
c = pageCursor;
int depth = maxDepth;
while(c) {
r = format("[%d/%d: %s] ", depth--, maxDepth, c->toString().c_str()) + r;
c = c->parent;
}
return r;
@ -1710,8 +1914,8 @@ private:
// KeyValueRefs returned become invalid once the cursor is moved
class Cursor : public IStoreCursor, public ReferenceCounted<Cursor>, public FastAllocated<Cursor>, NonCopyable {
public:
Cursor(Reference<IPagerSnapshot> pageSource, LogicalPageID root, int usablePageSizeOverride)
: m_version(pageSource->getVersion()),
Cursor(Reference<IPagerSnapshot> pageSource, LogicalPageID root, Version recordVersion, int usablePageSizeOverride)
: m_version(recordVersion),
m_cur1(pageSource, root, usablePageSizeOverride),
m_cur2(m_cur1)
{
@ -1768,7 +1972,7 @@ private:
r += format(" KV: '%s' -> '%s'\n", m_kv.get().key.printable().c_str(), m_kv.get().value.printable().c_str());
}
else {
r += " KV: <np>";
r += " KV: <np>\n";
}
r += format(" Cur1: %s\n", m_cur1.toString().c_str());
r += format(" Cur2: %s\n", m_cur2.toString().c_str());
@ -1924,23 +2128,21 @@ private:
// Split value, need to coalesce split value parts into a buffer in arena,
// after which cur1 will point to the first part and kv.key will reference its key
const RedwoodRecordRef::ValuePart &part = rec.valuePart.get();
ASSERT(part.start + rec.value.get().size() == part.total);
ASSERT(rec.chunk.start + rec.value.get().size() == rec.chunk.total);
debug_printf("readFullKVPair: Split, totalsize %d %s\n", part.total, self->toString().c_str());
debug_printf("readFullKVPair: Split, totalsize %d %s\n", rec.chunk.total, self->toString().c_str());
// Allocate space for the entire value in the same arena as the key
state int bytesLeft = part.total;
state int bytesLeft = rec.chunk.total;
state StringRef dst = makeString(bytesLeft, self->m_arena);
loop {
const RedwoodRecordRef &rec = self->m_cur1.get();
const RedwoodRecordRef::ValuePart &part = rec.valuePart.get();
debug_printf("readFullKVPair: Adding chunk %s\n", rec.toString().c_str());
int partSize = rec.value.get().size();
memcpy(mutateString(dst) + part.start, rec.value.get().begin(), partSize);
memcpy(mutateString(dst) + rec.chunk.start, rec.value.get().begin(), partSize);
bytesLeft -= partSize;
if(bytesLeft == 0) {
self->m_kv = KeyValueRef(rec.key, dst);
@ -1977,7 +2179,7 @@ public:
KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) : m_filePrefix(filePrefix) {
// TODO: This constructor should really just take an IVersionedStore
IPager *pager = new IndirectShadowPager(filePrefix);
m_tree = new VersionedBTree(pager, filePrefix, pager->getUsablePageSize());
m_tree = new VersionedBTree(pager, filePrefix, true, pager->getUsablePageSize());
m_init = catchError(init_impl(this));
}
@ -2164,28 +2366,26 @@ KeyValue randomKV(int keySize = 10, int valueSize = 5) {
return kv;
}
ACTOR Future<int> verifyRandomRange(VersionedBTree *btree, Version v, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount) {
ACTOR Future<int> verifyRange(VersionedBTree *btree, Key start, Key end, Version v, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount) {
state int errors = 0;
state Key start = randomKV().key;
state Key end = randomKV().key;
if(end <= start)
end = keyAfter(start);
debug_printf("VerifyRange '%s' to '%s' @%lld\n", printable(start).c_str(), printable(end).c_str(), v);
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator i = written->lower_bound(std::make_pair(start.toString(), 0));
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iEnd = written->upper_bound(std::make_pair(end.toString(), 0));
state std::map<std::pair<std::string, Version>, Optional<std::string>>::const_iterator iLast;
state Reference<IStoreCursor> cur = btree->readAtVersion(v);
debug_printf("VerifyRange(@%lld, %s, %s): Start cur=%p\n", v, start.toString().c_str(), end.toString().c_str(), cur.getPtr());
// Randomly use the cursor for something else first.
if(g_random->coinflip()) {
debug_printf("VerifyRange: Dummy seek\n");
debug_printf("VerifyRange(@%lld, %s, %s): Dummy seek\n", v, start.toString().c_str(), end.toString().c_str());
state Key randomKey = randomKV().key;
wait(g_random->coinflip() ? cur->findFirstEqualOrGreater(randomKey, true, 0) : cur->findLastLessOrEqual(randomKey, true, 0));
}
debug_printf("VerifyRange: Actual seek\n");
debug_printf("VerifyRange(@%lld, %s, %s): Actual seek\n", v, start.toString().c_str(), end.toString().c_str());
wait(cur->findFirstEqualOrGreater(start, true, 0));
state std::vector<KeyValue> results;
@ -2255,9 +2455,10 @@ ACTOR Future<int> verifyRandomRange(VersionedBTree *btree, Version v, std::map<s
printf("VerifyRange(@%lld, %s, %s) ERROR: Tree range ended but written has @%lld '%s'\n", v, start.toString().c_str(), end.toString().c_str(), iLast->first.second, iLast->first.first.c_str());
}
debug_printf("VerifyRangeReverse '%s' to '%s' @%lld\n", printable(start).c_str(), printable(end).c_str(), v);
// Randomly use a new cursor for the revere range read
if(g_random->coinflip()) {
debug_printf("VerifyRangeReverse(@%lld, %s, %s): start\n", v, start.toString().c_str(), end.toString().c_str());
// Randomly use a new cursor for the reverse range read but only if version history is available
if(!btree->isSingleVersion() && g_random->coinflip()) {
cur = btree->readAtVersion(v);
}
@ -2345,13 +2546,24 @@ ACTOR Future<int> verifyAll(VersionedBTree *btree, Version maxCommittedVersion,
}
ACTOR Future<Void> verify(VersionedBTree *btree, FutureStream<Version> vStream, std::map<std::pair<std::string, Version>, Optional<std::string>> *written, int *pErrorCount) {
state Future<int> vall;
state Future<int> vrange;
try {
loop {
state Version v = waitNext(vStream);
debug_printf("Verifying through version %lld\n", v);
state Future<int> vall = verifyAll(btree, v, written, pErrorCount);
state Future<int> vrange = verifyRandomRange(btree, g_random->randomInt(1, v + 1), written, pErrorCount);
if(btree->isSingleVersion()) {
v = btree->getLastCommittedVersion();
debug_printf("Verifying at latest committed version %lld\n", v);
vall = verifyRange(btree, LiteralStringRef(""), LiteralStringRef("\xff\xff"), v, written, pErrorCount);
vrange = verifyRange(btree, randomKV().key, randomKV().key, v, written, pErrorCount);
}
else {
debug_printf("Verifying through version %lld\n", v);
vall = verifyAll(btree, v, written, pErrorCount);
vrange = verifyRange(btree, randomKV().key, randomKV().key, g_random->randomInt(1, v + 1), written, pErrorCount);
}
wait(success(vall) && success(vrange));
int errors = vall.get() + vrange.get();
@ -2375,7 +2587,10 @@ ACTOR Future<Void> randomReader(VersionedBTree *btree) {
loop {
wait(yield());
if(!cur || g_random->random01() > .1) {
Version v = g_random->randomInt(1, btree->getLastCommittedVersion() + 1);
Version v = btree->getLastCommittedVersion();
if(!btree->isSingleVersion()) {
v = g_random->randomInt(1, v + 1);
}
cur = btree->readAtVersion(v);
}
@ -2513,10 +2728,8 @@ TEST_CASE("!/redwood/mutableDeltaTreeCorrectnessRedwoodRecord") {
if(g_random->coinflip()) {
rec.value = StringRef(arena, v);
if(g_random->coinflip()) {
RedwoodRecordRef::ValuePart part;
part.start = g_random->randomInt(0, 5000);
part.total = part.start + v.size() + g_random->randomInt(0, 5000);
rec.valuePart = part;
rec.chunk.start = g_random->randomInt(0, 5000);
rec.chunk.total = rec.chunk.start + v.size() + g_random->randomInt(0, 5000);
}
}
items.push_back(rec);
@ -2679,7 +2892,8 @@ TEST_CASE("!/redwood/correctness") {
printf("Initializing...\n");
state int pageSize = g_random->coinflip() ? pager->getUsablePageSize() : g_random->randomInt(200, 400);
state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, pageSize);
state bool singleVersion = g_random->random01() < .05;
state VersionedBTree *btree = new VersionedBTree(pager, pagerFile, singleVersion, pageSize);
wait(btree->init());
// We must be able to fit at least two any two keys plus overhead in a page to prevent
@ -2847,7 +3061,7 @@ TEST_CASE("!/redwood/correctness") {
debug_printf("Reopening btree\n");
IPager *pager = new IndirectShadowPager(pagerFile);
btree = new VersionedBTree(pager, pagerFile, pageSize);
btree = new VersionedBTree(pager, pagerFile, singleVersion, pageSize);
wait(btree->init());
Version v = wait(btree->getLatestVersion());
@ -2895,7 +3109,7 @@ ACTOR Future<Void> pointReads(VersionedBTree *btree, int count, int nodeCount) {
state Reference<IStoreCursor> cur = btree->readAtVersion(readVer);
while(c < count) {
//cur = btree->readAtVersion(readVer);
*(uint32_t *)k.begin() = g_random->randomInt(0, nodeCount);
*(uint32_t *)k.begin() = bigEndian32(g_random->randomInt(0, nodeCount));
wait(success(cur->findFirstEqualOrGreater(k, true, 0)));
++c;
}
@ -2911,15 +3125,18 @@ TEST_CASE("!/redwood/performance/set") {
deleteFile(pagerFile);
deleteFile(pagerFile + "0.pagerlog");
deleteFile(pagerFile + "1.pagerlog");
IPager *pager = new IndirectShadowPager(pagerFile);
state VersionedBTree *btree = new VersionedBTree(pager, "unittest_pageFile");
state bool singleVersion = true;
state VersionedBTree *btree = new VersionedBTree(pager, "unittest_pageFile", singleVersion);
wait(btree->init());
state int nodeCount = 1e9;
state int maxChangesPerVersion = 1000;
state int64_t kvBytesTarget = 100e6;
int maxKeySize = 19;
int maxValueSize = 1000;
state int64_t kvBytesTarget = 200e6;
int maxKeySize = 50;
int maxValueSize = 100;
state int maxConsecutiveRun = 6;
state std::string key(maxKeySize, 'k');
state std::string value(maxValueSize, 'v');
@ -2936,10 +3153,14 @@ TEST_CASE("!/redwood/performance/set") {
state Version version = lastVer + 1;
btree->setWriteVersion(version);
int changes = g_random->randomInt(0, maxChangesPerVersion);
int baseKey = g_random->randomInt(0, nodeCount);
while(changes--) {
KeyValue kv;
// Change first 4 bytes of key to an int
*(uint32_t *)key.data() = g_random->randomInt(0, nodeCount);
*(uint32_t *)key.data() = bigEndian32(baseKey++);
if(g_random->randomInt(0, maxConsecutiveRun) == 0) {
baseKey = g_random->randomInt(0, nodeCount);
}
kv.key = StringRef((uint8_t *)key.data(), g_random->randomInt(10, key.size()));
kv.value = StringRef((uint8_t *)value.data(), g_random->randomInt(0, value.size()));
btree->set(kv);