diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 6305e2234b..f7f71865f8 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -47,7 +47,6 @@ #include "flow/SystemMonitor.h" #include "flow/TLSConfig.actor.h" #include "flow/UnitTest.h" -#include "flow/FBTrace.h" #if defined(CMAKE_BUILD) || !defined(WIN32) #include "versions.h" @@ -1198,12 +1197,8 @@ ACTOR Future< pair> > getKeyLocation_internal( ASSERT( key < allKeys.end ); } - if( info.debugID.present() ) { + if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETKEYLOCATION_BEFORE))); - } loop { ++cx->transactionKeyServerLocationRequests; @@ -1211,12 +1206,8 @@ ACTOR Future< pair> > getKeyLocation_internal( when ( wait( cx->onMasterProxiesChanged() ) ) {} when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) { ++cx->transactionKeyServerLocationRequestsCompleted; - if( info.debugID.present() ) { + if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETKEYLOCATION_AFTER))); - } ASSERT( rep.results.size() == 1 ); auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second); @@ -1245,12 +1236,8 @@ Future>> getKeyLocation( Database const& } ACTOR Future< vector< pair> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) { - if( info.debugID.present() ) { + if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETKEYLOCATIONS_BEFORE))); - } loop { ++cx->transactionKeyServerLocationRequests; @@ -1259,12 +1246,8 @@ ACTOR Future< vector< pair> > > getKeyRangeLoca when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) { ++cx->transactionKeyServerLocationRequestsCompleted; state GetKeyServerLocationsReply rep = _rep; - if( info.debugID.present() ) { + if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETKEYLOCATIONS_AFTER))); - } ASSERT( rep.results.size() ); state vector< pair> > results; @@ -1364,9 +1347,6 @@ ACTOR Future> getValue( Future version, Key key, Databa g_traceBatch.addAttach("GetValueAttachID", info.debugID.get().first(), getValueID.get().first()); g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Before"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(getValueID.get().first(), now(), - GetValueDebugTrace::NATIVEAPI_GETVALUE_BEFORE))); /*TraceEvent("TransactionDebugGetValueInfo", getValueID.get()) .detail("Key", key) .detail("ReqVersion", ver) @@ -1411,9 +1391,6 @@ ACTOR Future> getValue( Future version, Key key, Databa if( info.debugID.present() ) { g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.After"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(getValueID.get().first(), now(), - GetValueDebugTrace::NATIVEAPI_GETVALUE_AFTER))); /*TraceEvent("TransactionDebugGetValueDone", getValueID.get()) .detail("Key", key) .detail("ReqVersion", ver) @@ -1428,9 +1405,6 @@ ACTOR Future> getValue( Future version, Key key, Databa cx->getValueCompleted->log(); if( info.debugID.present() ) { g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Error"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(getValueID.get().first(), now(), - GetValueDebugTrace::NATIVEAPI_GETVALUE_ERROR))); /*TraceEvent("TransactionDebugGetValueDone", getValueID.get()) .detail("Key", key) .detail("ReqVersion", ver) @@ -1452,12 +1426,8 @@ ACTOR Future> getValue( Future version, Key key, Databa ACTOR Future getKey( Database cx, KeySelector k, Future version, TransactionInfo info ) { wait(success(version)); - if( info.debugID.present() ) { + if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.AfterVersion"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETKEY_AFTERVERSION))); - } loop { if (k.getKey() == allKeys.end) { @@ -1472,12 +1442,8 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T state pair> ssi = wait( getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward()) ); try { - if( info.debugID.present() ) { + if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETKEY_BEFORE))); - } ++cx->transactionPhysicalReads; state GetKeyReply reply; try { @@ -1495,12 +1461,8 @@ ACTOR Future getKey( Database cx, KeySelector k, Future version, T ++cx->transactionPhysicalReadsCompleted; throw; } - if( info.debugID.present() ) { + if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", reply.sel.offset).detail("OrEqual", k.orEqual); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETKEY_AFTER))); - } k = reply.sel; if (!k.offset && k.orEqual) { return k.getKey(); @@ -1573,9 +1535,6 @@ ACTOR Future watchValue(Future version, Key key, Optional g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first()); g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new WatchValueDebugTrace(watchValueID.get().first(), now(), - WatchValueDebugTrace::NATIVEAPI_WATCHVALUE_BEFORE))); } state WatchValueReply resp; choose { @@ -1588,9 +1547,6 @@ ACTOR Future watchValue(Future version, Key key, Optional } if( info.debugID.present() ) { g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new WatchValueDebugTrace(watchValueID.get().first(), now(), - WatchValueDebugTrace::NATIVEAPI_WATCHVALUE_AFTER))); } //FIXME: wait for known committed version on the storage server before replying, @@ -1671,9 +1627,6 @@ ACTOR Future> getExactRange( Database cx, Version ver try { if( info.debugID.present() ) { g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.Before"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_BEFORE))); /*TraceEvent("TransactionDebugGetExactRangeInfo", info.debugID.get()) .detail("ReqBeginKey", req.begin.getKey()) .detail("ReqEndKey", req.end.getKey()) @@ -1700,12 +1653,8 @@ ACTOR Future> getExactRange( Database cx, Version ver ++cx->transactionPhysicalReadsCompleted; throw; } - if( info.debugID.present() ) { + if( info.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_AFTER))); - } output.arena().dependsOn( rep.arena ); output.append( output.arena(), rep.data.begin(), rep.data.size() ); @@ -1964,9 +1913,6 @@ ACTOR Future> getRange( Database cx, Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETRANGE_BEFORE))); /*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get()) .detail("ReqBeginKey", req.begin.getKey()) .detail("ReqEndKey", req.end.getKey()) @@ -2002,9 +1948,6 @@ ACTOR Future> getRange( Database cx, Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_AFTER))); /*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get()) .detail("ReqBeginKey", req.begin.getKey()) .detail("ReqEndKey", req.end.getKey()) @@ -2092,9 +2035,6 @@ ACTOR Future> getRange( Database cx, Reference(new TransactionDebugTrace(info.debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETRANGE_ERROR))); TraceEvent("TransactionDebugError", info.debugID.get()).error(e); } if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || @@ -2783,9 +2723,6 @@ ACTOR static Future tryCommit( Database cx, Reference commitID = nondeterministicRandom()->randomUniqueID(); g_traceBatch.addAttach("CommitAttachID", info.debugID.get().first(), commitID.get().first()); g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.Before"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(commitID.get().first(), now(), - CommitDebugTrace::NATIVEAPI_COMMIT_BEFORE))); } req.debugID = commitID; @@ -2829,12 +2766,9 @@ ACTOR static Future tryCommit( Database cx, Reference cx->transactionCommittedMutations += req.transaction.mutations.size(); cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize(); - if(info.debugID.present()) { + if(info.debugID.present()) g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(commitID.get().first(), now(), - CommitDebugTrace::NATIVEAPI_COMMIT_AFTER))); - } + double latency = now() - startTime; cx->commitLatencies.addSample(latency); cx->latencies.addSample(now() - tr->startTime); @@ -2884,12 +2818,9 @@ ACTOR static Future tryCommit( Database cx, Reference if (info.debugID.present()) TraceEvent(interval.end()).detail("Conflict", 1); - if(info.debugID.present()) { + if(info.debugID.present()) g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(commitID.get().first(), now(), - CommitDebugTrace::NATIVEAPI_COMMIT_AFTER))); - } + throw not_committed(); } } @@ -3219,23 +3150,15 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Optional debugID ) { try { ++cx->transactionReadVersionBatches; - if( debugID.present() ) { + if( debugID.present() ) g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETCONSISTENTREADVERSION_BEFORE))); - } loop { state GetReadVersionRequest req( transactionCount, flags, debugID ); choose { when ( wait( cx->onMasterProxiesChanged() ) ) {} when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) { - if( debugID.present() ) { + if( debugID.present() ) g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(debugID.get().first(), now(), - TransactionDebugTrace::NATIVEAPI_GETCONSISTENTREADVERSION_AFTER))); - } ASSERT( v.version > 0 ); cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version); return v; diff --git a/fdbserver/KeyValueStoreSQLite.actor.cpp b/fdbserver/KeyValueStoreSQLite.actor.cpp index 09f0d66774..598bcc0f57 100644 --- a/fdbserver/KeyValueStoreSQLite.actor.cpp +++ b/fdbserver/KeyValueStoreSQLite.actor.cpp @@ -25,7 +25,6 @@ #include "fdbserver/CoroFlow.h" #include "fdbserver/Knobs.h" #include "flow/Hash3.h" -#include "flow/FBTrace.h" extern "C" { #include "fdbserver/sqlite/sqliteInt.h" @@ -1531,22 +1530,12 @@ private: }; void action( ReadValueAction& rv ) { //double t = timer(); - if (rv.debugID.present()) { - g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(rv.debugID.get().first(), now(), - GetValueDebugTrace::READER_GETVALUE_BEFORE))); - } + if (rv.debugID.present()) g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask()); rv.result.send( getCursor()->get().get(rv.key) ); ++counter; - if (rv.debugID.present()) { - g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(rv.debugID.get().first(), now(), - GetValueDebugTrace::READER_GETVALUE_AFTER))); - } + if (rv.debugID.present()) g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask()); //t = timer()-t; //if (t >= 1.0) TraceEvent("ReadValueActionSlow",dbgid).detail("Elapsed", t); } @@ -1561,22 +1550,12 @@ private: }; void action( ReadValuePrefixAction& rv ) { //double t = timer(); - if (rv.debugID.present()) { - g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(rv.debugID.get().first(), now(), - GetValueDebugTrace::READER_GETVALUEPREFIX_BEFORE))); - } + if (rv.debugID.present()) g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask()); rv.result.send( getCursor()->get().getPrefix(rv.key, rv.maxLength) ); ++counter; - if (rv.debugID.present()) { - g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(rv.debugID.get().first(), now(), - GetValueDebugTrace::READER_GETVALUEPREFIX_AFTER))); - } + if (rv.debugID.present()) g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask()); //t = timer()-t; //if (t >= 1.0) TraceEvent("ReadValuePrefixActionSlow",dbgid).detail("Elapsed", t); } diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index ac12d8ff2d..af9823a1fe 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -45,7 +45,6 @@ #include "flow/ActorCollection.h" #include "flow/Knobs.h" #include "flow/Stats.h" -#include "flow/FBTrace.h" #include "flow/TDMetric.actor.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -247,12 +246,8 @@ ACTOR Future queueTransactionStartRequests( req.reply.send(rep); TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60); } else { - if (req.debugID.present()) { + if (req.debugID.present()) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(req.debugID.get().first(), now(), - TransactionDebugTrace::MASTERPROXYSERVER_QUEUETRANSACTIONSTARTREQUESTS_BEFORE))); - } if (systemQueue->empty() && defaultQueue->empty() && batchQueue->empty()) { forwardPromise(GRVTimer, delayJittered(std::max(0.0, *GRVBatchTime - (now() - *lastGRVTime)), TaskPriority::ProxyGRVTimer)); @@ -556,9 +551,6 @@ ACTOR Future commitBatcher(ProxyCommitData *commitData, PromiseStream(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::MASTERPROXYSERVER_BATCHER))); } if(!batch.size()) { @@ -747,12 +739,8 @@ ACTOR Future commitBatch( TraceEvent("SecondCommitBatch", self->dbgid).detail("DebugID", debugID.get()); } - if (debugID.present()) { + if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(debugID.get().first(), now(), - CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_BEFORE))); - } /////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield) @@ -761,12 +749,8 @@ ACTOR Future commitBatch( wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1)); state Future releaseDelay = delay(std::min(SERVER_KNOBS->MAX_PROXY_COMPUTE, batchOperations*self->commitComputePerOperation[latencyBucket]), TaskPriority::ProxyMasterVersionReply); - if (debugID.present()) { + if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(debugID.get().first(), now(), - CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_GETTINGCOMMITVERSION))); - } GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid); GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) ); @@ -786,12 +770,8 @@ ACTOR Future commitBatch( //TraceEvent("ProxyGotVer", self->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion); - if (debugID.present()) { + if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(debugID.get().first(), now(), - CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_GOTCOMMITVERSION))); - } ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version ); int conflictRangeCount = 0; @@ -823,12 +803,8 @@ ACTOR Future commitBatch( /////// Phase 2: Resolution (waiting on the network; pipelined) state vector resolution = wait( getAll(replies) ); - if (debugID.present()) { + if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterResolution"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(debugID.get().first(), now(), - CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERRESOLUTION))); - } ////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be) TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber - 1); // Queuing post-resolution commit processing @@ -839,12 +815,8 @@ ACTOR Future commitBatch( state double computeDuration = 0; self->stats.txnCommitResolved += trs.size(); - if (debugID.present()) { + if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.ProcessingMutations"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(debugID.get().first(), now(), - CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_PROCESSINGMUTATIONS))); - } state Arena arena; state bool isMyFirstBatch = !self->version; @@ -1141,12 +1113,8 @@ ACTOR Future commitBatch( state LogSystemDiskQueueAdapter::CommitMessage msg = storeCommits.back().first.get(); - if (debugID.present()) { + if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterStoreCommits"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(debugID.get().first(), now(), - CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERSTORECOMMITS))); - } // txnState (transaction subsystem state) tag: message extracted from log adapter bool firstMessage = true; @@ -1220,12 +1188,8 @@ ACTOR Future commitBatch( debug_advanceMinCommittedVersion(UID(), commitVersion); //TraceEvent("ProxyPushed", self->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion); - if (debugID.present()) { + if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterLogPush"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(debugID.get().first(), now(), - CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERLOGPUSH))); - } for (auto &p : storeCommits) { ASSERT(!p.second.isReady()); @@ -1352,9 +1316,6 @@ ACTOR Future getLiveCommittedVersion(ProxyCommitData* commi if (debugID.present()) { g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.confirmEpochLive"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(debugID.get().first(), now(), - TransactionDebugTrace::MASTERPROXYSERVER_GETLIVECOMMITTEDVERSION_CONFIRMEPOCHLIVE))); } vector versions = wait(getAll(proxyVersions)); @@ -1371,9 +1332,6 @@ ACTOR Future getLiveCommittedVersion(ProxyCommitData* commi if (debugID.present()) { g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(debugID.get().first(), now(), - TransactionDebugTrace::MASTERPROXYSERVER_GETLIVECOMMITTEDVERSION_AFTER))); } commitData->stats.txnStartOut += transactionCount; @@ -1535,9 +1493,6 @@ ACTOR static Future transactionStarter( if (debugID.present()) { g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(debugID.get().first(), now(), - TransactionDebugTrace::MASTERPROXYSERVER_MASTERPROXYSERVERCORE_BROADCAST))); } for (int i = 0; i < start.size(); i++) { @@ -1984,12 +1939,8 @@ ACTOR Future masterProxyServerCore( } when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) { //TraceEvent("ProxyGetRCV", proxy.id()); - if (req.debugID.present()) { + if (req.debugID.present()) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(req.debugID.get().first(), now(), - TransactionDebugTrace::MASTERPROXYSERVER_MASTERPROXYSERVERCORE_GETRAWCOMMITTEDVERSION))); - } GetReadVersionReply rep; rep.locked = commitData.locked; rep.metadataVersion = commitData.metadataVersion; diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index 34bd28f5e3..64b2959c63 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -30,7 +30,6 @@ #include "fdbserver/ConflictSet.h" #include "fdbserver/StorageMetrics.h" #include "fdbclient/SystemData.h" -#include "flow/FBTrace.h" #include "flow/actorcompiler.h" // This must be the last #include. namespace { @@ -116,9 +115,6 @@ ACTOR Future resolveBatch( debugID = nondeterministicRandom()->randomUniqueID(); g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), debugID.get().first()); g_traceBatch.addEvent("CommitDebug",debugID.get().first(),"Resolver.resolveBatch.Before"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::RESOLVER_RESOLVEBATCH_BEFORE))); } /*TraceEvent("ResolveBatchStart", self->dbgid).detail("From", proxyAddress).detail("Version", req.version).detail("PrevVersion", req.prevVersion).detail("StateTransactions", req.txnStateTransactions.size()) @@ -136,9 +132,6 @@ ACTOR Future resolveBatch( if(debugID.present()) { g_traceBatch.addEvent("CommitDebug",debugID.get().first(),"Resolver.resolveBatch.AfterQueueSizeCheck"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTERQUEUESIZECHECK))); } loop { @@ -171,12 +164,8 @@ ACTOR Future resolveBatch( Version firstUnseenVersion = proxyInfo.lastVersion + 1; proxyInfo.lastVersion = req.version; - if(req.debugID.present()) { + if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTERORDERER))); - } ResolveTransactionBatchReply& reply = proxyInfo.outstandingBatches[req.version]; @@ -287,12 +276,8 @@ ACTOR Future resolveBatch( self->checkNeededVersion.trigger(); } - if(req.debugID.present()) { + if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.After"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTER))); - } } else { TEST(true); // Duplicate resolve batch request diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index 37c118f548..5e2471cb9a 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -1 +1,1011 @@ +/* + * StorageCache.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/Knobs.h" +#include "fdbserver/ServerDBInfo.h" +#include "fdbclient/StorageServerInterface.h" +#include "fdbclient/VersionedMap.h" +#include "fdbclient/KeyRangeMap.h" +#include "fdbclient/Atomic.h" +#include "fdbclient/Notified.h" +#include "fdbserver/LogSystem.h" +#include "fdbserver/WaitFailure.h" +#include "fdbserver/WorkerInterface.actor.h" +#include "flow/FBTrace.h" +#include "flow/actorcompiler.h" // This must be the last #include. + + +//TODO storageCache server shares quite a bit of storageServer functionality, although simplified +// Need to look into refactoring common code out for better code readability and to avoid duplication + +//TODO rename wrong_shard_server error to wrong_cache_server +inline bool canReplyWith(Error e) { + switch(e.code()) { + case error_code_transaction_too_old: + case error_code_future_version: + case error_code_wrong_shard_server: + case error_code_process_behind: + //case error_code_all_alternatives_failed: + return true; + default: + return false; + }; +} + +const int VERSION_OVERHEAD = 64 + sizeof(Version) + sizeof(Standalone) + //mutationLog, 64b overhead for map + 2 * (64 + sizeof(Version) + sizeof(Reference::PTreeT>)); //versioned map [ x2 for createNewVersion(version+1) ], 64b overhead for map +static int mvccStorageBytes( MutationRef const& m ) { return VersionedMap::overheadPerItem * 2 + (MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2; } + +struct StorageCacheData { + typedef VersionedMap VersionedData; +private: + // in-memory versioned struct (PTree as of now. Subject to change) + VersionedData versionedData; + // in-memory mutationLog that the versionedData contains references to + // TODO change it to a deque, already contains mutations in version order + std::map> mutationLog; // versions (durableVersion, version] + +public: + UID thisServerID; // unique id + uint16_t index; // server index + Reference>> logSystem; + Key ck; //cacheKey + KeyRangeMap cachedRangeMap; // map of cached key-ranges + + // The following are in rough order from newest to oldest + // TODO double check which ones we need for storageCache servers + Version lastTLogVersion, lastVersionWithData; + NotifiedVersion version; // current version i.e. the max version that can be read from the cache + NotifiedVersion desiredOldestVersion; // oldestVersion can be increased to this after compaction + NotifiedVersion oldestVersion; // Min version that might be read from the cache + + // TODO not really in use as of now. may need in some failure cases. Revisit and remove if no plausible use + Future compactionInProgress; + + // TODO do we need otherError here? + Promise otherError; + + int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this cache server + bool behind; + + // TODO double check which ones we need for storageCache servers + struct Counters { + CounterCollection cc; + Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries; + Counter bytesInput, mutationBytes; // Like bytesInput but without MVCC accounting + Counter mutations, setMutations, clearRangeMutations, atomicMutations; + Counter updateBatches, updateVersions; + Counter loops; + Counter readsRejected; + + //LatencyBands readLatencyBands; + + Counters(StorageCacheData* self) + : cc("StorageCacheServer", self->thisServerID.toString()), + getKeyQueries("GetKeyQueries", cc), + getValueQueries("GetValueQueries",cc), + getRangeQueries("GetRangeQueries", cc), + allQueries("QueryQueue", cc), + finishedQueries("FinishedQueries", cc), + rowsQueried("RowsQueried", cc), + bytesQueried("BytesQueried", cc), + watchQueries("WatchQueries", cc), + bytesInput("BytesInput", cc), + mutationBytes("MutationBytes", cc), + mutations("Mutations", cc), + setMutations("SetMutations", cc), + clearRangeMutations("ClearRangeMutations", cc), + atomicMutations("AtomicMutations", cc), + updateBatches("UpdateBatches", cc), + updateVersions("UpdateVersions", cc), + loops("Loops", cc), + readsRejected("ReadsRejected", cc) + { + specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; }); + specialCounter(cc, "Version", [self](){ return self->version.get(); }); + specialCounter(cc, "VersionLag", [self](){ return self->versionLag; }); + } + } counters; + + explicit StorageCacheData(UID thisServerID, uint16_t index) + : thisServerID(thisServerID), index(index), + logSystem(new AsyncVar>()), + lastTLogVersion(0), lastVersionWithData(0), + compactionInProgress(Void()), + versionLag(0), behind(false), counters(this) + { + version.initMetric(LiteralStringRef("StorageCacheData.Version"), counters.cc.id); + desiredOldestVersion.initMetric(LiteralStringRef("StorageCacheData.DesriedOldestVersion"), counters.cc.id); + oldestVersion.initMetric(LiteralStringRef("StorageCacheData.OldestVersion"), counters.cc.id); + } + + void addMutation(KeyRangeRef const& cachedKeyRange, Version version, MutationRef const& mutation); + + bool isReadable( KeyRangeRef const& keys ) { + auto cr = cachedRangeMap.intersectingRanges(keys); + for(auto i = cr.begin(); i != cr.end(); ++i) + if (!i->value()) + return false; + return true; + } + + Arena lastArena; + std::map> const & getMutationLog() { return mutationLog; } + std::map>& getMutableMutationLog() { return mutationLog; } + VersionedData const& data() const { return versionedData; } + VersionedData& mutableData() { return versionedData; } + + Standalone& addVersionToMutationLog(Version v) { + // return existing version... + auto m = mutationLog.find(v); + if (m != mutationLog.end()) + return m->second; + + // ...or create a new one + auto& u = mutationLog[v]; + u.version = v; + if (lastArena.getSize() >= 65536) lastArena = Arena(4096); + u.arena() = lastArena; + counters.bytesInput += VERSION_OVERHEAD; + return u; + } + + MutationRef addMutationToMutationLog(Standalone &mLV, MutationRef const& m){ + //TODO find out more + //byteSampleApplyMutation(m, mLV.version); + counters.bytesInput += mvccStorageBytes(m); + return mLV.mutations.push_back_deep( mLV.arena(), m ); + } + +}; + +///////////////////////////////////// Queries ///////////////////////////////// +#pragma region Queries +ACTOR Future waitForVersion( StorageCacheData* data, Version version ) { + // This could become an Actor transparently, but for now it just does the lookup + if (version == latestVersion) + version = std::max(Version(1), data->version.get()); + if (version < data->oldestVersion.get() || version <= 0) throw transaction_too_old(); + else if (version <= data->version.get()) + return version; + + if(data->behind && version > data->version.get()) { + throw process_behind(); + } + + if(deterministicRandom()->random01() < 0.001) + TraceEvent("WaitForVersion1000x"); + choose { + when ( wait( data->version.whenAtLeast(version) ) ) { + //FIXME: A bunch of these can block with or without the following delay 0. + //wait( delay(0) ); // don't do a whole bunch of these at once + if (version < data->oldestVersion.get()) throw transaction_too_old(); + return version; + } + when ( wait( delay( SERVER_KNOBS->FUTURE_VERSION_DELAY ) ) ) { + if(deterministicRandom()->random01() < 0.001) + TraceEvent(SevWarn, "CacheServerFutureVersion1000x", data->thisServerID) + .detail("Version", version) + .detail("MyVersion", data->version.get()) + .detail("ServerID", data->thisServerID); + throw future_version(); + } + } +} + +ACTOR Future waitForVersionNoTooOld( StorageCacheData* data, Version version ) { + // This could become an Actor transparently, but for now it just does the lookup + if (version == latestVersion) + version = std::max(Version(1), data->version.get()); + if (version <= data->version.get()) + return version; + choose { + when ( wait( data->version.whenAtLeast(version) ) ) { + return version; + } + when ( wait( delay( SERVER_KNOBS->FUTURE_VERSION_DELAY ) ) ) { + if(deterministicRandom()->random01() < 0.001) + TraceEvent(SevWarn, "CacheServerFutureVersion1000x", data->thisServerID) + .detail("Version", version) + .detail("MyVersion", data->version.get()) + .detail("ServerID", data->thisServerID); + throw future_version(); + } + } +} + +ACTOR Future getValueQ( StorageCacheData* data, GetValueRequest req ) { + state int64_t resultSize = 0; + try { + ++data->counters.getValueQueries; + ++data->counters.allQueries; + //++data->readQueueSizeMetric; + //TODO later + //data->maxQueryQueue = std::max( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue()); + + // Active load balancing runs at a very high priority (to obtain accurate queue lengths) // so we need to downgrade here + + //TODO what's this? + wait( delay(0, TaskPriority::DefaultEndpoint) ); + + if( req.debugID.present() ) { + g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask()); + //FIXME + fbTrace(Reference(new GetValueDebugTrace(req.debugID.get().first(), now(), GetValueDebugTrace::STORAGECACHE_GETVALUE_DO_READ))); + } + + state Optional v; + state Version version = wait( waitForVersion( data, req.version ) ); + if( req.debugID.present() ) + g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask()); + + if (!data->cachedRangeMap[req.key]) { + //TraceEvent("WrongCacheServer", data->thisServerID).detail("Key", req.key).detail("Version", version).detail("In", "getValueQ"); + throw wrong_shard_server(); + } + + state int path = 0; + auto i = data->data().at(version).lastLessOrEqual(req.key); + if (i && i->isValue() && i.key() == req.key) { + v = (Value)i->getValue(); + path = 1; + } + + //debugMutation("CacheGetValue", version, MutationRef(MutationRef::DebugKey, req.key, v.present()?v.get():LiteralStringRef(""))); + //debugMutation("CacheGetPath", version, MutationRef(MutationRef::DebugKey, req.key, path==0?LiteralStringRef("0"):path==1?LiteralStringRef("1"):LiteralStringRef("2"))); + + if (v.present()) { + ++data->counters.rowsQueried; + resultSize = v.get().size(); + data->counters.bytesQueried += resultSize; + } + + if( req.debugID.present() ) + g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask()); + + GetValueReply reply(v); + req.reply.send(reply); + } catch (Error& e) { + if(!canReplyWith(e)) + throw; + req.reply.sendError(e); + } + + ++data->counters.finishedQueries; + //--data->readQueueSizeMetric; + //if(data->latencyBandConfig.present()) { + // int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits::max()); + // data->counters.readLatencyBands.addMeasurement(timer() - req.requestTime(), resultSize > maxReadBytes); + //} + + return Void(); +}; + +//TODO Implement the reverse readRange +GetKeyValuesReply readRange(StorageCacheData* data, Version version, KeyRangeRef range, int limit, int* pLimitBytes) { + GetKeyValuesReply result; + StorageCacheData::VersionedData::ViewAtVersion view = data->data().at(version); + StorageCacheData::VersionedData::iterator vCurrent = view.end(); + KeyRef readBegin; + KeyRef rangeBegin = range.begin; + KeyRef rangeEnd = range.end; + + //We might care about a clear beginning before start that runs into range + vCurrent = view.lastLessOrEqual(rangeBegin); + if (vCurrent && vCurrent->isClearTo() && vCurrent->getEndKey() > rangeBegin) + readBegin = vCurrent->getEndKey(); + else + readBegin = rangeBegin; + + vCurrent = view.lower_bound(readBegin); + ASSERT(!vCurrent || vCurrent.key() >= readBegin); + if (vCurrent) { + auto b = vCurrent; + --b; + ASSERT(!b || b.key() < readBegin); + } + int accumulatedBytes = 0; + while (vCurrent && vCurrent.key() < rangeEnd && limit > 0 && accumulatedBytes < *pLimitBytes) { + if (!vCurrent->isClearTo()) { + result.data.push_back_deep(result.arena, KeyValueRef(vCurrent.key(), vCurrent->getValue())); + accumulatedBytes += sizeof(KeyValueRef) + result.data.end()[-1].expectedSize(); + --limit; + } + ++vCurrent; + } + + *pLimitBytes -= accumulatedBytes; + ASSERT(result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0); + result.more = limit == 0 || *pLimitBytes <= 0; // FIXME: Does this have to be exact? + result.version = version; + return result; +} + +Key findKey( StorageCacheData* data, KeySelectorRef sel, Version version, KeyRange range, int* pOffset) +// Attempts to find the key indicated by sel in the data at version, within range. +// Precondition: selectorInRange(sel, range) +// If it is found, offset is set to 0 and a key is returned which falls inside range. +// If the search would depend on any key outside range OR if the key selector offset is too large (range read returns too many bytes), it returns either +// a negative offset and a key in [range.begin, sel.getKey()], indicating the key is (the first key <= returned key) + offset, or +// a positive offset and a key in (sel.getKey(), range.end], indicating the key is (the first key >= returned key) + offset-1 +// The range passed in to this function should specify a shard. If range.begin is repeatedly not the beginning of a shard, then it is possible to get stuck looping here +{ + ASSERT( version != latestVersion ); + ASSERT( selectorInRange(sel, range) && version >= data->oldestVersion.get()); + + // Count forward or backward distance items, skipping the first one if it == key and skipEqualKey + bool forward = sel.offset > 0; // If forward, result >= sel.getKey(); else result <= sel.getKey() + int sign = forward ? +1 : -1; + bool skipEqualKey = sel.orEqual == forward; + int distance = forward ? sel.offset : 1-sel.offset; + + //Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from the read range in this case) + int maxBytes; + if (sel.offset <= 1 && sel.offset >= 0) + maxBytes = std::numeric_limits::max(); + else + maxBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_LIMIT_BYTES : SERVER_KNOBS->STORAGE_LIMIT_BYTES; + + GetKeyValuesReply rep = readRange( data, version, + forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())), + (distance + skipEqualKey)*sign, &maxBytes ); + bool more = rep.more && rep.data.size() != distance + skipEqualKey; + + //If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in a loop + if(more && !forward && rep.data.size() == 1) { + TEST(true); //Reverse key selector returned only one result in range read + maxBytes = std::numeric_limits::max(); + GetKeyValuesReply rep2 = readRange( data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes ); + rep = rep2; + more = rep.more && rep.data.size() != distance + skipEqualKey; + ASSERT(rep.data.size() == 2 || !more); + } + + int index = distance-1; + if (skipEqualKey && rep.data.size() && rep.data[0].key == sel.getKey() ) + ++index; + + if (index < rep.data.size()) { + *pOffset = 0; + return rep.data[ index ].key; + } else { + // FIXME: If range.begin=="" && !forward, return success? + *pOffset = index - rep.data.size() + 1; + if (!forward) *pOffset = -*pOffset; + + if (more) { + TEST(true); // Key selector read range had more results + + ASSERT(rep.data.size()); + Key returnKey = forward ? keyAfter(rep.data.back().key) : rep.data.back().key; + + //This is possible if key/value pairs are very large and only one result is returned on a last less than query + //SOMEDAY: graceful handling of exceptionally sized values + ASSERT(returnKey != sel.getKey()); + + return returnKey; + } else + return forward ? range.end : range.begin; + } +} + +KeyRange getCachedKeyRange( StorageCacheData* data, const KeySelectorRef& sel ) +// Returns largest range that is cached on this server and selectorInRange(sel, range) or wrong_shard_server if no such range exists +{ + auto i = sel.isBackward() ? data->cachedRangeMap.rangeContainingKeyBefore( sel.getKey() ) : + data->cachedRangeMap.rangeContaining( sel.getKey() ); + if (!i->value()) throw wrong_shard_server(); + ASSERT( selectorInRange(sel, i->range()) ); + return i->range(); +} + +ACTOR Future getKeyValues( StorageCacheData* data, GetKeyValuesRequest req ) +// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large selector offset prevents +// all data from being read in one range read +{ + state int64_t resultSize = 0; + + ++data->counters.getRangeQueries; + ++data->counters.allQueries; + //++data->readQueueSizeMetric; + //data->maxQueryQueue = std::max( data->maxQueryQueue, data->counters.allQueries.getValue() - data->counters.finishedQueries.getValue()); + + // Active load balancing runs at a very high priority (to obtain accurate queue lengths) + // so we need to downgrade here + TaskPriority taskType = TaskPriority::DefaultEndpoint; + if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.isFetchKeys) { + taskType = TaskPriority::FetchKeys; + // } else if (false) { + // // Placeholder for up-prioritizing fetches for important requests + // taskType = TaskPriority::DefaultDelay; + } + wait( delay(0, taskType) ); + + try { + if( req.debugID.present() ) + g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storagecache.getKeyValues.Before"); + state Version version = wait( waitForVersion( data, req.version ) ); + + state KeyRange cachedKeyRange; + try { + cachedKeyRange = getCachedKeyRange(data, req.begin); + + if (req.debugID.present()) + g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), + "storagecache.getKeyValues.AfterVersion"); + //.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end); + } catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", "None").detail("In", "getKeyValues>getShardKeyRange"); throw e; } + + if ( !selectorInRange(req.end, cachedKeyRange) && !(req.end.isFirstGreaterOrEqual() && req.end.getKey() == cachedKeyRange.end) ) { +// TraceEvent("WrongShardServer1", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkShardExtents"); + throw wrong_shard_server(); + } + + state int offset1; + state int offset2; + state Key begin = req.begin.isFirstGreaterOrEqual() ? req.begin.getKey() : findKey( data, req.begin, version, cachedKeyRange, &offset1 ); + state Key end = req.end.isFirstGreaterOrEqual() ? req.end.getKey() : findKey( data, req.end, version, cachedKeyRange, &offset2 ); + if( req.debugID.present() ) + g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storagecache.getKeyValues.AfterKeys"); + //.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey()); + + // Offsets of zero indicate begin/end keys in this cachedKeyRange, which obviously means we can answer the query + // An end offset of 1 is also OK because the end key is exclusive, so if the first key of the next cachedKeyRange is the end the last actual key returned must be from this cachedKeyRange. + // A begin offset of 1 is also OK because then either begin is past end or equal to end (so the result is definitely empty) + if ((offset1 && offset1!=1) || (offset2 && offset2!=1)) { + TEST(true); // wrong_cache_server due to offset + // We could detect when offset1 takes us off the beginning of the database or offset2 takes us off the end, and return a clipped range rather + // than an error (since that is what the NativeAPI.getRange will do anyway via its "slow path"), but we would have to add some flags to the response + // to encode whether we went off the beginning and the end, since it needs that information. + //TraceEvent("WrongShardServer2", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end).detail("In", "getKeyValues>checkOffsets").detail("BeginKey", begin).detail("EndKey", end).detail("BeginOffset", offset1).detail("EndOffset", offset2); + throw wrong_shard_server(); + } + + if (begin >= end) { + if( req.debugID.present() ) + g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storagecache.getKeyValues.Send"); + //.detail("Begin",begin).detail("End",end); + + GetKeyValuesReply none; + none.version = version; + none.more = false; + req.reply.send( none ); + } else { + state int remainingLimitBytes = req.limitBytes; + + GetKeyValuesReply _r = readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes); + GetKeyValuesReply r = _r; + + if( req.debugID.present() ) + g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storagecache.getKeyValues.AfterReadRange"); + //.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size()); + if (EXPENSIVE_VALIDATION) { + for (int i = 0; i < r.data.size(); i++) + ASSERT(r.data[i].key >= begin && r.data[i].key < end); + ASSERT(r.data.size() <= std::abs(req.limit)); + } + + req.reply.send( r ); + + resultSize = req.limitBytes - remainingLimitBytes; + data->counters.bytesQueried += resultSize; + data->counters.rowsQueried += r.data.size(); + } + } catch (Error& e) { + if(!canReplyWith(e)) + throw; + req.reply.sendError(e); + } + + ++data->counters.finishedQueries; + + return Void(); +} + +ACTOR Future getKey( StorageCacheData* data, GetKeyRequest req ) { + state int64_t resultSize = 0; + + ++data->counters.getKeyQueries; + ++data->counters.allQueries; + + // Active load balancing runs at a very high priority (to obtain accurate queue lengths) + // so we need to downgrade here + wait( delay(0, TaskPriority::DefaultEndpoint) ); + + try { + state Version version = wait( waitForVersion( data, req.version ) ); + state KeyRange cachedKeyRange = getCachedKeyRange( data, req.sel ); + + state int offset; + Key k = findKey( data, req.sel, version, cachedKeyRange, &offset ); + + KeySelector updated; + if (offset < 0) + updated = firstGreaterOrEqual(k)+offset; // first thing on this shard OR (large offset case) smallest key retrieved in range read + else if (offset > 0) + updated = firstGreaterOrEqual(k)+offset-1; // first thing on next shard OR (large offset case) keyAfter largest key retrieved in range read + else + updated = KeySelectorRef(k,true,0); //found + + resultSize = k.size(); + data->counters.bytesQueried += resultSize; + ++data->counters.rowsQueried; + + GetKeyReply reply(updated); + req.reply.send(reply); + } + catch (Error& e) { + if (e.code() == error_code_wrong_shard_server) TraceEvent("WrongShardServer").detail("In","getKey"); + if(!canReplyWith(e)) + throw; + req.reply.sendError(e); + } + + ++data->counters.finishedQueries; + + return Void(); +} + +#pragma endregion + +bool expandMutation( MutationRef& m, StorageCacheData::VersionedData const& data, KeyRef eagerTrustedEnd, Arena& ar ) { + // After this function call, m should be copied into an arena immediately (before modifying data, shards, or eager) + if (m.type == MutationRef::ClearRange) { + // Expand the clear + const auto& d = data.atLatest(); + + // If another clear overlaps the beginning of this one, engulf it + auto i = d.lastLess(m.param1); + if (i && i->isClearTo() && i->getEndKey() >= m.param1) + m.param1 = i.key(); + + // If another clear overlaps the end of this one, engulf it; otherwise expand + i = d.lastLessOrEqual(m.param2); + if (i && i->isClearTo() && i->getEndKey() >= m.param2) { + m.param2 = i->getEndKey(); + } else { + // Expand to the next set or clear (from storage or latestVersion), and if it + // is a clear, engulf it as well + i = d.lower_bound(m.param2); + //KeyRef endKeyAtStorageVersion = m.param2 == eagerTrustedEnd ? eagerTrustedEnd : std::min( eager->getKeyEnd( m.param2 ), eagerTrustedEnd ); + // TODO check if the following is correct + KeyRef endKeyAtStorageVersion = eagerTrustedEnd; + if (!i || endKeyAtStorageVersion < i.key()) + m.param2 = endKeyAtStorageVersion; + else if (i->isClearTo()) + m.param2 = i->getEndKey(); + else + m.param2 = i.key(); + } + } + else if (m.type != MutationRef::SetValue && (m.type)) { + + Optional oldVal; + auto it = data.atLatest().lastLessOrEqual(m.param1); + if (it != data.atLatest().end() && it->isValue() && it.key() == m.param1) + oldVal = it->getValue(); + else if (it != data.atLatest().end() && it->isClearTo() && it->getEndKey() > m.param1) { + TEST(true); // Atomic op right after a clear. + } + + switch(m.type) { + case MutationRef::AddValue: + m.param2 = doLittleEndianAdd(oldVal, m.param2, ar); + break; + case MutationRef::And: + m.param2 = doAnd(oldVal, m.param2, ar); + break; + case MutationRef::Or: + m.param2 = doOr(oldVal, m.param2, ar); + break; + case MutationRef::Xor: + m.param2 = doXor(oldVal, m.param2, ar); + break; + case MutationRef::AppendIfFits: + m.param2 = doAppendIfFits(oldVal, m.param2, ar); + break; + case MutationRef::Max: + m.param2 = doMax(oldVal, m.param2, ar); + break; + case MutationRef::Min: + m.param2 = doMin(oldVal, m.param2, ar); + break; + case MutationRef::ByteMin: + m.param2 = doByteMin(oldVal, m.param2, ar); + break; + case MutationRef::ByteMax: + m.param2 = doByteMax(oldVal, m.param2, ar); + break; + case MutationRef::MinV2: + m.param2 = doMinV2(oldVal, m.param2, ar); + break; + case MutationRef::AndV2: + m.param2 = doAndV2(oldVal, m.param2, ar); + break; + case MutationRef::CompareAndClear: + if (oldVal.present() && m.param2 == oldVal.get()) { + m.type = MutationRef::ClearRange; + m.param2 = keyAfter(m.param1, ar); + return expandMutation(m, data, eagerTrustedEnd, ar); + } + return false; + } + m.type = MutationRef::SetValue; + } + + return true; +} + +// Applies a write mutation (SetValue or ClearRange) to the in-memory versioned data structure +void applyMutation( StorageCacheData *self, MutationRef const& m, Arena& arena, StorageCacheData::VersionedData &data ) { + // m is expected to be in arena already + // Clear split keys are added to arena + + if (m.type == MutationRef::SetValue) { + auto prev = data.atLatest().lastLessOrEqual(m.param1); + if (prev && prev->isClearTo() && prev->getEndKey() > m.param1) { + ASSERT( prev.key() <= m.param1 ); + KeyRef end = prev->getEndKey(); + // TODO double check if the insert version of the previous clear needs to be preserved for the "left half", + // insert() invalidates prev, so prev.key() is not safe to pass to it by reference + data.insert( KeyRef(prev.key()), ValueOrClearToRef::clearTo( m.param1 ), prev.insertVersion() ); // overwritten by below insert if empty + KeyRef nextKey = keyAfter(m.param1, arena); + if ( end != nextKey ) { + ASSERT( end > nextKey ); + // TODO double check if it's okay to let go of the the insert version of the "right half" + // FIXME: This copy is technically an asymptotic problem, definitely a waste of memory (copy of keyAfter is a waste, but not asymptotic) + data.insert( nextKey, ValueOrClearToRef::clearTo( KeyRef(arena, end) ) ); + } + } + data.insert( m.param1, ValueOrClearToRef::value(m.param2) ); + } else if (m.type == MutationRef::ClearRange) { + data.erase( m.param1, m.param2 ); + ASSERT( m.param2 > m.param1 ); + ASSERT( !data.isClearContaining( data.atLatest(), m.param1 ) ); + data.insert( m.param1, ValueOrClearToRef::clearTo(m.param2) ); + } +} + +template +void splitMutation(StorageCacheData* data, KeyRangeMap& map, MutationRef const& m, Version ver) { + if(isSingleKeyMutation((MutationRef::Type) m.type)) { + auto i = map.rangeContaining(m.param1); + if (i->value()) // If this key lies in the cached key-range on this server + data->addMutation( i->range(), ver, m ); + } + else if (m.type == MutationRef::ClearRange) { + KeyRangeRef mKeys( m.param1, m.param2 ); + auto r = map.intersectingRanges( mKeys ); + for(auto i = r.begin(); i != r.end(); ++i) { + if (i->value()) { // if this sub-range exists on this cache server + KeyRangeRef k = mKeys & i->range(); + data->addMutation( i->range(), ver, MutationRef((MutationRef::Type)m.type, k.begin, k.end) ); + } + } + } else + ASSERT(false); // Unknown mutation type in splitMutations +} + +void StorageCacheData::addMutation(KeyRangeRef const& cachedKeyRange, Version version, MutationRef const& mutation) { + MutationRef expanded = mutation; + auto& mLog = addVersionToMutationLog(version); + + if ( !expandMutation( expanded, data(), cachedKeyRange.end, mLog.arena()) ) { + return; + } + expanded = addMutationToMutationLog(mLog, expanded); + if (debugMutation("expandedMutation", version, expanded)) { + const char* type = + mutation.type == MutationRef::SetValue ? "SetValue" : + mutation.type == MutationRef::ClearRange ? "ClearRange" : + mutation.type == MutationRef::DebugKeyRange ? "DebugKeyRange" : + mutation.type == MutationRef::DebugKey ? "DebugKey" : + "UnknownMutation"; + printf("DEBUGMUTATION:\t%.6f\t%s\t%s\t%s\t%s\t%s\n", + now(), g_network->getLocalAddress().toString().c_str(), "originalMutation", + type, printable(mutation.param1).c_str(), printable(mutation.param2).c_str()); + printf(" Cached Key-range: %s - %s\n", printable(cachedKeyRange.begin).c_str(), printable(cachedKeyRange.end).c_str()); + } + applyMutation( this, expanded, mLog.arena(), mutableData() ); + printf("\nSCUpdate: Printing versioned tree after applying mutation\n"); + mutableData().printTree(version); + +} + +// Helper class for updating the storage cache (i.e. applying mutations) +class StorageCacheUpdater { +public: + StorageCacheUpdater() : currentVersion(invalidVersion), processedCacheStartKey(false) {} + StorageCacheUpdater(Version currentVersion) : currentVersion(currentVersion), processedCacheStartKey(false) {} + + void applyMutation(StorageCacheData* data, MutationRef const& m , Version ver) { + //TraceEvent("SCNewVersion", data->thisServerID).detail("VerWas", data->mutableData().latestVersion).detail("ChVer", ver); + + if(currentVersion != ver) { + currentVersion = ver; + data->mutableData().createNewVersion(ver); + } + + if (m.param1.startsWith( systemKeys.end )) { + //TraceEvent("PrivateData", data->thisServerID).detail("Mutation", m.toString()).detail("Version", ver); + applyPrivateCacheData( data, m ); + } else { + // FIXME: enable when debugMutation is active + //for(auto m = changes[c].mutations.begin(); m; ++m) { + // debugMutation("SCUpdateMutation", changes[c].version, *m); + //} + + splitMutation(data, data->cachedRangeMap, m, ver); + } + + //TODO + if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get(); + } + + Version currentVersion; +private: + KeyRef cacheStartKey; + bool nowAssigned; + bool processedCacheStartKey; + + // Applies private mutations, as the name suggests. It's basically establishes the key-ranges + //that this cache server is responsible for + // TODO Revisit during failure handling. Might we loose some private mutations? + void applyPrivateCacheData( StorageCacheData* data, MutationRef const& m ) { + TraceEvent(SevDebug, "SCPrivateCacheMutation", data->thisServerID).detail("Mutation", m.toString()); + + if (processedCacheStartKey) { + // we expect changes in pairs, [begin,end). This mutation is for end key of the range + ASSERT (m.type == MutationRef::SetValue && m.param1.startsWith(data->ck)); + KeyRangeRef keys( cacheStartKey.removePrefix(data->ck), m.param1.removePrefix(data->ck)); + data->cachedRangeMap.insert(keys, true); + fprintf(stderr, "SCPrivateCacheMutation: begin: %s, end: %s\n", printable(keys.begin).c_str(), printable(keys.end).c_str()); + + processedCacheStartKey = false; + } else if (m.type == MutationRef::SetValue && m.param1.startsWith( data->ck )) { + // We expect changes in pairs, [begin,end), This mutation is for start key of the range + cacheStartKey = m.param1; + processedCacheStartKey = true; + } else { + fprintf(stderr, "SCPrivateCacheMutation: Unknown private mutation\n"); + ASSERT(false); // Unknown private mutation + } + } +}; + +// Compacts the in-memory VersionedMap, i.e. removes versions below the desiredOldestVersion +// TODO revisit if we change the data structure +ACTOR Future compactCache(StorageCacheData* data) { + loop { + //TODO understand this, should we add delay here? + //if (g_network->isSimulated()) { + // double endTime = g_simulator.checkDisabled(format("%s/compactCache", data->thisServerID.toString().c_str())); + // if(endTime > now()) { + // wait(delay(endTime - now(), TaskPriority::CompactCache)); + // } + //} + + // Wait until the desiredOldestVersion is greater than the current oldestVersion + wait( data->desiredOldestVersion.whenAtLeast( data->oldestVersion.get()+1 ) ); + wait( delay(0, TaskPriority::CompactCache) ); + + //TODO not really in use as of now. may need in some failure cases. Revisit and remove if no plausible use + state Promise compactionInProgress; + data->compactionInProgress = compactionInProgress.getFuture(); + state Version desiredVersion = data->desiredOldestVersion.get(); + // Call the compaction routine that does the actual work, + // TODO It's a synchronous function call as of now. Should it asynch? + data->mutableData().compact(desiredVersion); + Future finishedForgetting = data->mutableData().forgetVersionsBeforeAsync( desiredVersion, + TaskPriority::CompactCache ); + data->oldestVersion.set( desiredVersion ); + wait( finishedForgetting ); + // TODO how do we yield here? This may not be enough, because compact() does the heavy lifting + // of compating the VersionedMap. We should probably look into per version compaction and then + // we can yield after compacting one version + wait( yield(TaskPriority::CompactCache) ); + + // TODO what flowlock to acquire during compaction? + compactionInProgress.send(Void()); + wait( delay(0, TaskPriority::CompactCache) ); //Setting compactionInProgess could cause the cache server to shut down, so delay to check for cancellation + } +} + +ACTOR Future pullAsyncData( StorageCacheData *data ) { + state Future dbInfoChange = Void(); + state Reference r; + state Version tagAt = 0; + + state StorageCacheUpdater updater(data->lastVersionWithData); + state Version ver = invalidVersion; + //data->lastTLogVersion = r->getMaxKnownVersion(); + //data->versionLag = std::max(0, data->lastTLogVersion - data->version.get()); + ++data->counters.updateBatches; + + loop { + loop { + choose { + when(wait( r ? r->getMore(TaskPriority::TLogCommit) : Never() ) ) { + break; + } + when( wait( dbInfoChange ) ) { + if( data->logSystem->get() ) + r = data->logSystem->get()->peek( data->thisServerID, tagAt, Optional(), cacheTag, true ); + else + r = Reference(); + dbInfoChange = data->logSystem->onChange(); + } + } + } + //FIXME: if the popped version is greater than our last version, we need to clear the cache + + //FIXME: ensure this can only read data from the current version + r->setProtocolVersion(currentProtocolVersion); + + // Now process the mutations + for (; r->hasMessage(); r->nextMessage()) { + ArenaReader& reader = *r->reader(); + + MutationRef msg; + reader >> msg; + fprintf(stderr, "%lld : %s\n", r->version().version, msg.toString().c_str()); + + if (r->version().version > ver && r->version().version > data->version.get()) { + ++data->counters.updateVersions; + ver = r->version().version; + } + if (ver != invalidVersion) // This change belongs to a version < minVersion + { + updater.applyMutation(data, msg, ver); + // TODO + //mutationBytes += msg.totalSize(); + data->counters.mutationBytes += msg.totalSize(); + ++data->counters.mutations; + switch(msg.type) { + case MutationRef::SetValue: + ++data->counters.setMutations; + break; + case MutationRef::ClearRange: + ++data->counters.clearRangeMutations; + break; + case MutationRef::AddValue: + case MutationRef::And: + case MutationRef::AndV2: + case MutationRef::AppendIfFits: + case MutationRef::ByteMax: + case MutationRef::ByteMin: + case MutationRef::Max: + case MutationRef::Min: + case MutationRef::MinV2: + case MutationRef::Or: + case MutationRef::Xor: + case MutationRef::CompareAndClear: + ++data->counters.atomicMutations; + break; + } + } + else + TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", r->version().toString()); + + tagAt = r->version().version + 1; + } + + if(ver != invalidVersion) { + data->lastVersionWithData = ver; + } else { + // TODO double check + ver = r->version().version - 1; + } + + if(ver != invalidVersion && ver > data->version.get()) { + debugKeyRange("SCUpdate", ver, allKeys); + + data->mutableData().createNewVersion(ver); + + // TODO what about otherError + if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get(); + + // TODO may enable these later + //data->noRecentUpdates.set(false); + //data->lastUpdate = now(); + data->version.set( ver ); // Triggers replies to waiting gets for new version(s) + // TODO double check + //setDataVersion(data->thisServerID, data->version.get()); + + // TODO what about otherError + if (data->otherError.getFuture().isReady()) data->otherError.getFuture().get(); + + // we can get rid of versions beyond maxVerionsInMemory at any point. Update the + //desiredOldestVersion and that may invoke the compaction actor + Version maxVersionsInMemory = SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS; + Version proposedOldestVersion = data->version.get() - maxVersionsInMemory; + proposedOldestVersion = std::max(proposedOldestVersion, data->oldestVersion.get()); + data->desiredOldestVersion.set(proposedOldestVersion); + } + + // TODO implement a validate function for the cache + //validate(data); + + if(r->version().version >= data->lastTLogVersion) { + if(data->behind) { + TraceEvent("StorageCacheNoLongerBehind", data->thisServerID).detail("CursorVersion", r->version().version).detail("TLogVersion", data->lastTLogVersion); + } + data->behind = false; + } + + tagAt = std::max( tagAt, r->version().version); + } +} + +ACTOR Future storageCache(StorageServerInterface ssi, uint16_t id, Reference> db) { + state StorageCacheData self(ssi.id(), id); + state ActorCollection actors(false); + state Future dbInfoChange = Void(); + + // This helps identify the private mutations meant for this cache server + self.ck = cacheKeysPrefixFor( id ).withPrefix(systemKeys.begin); // FFFF/02cacheKeys/[this server]/ + + actors.add(waitFailureServer(ssi.waitFailure.getFuture())); + + // compactCache actor will periodically compact the cache when certain version condityion is met + actors.add(compactCache(&self)); + + // pullAsyncData actor pulls mutations from the TLog and also applies them. + actors.add(pullAsyncData(&self)); + + loop { + ++self.counters.loops; + choose { + when( wait( dbInfoChange ) ) { + dbInfoChange = db->onChange(); + self.logSystem->set(ILogSystem::fromServerDBInfo( ssi.id(), db->get(), true )); + } + when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) { + // TODO do we need to add throttling for cache servers? Probably not + //actors.add(self->readGuard(req , getValueQ)); + actors.add(getValueQ(&self, req)); + } + when( WatchValueRequest req = waitNext(ssi.watchValue.getFuture()) ) { + ASSERT(false); + } + when (GetKeyRequest req = waitNext(ssi.getKey.getFuture())) { + actors.add(getKey(&self, req)); + } + when (GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture()) ) { + actors.add(getKeyValues(&self, req)); + } + when (GetShardStateRequest req = waitNext(ssi.getShardState.getFuture()) ) { + ASSERT(false); + } + when (StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) { + ASSERT(false); + } + //when( ReplyPromise reply = waitNext(ssi.getVersion.getFuture()) ) { + // ASSERT(false); + //} + when( ReplyPromise reply = waitNext(ssi.getKeyValueStoreType.getFuture()) ) { + ASSERT(false); + } + when(wait(actors.getResult())) {} + } + } +} diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index e1a26a3042..7ce0bb5d79 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -21,7 +21,6 @@ #include "flow/Hash3.h" #include "flow/Stats.h" #include "flow/UnitTest.h" -#include "flow/FBTrace.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" #include "fdbclient/KeyRangeMap.h" @@ -1767,9 +1766,6 @@ ACTOR Future tLogCommit( tlogDebugID = nondeterministicRandom()->randomUniqueID(); g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), tlogDebugID.get().first()); g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.BeforeWaitForVersion"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::TLOG_TLOGCOMMIT_BEFOREWAITFORVERSION))); } logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, req.minKnownCommittedVersion); @@ -1799,12 +1795,8 @@ ACTOR Future tLogCommit( } if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!) - if(req.debugID.present()) { + if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::TLOG_TLOGCOMMIT_BEFORE))); - } //TraceEvent("TLogCommit", logData->logId).detail("Version", req.version); commitMessages(self, logData, req.version, req.arena, req.messages); @@ -1827,12 +1819,8 @@ ACTOR Future tLogCommit( // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors logData->version.set( req.version ); - if(req.debugID.present()) { + if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::TLOG_TLOGCOMMIT_AFTERTLOGCOMMIT))); - } } // Send replies only once all prior messages have been received and committed. state Future stopped = logData->stopCommit.onTrigger(); @@ -1844,12 +1832,8 @@ ACTOR Future tLogCommit( return Void(); } - if(req.debugID.present()) { + if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After"); - //FIXME - fbTrace(Reference(new CommitDebugTrace(req.debugID.get().first(), now(), - CommitDebugTrace::TLOG_TLOGCOMMIT_AFTER))); - } req.reply.send( logData->durableKnownCommittedVersion ); return Void(); @@ -2115,9 +2099,6 @@ ACTOR Future serveTLogInterface( TLogData* self, TLogInterface tli, Refere UID tlogDebugID = nondeterministicRandom()->randomUniqueID(); g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first()); g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(req.debugID.get().first(), now(), - TransactionDebugTrace::TLOGSERVER_TLOGCONFIRMRUNNINGREQUEST))); } if (!logData->stopped) req.reply.send(Void()); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 59d337c449..e8a9bbca76 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -51,7 +51,6 @@ #include "fdbrpc/sim_validation.h" #include "fdbrpc/Smoother.h" #include "flow/Stats.h" -#include "flow/FBTrace.h" #include "flow/TDMetric.actor.h" #include #include "flow/actorcompiler.h" // This must be the last #include. @@ -839,21 +838,13 @@ ACTOR Future getValueQ( StorageServer* data, GetValueRequest req ) { // so we need to downgrade here wait( delay(0, TaskPriority::DefaultEndpoint) ); - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(req.debugID.get().first(), now(), - GetValueDebugTrace::STORAGESERVER_GETVALUE_DOREAD))); - } state Optional v; state Version version = wait( waitForVersion( data, req.version ) ); - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(req.debugID.get().first(), now(), - GetValueDebugTrace::STORAGESERVER_GETVALUE_AFTERVERSION))); - } state uint64_t changeCounter = data->shardChangeCounter; @@ -906,12 +897,8 @@ ACTOR Future getValueQ( StorageServer* data, GetValueRequest req ) { data->metrics.notifyBytesReadPerKSecond(req.key, bytesReadPerKSecond); } - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(req.debugID.get().first(), now(), - GetValueDebugTrace::STORAGESERVER_GETVALUE_AFTERREAD))); - } GetValueReply reply(v); reply.penalty = data->getPenalty(); @@ -936,20 +923,12 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) try { ++data->counters.watchQueries; - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new WatchValueDebugTrace(req.debugID.get().first(), now(), - WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_BEFORE))); - } wait(success(waitForVersionNoTooOld(data, req.version))); - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new WatchValueDebugTrace(req.debugID.get().first(), now(), - WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_AFTERVERSION))); - } loop { try { @@ -966,12 +945,8 @@ ACTOR Future watchValue_impl( StorageServer* data, WatchValueRequest req ) debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("") ) ); - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new WatchValueDebugTrace(req.debugID.get().first(), now(), - WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_AFTERREAD))); - } if( reply.value != req.value ) { req.reply.send(WatchValueReply{ latest }); @@ -1376,24 +1351,16 @@ ACTOR Future getKeyValues( StorageServer* data, GetKeyValuesRequest req ) wait( delay(0, taskType) ); try { - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(req.debugID.get().first(), now(), - TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_BEFORE))); - } state Version version = wait( waitForVersion( data, req.version ) ); state uint64_t changeCounter = data->shardChangeCounter; // try { state KeyRange shard = getShardKeyRange( data, req.begin ); - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterVersion"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(req.debugID.get().first(), now(), - TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERVERSION))); - } //.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end); //} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", "None").detail("In", "getKeyValues>getShardKeyRange"); throw e; } @@ -1408,12 +1375,8 @@ ACTOR Future getKeyValues( StorageServer* data, GetKeyValuesRequest req ) state Future fEnd = req.end.isFirstGreaterOrEqual() ? Future(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 ); state Key begin = wait(fBegin); state Key end = wait(fEnd); - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterKeys"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(req.debugID.get().first(), now(), - TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERKEYS))); - } //.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey()); // Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query @@ -1429,12 +1392,8 @@ ACTOR Future getKeyValues( StorageServer* data, GetKeyValuesRequest req ) } if (begin >= end) { - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Send"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(req.debugID.get().first(), now(), - TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_SEND))); - } //.detail("Begin",begin).detail("End",end); GetKeyValuesReply none; @@ -1450,12 +1409,8 @@ ACTOR Future getKeyValues( StorageServer* data, GetKeyValuesRequest req ) GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) ); GetKeyValuesReply r = _r; - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterReadRange"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(req.debugID.get().first(), now(), - TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERREADRANGE))); - } //.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size()); data->checkChangeCounter( changeCounter, KeyRangeRef( std::min(begin, std::min(req.begin.getKey(), req.end.getKey())), std::max(end, std::max(req.begin.getKey(), req.end.getKey())) ) ); if (EXPENSIVE_VALIDATION) { @@ -3647,12 +3602,8 @@ ACTOR Future storageServerCore( StorageServer* self, StorageServerInterfac } when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) { // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work - if( req.debugID.present() ) { + if( req.debugID.present() ) g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.received"); //.detail("TaskID", g_network->getCurrentTask()); - //FIXME - fbTrace(Reference(new GetValueDebugTrace(req.debugID.get().first(), now(), - GetValueDebugTrace::STORAGESERVER_GETVALUE_RECEIVED))); - } if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key)) req.reply.send(GetValueReply()); diff --git a/fdbserver/workloads/ReadWrite.actor.cpp b/fdbserver/workloads/ReadWrite.actor.cpp index ce9623a079..cd7cc918c3 100644 --- a/fdbserver/workloads/ReadWrite.actor.cpp +++ b/fdbserver/workloads/ReadWrite.actor.cpp @@ -31,7 +31,6 @@ #include "fdbserver/ClusterRecruitmentInterface.h" #include "fdbclient/ReadYourWrites.h" #include "flow/TDMetric.actor.h" -#include "flow/FBTrace.h" #include "flow/actorcompiler.h" // This must be the last #include. const int sampleSize = 10000; @@ -619,9 +618,6 @@ struct ReadWriteWorkload : KVWorkload { debugID = deterministicRandom()->randomUniqueID(); tr.debugTransaction(debugID); g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.Before"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(debugID.first(), now(), - TransactionDebugTrace::READWRITE_RANDOMREADWRITECLIENT_BEFORE))); } else { debugID = UID(); @@ -694,12 +690,8 @@ struct ReadWriteWorkload : KVWorkload { } } - if(debugID != UID()) { + if(debugID != UID()) g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.After"); - //FIXME - fbTrace(Reference(new TransactionDebugTrace(debugID.first(), now(), - TransactionDebugTrace::READWRITE_RANDOMREADWRITECLIENT_AFTER))); - } tr = Trans(); diff --git a/flow/FBTrace.h b/flow/FBTrace.h index 13404e2d47..3c76a108e0 100644 --- a/flow/FBTrace.h +++ b/flow/FBTrace.h @@ -44,20 +44,17 @@ public: constexpr static FileIdentifier file_identifier = 617894; enum codeLocation { STORAGESERVER_GETVALUE_RECEIVED = 0, - STORAGESERVER_GETVALUE_DOREAD = 1, - STORAGESERVER_GETVALUE_AFTERVERSION = 2, - STORAGESERVER_GETVALUE_AFTERREAD = 3, + STORAGESERVER_GETVALUE_DO_READ = 1, + STORAGESERVER_GETVALUE_AFTER_VERSION = 2, + STORAGESERVER_GETVALUE_AFTER_READ = 3, STORAGECACHE_GETVALUE_RECEIVED = 4, - STORAGECACHE_GETVALUE_DOREAD = 5, - STORAGECACHE_GETVALUE_AFTERVERSION = 6, - STORAGECACHE_GETVALUE_AFTERREAD = 7, + STORAGECACHE_GETVALUE_DO_READ = 5, + STORAGECACHE_GETVALUE_AFTER_VERSION = 6, + STORAGECACHE_GETVALUE_AFTER_READ = 7, READER_GETVALUE_BEFORE = 8, READER_GETVALUE_AFTER = 9, READER_GETVALUEPREFIX_BEFORE = 10, - READER_GETVALUEPREFIX_AFTER = 11, - NATIVEAPI_GETVALUE_BEFORE = 12, - NATIVEAPI_GETVALUE_AFTER = 13, - NATIVEAPI_GETVALUE_ERROR = 14 + READER_GETVALUEPREFIX_AFTER = 11 }; uint64_t id; @@ -77,10 +74,10 @@ public: constexpr static FileIdentifier file_identifier = 14486715; enum codeLocation { STORAGESERVER_WATCHVALUE_BEFORE = 1, - STORAGESERVER_WATCHVALUE_AFTERVERSION = 2, - STORAGESERVER_WATCHVALUE_AFTERREAD = 3, + STORAGESERVER_WATCHVALUE_AFTER_VERSION = 2, + STORAGESERVER_WATCHVALUE_AFTER_READ = 3, NATIVEAPI_WATCHVALUE_BEFORE = 4, - NATIVEAPI_WATCHVALUE_AFTER = 5 + NATIVEAPI_WATCHVALUE_AFTER_READ = 5 }; uint64_t id; @@ -102,9 +99,9 @@ public: STORAGESERVER_COMMIT_BEORE = 0, STORAGESERVER_COMMIT_AFTER_VERSION = 1, STORAGESERVER_COMMIT_AFTER_READ = 2, - NATIVEAPI_COMMIT_BEFORE = 3, + NATIVEAPI_COMMIT_BEORE = 3, NATIVEAPI_COMMIT_AFTER = 4, - MASTERPROXYSERVER_BATCHER = 5, + MASTERROXYSERVER_BATCHER = 5, MASTERPROXYSERVER_COMMITBATCH_BEFORE = 6, MASTERPROXYSERVER_COMMITBATCH_GETTINGCOMMITVERSION = 7, MASTERPROXYSERVER_COMMITBATCH_GOTCOMMITVERSION = 8,