1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-06-01 18:56:00 +08:00

Custom bookkeeping allocator for PTree. Simple Cache workload.

This commit is contained in:
negoyal 2020-02-14 11:42:47 -08:00
parent 85cc35e81e
commit 28768715f2
7 changed files with 641 additions and 455 deletions

@ -1793,6 +1793,7 @@ ACTOR Future<Void> changeCachedRange(Database cx, KeyRangeRef range, bool add) {
state Value trueValue = storageCacheValue(std::vector<uint16_t>{ 0 });
state Value falseValue = storageCacheValue(std::vector<uint16_t>{});
loop {
TraceEvent(SevDebug, "ChangeCachedRangeInLoop").detail("BeginKey", range.begin.toString()).detail("EndKey", range.end.toString());
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
@ -1811,10 +1812,12 @@ ACTOR Future<Void> changeCachedRange(Database cx, KeyRangeRef range, bool add) {
// we need to uncache from here
tr.set(sysRange.begin, falseValue);
tr.set(privateRange.begin, serverKeysFalse);
TraceEvent(SevDebug, "ChangeCachedRangeSetBegin1").detail("BeginKey", sysRange.begin.toString());
} else if (!prevIsCached && add) {
// we need to cache, starting from here
tr.set(sysRange.begin, trueValue);
tr.set(privateRange.begin, serverKeysTrue);
TraceEvent(SevDebug, "ChangeCachedRangeSetBegin2").detail("BeginKey", sysRange.begin.toString());
}
Standalone<RangeResultRef> after =
wait(tr.getRange(KeyRangeRef(sysRange.end, storageCacheKeys.end), 1, false));
@ -1827,14 +1830,19 @@ ACTOR Future<Void> changeCachedRange(Database cx, KeyRangeRef range, bool add) {
if (afterIsCached && !add) {
tr.set(sysRange.end, trueValue);
tr.set(privateRange.end, serverKeysTrue);
TraceEvent(SevDebug, "ChangeCachedRangeSetEnd1").detail("EndKey", sysRange.end.toString());
} else if (!afterIsCached && add) {
tr.set(sysRange.end, falseValue);
tr.set(privateRange.end, serverKeysFalse);
TraceEvent(SevDebug, "ChangeCachedRangeSetEnd2").detail("EndKey", sysRange.end.toString());
}
wait(tr.commit());
TraceEvent(SevDebug, "ChangeCachedRangeReturn");
return Void();
} catch (Error& e) {
wait(tr.onError(e));
state Error err = e;
wait(tr.onError(err));
TraceEvent(SevDebug, "ChangeCachedRangeError").error(err);
}
}
}

@ -61,13 +61,13 @@ void decodeKeyServersValue( const ValueRef& value, vector<UID>& src, vector<UID>
}
}
// "\xff/storageCacheServer/[[UID]] := StorageServerInterface"
// "\xff/cacheServer/[[UID]] := StorageServerInterface"
// This will be added by the cache server on initialization and removed by DD
// TODO[mpilman]: We will need a way to map uint16_t ids to UIDs in a future
// versions. For now caches simply cache everything so the ids
// are not yet meaningful.
const KeyRangeRef storageCacheServerKeys(LiteralStringRef("\xff/storageCacheServer/"),
LiteralStringRef("\xff/storageCacheServer0"));
const KeyRangeRef storageCacheServerKeys(LiteralStringRef("\xff/cacheServer/"),
LiteralStringRef("\xff/cacheServer0"));
const KeyRef storageCacheServersPrefix = storageCacheServerKeys.begin;
const KeyRef storageCacheServersEnd = storageCacheServerKeys.end;

File diff suppressed because it is too large Load Diff

@ -491,14 +491,24 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
keyBegin = itr->first;
mutationBegin = itr->second;
++itr;
keyEnd = itr->first;
mutationEnd = itr->second;
if (itr != cachedRangeInfo.end()) {
keyEnd = itr->first;
mutationEnd = itr->second;
} else {
TraceEvent(SevDebug, "EndKeyNotFound", dbgid).detail("KeyBegin", keyBegin.toString());
break;
}
} else {
keyEnd = itr->first;
mutationEnd = itr->second;
++itr;
keyBegin = itr->first;
mutationBegin = itr->second;
if (itr != cachedRangeInfo.end()) {
keyBegin = itr->first;
mutationBegin = itr->second;
} else {
TraceEvent(SevDebug, "BeginKeyNotFound", dbgid).detail("KeyEnd", keyEnd.toString());
break;
}
}
// Now get all the storage server tags for the cached key-ranges

@ -121,6 +121,7 @@ set(FDBSERVER_SRCS
workloads/BackupToDBUpgrade.actor.cpp
workloads/BulkLoad.actor.cpp
workloads/BulkSetup.actor.h
workloads/Cache.actor.cpp
workloads/ChangeConfig.actor.cpp
workloads/ClientTransactionProfileCorrectness.actor.cpp
workloads/TriggerRecovery.actor.cpp

@ -133,7 +133,7 @@ struct FetchInjectionInfo {
};
struct StorageCacheData {
typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
typedef VersionedMap<KeyRef, ValueOrClearToRef, FastAllocPTree<KeyRef>> VersionedData;
private:
// in-memory versioned struct (PTree as of now. Subject to change)
VersionedData versionedData;
@ -230,12 +230,14 @@ public:
} counters;
explicit StorageCacheData(UID thisServerID, uint16_t index, Reference<AsyncVar<ServerDBInfo>> const& db)
: thisServerID(thisServerID), index(index), db(db),
: versionedData(FastAllocPTree<KeyRef>{std::make_shared<int>(0)}),
thisServerID(thisServerID), index(index), db(db),
cacheRangeChangeCounter(0),
lastTLogVersion(0), lastVersionWithData(0),
compactionInProgress(Void()),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
debug_inApplyUpdate(false), debug_lastValidateTime(0),
versionLag(0), cacheRangeChangeCounter(0), behind(false), counters(this)
versionLag(0), behind(false), counters(this)
{
version.initMetric(LiteralStringRef("StorageCacheData.Version"), counters.cc.id);
desiredOldestVersion.initMetric(LiteralStringRef("StorageCacheData.DesriedOldestVersion"), counters.cc.id);
@ -244,7 +246,6 @@ public:
newestAvailableVersion.insert(allKeys, invalidVersion);
newestDirtyVersion.insert(allKeys, invalidVersion);
addCacheRange( CacheRangeInfo::newNotAssigned( allKeys ) );
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
}
@ -374,7 +375,7 @@ void validate(StorageCacheData* data, bool force = false) {
CacheRangeInfo* cacheRange = range->value().getPtr();
if (!cacheRange->isInVersionedData()) {
if (latest.lower_bound(range->begin()) != latest.lower_bound(range->end())) {
TraceEvent(SevError, "SCValiedate", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime).detail("KeyBegin", range->begin()).detail("KeyEnd", range->end())
TraceEvent(SevError, "SCValidate", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime).detail("KeyBegin", range->begin()).detail("KeyEnd", range->end())
.detail("FirstKey", latest.lower_bound(range->begin()).key()).detail("FirstInsertV", latest.lower_bound(range->begin()).insertVersion());
}
ASSERT( latest.lower_bound(range->begin()) == latest.lower_bound(range->end()) );
@ -385,6 +386,7 @@ void validate(StorageCacheData* data, bool force = false) {
validateCacheRange(latest, allKeys, data->version.get(), data->thisServerID, data->oldestVersion.get());
data->debug_lastValidateTime = now();
TraceEvent(SevError, "SCValidationDone", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime);
}
} catch (...) {
TraceEvent(SevError, "SCValidationFailure", data->thisServerID).detail("LastValidTime", data->debug_lastValidateTime);
@ -450,7 +452,7 @@ ACTOR Future<Version> waitForVersionNoTooOld( StorageCacheData* data, Version ve
ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
state int64_t resultSize = 0;
printf("\nSCGetValueQ\n");
//printf("\nSCGetValueQ\n");
try {
++data->counters.getValueQueries;
@ -508,6 +510,7 @@ ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
GetValueReply reply(v, true);
req.reply.send(reply);
} catch (Error& e) {
TraceEvent("SCGetValueQError", data->thisServerID).detail("Code",e.code()).detail("Version", version).detail("ReqKey",req.key);
if(!canReplyWith(e))
throw;
req.reply.sendError(e);
@ -532,7 +535,7 @@ GetKeyValuesReply readRange(StorageCacheData* data, Version version, KeyRangeRef
KeyRef rangeBegin = range.begin;
KeyRef rangeEnd = range.end;
printf("\nSCReadRange\n");
//printf("\nSCReadRange\n");
//We might care about a clear beginning before start that runs into range
vCurrent = view.lastLessOrEqual(rangeBegin);
if (vCurrent && vCurrent->isClearTo() && vCurrent->getEndKey() > rangeBegin)
@ -638,6 +641,8 @@ KeyRange getCachedKeyRange( StorageCacheData* data, const KeySelectorRef& sel )
{
auto i = sel.isBackward() ? data->cachedRangeMap.rangeContainingKeyBefore( sel.getKey() ) :
data->cachedRangeMap.rangeContaining( sel.getKey() );
TraceEvent("SCGetCachedKeyRange", data->thisServerID).detail("SelKey", sel.getKey()).detail("Begin", i->range().begin).detail("End", i->range().end).detail("Value", i->value()->debugDescribeState());
if (i->value()->notAssigned())
throw wrong_shard_server();
else if (!i->value()->isReadable())
@ -655,7 +660,7 @@ ACTOR Future<Void> getKeyValues( StorageCacheData* data, GetKeyValuesRequest req
++data->counters.getRangeQueries;
++data->counters.allQueries;
printf("\nSCGetKeyValues\n");
//printf("\nSCGetKeyValues\n");
//++data->readQueueSizeMetric;
//data->maxQueryQueue = std::max<int>( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue());
@ -679,6 +684,9 @@ ACTOR Future<Void> getKeyValues( StorageCacheData* data, GetKeyValuesRequest req
state KeyRange cachedKeyRange = getCachedKeyRange( data, req.begin );
TraceEvent("SCGetKeyValues1", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).
detail("CacheRangeBegin", cachedKeyRange.begin).detail("CacheRangeEnd", cachedKeyRange.end);
if( req.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storagecache.getKeyValues.AfterVersion");
//.detail("CacheRangeBegin", cachedKeyRange.begin).detail("CacheRangeEnd", cachedKeyRange.end);
@ -710,6 +718,9 @@ ACTOR Future<Void> getKeyValues( StorageCacheData* data, GetKeyValuesRequest req
detail("BeginKey", begin).detail("EndKey", end).detail("BeginOffset", offset1).detail("EndOffset", offset2);
throw wrong_shard_server();
}
TraceEvent("SCGetKeyValues", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).
detail("CacheRangeBegin", cachedKeyRange.begin).detail("CacheRangeEnd", cachedKeyRange.end).detail("In", "getKeyValues>checkOffsets").
detail("BeginKey", begin).detail("EndKey", end).detail("BeginOffset", offset1).detail("EndOffset", offset2);
if (begin >= end) {
if( req.debugID.present() )
@ -744,7 +755,8 @@ ACTOR Future<Void> getKeyValues( StorageCacheData* data, GetKeyValuesRequest req
data->counters.rowsQueried += r.data.size();
}
} catch (Error& e) {
TraceEvent("SCGetKeyValuesError", data->thisServerID).detail("Code",e.code());
TraceEvent("SCGetKeyValuesError", data->thisServerID).detail("Code",e.code()).detail("Version", version).
detail("CacheRangeBegin", cachedKeyRange.begin).detail("CacheRangeEnd", cachedKeyRange.end).detail("ReqBegin", req.begin.getKey()).detail("ReqEnd", req.end.getKey());
if(!canReplyWith(e))
throw;
req.reply.sendError(e);
@ -761,7 +773,7 @@ ACTOR Future<Void> getKey( StorageCacheData* data, GetKeyRequest req ) {
++data->counters.getKeyQueries;
++data->counters.allQueries;
printf("\nSCGetKey\n");
//printf("\nSCGetKey\n");
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
wait( delay(0, TaskPriority::DefaultEndpoint) );
@ -792,8 +804,8 @@ ACTOR Future<Void> getKey( StorageCacheData* data, GetKeyRequest req ) {
req.reply.send(reply);
}
catch (Error& e) {
if (e.code() == error_code_wrong_shard_server) TraceEvent("WrongCacheRangeServer").detail("In","getKey");
if (e.code() == error_code_future_version) TraceEvent("ColdCacheRangeServer").detail("In","getKey");
if (e.code() == error_code_wrong_shard_server) TraceEvent("SCWrongCacheRangeServer").detail("In","getKey");
if (e.code() == error_code_future_version) TraceEvent("SCColdCacheRangeServer").detail("In","getKey");
if(!canReplyWith(e))
throw;
req.reply.sendError(e);

@ -100,6 +100,7 @@ add_fdb_test(TEST_FILES fast/BackupCorrectness.txt)
add_fdb_test(TEST_FILES fast/BackupCorrectnessClean.txt)
add_fdb_test(TEST_FILES fast/BackupToDBCorrectness.txt)
add_fdb_test(TEST_FILES fast/BackupToDBCorrectnessClean.txt)
add_fdb_test(TEST_FILES fast/CacheTest.txt)
add_fdb_test(TEST_FILES fast/CloggedSideband.txt)
add_fdb_test(TEST_FILES fast/ConstrainedRandomSelector.txt)
add_fdb_test(TEST_FILES fast/CycleAndLock.txt)