mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 01:42:37 +08:00
Blob single part upload (#8703)
* Exposing writeEntireFile up through BackupContainerFileSystem, and using it in blob worker * Adding blob worker latency metrics * avoid writeEntireFile if object is too large * gracefully falling back to multi-part upload if the file is too big
This commit is contained in:
parent
ca1f3140c4
commit
baa35fbc8f
@ -1131,6 +1131,16 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
// fallback for using existing write api if the underlying blob store doesn't support efficient writeEntireFile
|
||||
ACTOR static Future<Void> writeEntireFileFallback(Reference<BackupContainerFileSystem> bc,
|
||||
std::string fileName,
|
||||
std::string fileContents) {
|
||||
state Reference<IBackupFile> objectFile = wait(bc->writeFile(fileName));
|
||||
wait(objectFile->append(&fileContents[0], fileContents.size()));
|
||||
wait(objectFile->finish());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> createTestEncryptionKeyFile(std::string filename) {
|
||||
state Reference<IAsyncFile> keyFile = wait(IAsyncFileSystem::filesystem()->open(
|
||||
filename,
|
||||
@ -1484,6 +1494,12 @@ Future<Void> BackupContainerFileSystem::encryptionSetupComplete() const {
|
||||
return encryptionSetupFuture;
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerFileSystem::writeEntireFileFallback(const std::string& fileName,
|
||||
const std::string& fileContents) {
|
||||
return BackupContainerFileSystemImpl::writeEntireFileFallback(
|
||||
Reference<BackupContainerFileSystem>::addRef(this), fileName, fileContents);
|
||||
}
|
||||
|
||||
void BackupContainerFileSystem::setEncryptionKey(Optional<std::string> const& encryptionKeyFileName) {
|
||||
if (encryptionKeyFileName.present()) {
|
||||
encryptionSetupFuture = BackupContainerFileSystemImpl::readEncryptionKey(encryptionKeyFileName.get());
|
||||
|
@ -279,6 +279,10 @@ Future<Reference<IBackupFile>> BackupContainerLocalDirectory::writeFile(const st
|
||||
return map(f, [=](Reference<IAsyncFile> f) { return Reference<IBackupFile>(new BackupFile(path, f, fullPath)); });
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerLocalDirectory::writeEntireFile(const std::string& path, const std::string& contents) {
|
||||
return writeEntireFileFallback(path, contents);
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerLocalDirectory::deleteFile(const std::string& path) {
|
||||
::deleteFile(joinPath(m_path, path));
|
||||
return Void();
|
||||
|
@ -198,6 +198,10 @@ Future<Reference<IBackupFile>> BackupContainerS3BlobStore::writeFile(const std::
|
||||
return Future<Reference<IBackupFile>>(makeReference<BackupContainerS3BlobStoreImpl::BackupFile>(path, f));
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerS3BlobStore::writeEntireFile(const std::string& path, const std::string& fileContents) {
|
||||
return m_bstore->writeEntireFile(m_bucket, dataPath(path), fileContents);
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerS3BlobStore::deleteFile(const std::string& path) {
|
||||
return m_bstore->deleteObject(m_bucket, dataPath(path));
|
||||
}
|
||||
|
@ -1568,11 +1568,12 @@ ACTOR Future<Void> writeEntireFile_impl(Reference<S3BlobStoreEndpoint> bstore,
|
||||
std::string object,
|
||||
std::string content) {
|
||||
state UnsentPacketQueue packets;
|
||||
PacketWriter pw(packets.getWriteBuffer(content.size()), nullptr, Unversioned());
|
||||
pw.serializeBytes(content);
|
||||
if (content.size() > bstore->knobs.multipart_max_part_size)
|
||||
throw file_too_large();
|
||||
|
||||
PacketWriter pw(packets.getWriteBuffer(content.size()), nullptr, Unversioned());
|
||||
pw.serializeBytes(content);
|
||||
|
||||
// Yield because we may have just had to copy several MB's into packet buffer chain and next we have to calculate an
|
||||
// MD5 sum of it.
|
||||
// TODO: If this actor is used to send large files then combine the summing and packetization into a loop with a
|
||||
|
@ -942,6 +942,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||
|
||||
// Server request latency measurement
|
||||
init( LATENCY_SAMPLE_SIZE, 100000 );
|
||||
init( FILE_LATENCY_SAMPLE_SIZE, 10000 );
|
||||
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );
|
||||
|
||||
// Cluster recovery
|
||||
@ -983,6 +984,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||
init( BG_ENABLE_READ_DRIVEN_COMPACTION, true ); if (randomize && BUGGIFY) BG_ENABLE_READ_DRIVEN_COMPACTION = false;
|
||||
init( BG_RDC_BYTES_FACTOR, 2 ); if (randomize && BUGGIFY) BG_RDC_BYTES_FACTOR = deterministicRandom()->randomInt(1, 10);
|
||||
init( BG_RDC_READ_FACTOR, 3 ); if (randomize && BUGGIFY) BG_RDC_READ_FACTOR = deterministicRandom()->randomInt(1, 10);
|
||||
init( BG_WRITE_MULTIPART, false ); if (randomize && BUGGIFY) BG_WRITE_MULTIPART = true;
|
||||
|
||||
init( BG_ENABLE_MERGING, true ); if (randomize && BUGGIFY) BG_ENABLE_MERGING = false;
|
||||
init( BG_MERGE_CANDIDATE_THRESHOLD_SECONDS, isSimulated ? 20.0 : 30 * 60 ); if (randomize && BUGGIFY) BG_MERGE_CANDIDATE_THRESHOLD_SECONDS = 5.0;
|
||||
|
@ -369,6 +369,11 @@ Future<Reference<IBackupFile>> BackupContainerAzureBlobStore::writeFile(const st
|
||||
return BackupContainerAzureBlobStoreImpl::writeFile(this, fileName);
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerAzureBlobStore::writeEntireFile(const std::string& fileName,
|
||||
const std::string& fileConents) {
|
||||
return writeEntireFileFallback(fileName, fileContents);
|
||||
}
|
||||
|
||||
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerAzureBlobStore::listFiles(
|
||||
const std::string& path,
|
||||
std::function<bool(std::string const&)> folderPathFilter) {
|
||||
|
@ -56,6 +56,8 @@ public:
|
||||
|
||||
Future<Reference<IBackupFile>> writeFile(const std::string& fileName) override;
|
||||
|
||||
Future<Void> writeEntireFile(const std::string& fileName, const std::string& fileContents) override;
|
||||
|
||||
Future<FilesAndSizesT> listFiles(const std::string& path = "",
|
||||
std::function<bool(std::string const&)> folderPathFilter = nullptr) override;
|
||||
|
||||
|
@ -98,6 +98,9 @@ public:
|
||||
// Open a file for write by fileName
|
||||
virtual Future<Reference<IBackupFile>> writeFile(const std::string& fileName) = 0;
|
||||
|
||||
// write entire file
|
||||
virtual Future<Void> writeEntireFile(const std::string& fileName, const std::string& contents) = 0;
|
||||
|
||||
// Delete a file
|
||||
virtual Future<Void> deleteFile(const std::string& fileName) = 0;
|
||||
|
||||
@ -166,6 +169,8 @@ protected:
|
||||
void setEncryptionKey(Optional<std::string> const& encryptionKeyFileName);
|
||||
Future<Void> encryptionSetupComplete() const;
|
||||
|
||||
Future<Void> writeEntireFileFallback(const std::string& fileName, const std::string& fileContents);
|
||||
|
||||
private:
|
||||
struct VersionProperty {
|
||||
VersionProperty(Reference<BackupContainerFileSystem> bc, const std::string& name)
|
||||
|
@ -46,6 +46,8 @@ public:
|
||||
|
||||
Future<Reference<IBackupFile>> writeFile(const std::string& path) final;
|
||||
|
||||
Future<Void> writeEntireFile(const std::string& path, const std::string& contents) final;
|
||||
|
||||
Future<Void> deleteFile(const std::string& path) final;
|
||||
|
||||
Future<FilesAndSizesT> listFiles(const std::string& path, std::function<bool(std::string const&)>) final;
|
||||
|
@ -57,6 +57,8 @@ public:
|
||||
|
||||
Future<Reference<IBackupFile>> writeFile(const std::string& path) final;
|
||||
|
||||
Future<Void> writeEntireFile(const std::string& path, const std::string& contents) final;
|
||||
|
||||
Future<Void> deleteFile(const std::string& path) final;
|
||||
|
||||
Future<FilesAndSizesT> listFiles(const std::string& path, std::function<bool(std::string const&)> pathFilter) final;
|
||||
|
@ -57,6 +57,11 @@ struct BlobWorkerStats {
|
||||
int64_t lastResidentMemory;
|
||||
int64_t estimatedMaxResidentMemory;
|
||||
|
||||
LatencySample snapshotBlobWriteLatencySample;
|
||||
LatencySample deltaBlobWriteLatencySample;
|
||||
LatencySample reSnapshotLatencySample;
|
||||
LatencySample readLatencySample;
|
||||
|
||||
Reference<FlowLock> initialSnapshotLock;
|
||||
Reference<FlowLock> resnapshotLock;
|
||||
Reference<FlowLock> deltaWritesLock;
|
||||
@ -68,7 +73,10 @@ struct BlobWorkerStats {
|
||||
double interval,
|
||||
Reference<FlowLock> initialSnapshotLock,
|
||||
Reference<FlowLock> resnapshotLock,
|
||||
Reference<FlowLock> deltaWritesLock)
|
||||
Reference<FlowLock> deltaWritesLock,
|
||||
double sampleLoggingInterval,
|
||||
int fileOpLatencySampleSize,
|
||||
int requestLatencySampleSize)
|
||||
: cc("BlobWorkerStats", id.toString()),
|
||||
|
||||
s3PutReqs("S3PutReqs", cc), s3GetReqs("S3GetReqs", cc), s3DeleteReqs("S3DeleteReqs", cc),
|
||||
@ -87,6 +95,10 @@ struct BlobWorkerStats {
|
||||
forceFlushCleanups("ForceFlushCleanups", cc), readDrivenCompactions("ReadDrivenCompactions", cc),
|
||||
numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0), granulesPendingSplitCheck(0),
|
||||
minimumCFVersion(0), cfVersionLag(0), notAtLatestChangeFeeds(0), lastResidentMemory(0),
|
||||
snapshotBlobWriteLatencySample("SnapshotBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
deltaBlobWriteLatencySample("DeltaBlobWriteMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
reSnapshotLatencySample("GranuleResnapshotMetrics", id, sampleLoggingInterval, fileOpLatencySampleSize),
|
||||
readLatencySample("GranuleReadLatencyMetrics", id, sampleLoggingInterval, requestLatencySampleSize),
|
||||
estimatedMaxResidentMemory(0), initialSnapshotLock(initialSnapshotLock), resnapshotLock(resnapshotLock),
|
||||
deltaWritesLock(deltaWritesLock) {
|
||||
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbrpc/TimedRequest.h"
|
||||
#include "fdbclient/StorageServerInterface.h" // for TenantInfo - should we refactor that elsewhere?
|
||||
|
||||
struct BlobWorkerInterface {
|
||||
@ -105,7 +106,7 @@ struct BlobGranuleFileReply {
|
||||
|
||||
// TODO could do a reply promise stream of file mutations to bound memory requirements?
|
||||
// Have to load whole snapshot file into memory though so it doesn't actually matter too much
|
||||
struct BlobGranuleFileRequest {
|
||||
struct BlobGranuleFileRequest : TimedRequest {
|
||||
constexpr static FileIdentifier file_identifier = 4150141;
|
||||
Arena arena;
|
||||
KeyRangeRef keyRange;
|
||||
|
@ -919,6 +919,7 @@ public:
|
||||
|
||||
// Server request latency measurement
|
||||
int LATENCY_SAMPLE_SIZE;
|
||||
int FILE_LATENCY_SAMPLE_SIZE;
|
||||
double LATENCY_METRICS_LOGGING_INTERVAL;
|
||||
|
||||
// Cluster recovery
|
||||
@ -964,6 +965,7 @@ public:
|
||||
bool BG_ENABLE_READ_DRIVEN_COMPACTION;
|
||||
int BG_RDC_BYTES_FACTOR;
|
||||
int BG_RDC_READ_FACTOR;
|
||||
bool BG_WRITE_MULTIPART;
|
||||
|
||||
int BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM;
|
||||
int BLOB_WORKER_RESNAPSHOT_PARALLELISM;
|
||||
|
@ -86,9 +86,14 @@ ACTOR Future<Void> createObject(ConnectionProviderTestSettings* settings, Provid
|
||||
state std::string fullPath;
|
||||
std::tie(bstore, fullPath) = provider->provider->createForWrite(objName);
|
||||
|
||||
state Reference<IBackupFile> file = wait(bstore->writeFile(fullPath));
|
||||
wait(file->append(data.begin(), data.size()));
|
||||
wait(file->finish());
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
state Reference<IBackupFile> file = wait(bstore->writeFile(fullPath));
|
||||
wait(file->append(data.begin(), data.size()));
|
||||
wait(file->finish());
|
||||
} else {
|
||||
std::string contents = data.toString();
|
||||
wait(bstore->writeEntireFile(fullPath, contents));
|
||||
}
|
||||
|
||||
// after write, put in the readable list
|
||||
provider->data.push_back({ fullPath, data });
|
||||
|
@ -297,7 +297,14 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
|
||||
initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)),
|
||||
resnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_RESNAPSHOT_PARALLELISM)),
|
||||
deltaWritesLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_DELTA_FILE_WRITE_PARALLELISM)),
|
||||
stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, initialSnapshotLock, resnapshotLock, deltaWritesLock),
|
||||
stats(id,
|
||||
SERVER_KNOBS->WORKER_LOGGING_INTERVAL,
|
||||
initialSnapshotLock,
|
||||
resnapshotLock,
|
||||
deltaWritesLock,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->FILE_LATENCY_SAMPLE_SIZE,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
isEncryptionEnabled(isEncryptionOpSupported(EncryptOperationType::BLOB_GRANULE_ENCRYPTION)) {}
|
||||
|
||||
bool managerEpochOk(int64_t epoch) {
|
||||
@ -764,6 +771,28 @@ ACTOR Future<std::pair<BlobGranuleSplitState, Version>> getGranuleSplitState(Tra
|
||||
}
|
||||
}
|
||||
|
||||
// tries to use writeEntireFile if possible, but if too big falls back to multi-part upload
|
||||
ACTOR Future<Void> writeFile(Reference<BackupContainerFileSystem> writeBStore, std::string fname, Value serialized) {
|
||||
if (!SERVER_KNOBS->BG_WRITE_MULTIPART) {
|
||||
try {
|
||||
state std::string fileContents = serialized.toString();
|
||||
wait(writeBStore->writeEntireFile(fname, fileContents));
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
// error_code_file_too_large means it was too big to do with single write
|
||||
if (e.code() != error_code_file_too_large) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state Reference<IBackupFile> objectFile = wait(writeBStore->writeFile(fname));
|
||||
wait(objectFile->append(serialized.begin(), serialized.size()));
|
||||
wait(objectFile->finish());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// writeDelta file writes speculatively in the common case to optimize throughput. It creates the s3 object even though
|
||||
// the data in it may not yet be committed, and even though previous delta files with lower versioned data may still be
|
||||
// in flight. The synchronization happens after the s3 file is written, but before we update the FDB index of what files
|
||||
@ -814,14 +843,16 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
|
||||
state Reference<BackupContainerFileSystem> writeBStore;
|
||||
state std::string fname;
|
||||
std::tie(writeBStore, fname) = bstore->createForWrite(fileName);
|
||||
state Reference<IBackupFile> objectFile = wait(writeBStore->writeFile(fname));
|
||||
|
||||
state double writeStartTimer = g_network->timer();
|
||||
|
||||
wait(writeFile(writeBStore, fname, serialized));
|
||||
|
||||
++bwData->stats.s3PutReqs;
|
||||
++bwData->stats.deltaFilesWritten;
|
||||
bwData->stats.deltaBytesWritten += serializedSize;
|
||||
|
||||
wait(objectFile->append(serialized.begin(), serializedSize));
|
||||
wait(objectFile->finish());
|
||||
double duration = g_network->timer() - writeStartTimer;
|
||||
bwData->stats.deltaBlobWriteLatencySample.addMeasurement(duration);
|
||||
|
||||
// free serialized since it is persisted in blob
|
||||
serialized = Value();
|
||||
@ -1025,14 +1056,16 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
|
||||
state Reference<BackupContainerFileSystem> writeBStore;
|
||||
state std::string fname;
|
||||
std::tie(writeBStore, fname) = bstore->createForWrite(fileName);
|
||||
state Reference<IBackupFile> objectFile = wait(writeBStore->writeFile(fname));
|
||||
|
||||
state double writeStartTimer = g_network->timer();
|
||||
|
||||
wait(writeFile(writeBStore, fname, serialized));
|
||||
|
||||
++bwData->stats.s3PutReqs;
|
||||
++bwData->stats.snapshotFilesWritten;
|
||||
bwData->stats.snapshotBytesWritten += serializedSize;
|
||||
|
||||
wait(objectFile->append(serialized.begin(), serializedSize));
|
||||
wait(objectFile->finish());
|
||||
double duration = g_network->timer() - writeStartTimer;
|
||||
bwData->stats.snapshotBlobWriteLatencySample.addMeasurement(duration);
|
||||
|
||||
// free serialized since it is persisted in blob
|
||||
serialized = Value();
|
||||
@ -1225,6 +1258,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
|
||||
state Arena filenameArena;
|
||||
state std::vector<Future<RangeResult>> chunksToRead;
|
||||
state int64_t compactBytesRead = 0;
|
||||
state double resnapshotStartTimer = g_network->timer();
|
||||
|
||||
for (auto& f : fileSet) {
|
||||
ASSERT(!f.snapshotFiles.empty());
|
||||
ASSERT(!f.deltaFiles.empty());
|
||||
@ -1324,6 +1359,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
|
||||
|
||||
BlobFileIndex f = wait(snapshotWriter);
|
||||
DEBUG_KEY_RANGE("BlobWorkerBlobSnapshot", version, metadata->keyRange, bwData->id);
|
||||
double duration = g_network->timer() - resnapshotStartTimer;
|
||||
bwData->stats.reSnapshotLatencySample.addMeasurement(duration);
|
||||
return f;
|
||||
} catch (Error& e) {
|
||||
if (BW_DEBUG) {
|
||||
@ -3846,6 +3883,10 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
||||
if (didCollapse) {
|
||||
++bwData->stats.readRequestsCollapsed;
|
||||
}
|
||||
|
||||
double duration = g_network->timer() - req.requestTime();
|
||||
bwData->stats.readLatencySample.addMeasurement(duration);
|
||||
|
||||
ASSERT(!req.reply.isSet());
|
||||
req.reply.send(rep);
|
||||
--bwData->stats.activeReadRequests;
|
||||
|
Loading…
x
Reference in New Issue
Block a user