1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-31 18:19:35 +08:00

Hacky plumbing of spill type and file renaming.

This commit is contained in:
Alex Miller 2019-02-07 17:02:47 -08:00
parent 0cf3ee9f99
commit 7b1afdc71e
4 changed files with 48 additions and 17 deletions

@ -1777,6 +1777,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
InitializeTLogRequest &req = remoteTLogReqs[i];
req.recruitmentID = self->recruitmentID;
req.storeType = configuration.tLogDataStoreType;
req.spillType = configuration.tLogSpillType;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;

@ -81,6 +81,7 @@ struct InitializeTLogRequest {
std::vector<Tag> recoverTags;
std::vector<Tag> allTags;
KeyValueStoreType storeType;
TLogSpillType spillType;
Tag remoteTag;
int8_t locality;
bool isPrimary;
@ -93,7 +94,7 @@ struct InitializeTLogRequest {
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, recruitmentID, recoverFrom, recoverAt, knownCommittedVersion, epoch, recoverTags, allTags, storeType, remoteTag, locality, isPrimary, startVersion, logRouterTags, reply);
serializer(ar, recruitmentID, recoverFrom, recoverAt, knownCommittedVersion, epoch, recoverTags, allTags, storeType, remoteTag, locality, isPrimary, startVersion, logRouterTags, reply, spillType);
}
};

@ -184,6 +184,7 @@ ACTOR Future<Void> loadedPonger( FutureStream<LoadedPingRequest> pings ) {
StringRef fileStoragePrefix = LiteralStringRef("storage-");
StringRef fileLogDataPrefix = LiteralStringRef("log-");
StringRef fileLogRefSpillDataPrefix = LiteralStringRef("log-reference-");
StringRef fileLogQueuePrefix = LiteralStringRef("logqueue-");
StringRef tlogQueueExtension = LiteralStringRef("fdq");
@ -221,12 +222,13 @@ std::string filenameFromId( KeyValueStoreType storeType, std::string folder, std
}
struct DiskStore {
enum COMPONENT { TLogData, Storage };
enum COMPONENT { TLogData, Storage, UNSET };
UID storeID;
std::string filename; // For KVStoreMemory just the base filename to be passed to IDiskQueue
COMPONENT storedComponent;
KeyValueStoreType storeType;
UID storeID = UID();
std::string filename = ""; // For KVStoreMemory just the base filename to be passed to IDiskQueue
COMPONENT storedComponent = UNSET;
KeyValueStoreType storeType = KeyValueStoreType::END;
TLogSpillType spillType = TLogSpillType::UNSET;
};
std::vector< DiskStore > getDiskStores( std::string folder, std::string suffix, KeyValueStoreType type) {
@ -242,8 +244,14 @@ std::vector< DiskStore > getDiskStores( std::string folder, std::string suffix,
store.storedComponent = DiskStore::Storage;
prefix = fileStoragePrefix;
}
else if( StringRef( files[idx] ).startsWith( fileLogRefSpillDataPrefix ) ) {
store.storedComponent = DiskStore::TLogData;
store.spillType = TLogSpillType::REFERENCE;
prefix = fileLogRefSpillDataPrefix;
}
else if( StringRef( files[idx] ).startsWith( fileLogDataPrefix ) ) {
store.storedComponent = DiskStore::TLogData;
store.spillType = TLogSpillType::VALUE;
prefix = fileLogDataPrefix;
}
else
@ -540,7 +548,8 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
state WorkerCache<InitializeStorageReply> storageCache;
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo()) );
state Future<Void> metricsLogger;
state std::map<KeyValueStoreType::StoreType, std::pair<Future<Void>, PromiseStream<InitializeTLogRequest>>> sharedLogs;
state std::map<std::pair<KeyValueStoreType::StoreType, TLogSpillType::SpillType>,
std::pair<Future<Void>, PromiseStream<InitializeTLogRequest>>> sharedLogs;
state WorkerInterface interf( locality );
@ -625,9 +634,11 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv);
errorForwarders.add( forwardError( errors, Role::STORAGE_SERVER, recruited.id(), f ) );
} else if( s.storedComponent == DiskStore::TLogData ) {
std::string spillPrefix = s.spillType.toString();
if (!spillPrefix.empty()) spillPrefix = spillPrefix + "-";
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
IDiskQueue* queue = openDiskQueue(
joinPath( folder, fileLogQueuePrefix.toString() + s.storeID.toString() + "-"), tlogQueueExtension.toString(), s.storeID, 10*SERVER_KNOBS->TARGET_BYTES_PER_TLOG);
joinPath( folder, fileLogQueuePrefix.toString() + spillPrefix + s.storeID.toString() + "-"), tlogQueueExtension.toString(), s.storeID, 10*SERVER_KNOBS->TARGET_BYTES_PER_TLOG);
filesClosed.add( kv->onClosed() );
filesClosed.add( queue->onClosed() );
@ -637,8 +648,16 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
Promise<Void> oldLog;
Promise<Void> recovery;
auto& logData = sharedLogs[s.storeType];
Future<Void> tl = tLog( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery );
auto& logData = sharedLogs[std::make_pair(s.storeType, s.spillType)];
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
auto tLogFn = tLog;
if (s.spillType == TLogSpillType::VALUE) {
tLogFn = oldTLog_6_0::tLog;
} else if (s.spillType == TLogSpillType::REFERENCE) {
tLogFn = tLog;
}
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery );
recoveries.push_back(recovery.getFuture());
tl = handleIOErrors( tl, kv, s.storeID );
@ -741,7 +760,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
req.reply.send(recruited);
}
when( InitializeTLogRequest req = waitNext(interf.tLog.getFuture()) ) {
auto& logData = sharedLogs[req.storeType];
auto& logData = sharedLogs[std::make_pair(req.storeType, req.spillType)];
logData.second.send(req);
if(!logData.first.isValid() || logData.first.isReady()) {
UID logId = g_random->randomUniqueID();
@ -752,13 +771,21 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
//FIXME: start role for every tlog instance, rather that just for the shared actor, also use a different role type for the shared actor
startRole( Role::SHARED_TRANSACTION_LOG, logId, interf.id(), details );
std::string filename = filenameFromId( req.storeType, folder, fileLogDataPrefix.toString(), logId );
std::string spillPrefix = req.spillType.toString();
if (!spillPrefix.empty()) spillPrefix = spillPrefix + "-";
std::string filename = filenameFromId( req.storeType, folder, fileLogDataPrefix.toString() + spillPrefix, logId );
IKeyValueStore* data = openKVStore( req.storeType, filename, logId, memoryLimit );
IDiskQueue* queue = openDiskQueue( joinPath( folder, fileLogQueuePrefix.toString() + logId.toString() + "-" ), tlogQueueExtension.toString(), logId );
IDiskQueue* queue = openDiskQueue( joinPath( folder, fileLogQueuePrefix.toString() + spillPrefix + logId.toString() + "-" ), tlogQueueExtension.toString(), logId );
filesClosed.add( data->onClosed() );
filesClosed.add( queue->onClosed() );
logData.first = tLog( data, queue, dbInfo, locality, logData.second, logId, false, Promise<Void>(), Promise<Void>() );
auto tLogFn = tLog;
if (req.spillType == TLogSpillType::VALUE) {
tLogFn = oldTLog_6_0::tLog;
} else if (req.spillType == TLogSpillType::REFERENCE) {
tLogFn = tLog;
}
logData.first = tLogFn( data, queue, dbInfo, locality, logData.second, logId, false, Promise<Void>(), Promise<Void>() );
logData.first = handleIOErrors( logData.first, data, logId );
logData.first = handleIOErrors( logData.first, queue, logId );
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, logData.first ) );
@ -900,8 +927,10 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
included = fileExists(d.filename + "1.fdq");
}
if(d.storedComponent == DiskStore::COMPONENT::TLogData && included) {
std::string basename = fileLogQueuePrefix.toString() + d.filename.substr(fileLogDataPrefix.size());
included = fileExists(basename + "0.fdq") && fileExists(basename + "1.fdq");
std::string logDataBasename = fileLogQueuePrefix.toString() + d.filename.substr(fileLogDataPrefix.size());
std::string logRefSpillBasename = fileLogQueuePrefix.toString() + d.filename.substr(fileLogRefSpillDataPrefix.size());
included = (fileExists(logDataBasename + "0.fdq") && fileExists(logDataBasename + "1.fdq"))
|| (fileExists(logRefSpillBasename + "0.fdq") && fileExists(logRefSpillBasename + "1.fdq"));
}
}
if(included) {

@ -27,7 +27,7 @@
// "ssd" is an alias to the preferred type which skews the random distribution toward it but that's okay.
static const char* storeTypes[] = { "ssd", "ssd-1", "ssd-2", "memory" };
static const char* logTypes[] = { "log_engine:=1", "log_engine:=2" };
static const char* logTypes[] = { "log_engine:=1", "log_engine:=2", "log_spill:=1", "log_spill:=2" };
static const char* redundancies[] = { "single", "double", "triple" };
std::string generateRegions() {