1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-23 07:20:33 +08:00

WIP : deployable/stable version since Nov 3. Start rebase to master branch

This commit is contained in:
mengranwo 2019-01-09 18:03:54 -08:00
parent 5d8928d17f
commit f597aa7e18
15 changed files with 1365 additions and 263 deletions

@ -460,7 +460,7 @@ void initHelp() {
"clear a range of keys from the database",
"All keys between BEGINKEY (inclusive) and ENDKEY (exclusive) are cleared from the database. This command will succeed even if the specified range is empty, but may fail because of conflicts." ESCAPINGK);
helpMap["configure"] = CommandHelp(
"configure [new] <single|double|triple|three_data_hall|three_datacenter|ssd|memory|proxies=<PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*",
"configure [new] <single|double|triple|three_data_hall|three_datacenter|ssd|memory|memory-radixtree|proxies=<PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*",
"change the database configuration",
"The `new' option, if present, initializes a new database with the given configuration rather than changing the configuration of an existing one. When used, both a redundancy mode and a storage engine must be specified.\n\nRedundancy mode:\n single - one copy of the data. Not fault tolerant.\n double - two copies of data (survive one failure).\n triple - three copies of data (survive two failures).\n three_data_hall - See the Admin Guide.\n three_datacenter - See the Admin Guide.\n\nStorage engine:\n ssd - B-Tree storage engine optimized for solid state disks.\n memory - Durable in-memory storage engine for small datasets.\n\nproxies=<PROXIES>: Sets the desired number of proxies in the cluster. Must be at least 1, or set to -1 which restores the number of proxies to the default value.\n\nlogs=<LOGS>: Sets the desired number of log servers in the cluster. Must be at least 1, or set to -1 which restores the number of logs to the default value.\n\nresolvers=<RESOLVERS>: Sets the desired number of resolvers in the cluster. Must be at least 1, or set to -1 which restores the number of resolvers to the default value.\n\nSee the FoundationDB Administration Guide for more information.");
helpMap["fileconfigure"] = CommandHelp(
@ -2377,7 +2377,7 @@ void onoff_generator(const char* text, const char *line, std::vector<std::string
}
void configure_generator(const char* text, const char *line, std::vector<std::string>& lc) {
const char* opts[] = {"new", "single", "double", "triple", "three_data_hall", "three_datacenter", "ssd", "ssd-1", "ssd-2", "memory", "memory-1", "memory-2", "proxies=", "logs=", "resolvers=", NULL};
const char* opts[] = {"new", "single", "double", "triple", "three_data_hall", "three_datacenter", "ssd", "ssd-1", "ssd-2", "memory", "memory-1", "memory-2", "memory-radixtree", "proxies=", "logs=", "resolvers=", NULL};
array_generator(text, line, opts, lc);
}

@ -269,6 +269,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["storage_engine"] = "ssd-redwood-experimental";
} else if( tLogDataStoreType == KeyValueStoreType::MEMORY && storageServerStoreType == KeyValueStoreType::MEMORY ) {
result["storage_engine"] = "memory-1";
} else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::MEMORY_RADIXTREE ) {
result["storage_engine"] = "memory-radixtree";
} else if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::MEMORY ) {
result["storage_engine"] = "memory-2";
} else {
@ -409,10 +411,17 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
type = std::min((int)TLogVersion::MAX_SUPPORTED, type);
tLogVersion = (TLogVersion::Version)type;
}
else if (ck == LiteralStringRef("log_engine")) { parse((&type), value); tLogDataStoreType = (KeyValueStoreType::StoreType)type;
else if (ck == LiteralStringRef("log_engine")) {
parse((&type), value);
tLogDataStoreType = (KeyValueStoreType::StoreType)type;
// TODO: Remove this once Redwood works as a log engine
if(tLogDataStoreType == KeyValueStoreType::SSD_REDWOOD_V1)
if(tLogDataStoreType == KeyValueStoreType::SSD_REDWOOD_V1) {
tLogDataStoreType = KeyValueStoreType::SSD_BTREE_V2;
}
// TODO: Remove this once memroy radix tree works as a log engine
if(tLogDataStoreType == KeyValueStoreType::MEMORY_RADIXTREE) {
tLogDataStoreType = KeyValueStoreType::SSD_BTREE_V2;
}
}
else if (ck == LiteralStringRef("log_spill")) { parse((&type), value); tLogSpillType = (TLogSpillType::SpillType)type; }
else if (ck == LiteralStringRef("storage_engine")) { parse((&type), value); storageServerStoreType = (KeyValueStoreType::StoreType)type; }

@ -648,6 +648,7 @@ struct KeyValueStoreType {
MEMORY,
SSD_BTREE_V2,
SSD_REDWOOD_V1,
MEMORY_RADIXTREE,
END
};
@ -667,6 +668,7 @@ struct KeyValueStoreType {
case SSD_BTREE_V2: return "ssd-2";
case SSD_REDWOOD_V1: return "ssd-redwood-experimental";
case MEMORY: return "memory";
case MEMORY_RADIXTREE: return "memory-radixtree";
default: return "unknown";
}
}

@ -100,6 +100,9 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
} else if (mode == "memory-1") {
logType = KeyValueStoreType::MEMORY;
storeType= KeyValueStoreType::MEMORY;
} else if (mode == "memory-radixtree") {
logType = KeyValueStoreType::SSD_BTREE_V2;
storeType= KeyValueStoreType::MEMORY_RADIXTREE;
}
// Add any new store types to fdbserver/workloads/ConfigureDatabase, too
@ -474,7 +477,6 @@ ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std:
}
}
}
if (creating) {
tr.setOption( FDBTransactionOptions::INITIALIZE_NEW_DATABASE );
tr.addReadConflictRange( singleKeyRange( initIdKey ) );

@ -68,6 +68,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"kvstore_available_bytes":12341234,
"kvstore_free_bytes":12341234,
"kvstore_total_bytes":12341234,
"kvstore_total_size":12341234,
"kvstore_total_nodes":12341234,
"kvstore_inline_keys":12341234,
"durable_bytes":{
"hz":0.0,
"counter":0,
@ -405,7 +408,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"description":"abc"
}
],
)statusSchema" R"statusSchema(
)statusSchema"
R"statusSchema(
"recovery_state":{
"required_resolvers":1,
"required_proxies":1,
@ -574,7 +578,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"ssd-redwood-experimental",
"memory",
"memory-1",
"memory-2"
"memory-2",
"memory-radixtree"
]},
"coordinators_count":1,
"excluded_servers":[

@ -51,12 +51,18 @@ public:
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) = 0;
//Returns the amount of free and total space for this store, in bytes
// Returns the amount of free and total space for this store, in bytes
virtual std::tuple<size_t, size_t, size_t> getSize() { return std::make_tuple(0, 0, 0); }
//Returns how many key & value pairs have been inserted and how many nodes have been created
virtual StorageBytes getStorageBytes() = 0;
virtual void resyncLog() {}
virtual void enableSnapshot() {}
// For debug, print out detailed node info
virtual void printData() {}
/*
Concurrency contract
Causal consistency:
@ -81,7 +87,9 @@ protected:
extern IKeyValueStore* keyValueStoreSQLite( std::string const& filename, UID logID, KeyValueStoreType storeType, bool checkChecksums=false, bool checkIntegrity=false );
extern IKeyValueStore* keyValueStoreRedwoodV1( std::string const& filename, UID logID);
extern IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit, std::string ext = "fdq");
extern IKeyValueStore* keyValueStoreMemory(std::string const& basename, UID logID, int64_t memoryLimit,
std::string ext = "fdq",
KeyValueStoreType storeType = KeyValueStoreType::MEMORY);
extern IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery );
inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string const& filename, UID logID, int64_t memoryLimit, bool checkChecksums=false, bool checkIntegrity=false ) {
@ -94,6 +102,8 @@ inline IKeyValueStore* openKVStore( KeyValueStoreType storeType, std::string con
return keyValueStoreMemory( filename, logID, memoryLimit );
case KeyValueStoreType::SSD_REDWOOD_V1:
return keyValueStoreRedwoodV1( filename, logID );
case KeyValueStoreType::MEMORY_RADIXTREE:
return keyValueStoreMemory(filename, logID, memoryLimit, "fdr", KeyValueStoreType::MEMORY_RADIXTREE); // for radixTree type, set file ext to "fdr"
default:
UNREACHABLE();
}

@ -20,7 +20,8 @@
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/IDiskQueue.h"
#include "flow/IndexedSet.h"
#include "flow/IKeyValueContainer.h"
#include "flow/RadixTree.h"
#include "flow/ActorCollection.h"
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
@ -28,50 +29,44 @@
#define OP_DISK_OVERHEAD (sizeof(OpHeader) + 1)
//Stored in the IndexedSets that hold the database.
//Each KeyValueMapPair is 32 bytes, excluding arena memory.
//It is stored in an IndexedSet<KeyValueMapPair, uint64_t>::Node, for a total size of 72 bytes.
struct KeyValueMapPair {
Arena arena; //8 Bytes (excluding arena memory)
KeyRef key; //12 Bytes
ValueRef value; //12 Bytes
void operator= ( KeyValueMapPair const& rhs ) { arena = rhs.arena; key = rhs.key; value = rhs.value; }
KeyValueMapPair( KeyValueMapPair const& rhs ) : arena(rhs.arena), key(rhs.key), value(rhs.value) {}
KeyValueMapPair(KeyRef key, ValueRef value) : arena(key.expectedSize() + value.expectedSize()), key(arena, key), value(arena, value) { }
bool operator<(KeyValueMapPair const& r) const { return key < r.key; }
bool operator==(KeyValueMapPair const& r) const { return key == r.key; }
bool operator!=(KeyValueMapPair const& r) const { return key != r.key; }
};
template <class CompatibleWithKey>
bool operator<(KeyValueMapPair const& l, CompatibleWithKey const& r) { return l.key < r; }
template <class CompatibleWithKey>
bool operator<(CompatibleWithKey const& l, KeyValueMapPair const& r) { return l < r.key; }
extern bool noUnseed;
template <typename Container>
class KeyValueStoreMemory : public IKeyValueStore, NonCopyable {
public:
KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery );
KeyValueStoreMemory(IDiskQueue* log, UID id, int64_t memoryLimit, KeyValueStoreType storeType, bool disableSnapshot,
bool replaceContent, bool exactRecovery);
// IClosable
virtual Future<Void> getError() { return log->getError(); }
virtual Future<Void> onClosed() { return log->onClosed(); }
virtual void dispose() { recovering.cancel(); log->dispose(); delete this; }
virtual void close() { recovering.cancel(); log->close(); delete this; }
virtual void dispose() {
recovering.cancel();
log->dispose();
if (reserved_buffer != nullptr) {
delete[] reserved_buffer;
reserved_buffer = nullptr;
}
delete this;
}
virtual void close() {
recovering.cancel();
log->close();
if (reserved_buffer != nullptr) {
delete[] reserved_buffer;
reserved_buffer = nullptr;
}
delete this;
}
// IKeyValueStore
virtual KeyValueStoreType getType() { return KeyValueStoreType::MEMORY; }
virtual KeyValueStoreType getType() { return type; }
virtual std::tuple<size_t, size_t, size_t> getSize() { return data.size(); }
int64_t getAvailableSize() {
int64_t residentSize =
data.sumTo(data.end()) +
queue.totalSize() + // doesn't account for overhead in queue
transactionSize;
int64_t residentSize = data.sumTo(data.end()) + queue.totalSize() + // doesn't account for overhead in queue
transactionSize;
return memoryLimit - residentSize;
}
@ -82,22 +77,25 @@ public:
// Try to bound how many in-memory bytes we might need to write to disk if we commit() now
int64_t uncommittedBytes = queue.totalSize() + transactionSize;
//Check that we have enough space in memory and on disk
// Check that we have enough space in memory and on disk
int64_t freeSize = std::min(getAvailableSize(), diskQueueBytes.free / 4 - uncommittedBytes);
int64_t availableSize = std::min(getAvailableSize(), diskQueueBytes.available / 4 - uncommittedBytes);
int64_t totalSize = std::min(memoryLimit, diskQueueBytes.total / 4 - uncommittedBytes);
return StorageBytes(std::max((int64_t)0, freeSize), std::max((int64_t)0, totalSize), diskQueueBytes.used,
std::max((int64_t)0, availableSize));
std::max((int64_t)0, availableSize));
}
void semiCommit() {
transactionSize += queue.totalSize();
if(transactionSize > 0.5 * committedDataSize) {
if (transactionSize > 0.5 * committedDataSize) {
transactionIsLarge = true;
TraceEvent("KVSMemSwitchingToLargeTransactionMode", id).detail("TransactionSize", transactionSize).detail("DataSize", committedDataSize);
TraceEvent("KVSMemSwitchingToLargeTransactionMode", id)
.detail("TransactionSize", transactionSize)
.detail("DataSize", committedDataSize);
TEST(true); // KeyValueStoreMemory switching to large transaction mode
TEST(committedDataSize > 1e3); // KeyValueStoreMemory switching to large transaction mode with committed data
TEST(committedDataSize >
1e3); // KeyValueStoreMemory switching to large transaction mode with committed data
}
int64_t bytesWritten = commit_queue(queue, true);
@ -105,33 +103,28 @@ public:
}
virtual void set(KeyValueRef keyValue, const Arena* arena) {
//A commit that occurs with no available space returns Never, so we can throw out all modifications
if(getAvailableSize() <= 0)
return;
// A commit that occurs with no available space returns Never, so we can throw out all modifications
if (getAvailableSize() <= 0) return;
if(transactionIsLarge) {
KeyValueMapPair pair(keyValue.key, keyValue.value);
data.insert(pair, pair.arena.getSize() + data.getElementBytes());
}
else {
if (transactionIsLarge) {
data.insert(keyValue.key, keyValue.value);
} else {
queue.set(keyValue, arena);
if(recovering.isReady() && !disableSnapshot) {
if (recovering.isReady() && !disableSnapshot) {
semiCommit();
}
}
}
virtual void clear(KeyRangeRef range, const Arena* arena) {
//A commit that occurs with no available space returns Never, so we can throw out all modifications
if(getAvailableSize() <= 0)
return;
// A commit that occurs with no available space returns Never, so we can throw out all modifications
if (getAvailableSize() <= 0) return;
if(transactionIsLarge) {
if (transactionIsLarge) {
data.erase(data.lower_bound(range.begin), data.lower_bound(range.end));
}
else {
} else {
queue.clear(range, arena);
if(recovering.isReady() && !disableSnapshot) {
if (recovering.isReady() && !disableSnapshot) {
semiCommit();
}
}
@ -143,37 +136,37 @@ public:
return Never();
}
if(recovering.isError()) throw recovering.getError();
if(!recovering.isReady())
return waitAndCommit(this, sequential);
if (recovering.isError()) throw recovering.getError();
if (!recovering.isReady()) return waitAndCommit(this, sequential);
if(!disableSnapshot && replaceContent && !firstCommitWithSnapshot) {
if (!disableSnapshot && replaceContent && !firstCommitWithSnapshot) {
transactionSize += SERVER_KNOBS->REPLACE_CONTENTS_BYTES;
committedWriteBytes += SERVER_KNOBS->REPLACE_CONTENTS_BYTES;
semiCommit();
}
if(transactionIsLarge) {
if (transactionIsLarge) {
fullSnapshot(data);
resetSnapshot = true;
committedWriteBytes = notifiedCommittedWriteBytes.get();
overheadWriteBytes = 0;
if(disableSnapshot) {
if (disableSnapshot) {
return Void();
}
log_op(OpCommit, StringRef(), StringRef());
}
else {
} else {
int64_t bytesWritten = commit_queue(queue, !disableSnapshot, sequential);
if(disableSnapshot) {
if (disableSnapshot) {
return Void();
}
if(bytesWritten > 0 || committedWriteBytes > notifiedCommittedWriteBytes.get()) {
committedWriteBytes += bytesWritten + overheadWriteBytes + OP_DISK_OVERHEAD; //OP_DISK_OVERHEAD is for the following log_op(OpCommit)
notifiedCommittedWriteBytes.set(committedWriteBytes); //This set will cause snapshot items to be written, so it must happen before the OpCommit
if (bytesWritten > 0 || committedWriteBytes > notifiedCommittedWriteBytes.get()) {
committedWriteBytes += bytesWritten + overheadWriteBytes +
OP_DISK_OVERHEAD; // OP_DISK_OVERHEAD is for the following log_op(OpCommit)
notifiedCommittedWriteBytes.set(committedWriteBytes); // This set will cause snapshot items to be
// written, so it must happen before the OpCommit
log_op(OpCommit, StringRef(), StringRef());
overheadWriteBytes = log->getCommitOverhead();
}
@ -186,55 +179,62 @@ public:
transactionIsLarge = false;
firstCommitWithSnapshot = false;
addActor.send( commitAndUpdateVersions( this, c, previousSnapshotEnd ) );
addActor.send(commitAndUpdateVersions(this, c, previousSnapshotEnd));
return c;
}
virtual Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) {
if(recovering.isError()) throw recovering.getError();
virtual Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) {
if (recovering.isError()) throw recovering.getError();
if (!recovering.isReady()) return waitAndReadValue(this, key);
auto it = data.find(key);
if (it == data.end()) return Optional<Value>();
return Optional<Value>(it->value);
return Optional<Value>(it.getValue());
}
virtual Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID = Optional<UID>() ) {
if(recovering.isError()) throw recovering.getError();
virtual Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength,
Optional<UID> debugID = Optional<UID>()) {
if (recovering.isError()) throw recovering.getError();
if (!recovering.isReady()) return waitAndReadValuePrefix(this, key, maxLength);
auto it = data.find(key);
if (it == data.end()) return Optional<Value>();
auto val = it->value;
if(maxLength < val.size()) {
auto val = it.getValue();
if (maxLength < val.size()) {
return Optional<Value>(val.substr(0, maxLength));
}
else {
} else {
return Optional<Value>(val);
}
}
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
if(recovering.isError()) throw recovering.getError();
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange(KeyRangeRef keys, int rowLimit = 1 << 30,
int byteLimit = 1 << 30) {
if (recovering.isError()) throw recovering.getError();
if (!recovering.isReady()) return waitAndReadRange(this, keys, rowLimit, byteLimit);
Standalone<VectorRef<KeyValueRef>> result;
if (rowLimit >= 0) {
auto it = data.lower_bound(keys.begin);
while (it!=data.end() && it->key < keys.end && rowLimit && byteLimit>=0) {
byteLimit -= sizeof(KeyValueRef) + it->key.size() + it->value.size();
result.push_back_deep( result.arena(), KeyValueRef(it->key, it->value) );
while (it != data.end() && rowLimit && byteLimit >= 0) {
StringRef tempKey = it.getKey(reserved_buffer, CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
if (tempKey >= keys.end) break;
byteLimit -= sizeof(KeyValueRef) + tempKey.size() + it.getValue().size();
result.push_back_deep(result.arena(), KeyValueRef(tempKey, it.getValue()));
++it;
--rowLimit;
}
} else {
rowLimit = -rowLimit;
auto it = data.previous( data.lower_bound(keys.end) );
while (it!=data.end() && it->key >= keys.begin && rowLimit && byteLimit>=0) {
byteLimit -= sizeof(KeyValueRef) + it->key.size() + it->value.size();
result.push_back_deep( result.arena(), KeyValueRef(it->key, it->value) );
auto it = data.previous(data.lower_bound(keys.end));
while (it != data.end() && rowLimit && byteLimit >= 0) {
StringRef tempKey = it.getKey(reserved_buffer, CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
if (tempKey < keys.begin) break;
byteLimit -= sizeof(KeyValueRef) + tempKey.size() + it.getValue().size();
result.push_back_deep(result.arena(), KeyValueRef(tempKey, it.getValue()));
it = data.previous(it);
--rowLimit;
}
@ -243,14 +243,12 @@ public:
}
virtual void resyncLog() {
ASSERT( recovering.isReady() );
ASSERT(recovering.isReady());
resetSnapshot = true;
log_op(OpSnapshotAbort, StringRef(), StringRef());
}
virtual void enableSnapshot() {
disableSnapshot = false;
}
virtual void enableSnapshot() { disableSnapshot = false; }
private:
enum OpType {
@ -260,18 +258,16 @@ private:
OpSnapshotItem,
OpSnapshotEnd,
OpSnapshotAbort, // terminate an in progress snapshot in order to start a full snapshot
OpCommit, // only in log, not in queue
OpRollback // only in log, not in queue
OpCommit, // only in log, not in queue
OpRollback // only in log, not in queue
};
struct OpRef {
OpType op;
StringRef p1, p2;
OpRef() {}
OpRef(Arena& a, OpRef const& o) : op(o.op), p1(a,o.p1), p2(a,o.p2) {}
size_t expectedSize() {
return p1.expectedSize() + p2.expectedSize();
}
OpRef(Arena& a, OpRef const& o) : op(o.op), p1(a, o.p1), p2(a, o.p2) {}
size_t expectedSize() { return p1.expectedSize() + p2.expectedSize(); }
};
struct OpHeader {
int op;
@ -279,7 +275,7 @@ private:
};
struct OpQueue {
OpQueue() : numBytes(0) { }
OpQueue() : numBytes(0) {}
int totalSize() const { return numBytes; }
@ -289,61 +285,60 @@ private:
arenas.clear();
}
void rollback() {
clear();
}
void rollback() { clear(); }
void set( KeyValueRef keyValue, const Arena* arena = NULL ) {
void set(KeyValueRef keyValue, const Arena* arena = NULL) {
queue_op(OpSet, keyValue.key, keyValue.value, arena);
}
void clear( KeyRangeRef range, const Arena* arena = NULL ) {
queue_op(OpClear, range.begin, range.end, arena);
}
void clear(KeyRangeRef range, const Arena* arena = NULL) { queue_op(OpClear, range.begin, range.end, arena); }
void clear_to_end( StringRef fromKey, const Arena* arena = NULL ) {
void clear_to_end(StringRef fromKey, const Arena* arena = NULL) {
queue_op(OpClearToEnd, fromKey, StringRef(), arena);
}
void queue_op( OpType op, StringRef p1, StringRef p2, const Arena* arena ) {
void queue_op(OpType op, StringRef p1, StringRef p2, const Arena* arena) {
numBytes += p1.size() + p2.size() + sizeof(OpHeader) + sizeof(OpRef);
OpRef r; r.op = op; r.p1 = p1; r.p2 = p2;
if(arena == NULL) {
operations.push_back_deep( operations.arena(), r );
OpRef r;
r.op = op;
r.p1 = p1;
r.p2 = p2;
if (arena == NULL) {
operations.push_back_deep(operations.arena(), r);
} else {
operations.push_back( operations.arena(), r );
operations.push_back(operations.arena(), r);
arenas.push_back(*arena);
}
}
const OpRef* begin() {
return operations.begin();
}
const OpRef* begin() { return operations.begin(); }
const OpRef* end() {
return operations.end();
}
const OpRef* end() { return operations.end(); }
private:
Standalone<VectorRef<OpRef>> operations;
uint64_t numBytes;
std::vector<Arena> arenas;
private:
Standalone<VectorRef<OpRef>> operations;
uint64_t numBytes;
std::vector<Arena> arenas;
};
KeyValueStoreType type;
UID id;
IndexedSet< KeyValueMapPair, uint64_t > data;
Container data;
// reserved buffer for snapshot/fullsnapshot
uint8_t* reserved_buffer;
OpQueue queue; // mutations not yet commit()ted
IDiskQueue *log;
IDiskQueue* log;
Future<Void> recovering, snapshotting;
int64_t committedWriteBytes;
int64_t overheadWriteBytes;
NotifiedVersion notifiedCommittedWriteBytes;
Key recoveredSnapshotKey; // After recovery, the next key in the currently uncompleted snapshot
IDiskQueue::location currentSnapshotEnd; //The end of the most recently completed snapshot (this snapshot cannot be discarded)
IDiskQueue::location previousSnapshotEnd; //The end of the second most recently completed snapshot (on commit, this snapshot can be discarded)
IDiskQueue::location
currentSnapshotEnd; // The end of the most recently completed snapshot (this snapshot cannot be discarded)
IDiskQueue::location previousSnapshotEnd; // The end of the second most recently completed snapshot (on commit, this
// snapshot can be discarded)
PromiseStream<Future<Void>> addActor;
Future<Void> commitActors;
@ -351,49 +346,47 @@ private:
int64_t transactionSize;
bool transactionIsLarge;
bool resetSnapshot; //Set to true after a fullSnapshot is performed. This causes the regular snapshot mechanism to restart
bool resetSnapshot; // Set to true after a fullSnapshot is performed. This causes the regular snapshot mechanism to
// restart
bool disableSnapshot;
bool replaceContent;
bool firstCommitWithSnapshot;
int snapshotCount;
int64_t memoryLimit; //The upper limit on the memory used by the store (excluding, possibly, some clear operations)
int64_t memoryLimit; // The upper limit on the memory used by the store (excluding, possibly, some clear operations)
std::vector<std::pair<KeyValueMapPair, uint64_t>> dataSets;
int64_t commit_queue(OpQueue &ops, bool log, bool sequential = false) {
int64_t commit_queue(OpQueue& ops, bool log, bool sequential = false) {
int64_t total = 0, count = 0;
IDiskQueue::location log_location = 0;
for(auto o = ops.begin(); o != ops.end(); ++o) {
for (auto o = ops.begin(); o != ops.end(); ++o) {
++count;
total += o->p1.size() + o->p2.size() + OP_DISK_OVERHEAD;
if (o->op == OpSet) {
KeyValueMapPair pair(o->p1, o->p2);
if(sequential) {
if (sequential) {
KeyValueMapPair pair(o->p1, o->p2);
dataSets.push_back(std::make_pair(pair, pair.arena.getSize() + data.getElementBytes()));
} else {
data.insert( pair, pair.arena.getSize() + data.getElementBytes() );
data.insert(o->p1, o->p2);
}
}
else if (o->op == OpClear) {
if(sequential) {
} else if (o->op == OpClear) {
if (sequential) {
data.insert(dataSets);
dataSets.clear();
}
data.erase( data.lower_bound(o->p1), data.lower_bound(o->p2) );
}
else if (o->op == OpClearToEnd) {
if(sequential) {
data.erase(data.lower_bound(o->p1), data.lower_bound(o->p2));
} else if (o->op == OpClearToEnd) {
if (sequential) {
data.insert(dataSets);
dataSets.clear();
}
data.erase( data.lower_bound(o->p1), data.end() );
}
else ASSERT(false);
if ( log )
log_location = log_op( o->op, o->p1, o->p2 );
data.erase(data.lower_bound(o->p1), data.end());
} else
ASSERT(false);
if (log) log_location = log_op(o->op, o->p1, o->p2);
}
if(sequential) {
if (sequential) {
data.insert(dataSets);
dataSets.clear();
}
@ -413,11 +406,11 @@ private:
}
IDiskQueue::location log_op(OpType op, StringRef v1, StringRef v2) {
OpHeader h = {(int)op, v1.size(), v2.size()};
log->push( StringRef((const uint8_t*)&h, sizeof(h)) );
log->push( v1 );
log->push( v2 );
return log->push( LiteralStringRef("\x01") ); // Changes here should be reflected in OP_DISK_OVERHEAD
OpHeader h = { (int)op, v1.size(), v2.size() };
log->push(StringRef((const uint8_t*)&h, sizeof(h)));
log->push(v1);
log->push(v2);
return log->push(LiteralStringRef("\x01")); // Changes here should be reflected in OP_DISK_OVERHEAD
}
ACTOR static Future<Void> recover( KeyValueStoreMemory* self, bool exactRecovery ) {
@ -592,26 +585,27 @@ private:
}
}
//Snapshots an entire data set
void fullSnapshot( IndexedSet< KeyValueMapPair, uint64_t> &snapshotData ) {
// Snapshots an entire data set
void fullSnapshot(Container& snapshotData) {
previousSnapshotEnd = log_op(OpSnapshotAbort, StringRef(), StringRef());
replaceContent = false;
//Clear everything since we are about to write the whole database
// Clear everything since we are about to write the whole database
log_op(OpClearToEnd, allKeys.begin, StringRef());
int count = 0;
int64_t snapshotSize = 0;
for(auto kv = snapshotData.begin(); kv != snapshotData.end(); ++kv) {
log_op(OpSnapshotItem, kv->key, kv->value);
snapshotSize += kv->key.size() + kv->value.size() + OP_DISK_OVERHEAD;
for (auto kv = snapshotData.begin(); kv != snapshotData.end(); ++kv) {
StringRef tempKey = kv.getKey(reserved_buffer, CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
log_op(OpSnapshotItem, tempKey, kv.getValue());
snapshotSize += tempKey.size() + kv.getValue().size() + OP_DISK_OVERHEAD;
++count;
}
TraceEvent("FullSnapshotEnd", id)
.detail("PreviousSnapshotEndLoc", previousSnapshotEnd)
.detail("SnapshotSize", snapshotSize)
.detail("SnapshotElements", count);
.detail("PreviousSnapshotEndLoc", previousSnapshotEnd)
.detail("SnapshotSize", snapshotSize)
.detail("SnapshotElements", count);
currentSnapshotEnd = log_op(OpSnapshotEnd, StringRef(), StringRef());
}
@ -620,7 +614,7 @@ private:
wait(self->recovering);
state Key nextKey = self->recoveredSnapshotKey;
state bool nextKeyAfter = false; //setting this to true is equilvent to setting nextKey = keyAfter(nextKey)
state bool nextKeyAfter = false; // setting this to true is equilvent to setting nextKey = keyAfter(nextKey)
state uint64_t snapshotTotalWrittenBytes = 0;
state int lastDiff = 0;
state int snapItems = 0;
@ -631,7 +625,7 @@ private:
loop {
wait( self->notifiedCommittedWriteBytes.whenAtLeast( snapshotTotalWrittenBytes + 1 ) );
if(self->resetSnapshot) {
if (self->resetSnapshot) {
nextKey = Key();
nextKeyAfter = false;
snapItems = 0;
@ -641,16 +635,16 @@ private:
auto next = nextKeyAfter ? self->data.upper_bound(nextKey) : self->data.lower_bound(nextKey);
int diff = self->notifiedCommittedWriteBytes.get() - snapshotTotalWrittenBytes;
if( diff > lastDiff && diff > 5e7 )
if (diff > lastDiff && diff > 5e7)
TraceEvent(SevWarnAlways, "ManyWritesAtOnce", self->id)
.detail("CommittedWrites", self->notifiedCommittedWriteBytes.get())
.detail("SnapshotWrites", snapshotTotalWrittenBytes)
.detail("Diff", diff)
.detail("LastOperationWasASnapshot", nextKey == Key() && !nextKeyAfter);
.detail("CommittedWrites", self->notifiedCommittedWriteBytes.get())
.detail("SnapshotWrites", snapshotTotalWrittenBytes)
.detail("Diff", diff)
.detail("LastOperationWasASnapshot", nextKey == Key() && !nextKeyAfter);
lastDiff = diff;
if (next == self->data.end()) {
auto thisSnapshotEnd = self->log_op( OpSnapshotEnd, StringRef(), StringRef() );
auto thisSnapshotEnd = self->log_op(OpSnapshotEnd, StringRef(), StringRef());
//TraceEvent("SnapshotEnd", self->id)
// .detail("LastKey", lastKey.present() ? lastKey.get() : LiteralStringRef("<none>"))
// .detail("CurrentSnapshotEndLoc", self->currentSnapshotEnd)
@ -664,7 +658,7 @@ private:
self->previousSnapshotEnd = self->currentSnapshotEnd;
self->currentSnapshotEnd = thisSnapshotEnd;
if(++self->snapshotCount == 2) {
if (++self->snapshotCount == 2) {
self->replaceContent = false;
}
nextKey = Key();
@ -675,11 +669,12 @@ private:
snapshotTotalWrittenBytes += OP_DISK_OVERHEAD;
} else {
self->log_op( OpSnapshotItem, next->key, next->value );
nextKey = next->key;
StringRef tempKey = next.getKey(self->reserved_buffer, CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
self->log_op(OpSnapshotItem, tempKey, next.getValue());
nextKey = tempKey;
nextKeyAfter = true;
snapItems++;
uint64_t opBytes = next->key.size() + next->value.size() + OP_DISK_OVERHEAD;
uint64_t opBytes = tempKey.size() + next.getValue().size() + OP_DISK_OVERHEAD;
snapshotBytes += opBytes;
snapshotTotalWrittenBytes += opBytes;
}
@ -710,21 +705,40 @@ private:
}
};
KeyValueStoreMemory::KeyValueStoreMemory( IDiskQueue* log, UID id, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery )
: log(log), id(id), previousSnapshotEnd(-1), currentSnapshotEnd(-1), resetSnapshot(false), memoryLimit(memoryLimit), committedWriteBytes(0), overheadWriteBytes(0),
committedDataSize(0), transactionSize(0), transactionIsLarge(false), disableSnapshot(disableSnapshot), replaceContent(replaceContent), snapshotCount(0), firstCommitWithSnapshot(true)
{
recovering = recover( this, exactRecovery );
snapshotting = snapshot( this );
commitActors = actorCollection( addActor.getFuture() );
template <typename Container>
KeyValueStoreMemory<Container>::KeyValueStoreMemory(IDiskQueue* log, UID id, int64_t memoryLimit,
KeyValueStoreType storeType, bool disableSnapshot,
bool replaceContent, bool exactRecovery)
: log(log), id(id), type(storeType), previousSnapshotEnd(-1), currentSnapshotEnd(-1), resetSnapshot(false),
memoryLimit(memoryLimit), committedWriteBytes(0), overheadWriteBytes(0), committedDataSize(0), transactionSize(0),
transactionIsLarge(false), disableSnapshot(disableSnapshot), replaceContent(replaceContent), snapshotCount(0),
firstCommitWithSnapshot(true) {
// create reserved buffer for radixtree store type
this->reserved_buffer =
(storeType == KeyValueStoreType::MEMORY) ? nullptr : new uint8_t[CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT];
if (this->reserved_buffer != nullptr) memset(this->reserved_buffer, 0, CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
recovering = recover(this, exactRecovery);
snapshotting = snapshot(this);
commitActors = actorCollection(addActor.getFuture());
}
IKeyValueStore* keyValueStoreMemory( std::string const& basename, UID logID, int64_t memoryLimit, std::string ext ) {
TraceEvent("KVSMemOpening", logID).detail("Basename", basename).detail("MemoryLimit", memoryLimit);
IKeyValueStore* keyValueStoreMemory(std::string const& basename, UID logID, int64_t memoryLimit, std::string ext,
KeyValueStoreType storeType) {
TraceEvent("KVSMemOpening", logID)
.detail("Basename", basename)
.detail("MemoryLimit", memoryLimit)
.detail("StoreType", storeType);
IDiskQueue *log = openDiskQueue( basename, ext, logID, DiskQueueVersion::V1 );
return new KeyValueStoreMemory( log, logID, memoryLimit, false, false, false );
if(storeType == KeyValueStoreType::MEMORY_RADIXTREE){
return new KeyValueStoreMemory<radix_tree>(log, logID, memoryLimit, storeType, false, false, false);
} else {
return new KeyValueStoreMemory<IKeyValueContainer>(log, logID, memoryLimit, storeType, false, false, false);
}
}
IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery ) {
return new KeyValueStoreMemory( queue, logID, memoryLimit, disableSnapshot, replaceContent, exactRecovery );
return new KeyValueStoreMemory<IKeyValueContainer>(queue, logID, memoryLimit, KeyValueStoreType::MEMORY,
disableSnapshot, replaceContent, exactRecovery);
}

@ -804,17 +804,18 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
if (deterministicRandom()->random01() < 0.25) db.desiredTLogCount = deterministicRandom()->randomInt(1,7);
if (deterministicRandom()->random01() < 0.25) db.masterProxyCount = deterministicRandom()->randomInt(1,7);
if (deterministicRandom()->random01() < 0.25) db.resolverCount = deterministicRandom()->randomInt(1,7);
if (deterministicRandom()->random01() < 0.5) {
set_config("ssd");
} else {
set_config("memory");
}
// if (deterministicRandom()->random01() < 0.5) {
// set_config("ssd");
// } else {
// set_config("memory");
// }
// set_config("memory");
set_config("memory-radixtree");
if(simple) {
db.desiredTLogCount = 1;
db.masterProxyCount = 1;
db.resolverCount = 1;
}
int replication_type = simple ? 1 : ( std::max(minimumReplication, datacenters > 4 ? deterministicRandom()->randomInt(1,3) : std::min(deterministicRandom()->randomInt(0,6), 3)) );
switch (replication_type) {
case 0: {

@ -449,6 +449,9 @@ struct RolesInfo {
obj.setKeyRawNumber("kvstore_free_bytes", storageMetrics.getValue("KvstoreBytesFree"));
obj.setKeyRawNumber("kvstore_available_bytes", storageMetrics.getValue("KvstoreBytesAvailable"));
obj.setKeyRawNumber("kvstore_total_bytes", storageMetrics.getValue("KvstoreBytesTotal"));
obj.setKeyRawNumber("kvstore_total_size", storageMetrics.getValue("KvstoreSizeTotal"));
obj.setKeyRawNumber("kvstore_total_nodes", storageMetrics.getValue("KvstoreNodeTotal"));
obj.setKeyRawNumber("kvstore_inline_keys", storageMetrics.getValue("KvstoreInlineKey"));
obj["input_bytes"] = StatusCounter(storageMetrics.getValue("BytesInput")).getStatus();
obj["durable_bytes"] = StatusCounter(storageMetrics.getValue("BytesDurable")).getStatus();
obj.setKeyRawNumber("query_queue_max", storageMetrics.getValue("QueryQueueMax"));

@ -170,6 +170,7 @@ struct StorageServerDisk {
KeyValueStoreType getKeyValueStoreType() { return storage->getType(); }
StorageBytes getStorageBytes() { return storage->getStorageBytes(); }
std::tuple<size_t, size_t, size_t> getSize() { return storage->getSize(); }
private:
struct StorageServer* data;
@ -514,6 +515,9 @@ public:
specialCounter(cc, "KvstoreBytesFree", [self](){ return self->storage.getStorageBytes().free; });
specialCounter(cc, "KvstoreBytesAvailable", [self](){ return self->storage.getStorageBytes().available; });
specialCounter(cc, "KvstoreBytesTotal", [self](){ return self->storage.getStorageBytes().total; });
specialCounter(cc, "KvstoreSizeTotal", [self]() { return std::get<0>(self->storage.getSize()); });
specialCounter(cc, "KvstoreNodeTotal", [self]() { return std::get<1>(self->storage.getSize()); });
specialCounter(cc, "KvstoreInlineKey", [self]() { return std::get<2>(self->storage.getSize()); });
}
} counters;

@ -216,6 +216,7 @@ StringRef tlogQueueExtension = LiteralStringRef("fdq");
std::pair<KeyValueStoreType, std::string> bTreeV1Suffix = std::make_pair( KeyValueStoreType::SSD_BTREE_V1, ".fdb" );
std::pair<KeyValueStoreType, std::string> bTreeV2Suffix = std::make_pair(KeyValueStoreType::SSD_BTREE_V2, ".sqlite");
std::pair<KeyValueStoreType, std::string> memorySuffix = std::make_pair( KeyValueStoreType::MEMORY, "-0.fdq" );
std::pair<KeyValueStoreType, std::string> memoryRTSuffix = std::make_pair( KeyValueStoreType::MEMORY_RADIXTREE, "-0.fdr" );
std::pair<KeyValueStoreType, std::string> redwoodSuffix = std::make_pair( KeyValueStoreType::SSD_REDWOOD_V1, ".redwood" );
std::string validationFilename = "_validate";
@ -225,7 +226,7 @@ std::string filenameFromSample( KeyValueStoreType storeType, std::string folder,
return joinPath( folder, sample_filename );
else if ( storeType == KeyValueStoreType::SSD_BTREE_V2 )
return joinPath(folder, sample_filename);
else if( storeType == KeyValueStoreType::MEMORY )
else if( storeType == KeyValueStoreType::MEMORY || KeyValueStoreType::MEMORY_RADIXTREE )
return joinPath( folder, sample_filename.substr(0, sample_filename.size() - 5) );
else if ( storeType == KeyValueStoreType::SSD_REDWOOD_V1 )
@ -238,7 +239,7 @@ std::string filenameFromId( KeyValueStoreType storeType, std::string folder, std
return joinPath( folder, prefix + id.toString() + ".fdb" );
else if (storeType == KeyValueStoreType::SSD_BTREE_V2)
return joinPath(folder, prefix + id.toString() + ".sqlite");
else if( storeType == KeyValueStoreType::MEMORY )
else if( storeType == KeyValueStoreType::MEMORY || KeyValueStoreType::MEMORY_RADIXTREE)
return joinPath( folder, prefix + id.toString() + "-" );
else if (storeType == KeyValueStoreType::SSD_REDWOOD_V1)
return joinPath(folder, prefix + id.toString() + ".redwood");
@ -387,6 +388,8 @@ std::vector< DiskStore > getDiskStores( std::string folder ) {
result.insert( result.end(), result2.begin(), result2.end() );
auto result3 = getDiskStores( folder, redwoodSuffix.second, redwoodSuffix.first);
result.insert( result.end(), result3.begin(), result3.end() );
auto result4 = getDiskStores( folder, memoryRTSuffix.second, memoryRTSuffix.first );
result.insert( result.end(), result4.begin(), result4.end() );
return result;
}
@ -1277,10 +1280,11 @@ ACTOR Future<Void> workerServer(
}
else if (d.storeType == KeyValueStoreType::SSD_REDWOOD_V1) {
included = fileExists(d.filename + "0.pagerlog") && fileExists(d.filename + "1.pagerlog");
}
else {
ASSERT(d.storeType == KeyValueStoreType::MEMORY);
} else if (d.storeType == KeyValueStoreType::MEMORY) {
included = fileExists(d.filename + "1.fdq");
} else {
ASSERT(d.storeType == KeyValueStoreType::MEMORY_RADIXTREE);
included = fileExists(d.filename + "1.fdr");
}
if(d.storedComponent == DiskStore::COMPONENT::TLogData && included) {
included = false;

@ -167,7 +167,7 @@ ACTOR Future<Void> testKVRead( KVTest* test, Key key, Histogram<float>* latency,
//ASSERT( s1 <= v || test->get(key, s1)==v ); // Plan A
ASSERT( s2 <= v || test->get(key, s2)==v ); // Causal consistency
ASSERT( v <= test->lastCommit ); // read committed
//ASSERT( v <= test->lastSet ); // read uncommitted
// ASSERT( v <= test->lastSet ); // read uncommitted
return Void();
}
@ -182,7 +182,7 @@ ACTOR Future<Void> testKVReadSaturation( KVTest* test, Histogram<float>* latency
}
ACTOR Future<Void> testKVCommit( KVTest* test, Histogram<float>* latency, PerfIntCounter* count ) {
state Version v = test->lastSet;
state Version v = test->lastSet;
test->lastCommit = v;
state double begin = timer();
wait( test->store->commit() );
@ -265,6 +265,8 @@ ACTOR Future<Void> testKVStoreMain( KVStoreTestWorkload* workload, KVTest* ptest
state char* extraValue = new char[extraBytes];
memset(extraValue, '.', extraBytes);
wait(delay(10));
if (workload->doCount) {
state int64_t count = 0;
state Key k;
@ -283,81 +285,108 @@ ACTOR Future<Void> testKVStoreMain( KVStoreTestWorkload* workload, KVTest* ptest
if (workload->doSetup) {
wr << Version(0);
wr.serializeBytes(extraValue, extraBytes);
printf("Building %d nodes: ", workload->nodeCount);
state double setupBegin = timer();
state Future<Void> lastCommit = Void();
for(i=0; i<workload->nodeCount; i++) {
test.store->set( KeyValueRef( test.makeKey( i ), wr.toValue() ) );
if (!((i+1) % 10000) || i+1==workload->nodeCount) {
wait( lastCommit );
test.store->set( KeyValueRef( test.makeKey( i ), test.makeKey( i )) ) ;
if (!((i+1) % 10000) || i+1==workload->nodeCount) {
wait(lastCommit);
lastCommit = test.store->commit();
printf("ETA: %f seconds\n", (timer()-setupBegin) / i * (workload->nodeCount-i));
}
}
wait( lastCommit );
printf("ETA: %f seconds\n", (timer()-setupBegin) / i * (workload->nodeCount-i));
}
}
wait( lastCommit );
for(i=0; i<workload->nodeCount; i++) {
test.store->set( KeyValueRef( test.makeKey( i ), test.makeKey( i* 10)) ) ;
if (!((i+1) % 10000) || i+1==workload->nodeCount) {
wait(lastCommit);
lastCommit = test.store->commit();
printf("ETA: %f seconds\n", (timer()-setupBegin) / i * (workload->nodeCount-i));
}
}
wait( lastCommit );
for(i=0; i<workload->nodeCount; i++) {
Optional<Value> result = wait(test.store->readValue(test.makeKey( i )));
if(result.present()){
ASSERT(result.get() == test.makeKey( i* 10));
} else {
ASSERT (false);
}
}
workload->setupTook = timer()-setupBegin;
TraceEvent("KVStoreSetup").detail("Count", workload->nodeCount).detail("Took", workload->setupTook);
TraceEvent("KVStoreSetup").detail("Count", workload->nodeCount).detail("Took", workload->setupTook).detail("AvailableMemory", test.store->getStorageBytes().free);
}
state double t = now();
state double stopAt = t + workload->testDuration;
if (workload->saturation) {
if (workload->commitFraction) {
while (now() < stopAt) {
for(int s=0; s<1/workload->commitFraction; s++)
{
++test.lastSet;
BinaryWriter wr(Unversioned()); wr << test.lastSet;
wr.serializeBytes(extraValue, extraBytes);
test.set( KeyValueRef( test.randomKey(), wr.toValue() ) );
++workload->sets;
}
++commitsStarted;
wait( testKVCommit( &test, &workload->commitLatency, &workload->commits ) );
}
} else {
vector<Future<Void>> actors;
for(int a=0; a<100; a++)
actors.push_back( testKVReadSaturation( &test, &workload->readLatency, &workload->reads ) );
wait( timeout( waitForAll(actors), workload->testDuration, Void() ) );
}
} else {
while (t < stopAt) {
double end = now();
loop {
t += 1.0 / workload->operationsPerSecond;
double op = deterministicRandom()->random01();
if (op < workload->commitFraction) {
// Commit
if (workload->commits.getValue() == commitsStarted) {
++commitsStarted;
ac.add( testKVCommit( &test, &workload->commitLatency, &workload->commits ) );
}
} else if (op < workload->commitFraction+workload->setFraction) {
// Set
++test.lastSet;
BinaryWriter wr(Unversioned()); wr << test.lastSet;
wr.serializeBytes(extraValue, extraBytes);
test.set( KeyValueRef( test.randomKey(), wr.toValue() ) );
++workload->sets;
} else {
// Read
ac.add( testKVRead( &test, test.randomKey(), &workload->readLatency, &workload->reads ) );
}
if (t >= end) break;
}
wait( delayUntil(t) );
}
}
// state double stopAt = t + workload->testDuration;
// if (workload->saturation) {
// if (workload->commitFraction) {
// while (now() < stopAt) {
// for(int s=0; s<1/workload->commitFraction; s++)
// {
// ++test.lastSet;
// BinaryWriter wr(Unversioned()); wr << test.lastSet;
// wr.serializeBytes(extraValue, extraBytes);
// test.set( KeyValueRef( test.randomKey(), wr.toStringRef() ) );
// ++workload->sets;
// }
// ++commitsStarted;
// Void _ = wait( testKVCommit( &test, &workload->commitLatency, &workload->commits ) );
// }
// } else {
// vector<Future<Void>> actors;
// for(int a=0; a<100; a++)
// actors.push_back( testKVReadSaturation( &test, &workload->readLatency, &workload->reads ) );
// Void _ = wait( timeout( waitForAll(actors), workload->testDuration, Void() ) );
// }
// } else {
// while (t < stopAt) {
// double end = now();
// loop {
// t += 1.0 / workload->operationsPerSecond;
// double op = g_random->random01();
// if (op < workload->commitFraction) {
// // Commit
// if (workload->commits.getValue() == commitsStarted) {
// ++commitsStarted;
// ac.add( testKVCommit( &test, &workload->commitLatency, &workload->commits ) );
// }
// } else if (op < workload->commitFraction+workload->setFraction) {
// // Set
// ++test.lastSet;
// BinaryWriter wr(Unversioned()); wr << test.lastSet;
// wr.serializeBytes(extraValue, extraBytes);
// test.set( KeyValueRef( test.randomKey(), wr.toStringRef() ) );
// ++workload->sets;
// } else {
// // Read
// ac.add( testKVRead( &test, test.randomKey(), &workload->readLatency, &workload->reads ) );
// }
// if (t >= end) break;
// }
// Void _ = wait( delayUntil(t) );
// }
// }
if (workload->doClear) {
state int chunk = 1000000;
state int chunk = 50;
t = timer();
for(i = 0; i < workload->nodeCount; i+=chunk) {
for (i = 0; i < workload->nodeCount - chunk; i += chunk) {
test.store->clear( KeyRangeRef( test.makeKey( i ), test.makeKey( i + chunk ) ) );
wait( test.store->commit() );
}
for (i = workload->nodeCount - 50; i < workload->nodeCount; i++) {
Optional<Value> result = wait(test.store->readValue(test.makeKey(i)));
if (result.present()) {
ASSERT(result.get() == test.makeKey(i * 10));
} else {
ASSERT(false);
}
}
TraceEvent("KVStoreClear").detail("Took", timer() - t);
}
@ -383,6 +412,8 @@ ACTOR Future<Void> testKVStore(KVStoreTestWorkload* workload) {
test.store = keyValueStoreRedwoodV1( fn, id );
else if (workload->storeType == "memory")
test.store = keyValueStoreMemory( fn, id, 500e6 );
else if (workload->storeType == "memory-radixtree")
test.store = keyValueStoreMemory(fn, id, 500e6, "fdr", KeyValueStoreType::MEMORY_RADIXTREE);
else
ASSERT(false);

98
flow/IKeyValueContainer.h Normal file

@ -0,0 +1,98 @@
/*
* IKeyValueContainer.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef IKEYVALUECONTAINER_H
#define IKEYVALUECONTAINER_H
#pragma once
#include "IndexedSet.h"
// Stored in the IndexedSets that hold the database.
// Each KeyValueMapPair is 32 bytes, excluding arena memory.
// It is stored in an IndexedSet<KeyValueMapPair, uint64_t>::Node, for a total size of 72 bytes.
struct KeyValueMapPair {
Arena arena; // 8 Bytes (excluding arena memory)
KeyRef key; // 12 Bytes
ValueRef value; // 12 Bytes
void operator=(KeyValueMapPair const& rhs) {
arena = rhs.arena;
key = rhs.key;
value = rhs.value;
}
KeyValueMapPair(KeyValueMapPair const& rhs) : arena(rhs.arena), key(rhs.key), value(rhs.value) {}
KeyValueMapPair(KeyRef key, ValueRef value)
: arena(key.expectedSize() + value.expectedSize()), key(arena, key), value(arena, value) {}
bool operator<(KeyValueMapPair const& r) const { return key < r.key; }
bool operator==(KeyValueMapPair const& r) const { return key == r.key; }
bool operator!=(KeyValueMapPair const& r) const { return key != r.key; }
};
template <class CompatibleWithKey>
bool operator<(KeyValueMapPair const& l, CompatibleWithKey const& r) {
return l.key < r;
}
template <class CompatibleWithKey>
bool operator<(CompatibleWithKey const& l, KeyValueMapPair const& r) {
return l < r.key;
}
class IKeyValueContainer {
public:
typedef typename IndexedSet<KeyValueMapPair, uint64_t>::iterator iterator;
IKeyValueContainer() = default;
~IKeyValueContainer() = default;
bool empty() { return data.empty(); }
void clear() { return data.clear(); }
std::tuple<size_t, size_t, size_t> size() { return std::make_tuple(0, 0, 0); }
iterator find(const StringRef& key) { return data.find(key); }
iterator begin() { return data.begin(); }
iterator end() { return data.end(); }
iterator lower_bound(const StringRef& key) { return data.lower_bound(key); }
iterator upper_bound(const StringRef& key) { return data.upper_bound(key); }
iterator previous(iterator i) const { return data.previous(i); }
void erase(iterator begin, iterator end) { data.erase(begin, end); }
iterator insert(const StringRef& key, const StringRef& val, bool replaceExisting = true) {
KeyValueMapPair pair(key, val);
return data.insert(pair, pair.arena.getSize() + data.getElementBytes(), replaceExisting);
}
int insert(const std::vector<std::pair<KeyValueMapPair, uint64_t>>& pairs, bool replaceExisting = true) {
return data.insert(pairs, replaceExisting);
}
uint64_t sumTo(iterator to) { return data.sumTo(to); }
static int getElementBytes() { return IndexedSet<KeyValueMapPair, uint64_t>::getElementBytes(); }
private:
IKeyValueContainer(IKeyValueContainer const&); // unimplemented
void operator=(IKeyValueContainer const&); // unimplemented
IndexedSet<KeyValueMapPair, uint64_t> data;
};
#endif

@ -55,6 +55,8 @@ class Future;
class Void;
class StringRef;
template <class T, class Metric>
struct IndexedSet{
typedef T value_type;
@ -92,6 +94,10 @@ public:
void decrementNonEnd();
bool operator == ( const iterator& r ) const { return i == r.i; }
bool operator != ( const iterator& r ) const { return i != r.i; }
// following two methods are for memory storage engine(KeyValueStoreMemory class) use only
// in order to have same interface as radixtree
StringRef& getKey(uint8_t* dummyContent, int dummyLen) const { return i->data.key; }
StringRef& getValue() const { return i->data.value; }
};
IndexedSet() : root(NULL) {};

913
flow/RadixTree.h Normal file

@ -0,0 +1,913 @@
/*
* RadixTree.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW__RADIXTREE_H
#define FLOW__RADIXTREE_H
#pragma once
#include <cassert>
#include <string>
#include <utility>
#include <vector>
#include <iostream>
#include <functional>
#include <map>
#include <stdexcept>
#include "Arena.h"
// forward declaration
const int LEAF_BYTE = -1;
const int INLINE_KEY_SIZE = sizeof(StringRef);
StringRef radix_substr(const StringRef& key, int begin, int num) {
int size = key.size();
if (begin > size) {
throw std::out_of_range("out of range in radix_substr<StringRef>");
}
if((begin + num) > size) {
num = size - begin;
}
return key.substr(begin, num);
}
StringRef radix_join(const StringRef& key1, const StringRef& key2, Arena& arena) {
int rsize = key1.size() + key2.size();
uint8_t* s = new (arena) uint8_t[ rsize ];
memcpy(s, key1.begin(), key1.size());
memcpy(s + key1.size(), key2.begin(), key2.size());
return StringRef(s, rsize);
}
StringRef radix_constructStr(const StringRef& key, int begin, int num, Arena& arena) {
int size = key.size();
if (begin > size) {
throw std::out_of_range("out of range in radix_substr<StringRef>");
}
if((begin + num) > size) {
num = size - begin;
}
return StringRef(arena, key.substr(begin, num));
}
class radix_tree {
public:
typedef std::size_t size_type;
private:
// union used inside both base node and leaf node
union inlineUnion {
inlineUnion() : data() {} // constructor
uint8_t inlineData[INLINE_KEY_SIZE];
StringRef data;
};
struct node {
// constructor for all kinds of node (root/internal/leaf)
node() : m_is_leaf(0), m_is_inline(0), m_inline_length(0), m_depth(0), key(), arena(), m_parent(NULL) {}
node(const node&) = delete; // delete
node& operator=(const node& other) {
m_is_leaf = other.m_is_leaf;
m_is_inline = other.m_is_inline;
m_inline_length = other.m_inline_length;
m_depth = other.m_depth;
memcpy(key.inlineData, other.key.inlineData, INLINE_KEY_SIZE);
arena = other.arena;
m_parent = other.m_parent;
return *this;
}
void setKey(const StringRef& content, int start, int num) {
bool isInline = num <= INLINE_KEY_SIZE;
if (isInline) {
memcpy(key.inlineData, content.begin() + start, num);
m_inline_length = num;
if (!m_is_inline) arena = Arena();
} else {
Arena new_arena(num);
key.data = radix_constructStr(content, start, num, new_arena);
arena = new_arena;
}
m_is_inline = isInline;
}
StringRef getKey() {
if (m_is_inline) {
return StringRef(&key.inlineData[0], m_inline_length);
} else {
return key.data;
}
}
inline int getKeySize() { return m_is_inline ? m_inline_length : key.data.size(); }
inline int16_t getFirstByte() {
if (m_is_inline) {
return m_inline_length == 0 ? LEAF_BYTE : key.inlineData[0];
} else {
return key.data.size() == 0 ? LEAF_BYTE : key.data[0];
}
}
inline size_type getArenaSize() { return m_is_inline ? 0 : arena.getSize(); }
uint32_t m_is_leaf : 1;
uint32_t m_is_fixed : 1; // if true, then we have fixed number of children (3)
uint32_t m_is_inline : 1;
uint32_t m_inline_length : 4;
// m_depth can be seen as common prefix length with your ancestors
uint32_t m_depth : 25;
// key is the prefix, a substring that shared by your children
inlineUnion key;
// arena assign memory for key
Arena arena;
node *m_parent;
};
struct leafNode : FastAllocated<leafNode> {
leafNode(const StringRef& content) : base(), is_inline(0), inline_length(0), arena() {
base.m_is_leaf = 1;
setValue(content);
}
~leafNode() = default;
void setValue(const StringRef& content) {
bool isInline = content.size() <= INLINE_KEY_SIZE;
if (isInline) {
memcpy(value.inlineData, content.begin(), content.size());
inline_length = content.size();
if (!is_inline) arena = Arena();
} else {
Arena new_arena(content.size());
value.data = StringRef(new_arena, content);
arena = new_arena;
}
is_inline = isInline;
}
StringRef getValue() {
if (is_inline) {
return StringRef(&value.inlineData[0], inline_length);
} else {
return value.data;
}
}
inline size_type getLeafArenaSize() { return is_inline ? 0 : arena.getSize(); }
node base; // 32 bytes
uint32_t is_inline : 1;
uint32_t inline_length : 31;
inlineUnion value; // using the same data structure to store value
Arena arena;
};
struct internalNode : FastAllocated<internalNode> {
internalNode() : base(), m_children(std::vector<std::pair<int16_t, node*>>()) {
m_children.reserve(4);
base.m_is_fixed = 0;
}
~internalNode() {
for (auto it = 0; it < m_children.size(); ++it) {
node* current = m_children[it].second;
if (current->m_is_leaf) {
delete (leafNode*)current;
} else {
current->m_is_fixed ? delete (internalNode4*)current : delete (internalNode*)current;
}
}
m_children.clear();
}
node base;
// ordered map by char, m_children.begin() return the smallest value
std::vector<std::pair<int16_t, node *>> m_children;
};
struct internalNode4 : FastAllocated<internalNode4> {
internalNode4() : base(), num_children(0) {
base.m_is_fixed = 1;
memset(keys, 0, sizeof(keys));
memset(m_children, 0, sizeof(m_children));
}
~internalNode4() { num_children = 0; }
node base;
int16_t num_children;
int16_t keys[3];
node* m_children[3];
};
public:
class iterator : public std::iterator<std::forward_iterator_tag, std::pair<StringRef, StringRef>> {
public:
node* m_pointee;
iterator() : m_pointee(NULL) {}
iterator(const iterator& r) : m_pointee(r.m_pointee) {}
iterator(node* p) : m_pointee(p) {}
iterator& operator=(const iterator& r) { m_pointee = r.m_pointee; return *this; }
~iterator() = default;
const iterator& operator++();
const iterator& operator--();
bool operator != (const iterator &lhs) const;
bool operator == (const iterator &lhs) const;
StringRef getKey(uint8_t* content, int len) const;
StringRef getValue() const {
ASSERT(m_pointee->m_is_leaf);
return ((leafNode*)m_pointee)->getValue();
}
private:
node* increment(node* target) const;
node* decrement(node* target) const;
};
explicit radix_tree() : m_size(0), m_node(0), inline_keys(0), total_bytes(0), m_root(NULL) {}
~radix_tree() {}
radix_tree(const radix_tree& other) = delete; // delete
radix_tree& operator=(const radix_tree other) = delete; // delete
inline std::tuple<size_type, size_type, size_type> size() { return std::make_tuple(m_size, m_node, inline_keys); }
// Return the amount of memory used by an entry in the RadixTree
static int getElementBytes(node* node) {
int result = 0;
if (node->m_is_leaf) {
result = sizeof(leafNode) + ((leafNode*)node)->getLeafArenaSize();
} else if (node->m_is_fixed) {
result = sizeof(internalNode4);
} else {
ASSERT(!node->m_is_fixed);
result = sizeof(internalNode);
}
return result;
}
// dummy method interface(to keep every interface same as IndexedSet )
static int getElementBytes() {
ASSERT(false);
return 0;
}
bool empty() const { return m_size == 0; }
void clear() {
if (m_root != NULL) {
delete (internalNode*)m_root;
m_root = NULL;
}
m_size = 0;
m_node = 0;
inline_keys = 0;
total_bytes = 0;
}
// iterators
iterator find(const StringRef& key);
iterator begin();
iterator end();
// modifications
std::pair<iterator, bool> insert(const StringRef& key, const StringRef& val, bool replaceExisting = true);
int insert(const std::vector<std::pair<KeyValueMapPair, uint64_t>>& pairs, bool replaceExisting = true) {
// dummy method interface(to keep every interface same as IndexedSet )
ASSERT(false);
return 0;
}
void erase(iterator it);
void erase(iterator begin, iterator end);
// lookups
iterator lower_bound(const StringRef& key);
iterator upper_bound(const StringRef& key);
// access
uint64_t sumTo(iterator to);
iterator previous (iterator i);
private:
size_type m_size;
// number of nodes that has been created
size_type m_node;
// number of nodes with key.size() <= 12
size_type inline_keys;
uint64_t total_bytes;
node* m_root;
// modification
void add_child(node* parent, node* child);
void add_child_vector(node* parent, node* child);
void add_child4(node* parent, node* child);
void delete_child(node* parent, node* child);
void delete_child_vector(node* parent, node* child);
void delete_child4(node* parent, node* child);
// access
static int find_child(node* parent, int16_t ch); // return index
static int child_size(node* parent); // how many children does parent node have
static node* get_child(node* parent, int index); // return node pointer
// direction 0 = left, 1 = right
template <int reverse>
static node* descend(node* i) {
while (!i->m_is_leaf) {
ASSERT(child_size(i) != 0);
if (reverse) {
i = get_child(i, child_size(i) - 1);
} else {
i = get_child(i, 0);
}
}
return i;
}
node* find_node(const StringRef& key, node* node, int depth);
node* append(node* parent, const StringRef& key, const StringRef& val);
node* prepend(node* node, const StringRef& key, const StringRef& val);
bool erase(node* child);
iterator lower_bound(const StringRef& key, node* node);
iterator upper_bound(const StringRef& key, node* node);
};
/////////////////////// iterator //////////////////////////
void radix_tree::add_child(node* parent, node* child) {
if (parent->m_is_fixed) {
add_child4(parent, child);
} else {
add_child_vector(parent, child);
}
}
void radix_tree::add_child4(node* parent, node* child) {
int16_t ch = child->getFirstByte();
internalNode4* parent_ref = (internalNode4*)parent;
int i = 0;
for (; i < parent_ref->num_children; ++i) {
if (parent_ref->keys[i] >= ch) break;
}
if (!parent_ref->num_children) {
// empty
parent_ref->num_children++;
parent_ref->keys[0] = ch;
parent_ref->m_children[0] = child;
// DEBUG
total_bytes += getElementBytes(child) + child->getArenaSize();
} else if (i >= 0 && i < parent_ref->num_children && parent_ref->keys[i] == ch) {
// replace
node* original = parent_ref->m_children[i];
total_bytes -= (getElementBytes(original) + original->getArenaSize());
parent_ref->m_children[i] = child;
total_bytes += getElementBytes(child) + child->getArenaSize();
} else if (parent_ref->num_children < 3) {
// Shift to make room
memmove(parent_ref->keys + i + 1, parent_ref->keys + i, (parent_ref->num_children - i) * sizeof(int16_t));
memmove(parent_ref->m_children + i + 1, parent_ref->m_children + i,
(parent_ref->num_children - i) * sizeof(void*));
// Insert element
parent_ref->keys[i] = ch;
parent_ref->m_children[i] = child;
parent_ref->num_children++;
// DEBUG
total_bytes += getElementBytes(child) + child->getArenaSize();
} else {
ASSERT(parent_ref->num_children >= 3);
internalNode* new_node = new radix_tree::internalNode();
new_node->base = parent_ref->base; // equal operator
for (int index = 0; index < parent_ref->num_children; index++) {
new_node->m_children.emplace_back(parent_ref->keys[index], parent_ref->m_children[index]);
parent_ref->m_children[index]->m_parent = (node*)new_node;
}
// Insert new element
new_node->m_children.insert(new_node->m_children.begin() + i, std::make_pair(ch, child));
child->m_parent = (node*)new_node;
// update parent info
add_child(new_node->base.m_parent, (node*)new_node);
// DEBUG
total_bytes += new_node->m_children.size() * sizeof(std::pair<int16_t, void*>) + getElementBytes(child) +
child->getArenaSize();
delete parent_ref;
}
}
void radix_tree::add_child_vector(node* parent, node* child) {
int16_t ch = child->getFirstByte();
internalNode* parent_ref = (internalNode*)parent;
int i = 0;
for (; i < parent_ref->m_children.size(); ++i) {
if (parent_ref->m_children[i].first >= ch) break;
}
if (parent_ref->m_children.empty() || i == parent_ref->m_children.size() || parent_ref->m_children[i].first > ch) {
parent_ref->m_children.insert(parent_ref->m_children.begin() + i, std::make_pair(ch, child));
// DEBUG
total_bytes += getElementBytes(child) + child->getArenaSize() + sizeof(std::pair<int16_t, void*>);
} else {
ASSERT(parent_ref->m_children[i].first == ch);
// replace with the new child
node* original = parent_ref->m_children[i].second;
total_bytes -= (getElementBytes(original) + original->getArenaSize());
parent_ref->m_children[i] = std::make_pair(ch, child); // replace with the new child
total_bytes += getElementBytes(child) + child->getArenaSize();
}
}
void radix_tree::delete_child(radix_tree::node* parent, radix_tree::node* child) {
if (parent->m_is_fixed) {
delete_child4(parent, child);
} else {
delete_child_vector(parent, child);
}
}
void radix_tree::delete_child4(radix_tree::node* parent, radix_tree::node* child) {
int16_t ch = child->getFirstByte();
internalNode4* parent_ref = (internalNode4*)parent;
int i = 0;
for (; i < parent_ref->num_children; i++) {
if (parent_ref->keys[i] == ch) break;
}
ASSERT(i != parent_ref->num_children);
memmove(parent_ref->keys + i, parent_ref->keys + i + 1, (parent_ref->num_children - 1 - i) * sizeof(int16_t));
memmove(parent_ref->m_children + i, parent_ref->m_children + i + 1,
(parent_ref->num_children - 1 - i) * sizeof(void*));
parent_ref->num_children--;
total_bytes -= (getElementBytes(child) + child->getArenaSize());
}
void radix_tree::delete_child_vector(radix_tree::node* parent, radix_tree::node* child) {
int16_t ch = child->getFirstByte();
internalNode* parent_ref = (internalNode*)parent;
int i = 0;
for(; i<parent_ref->m_children.size(); i++){
if(parent_ref->m_children[i].first == ch) break;
}
ASSERT(i != parent_ref->m_children.size());
parent_ref->m_children.erase(parent_ref->m_children.begin() + i);
total_bytes -= (getElementBytes(child) + child->getArenaSize() + sizeof(std::pair<int16_t, void*>));
if (parent_ref->m_children.size() && parent_ref->m_children.size() <= parent_ref->m_children.capacity() / 4)
parent_ref->m_children.shrink_to_fit();
}
int radix_tree::find_child(radix_tree::node* parent, int16_t ch) {
int i = 0;
if (parent->m_is_fixed) {
internalNode4* parent_ref = (internalNode4*)parent;
for (; i < parent_ref->num_children; ++i) {
if (parent_ref->keys[i] == ch) return i;
}
} else {
internalNode* parent_ref = (internalNode*)parent;
for (; i != parent_ref->m_children.size(); ++i) {
if (parent_ref->m_children[i].first == ch) return i;
}
}
return i;
}
int radix_tree::child_size(radix_tree::node* parent) {
if (parent->m_is_fixed) {
return ((internalNode4*)parent)->num_children;
} else {
return ((internalNode*)parent)->m_children.size();
}
}
radix_tree::node* radix_tree::get_child(node* parent, int index) {
if (parent->m_is_fixed) {
ASSERT(index < ((internalNode4*)parent)->num_children);
return ((internalNode4*)parent)->m_children[index];
} else {
return ((internalNode*)parent)->m_children[index].second;
}
}
radix_tree::node* radix_tree::iterator::increment(node* target) const {
radix_tree::node* parent = target->m_parent;
if (parent == NULL) return NULL;
int index = find_child(parent, target->getFirstByte());
ASSERT(index != child_size(parent));
++index;
if (index == child_size(parent))
return increment(target->m_parent);
else
return descend<0>(get_child(parent, index));
}
radix_tree::node* radix_tree::iterator::decrement(radix_tree::node* target) const {
radix_tree::node* parent = target->m_parent;
if (parent == NULL) return NULL;
int index = find_child(parent, target->getFirstByte());
ASSERT(index != child_size(parent));
if (index == 0)
return decrement(target->m_parent);
else {
--index;
return descend<1>(get_child(parent, index));
}
}
bool radix_tree::iterator::operator!=(const radix_tree::iterator& lhs) const {
return m_pointee != lhs.m_pointee;
}
bool radix_tree::iterator::operator==(const radix_tree::iterator& lhs) const {
return m_pointee == lhs.m_pointee;
}
const radix_tree::iterator& radix_tree::iterator::operator++() {
if (m_pointee != NULL) // it is undefined behaviour to dereference iterator that is out of bounds...
m_pointee = increment(m_pointee);
return *this;
}
const radix_tree::iterator& radix_tree::iterator::operator--() {
if (m_pointee != NULL && m_pointee->m_is_leaf) {
m_pointee = decrement(m_pointee);
}
return *this;
}
/*
* reconstruct the key, using @param arena to allocate memory
*/
StringRef radix_tree::iterator::getKey(uint8_t* content, int len) const {
if (m_pointee == NULL) return StringRef();
ASSERT(m_pointee->m_is_leaf);
// memset(content, 0, len);
auto node = m_pointee;
uint32_t pos = m_pointee->m_depth;
while(true){
memcpy(content + pos, node->getKey().begin(), node->getKeySize());
node = node->m_parent;
if (node == NULL || pos <= 0) break;
pos -= node->getKeySize();
}
return StringRef(content, (m_pointee->m_depth + m_pointee->getKeySize()));
}
radix_tree::iterator radix_tree::end() {
return iterator(NULL);
}
radix_tree::iterator radix_tree::begin() {
if (m_root == NULL || m_size == 0)
return iterator(NULL);
else {
return descend<0>(m_root);
}
}
/////////////////////// lookup //////////////////////////
radix_tree::iterator radix_tree::find(const StringRef& key) {
if (m_root == NULL)
return iterator(NULL);
auto node = find_node(key, m_root, 0);
StringRef key_sub = radix_substr(key, node->m_depth, (key.size() - node->m_depth));
if (node->m_is_leaf && key_sub == node->getKey())
return node;
else
return nullptr;
}
/*
* corner case : insert "apache, append", then search for "appends". find_node() will return leaf node with m_key ==
* "pend"; if search for "ap", find_node() will return internal node with m_key = ap
*/
radix_tree::node* radix_tree::find_node(const StringRef& key, node* node, int depth) {
if (node->m_is_leaf)
return node;
int size = child_size(node);
// printf("try to find key %s on node %s [%d]\n", printable(key).c_str(), printable(node->getKey()).c_str(), size);
for (int it = 0; it < size; ++it) {
auto current = get_child(node, it);
// for leaf node with empty key, exact match
if (depth == key.size() && current->getKeySize() == 0) {
ASSERT(current->m_is_leaf); // find the exact match
return current;
}
// they have at least one byte in common
if (depth < key.size() && key[depth] == current->getFirstByte()) {
int len_node = current->getKeySize();
StringRef key_sub = radix_substr(key, depth, len_node);
if (key_sub == current->getKey()) {
if (current->m_is_leaf)
return current;
else
return find_node(key, current, depth + len_node);
} else {
// return the current match (which is the smallest match)
// radix tree won't have siblings that share the same prefix
return current;
}
}
}
return node;
}
/*
* Returns the smallest node x such that *x>=key, or end()
*/
radix_tree::iterator radix_tree::lower_bound(const StringRef& key) {
if(m_root == NULL || m_size == 0)
return iterator(NULL);
return lower_bound(key, m_root);
}
radix_tree::iterator radix_tree::lower_bound(const StringRef& key, node* node) {
iterator result(NULL);
int size = child_size(node);
for (int it = 0; it < size; ++it) {
auto current = get_child(node, it);
// short cut as find_node
if (key.size() == current->m_depth && current->getKeySize() == 0) {
return iterator(current);
}
StringRef key_sub = radix_substr(key, current->m_depth, current->getKeySize());
StringRef node_data = current->getKey();
if (key_sub == node_data) {
if (current->m_is_leaf && ((key.size() - current->m_depth) == current->getKeySize()))
return iterator(current); // exact match
else if (!current->m_is_leaf)
result = lower_bound(key, current);
} else if (node_data > key_sub) {
return descend<0>(current);
}
if (result != end()) return result;
}
return result;
}
/*
* Returns the smallest x such that *x>key, or end()
*/
radix_tree::iterator radix_tree::upper_bound(const StringRef& key) {
if(m_root == NULL || m_size == 0)
return iterator(NULL);
return upper_bound(key, m_root);
}
radix_tree::iterator radix_tree::upper_bound(const StringRef& key, node* node) {
if(node == NULL || node->m_is_leaf)
return iterator(node);
iterator result(NULL);
int size = child_size(node);
for (int it = 0; it < size; ++it) {
auto current = get_child(node, it);
StringRef key_sub = radix_substr(key, current->m_depth, current->getKeySize());
StringRef node_data = current->getKey();
if (!current->m_is_leaf && node_data == key_sub) {
result = upper_bound(key, current);
} else if (node_data > key_sub) {
return descend<0>(current);
}
if (result != end()) return result;
}
return result;
}
// Return the sum of getT(x) for begin()<=x<to
uint64_t radix_tree::sumTo(iterator to) {
if(to == end()) {
return m_root ? total_bytes : 0;
}
else {
throw std::invalid_argument("sumTo method only support end() input");
}
}
radix_tree::iterator radix_tree::previous(radix_tree::iterator i) {
if(i == end()){
// for iterator == end(), find the largest element
return descend<1>(m_root);
} else if (i == begin()) {
return iterator(NULL);
} else {
--i;
return i;
}
}
/////////////////////// modification //////////////////////////
/*
* @param parent : direct parent of this newly inserted node
* @param val : using val to create a newly inserted node
*/
radix_tree::node* radix_tree::append(node* parent, const StringRef& key, const StringRef& val) {
int depth = parent->m_depth + parent->getKeySize();
int len = key.size() - depth;
radix_tree::node* node_c = (node*)new radix_tree::leafNode(val);
node_c->m_depth = depth;
node_c->m_parent = parent;
if (len == 0) {
// node_c->key is empty (len = 0);
inline_keys++;
} else {
node_c->setKey(key, depth, len);
// DEBUG
if (len <= INLINE_KEY_SIZE) inline_keys++;
}
// printf("node_c key is %s value is %s %p\n", printable(node_c->getKey()).c_str(),
// printable(((leafNode*)node_c)->getValue()).c_str(), node_c);
add_child(parent, node_c);
m_node++;
return node_c;
}
/*
* step one : find common substring of node->m_key and val(findnode() method has already guaranteed that they have something in common)
* step two : split the existing node into two based on the common substring
* step three : append newly inserted node to node_a
*
* @param node : split node
* @param val : using val to create a newly inserted node
*/
radix_tree::node* radix_tree::prepend(node* split, const StringRef& key, const StringRef& val) {
int len1 = split->getKeySize();
int len2 = key.size() - split->m_depth;
int count = 0;
// deep copy original data using a temp_arena(becomes invalid once out)
Arena temp_arena(split->getKeySize());
StringRef original_data(temp_arena, split->getKey());
for (; count < len1 && count < len2; count++) {
if (!(original_data[count] == key[count + split->m_depth])) break;
}
ASSERT(count != 0);
// create a new internal node
node* node_a = (node*)new radix_tree::internalNode4();
m_node++;
node_a->m_parent = split->m_parent;
node_a->setKey(original_data, 0, count);
node_a->m_depth = split->m_depth;
add_child(node_a->m_parent, node_a); // replace original node* with node_a*
// DEBUG
if (count <= INLINE_KEY_SIZE) inline_keys++;
if (split->getKeySize() > INLINE_KEY_SIZE && (len1 - count) <= INLINE_KEY_SIZE) inline_keys++;
// modify original internal node
split->m_depth += count;
split->m_parent = node_a;
split->setKey(original_data, count, len1 - count);
add_child(node_a, split);
return append(node_a, key, val);
}
std::pair<radix_tree::iterator, bool> radix_tree::insert(const StringRef& key, const StringRef& val,
bool replaceExisting) {
if (m_root == NULL) {
m_root = (node*)new radix_tree::internalNode();
total_bytes += getElementBytes(m_root);
}
auto node = find_node(key, m_root, 0);
// short cut for root node
if (node == m_root) {
m_size++;
return std::pair<iterator, bool>(append(m_root, key, val), true);
}
StringRef key_sub = radix_substr(key, node->m_depth, node->getKeySize());
if (key_sub == node->getKey()) {
if (node->m_is_leaf) {
if (node->m_depth + node->getKeySize() == key.size()) {
// case one : exact match, replace with new value
bool inserted = false;
if (replaceExisting) {
size_type original_value = ((leafNode*)node)->getLeafArenaSize();
((leafNode*)node)->setValue(val);
total_bytes += ((leafNode*)node)->getLeafArenaSize() - original_value;
inserted = true;
}
return std::pair<iterator, bool>(node, inserted);
} else {
// case two : prepend (e.g leaf is "a", inserted key is "ab");
m_size++;
return std::pair<iterator, bool>(prepend(node, key, val), true);
}
} else {
m_size++;
return std::pair<iterator, bool>(append(node, key, val), true);
}
} else {
m_size++;
return std::pair<iterator, bool>(prepend(node, key, val), true);
}
}
void radix_tree::erase(iterator it) {
erase(it.m_pointee);
}
bool radix_tree::erase(radix_tree::node* child) {
if (m_root == NULL) return false;
ASSERT(child != NULL);
if (!child->m_is_leaf) return false;
radix_tree::node* parent;
parent = child->m_parent;
delete_child(parent, child);
// DEBUG
if (child->getKeySize() <= INLINE_KEY_SIZE) inline_keys--;
delete (leafNode*)child;
m_size--;
m_node--;
// can't do the merge if parent is root node
if (parent == m_root) return true;
if (child_size(parent) > 1) return true;
ASSERT(child_size(parent) == 1);
// parent has only one child left, merge parent with the sibling
node* brother = get_child(parent, 0);
// DEBUG
if (brother->getKeySize() <= INLINE_KEY_SIZE) inline_keys--;
delete_child(parent, brother);
Arena temp_arena;
StringRef new_data = radix_join(parent->getKey(), brother->getKey(), temp_arena);
brother->setKey(new_data, 0, new_data.size());
brother->m_depth = parent->m_depth;
brother->m_parent = parent->m_parent;
// delete parent and replace with brother
add_child(parent->m_parent, brother);
// DEBUG
if (brother->getKeySize() <= INLINE_KEY_SIZE) inline_keys++;
if (parent->getKeySize() <= INLINE_KEY_SIZE) inline_keys--;
parent->m_is_fixed ? delete (internalNode4*)parent : delete (internalNode*)parent;
m_node--;
return true;
}
// Erase the items in the indicated range.
void radix_tree::erase(radix_tree::iterator begin, radix_tree::iterator end) {
std::vector<radix_tree::node*> node_set;
for(auto it = begin; it != end; ++it){
node_set.push_back(it.m_pointee);
}
for(int i = 0; i <node_set.size(); ++i) {
erase(node_set[i]);
}
}
#endif