Fetch byte sample file (#9657)

This commit is contained in:
He Liu 2023-03-14 16:24:08 -07:00 committed by GitHub
parent 4bad004f47
commit a0a3f4bff3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 430 additions and 224 deletions

View File

@ -25,6 +25,7 @@
#include "fdbclient/FDBTypes.h"
const std::string checkpointBytesSampleFileName = "metadata_bytes.sst";
const std::string emptySstFilePath = "Dummy Empty SST File Path";
// FDB storage checkpoint format.
enum CheckpointFormat {

File diff suppressed because it is too large Load Diff

View File

@ -93,4 +93,12 @@ ACTOR Future<CheckpointMetaData> fetchCheckpointRanges(Database cx,
.detail("CheckpointMetaData", result.toString())
.detail("Ranges", describe(ranges));
return result;
}
}
std::string serverCheckpointDir(const std::string& baseDir, const UID& checkpointId) {
return joinPath(baseDir, checkpointId.toString());
}
std::string fetchedCheckpointDir(const std::string& baseDir, const UID& checkpointId) {
return joinPath(baseDir, checkpointId.toString());
}

View File

@ -31,6 +31,15 @@
#include "flow/actorcompiler.h" // has to be last include
class ICheckpointByteSampleReader {
public:
virtual ~ICheckpointByteSampleReader() {}
virtual KeyValue next() = 0;
virtual bool hasNext() const = 0;
};
class IRocksDBSstFileWriter {
public:
virtual void open(const std::string localFile) = 0;
@ -296,6 +305,8 @@ ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoi
const CheckpointAsKeyValues checkpointAsKeyValues,
UID logID);
std::unique_ptr<ICheckpointByteSampleReader> newCheckpointByteSampleReader(const CheckpointMetaData& checkpoint);
std::unique_ptr<IRocksDBSstFileWriter> newRocksDBSstFileWriter();
RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint);

View File

@ -87,6 +87,8 @@ ACTOR Future<CheckpointMetaData> fetchCheckpointRanges(
std::vector<KeyRange> ranges,
std::function<Future<Void>(const CheckpointMetaData&)> cFun = nullptr);
std::string serverCheckpointDir(const std::string& baseDir, const UID& checkpointId);
std::string fetchedCheckpointDir(const std::string& baseDir, const UID& checkpointId);
#include "flow/unactorcompiler.h"
#endif

View File

@ -201,8 +201,9 @@ static const KeyRangeRef persistCheckpointKeys =
KeyRangeRef(PERSIST_PREFIX "Checkpoint/"_sr, PERSIST_PREFIX "Checkpoint0"_sr);
static const KeyRangeRef persistPendingCheckpointKeys =
KeyRangeRef(PERSIST_PREFIX "PendingCheckpoint/"_sr, PERSIST_PREFIX "PendingCheckpoint0"_sr);
static const std::string rocksdbCheckpointDirPrefix = "/rockscheckpoints_";
static const std::string serverCheckpointFolder = "serverCheckpoints";
static const std::string checkpointBytesSampleTempFolder = "/metadata_temp";
static const std::string fetchedCheckpointFolder = "fetchedCheckpoints";
struct AddingShard : NonCopyable {
KeyRange keys;
@ -1133,6 +1134,8 @@ public:
double lastUpdate;
std::string folder;
std::string checkpointFolder;
std::string fetchedCheckpointFolder;
// defined only during splitMutations()/addMutation()
UpdateEagerReadInfo* updateEagerReads;
@ -9732,8 +9735,8 @@ ACTOR Future<bool> createSstFileForCheckpointShardBytesSample(StorageServer* dat
ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData metaData) {
ASSERT(std::find(metaData.src.begin(), metaData.src.end(), data->thisServerID) != metaData.src.end() &&
!metaData.ranges.empty());
state std::string checkpointDir = data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString();
state std::string bytesSampleFile = checkpointDir + "/" + checkpointBytesSampleFileName;
state std::string checkpointDir = serverCheckpointDir(data->checkpointFolder, metaData.checkpointID);
state std::string bytesSampleFile = joinPath(checkpointDir, checkpointBytesSampleFileName);
state std::string bytesSampleTempDir = data->folder + checkpointBytesSampleTempFolder;
state std::string bytesSampleTempFile =
bytesSampleTempDir + "/" + metaData.checkpointID.toString() + "_" + checkpointBytesSampleFileName;
@ -11885,12 +11888,17 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
self.sk = serverKeysPrefixFor(self.tssPairID.present() ? self.tssPairID.get() : self.thisServerID)
.withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
self.folder = folder;
self.checkpointFolder = joinPath(self.folder, serverCheckpointFolder);
self.fetchedCheckpointFolder = joinPath(self.folder, fetchedCheckpointFolder);
try {
wait(self.storage.init());
wait(self.storage.commit());
++self.counters.kvCommits;
platform::createDirectory(self.checkpointFolder);
platform::createDirectory(self.fetchedCheckpointFolder);
EncryptionAtRestMode encryptionMode = wait(self.storage.encryptionMode());
self.encryptionMode = encryptionMode;
@ -11970,6 +11978,8 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
state StorageServer self(persistentData, db, ssi);
state Future<Void> ssCore;
self.folder = folder;
self.checkpointFolder = joinPath(self.folder, serverCheckpointFolder);
self.fetchedCheckpointFolder = joinPath(self.folder, fetchedCheckpointFolder);
try {
state double start = now();

View File

@ -264,7 +264,7 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
}
// Fetch checkpoint.
state std::string checkpointDir = abspath("checkpoints");
state std::string checkpointDir = abspath("fetchedCheckpoints");
platform::eraseDirectoryRecursive(checkpointDir);
ASSERT(platform::createDirectory(checkpointDir));
state std::vector<CheckpointMetaData> fetchedCheckpoints;
@ -272,6 +272,9 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
for (; i < records.size(); ++i) {
loop {
TraceEvent(SevDebug, "TestFetchingCheckpoint").detail("Checkpoint", records[i].toString());
state std::string currentDir = fetchedCheckpointDir(checkpointDir, records[i].checkpointID);
platform::eraseDirectoryRecursive(currentDir);
ASSERT(platform::createDirectory(currentDir));
try {
state CheckpointMetaData record;
if (asKeyValues) {
@ -285,12 +288,19 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
}
}
ASSERT(!fetchRanges.empty());
wait(store(record, fetchCheckpointRanges(cx, records[i], checkpointDir, fetchRanges)));
wait(store(record, fetchCheckpointRanges(cx, records[i], currentDir, fetchRanges)));
ASSERT(record.getFormat() == RocksDBKeyValues);
} else {
wait(store(record, fetchCheckpoint(cx, records[i], checkpointDir)));
wait(store(record, fetchCheckpoint(cx, records[i], currentDir)));
ASSERT(record.getFormat() == format);
}
if (records[i].bytesSampleFile.present()) {
ASSERT(record.bytesSampleFile.present());
ASSERT(fileExists(record.bytesSampleFile.get()));
TraceEvent(SevDebug, "TestCheckpointByteSampleFile")
.detail("RemoteFile", records[i].bytesSampleFile.get())
.detail("LocalFile", record.bytesSampleFile.get());
}
fetchedCheckpoints.push_back(record);
TraceEvent(SevDebug, "TestCheckpointFetched").detail("Checkpoint", record.toString());
break;