diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index cb4eb93eba..bace284cb6 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -1777,6 +1777,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedrecruitmentID; req.storeType = configuration.tLogDataStoreType; + req.spillType = configuration.tLogSpillType; req.recoverFrom = oldLogSystem->getLogSystemConfig(); req.recoverAt = oldLogSystem->recoverAt.get(); req.knownCommittedVersion = oldLogSystem->knownCommittedVersion; diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 711e147cb2..b507dab110 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -81,6 +81,7 @@ struct InitializeTLogRequest { std::vector recoverTags; std::vector allTags; KeyValueStoreType storeType; + TLogSpillType spillType; Tag remoteTag; int8_t locality; bool isPrimary; @@ -93,7 +94,7 @@ struct InitializeTLogRequest { template void serialize( Ar& ar ) { - serializer(ar, recruitmentID, recoverFrom, recoverAt, knownCommittedVersion, epoch, recoverTags, allTags, storeType, remoteTag, locality, isPrimary, startVersion, logRouterTags, reply); + serializer(ar, recruitmentID, recoverFrom, recoverAt, knownCommittedVersion, epoch, recoverTags, allTags, storeType, remoteTag, locality, isPrimary, startVersion, logRouterTags, reply, spillType); } }; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 5f39e14d1a..e033ed8fa2 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -184,6 +184,7 @@ ACTOR Future loadedPonger( FutureStream pings ) { StringRef fileStoragePrefix = LiteralStringRef("storage-"); StringRef fileLogDataPrefix = LiteralStringRef("log-"); +StringRef fileLogRefSpillDataPrefix = LiteralStringRef("log-reference-"); StringRef fileLogQueuePrefix = LiteralStringRef("logqueue-"); StringRef tlogQueueExtension = LiteralStringRef("fdq"); @@ -221,12 +222,13 @@ std::string filenameFromId( KeyValueStoreType storeType, std::string folder, std } struct DiskStore { - enum COMPONENT { TLogData, Storage }; + enum COMPONENT { TLogData, Storage, UNSET }; - UID storeID; - std::string filename; // For KVStoreMemory just the base filename to be passed to IDiskQueue - COMPONENT storedComponent; - KeyValueStoreType storeType; + UID storeID = UID(); + std::string filename = ""; // For KVStoreMemory just the base filename to be passed to IDiskQueue + COMPONENT storedComponent = UNSET; + KeyValueStoreType storeType = KeyValueStoreType::END; + TLogSpillType spillType = TLogSpillType::UNSET; }; std::vector< DiskStore > getDiskStores( std::string folder, std::string suffix, KeyValueStoreType type) { @@ -242,8 +244,14 @@ std::vector< DiskStore > getDiskStores( std::string folder, std::string suffix, store.storedComponent = DiskStore::Storage; prefix = fileStoragePrefix; } + else if( StringRef( files[idx] ).startsWith( fileLogRefSpillDataPrefix ) ) { + store.storedComponent = DiskStore::TLogData; + store.spillType = TLogSpillType::REFERENCE; + prefix = fileLogRefSpillDataPrefix; + } else if( StringRef( files[idx] ).startsWith( fileLogDataPrefix ) ) { store.storedComponent = DiskStore::TLogData; + store.spillType = TLogSpillType::VALUE; prefix = fileLogDataPrefix; } else @@ -540,7 +548,8 @@ ACTOR Future workerServer( Reference connFile, Refe state WorkerCache storageCache; state Reference> dbInfo( new AsyncVar(ServerDBInfo()) ); state Future metricsLogger; - state std::map, PromiseStream>> sharedLogs; + state std::map, + std::pair, PromiseStream>> sharedLogs; state WorkerInterface interf( locality ); @@ -625,9 +634,11 @@ ACTOR Future workerServer( Reference connFile, Refe f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv); errorForwarders.add( forwardError( errors, Role::STORAGE_SERVER, recruited.id(), f ) ); } else if( s.storedComponent == DiskStore::TLogData ) { + std::string spillPrefix = s.spillType.toString(); + if (!spillPrefix.empty()) spillPrefix = spillPrefix + "-"; IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles ); IDiskQueue* queue = openDiskQueue( - joinPath( folder, fileLogQueuePrefix.toString() + s.storeID.toString() + "-"), tlogQueueExtension.toString(), s.storeID, 10*SERVER_KNOBS->TARGET_BYTES_PER_TLOG); + joinPath( folder, fileLogQueuePrefix.toString() + spillPrefix + s.storeID.toString() + "-"), tlogQueueExtension.toString(), s.storeID, 10*SERVER_KNOBS->TARGET_BYTES_PER_TLOG); filesClosed.add( kv->onClosed() ); filesClosed.add( queue->onClosed() ); @@ -637,8 +648,16 @@ ACTOR Future workerServer( Reference connFile, Refe Promise oldLog; Promise recovery; - auto& logData = sharedLogs[s.storeType]; - Future tl = tLog( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream(), s.storeID, true, oldLog, recovery ); + auto& logData = sharedLogs[std::make_pair(s.storeType, s.spillType)]; + // FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we + // be sending a fake InitializeTLogRequest rather than calling tLog() ? + auto tLogFn = tLog; + if (s.spillType == TLogSpillType::VALUE) { + tLogFn = oldTLog_6_0::tLog; + } else if (s.spillType == TLogSpillType::REFERENCE) { + tLogFn = tLog; + } + Future tl = tLogFn( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream(), s.storeID, true, oldLog, recovery ); recoveries.push_back(recovery.getFuture()); tl = handleIOErrors( tl, kv, s.storeID ); @@ -741,7 +760,7 @@ ACTOR Future workerServer( Reference connFile, Refe req.reply.send(recruited); } when( InitializeTLogRequest req = waitNext(interf.tLog.getFuture()) ) { - auto& logData = sharedLogs[req.storeType]; + auto& logData = sharedLogs[std::make_pair(req.storeType, req.spillType)]; logData.second.send(req); if(!logData.first.isValid() || logData.first.isReady()) { UID logId = g_random->randomUniqueID(); @@ -752,13 +771,21 @@ ACTOR Future workerServer( Reference connFile, Refe //FIXME: start role for every tlog instance, rather that just for the shared actor, also use a different role type for the shared actor startRole( Role::SHARED_TRANSACTION_LOG, logId, interf.id(), details ); - std::string filename = filenameFromId( req.storeType, folder, fileLogDataPrefix.toString(), logId ); + std::string spillPrefix = req.spillType.toString(); + if (!spillPrefix.empty()) spillPrefix = spillPrefix + "-"; + std::string filename = filenameFromId( req.storeType, folder, fileLogDataPrefix.toString() + spillPrefix, logId ); IKeyValueStore* data = openKVStore( req.storeType, filename, logId, memoryLimit ); - IDiskQueue* queue = openDiskQueue( joinPath( folder, fileLogQueuePrefix.toString() + logId.toString() + "-" ), tlogQueueExtension.toString(), logId ); + IDiskQueue* queue = openDiskQueue( joinPath( folder, fileLogQueuePrefix.toString() + spillPrefix + logId.toString() + "-" ), tlogQueueExtension.toString(), logId ); filesClosed.add( data->onClosed() ); filesClosed.add( queue->onClosed() ); - logData.first = tLog( data, queue, dbInfo, locality, logData.second, logId, false, Promise(), Promise() ); + auto tLogFn = tLog; + if (req.spillType == TLogSpillType::VALUE) { + tLogFn = oldTLog_6_0::tLog; + } else if (req.spillType == TLogSpillType::REFERENCE) { + tLogFn = tLog; + } + logData.first = tLogFn( data, queue, dbInfo, locality, logData.second, logId, false, Promise(), Promise() ); logData.first = handleIOErrors( logData.first, data, logId ); logData.first = handleIOErrors( logData.first, queue, logId ); errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, logData.first ) ); @@ -900,8 +927,10 @@ ACTOR Future workerServer( Reference connFile, Refe included = fileExists(d.filename + "1.fdq"); } if(d.storedComponent == DiskStore::COMPONENT::TLogData && included) { - std::string basename = fileLogQueuePrefix.toString() + d.filename.substr(fileLogDataPrefix.size()); - included = fileExists(basename + "0.fdq") && fileExists(basename + "1.fdq"); + std::string logDataBasename = fileLogQueuePrefix.toString() + d.filename.substr(fileLogDataPrefix.size()); + std::string logRefSpillBasename = fileLogQueuePrefix.toString() + d.filename.substr(fileLogRefSpillDataPrefix.size()); + included = (fileExists(logDataBasename + "0.fdq") && fileExists(logDataBasename + "1.fdq")) + || (fileExists(logRefSpillBasename + "0.fdq") && fileExists(logRefSpillBasename + "1.fdq")); } } if(included) { diff --git a/fdbserver/workloads/ConfigureDatabase.actor.cpp b/fdbserver/workloads/ConfigureDatabase.actor.cpp index 431d56384e..1d3eb08a64 100644 --- a/fdbserver/workloads/ConfigureDatabase.actor.cpp +++ b/fdbserver/workloads/ConfigureDatabase.actor.cpp @@ -27,7 +27,7 @@ // "ssd" is an alias to the preferred type which skews the random distribution toward it but that's okay. static const char* storeTypes[] = { "ssd", "ssd-1", "ssd-2", "memory" }; -static const char* logTypes[] = { "log_engine:=1", "log_engine:=2" }; +static const char* logTypes[] = { "log_engine:=1", "log_engine:=2", "log_spill:=1", "log_spill:=2" }; static const char* redundancies[] = { "single", "double", "triple" }; std::string generateRegions() {