From 0a332ee1c17975552784a5689cbf12367fa05302 Mon Sep 17 00:00:00 2001 From: Renxuan Wang Date: Mon, 28 Mar 2022 17:10:49 -0700 Subject: [PATCH] Add proxy option to backup and restore params. --- fdbbackup/FileConverter.actor.cpp | 8 ++- fdbbackup/FileDecoder.actor.cpp | 8 ++- fdbbackup/backup.actor.cpp | 72 ++++++++++++++----- fdbclient/BackupAgent.actor.h | 28 ++++++-- fdbclient/BackupContainer.actor.cpp | 16 +++-- fdbclient/BackupContainer.h | 9 ++- fdbclient/BackupContainerFileSystem.actor.cpp | 19 +++-- fdbclient/BackupContainerFileSystem.h | 6 +- fdbclient/FileBackupAgent.actor.cpp | 24 +++++-- fdbclient/RestoreInterface.h | 15 ++-- fdbclient/S3BlobStore.actor.cpp | 23 ++++-- fdbclient/S3BlobStore.h | 23 ++++-- fdbserver/BlobManager.actor.cpp | 2 +- fdbserver/BlobWorker.actor.cpp | 2 +- fdbserver/RestoreController.actor.cpp | 12 ++-- fdbserver/RestoreController.actor.h | 8 ++- fdbserver/RestoreLoader.actor.cpp | 2 +- fdbserver/RestoreLoader.actor.h | 4 +- fdbserver/RestoreWorkerInterface.actor.h | 4 +- fdbserver/workloads/AtomicRestore.actor.cpp | 1 + ...kupAndParallelRestoreCorrectness.actor.cpp | 7 +- .../workloads/BackupCorrectness.actor.cpp | 11 ++- fdbserver/workloads/BackupToBlob.actor.cpp | 1 + .../BlobGranuleCorrectnessWorkload.actor.cpp | 4 +- .../workloads/BlobGranuleVerifier.actor.cpp | 4 +- .../workloads/IncrementalBackup.actor.cpp | 8 ++- fdbserver/workloads/RestoreBackup.actor.cpp | 1 + fdbserver/workloads/RestoreFromBlob.actor.cpp | 4 +- fdbserver/workloads/SubmitBackup.actor.cpp | 1 + 29 files changed, 232 insertions(+), 95 deletions(-) diff --git a/fdbbackup/FileConverter.actor.cpp b/fdbbackup/FileConverter.actor.cpp index 1e48bd523d..8aeea5017f 100644 --- a/fdbbackup/FileConverter.actor.cpp +++ b/fdbbackup/FileConverter.actor.cpp @@ -101,6 +101,7 @@ std::vector getRelevantLogFiles(const std::vector& files, Vers struct ConvertParams { std::string container_url; + Optional proxy; Version begin = invalidVersion; Version end = invalidVersion; bool log_enabled = false; @@ -112,6 +113,10 @@ struct ConvertParams { std::string s; s.append("ContainerURL:"); s.append(container_url); + if (proxy.present()) { + s.append(" Proxy:"); + s.append(proxy.get()); + } s.append(" Begin:"); s.append(format("%" PRId64, begin)); s.append(" End:"); @@ -448,7 +453,8 @@ private: }; ACTOR Future convert(ConvertParams params) { - state Reference container = IBackupContainer::openContainer(params.container_url); + state Reference container = + IBackupContainer::openContainer(params.container_url, params.proxy, {}); state BackupFileList listing = wait(container->dumpFileList()); std::sort(listing.logs.begin(), listing.logs.end()); TraceEvent("Container").detail("URL", params.container_url).detail("Logs", listing.logs.size()); diff --git a/fdbbackup/FileDecoder.actor.cpp b/fdbbackup/FileDecoder.actor.cpp index 7e851bf6e0..7d9e27dcb1 100644 --- a/fdbbackup/FileDecoder.actor.cpp +++ b/fdbbackup/FileDecoder.actor.cpp @@ -94,6 +94,7 @@ void printBuildInformation() { struct DecodeParams { std::string container_url; + Optional proxy; std::string fileFilter; // only files match the filter will be decoded bool log_enabled = true; std::string log_dir, trace_format, trace_log_group; @@ -115,6 +116,10 @@ struct DecodeParams { std::string s; s.append("ContainerURL: "); s.append(container_url); + if (proxy.present()) { + s.append(", Proxy: "); + s.append(proxy.get()); + } s.append(", FileFilter: "); s.append(fileFilter); if (log_enabled) { @@ -526,7 +531,8 @@ ACTOR Future process_file(Reference container, LogFile f } ACTOR Future decode_logs(DecodeParams params) { - state Reference container = IBackupContainer::openContainer(params.container_url); + state Reference container = + IBackupContainer::openContainer(params.container_url, params.proxy, {}); state UID uid = deterministicRandom()->randomUniqueID(); state BackupFileList listing = wait(container->dumpFileList()); // remove partitioned logs diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 219d9ab820..431bc7798d 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -130,6 +130,7 @@ enum { OPT_USE_PARTITIONED_LOG, // Backup and Restore constants + OPT_PROXY, OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE, @@ -234,6 +235,7 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = { { OPT_NOSTOPWHENDONE, "--no-stop-when-done", SO_NONE }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, // Enable "-p" option after GA // { OPT_USE_PARTITIONED_LOG, "-p", SO_NONE }, { OPT_USE_PARTITIONED_LOG, "--partitioned-log-experimental", SO_NONE }, @@ -294,6 +296,7 @@ CSimpleOpt::SOption g_rgBackupModifyOptions[] = { { OPT_MOD_VERIFY_UID, "--verify-uid", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_SNAPSHOTINTERVAL, "-s", SO_REQ_SEP }, { OPT_SNAPSHOTINTERVAL, "--snapshot-interval", SO_REQ_SEP }, { OPT_MOD_ACTIVE_INTERVAL, "--active-snapshot-interval", SO_REQ_SEP }, @@ -482,6 +485,7 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = { { OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, { OPT_TRACE_DIR, "--logdir", SO_REQ_SEP }, { OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP }, @@ -517,6 +521,7 @@ CSimpleOpt::SOption g_rgBackupDeleteOptions[] = { #endif { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, { OPT_TRACE_DIR, "--logdir", SO_REQ_SEP }, { OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP }, @@ -546,6 +551,7 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = { { OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, { OPT_TRACE_DIR, "--logdir", SO_REQ_SEP }, { OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP }, @@ -578,6 +584,7 @@ CSimpleOpt::SOption g_rgBackupDumpOptions[] = { { OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_TRACE, "--log", SO_NONE }, { OPT_TRACE_DIR, "--logdir", SO_REQ_SEP }, { OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP }, @@ -652,6 +659,7 @@ CSimpleOpt::SOption g_rgBackupQueryOptions[] = { { OPT_RESTORE_TIMESTAMP, "--query-restore-timestamp", SO_REQ_SEP }, { OPT_DESTCONTAINER, "-d", SO_REQ_SEP }, { OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_RESTORE_VERSION, "-qrv", SO_REQ_SEP }, { OPT_RESTORE_VERSION, "--query-restore-version", SO_REQ_SEP }, { OPT_BACKUPKEYS_FILTER, "-k", SO_REQ_SEP }, @@ -689,6 +697,7 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = { { OPT_RESTORE_TIMESTAMP, "--timestamp", SO_REQ_SEP }, { OPT_KNOB, "--knob-", SO_REQ_SEP }, { OPT_RESTORECONTAINER, "-r", SO_REQ_SEP }, + { OPT_PROXY, "--proxy", SO_REQ_SEP }, { OPT_PREFIX_ADD, "--add-prefix", SO_REQ_SEP }, { OPT_PREFIX_REMOVE, "--remove-prefix", SO_REQ_SEP }, { OPT_TAGNAME, "-t", SO_REQ_SEP }, @@ -1920,6 +1929,7 @@ ACTOR Future submitDBBackup(Database src, ACTOR Future submitBackup(Database db, std::string url, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, Standalone> backupRanges, @@ -1977,6 +1987,7 @@ ACTOR Future submitBackup(Database db, else { wait(backupAgent.submitBackup(db, KeyRef(url), + proxy, initialSnapshotIntervalSeconds, snapshotIntervalSeconds, tagName, @@ -2260,8 +2271,9 @@ ACTOR Future changeDBBackupResumed(Database src, Database dest, bool pause } Reference openBackupContainer(const char* name, - std::string destinationContainer, - Optional const& encryptionKeyFile = {}) { + const std::string& destinationContainer, + const Optional& proxy, + const Optional& encryptionKeyFile) { // Error, if no dest container was specified if (destinationContainer.empty()) { fprintf(stderr, "ERROR: No backup destination was specified.\n"); @@ -2271,7 +2283,7 @@ Reference openBackupContainer(const char* name, Reference c; try { - c = IBackupContainer::openContainer(destinationContainer, encryptionKeyFile); + c = IBackupContainer::openContainer(destinationContainer, proxy, encryptionKeyFile); } catch (Error& e) { std::string msg = format("ERROR: '%s' on URL '%s'", e.what(), destinationContainer.c_str()); if (e.code() == error_code_backup_invalid_url && !IBackupContainer::lastOpenError.empty()) { @@ -2291,6 +2303,7 @@ ACTOR Future runRestore(Database db, std::string originalClusterFile, std::string tagName, std::string container, + Optional proxy, Standalone> ranges, Version beginVersion, Version targetVersion, @@ -2339,7 +2352,7 @@ ACTOR Future runRestore(Database db, state FileBackupAgent backupAgent; state Reference bc = - openBackupContainer(exeRestore.toString().c_str(), container, encryptionKeyFile); + openBackupContainer(exeRestore.toString().c_str(), container, proxy, encryptionKeyFile); // If targetVersion is unset then use the maximum restorable version from the backup description if (targetVersion == invalidVersion) { @@ -2368,6 +2381,7 @@ ACTOR Future runRestore(Database db, origDb, KeyRef(tagName), KeyRef(container), + proxy, ranges, waitForDone, targetVersion, @@ -2411,6 +2425,7 @@ ACTOR Future runRestore(Database db, ACTOR Future runFastRestoreTool(Database db, std::string tagName, std::string container, + Optional proxy, Standalone> ranges, Version dbVersion, bool performRestore, @@ -2440,7 +2455,7 @@ ACTOR Future runFastRestoreTool(Database db, if (performRestore) { if (dbVersion == invalidVersion) { TraceEvent("FastRestoreTool").detail("TargetRestoreVersion", "Largest restorable version"); - BackupDescription desc = wait(IBackupContainer::openContainer(container)->describeBackup()); + BackupDescription desc = wait(IBackupContainer::openContainer(container, proxy, {})->describeBackup()); if (!desc.maxRestorableVersion.present()) { fprintf(stderr, "The specified backup is not restorable to any version.\n"); throw restore_error(); @@ -2457,6 +2472,7 @@ ACTOR Future runFastRestoreTool(Database db, KeyRef(tagName), ranges, KeyRef(container), + proxy, dbVersion, LockDB::True, randomUID, @@ -2478,7 +2494,7 @@ ACTOR Future runFastRestoreTool(Database db, restoreVersion = dbVersion; } else { - state Reference bc = IBackupContainer::openContainer(container); + state Reference bc = IBackupContainer::openContainer(container, proxy, {}); state BackupDescription description = wait(bc->describeBackup()); if (dbVersion <= 0) { @@ -2522,9 +2538,10 @@ ACTOR Future runFastRestoreTool(Database db, ACTOR Future dumpBackupData(const char* name, std::string destinationContainer, + Optional proxy, Version beginVersion, Version endVersion) { - state Reference c = openBackupContainer(name, destinationContainer); + state Reference c = openBackupContainer(name, destinationContainer, proxy, {}); if (beginVersion < 0 || endVersion < 0) { BackupDescription desc = wait(c->describeBackup()); @@ -2552,6 +2569,7 @@ ACTOR Future dumpBackupData(const char* name, ACTOR Future expireBackupData(const char* name, std::string destinationContainer, + Optional proxy, Version endVersion, std::string endDatetime, Database db, @@ -2577,7 +2595,7 @@ ACTOR Future expireBackupData(const char* name, } try { - Reference c = openBackupContainer(name, destinationContainer, encryptionKeyFile); + Reference c = openBackupContainer(name, destinationContainer, proxy, encryptionKeyFile); state IBackupContainer::ExpireProgress progress; state std::string lastProgress; @@ -2623,9 +2641,11 @@ ACTOR Future expireBackupData(const char* name, return Void(); } -ACTOR Future deleteBackupContainer(const char* name, std::string destinationContainer) { +ACTOR Future deleteBackupContainer(const char* name, + std::string destinationContainer, + Optional proxy) { try { - state Reference c = openBackupContainer(name, destinationContainer); + state Reference c = openBackupContainer(name, destinationContainer, proxy, {}); state int numDeleted = 0; state Future done = c->deleteContainer(&numDeleted); @@ -2657,12 +2677,13 @@ ACTOR Future deleteBackupContainer(const char* name, std::string destinati ACTOR Future describeBackup(const char* name, std::string destinationContainer, + Optional proxy, bool deep, Optional cx, bool json, Optional encryptionKeyFile) { try { - Reference c = openBackupContainer(name, destinationContainer, encryptionKeyFile); + Reference c = openBackupContainer(name, destinationContainer, proxy, encryptionKeyFile); state BackupDescription desc = wait(c->describeBackup(deep)); if (cx.present()) wait(desc.resolveVersionTimes(cx.get())); @@ -2688,6 +2709,7 @@ static void reportBackupQueryError(UID operationId, JsonBuilderObject& result, s // resolved to that timestamp. ACTOR Future queryBackup(const char* name, std::string destinationContainer, + Optional proxy, Standalone> keyRangesFilter, Version restoreVersion, std::string originalClusterFile, @@ -2734,7 +2756,7 @@ ACTOR Future queryBackup(const char* name, } try { - state Reference bc = openBackupContainer(name, destinationContainer); + state Reference bc = openBackupContainer(name, destinationContainer, proxy, {}); if (restoreVersion == invalidVersion) { BackupDescription desc = wait(bc->describeBackup()); if (desc.maxRestorableVersion.present()) { @@ -2814,9 +2836,9 @@ ACTOR Future queryBackup(const char* name, return Void(); } -ACTOR Future listBackup(std::string baseUrl) { +ACTOR Future listBackup(std::string baseUrl, Optional proxy) { try { - std::vector containers = wait(IBackupContainer::listContainers(baseUrl)); + std::vector containers = wait(IBackupContainer::listContainers(baseUrl, proxy)); for (std::string container : containers) { printf("%s\n", container.c_str()); } @@ -2852,6 +2874,7 @@ ACTOR Future listBackupTags(Database cx) { struct BackupModifyOptions { Optional verifyUID; Optional destURL; + Optional proxy; Optional snapshotIntervalSeconds; Optional activeSnapshotIntervalSeconds; bool hasChanges() const { @@ -2869,7 +2892,7 @@ ACTOR Future modifyBackup(Database db, std::string tagName, BackupModifyOp state Reference bc; if (options.destURL.present()) { - bc = openBackupContainer(exeBackup.toString().c_str(), options.destURL.get()); + bc = openBackupContainer(exeBackup.toString().c_str(), options.destURL.get(), options.proxy, {}); try { wait(timeoutError(bc->create(), 30)); } catch (Error& e) { @@ -3342,6 +3365,7 @@ int main(int argc, char* argv[]) { break; } + Optional proxy; std::string destinationContainer; bool describeDeep = false; bool describeTimestamps = false; @@ -3595,6 +3619,10 @@ int main(int argc, char* argv[]) { return FDB_EXIT_ERROR; } break; + case OPT_PROXY: + proxy = args->OptionArg(); + modifyOptions.proxy = proxy; + break; case OPT_DESTCONTAINER: destinationContainer = args->OptionArg(); // If the url starts with '/' then prepend "file://" for backwards compatibility @@ -3962,9 +3990,10 @@ int main(int argc, char* argv[]) { if (!initCluster()) return FDB_EXIT_ERROR; // Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable. - openBackupContainer(argv[0], destinationContainer, encryptionKeyFile); + openBackupContainer(argv[0], destinationContainer, proxy, encryptionKeyFile); f = stopAfter(submitBackup(db, destinationContainer, + proxy, initialSnapshotIntervalSeconds, snapshotIntervalSeconds, backupKeys, @@ -4036,6 +4065,7 @@ int main(int argc, char* argv[]) { } f = stopAfter(expireBackupData(argv[0], destinationContainer, + proxy, expireVersion, expireDatetime, db, @@ -4047,7 +4077,7 @@ int main(int argc, char* argv[]) { case BackupType::DELETE_BACKUP: initTraceFile(); - f = stopAfter(deleteBackupContainer(argv[0], destinationContainer)); + f = stopAfter(deleteBackupContainer(argv[0], destinationContainer, proxy)); break; case BackupType::DESCRIBE: @@ -4060,6 +4090,7 @@ int main(int argc, char* argv[]) { // given, but quietly skip them if not. f = stopAfter(describeBackup(argv[0], destinationContainer, + proxy, describeDeep, describeTimestamps ? Optional(db) : Optional(), jsonOutput, @@ -4068,7 +4099,7 @@ int main(int argc, char* argv[]) { case BackupType::LIST: initTraceFile(); - f = stopAfter(listBackup(baseUrl)); + f = stopAfter(listBackup(baseUrl, proxy)); break; case BackupType::TAGS: @@ -4081,6 +4112,7 @@ int main(int argc, char* argv[]) { initTraceFile(); f = stopAfter(queryBackup(argv[0], destinationContainer, + proxy, backupKeysFilter, restoreVersion, restoreClusterFileOrig, @@ -4090,7 +4122,7 @@ int main(int argc, char* argv[]) { case BackupType::DUMP: initTraceFile(); - f = stopAfter(dumpBackupData(argv[0], destinationContainer, dumpBegin, dumpEnd)); + f = stopAfter(dumpBackupData(argv[0], destinationContainer, proxy, dumpBegin, dumpEnd)); break; case BackupType::UNDEFINED: @@ -4141,6 +4173,7 @@ int main(int argc, char* argv[]) { restoreClusterFileOrig, tagName, restoreContainer, + proxy, backupKeys, beginVersion, restoreVersion, @@ -4218,6 +4251,7 @@ int main(int argc, char* argv[]) { f = stopAfter(runFastRestoreTool(db, tagName, restoreContainer, + proxy, backupKeys, restoreVersion, !dryRun, diff --git a/fdbclient/BackupAgent.actor.h b/fdbclient/BackupAgent.actor.h index 94cb10d290..a938dcd51f 100644 --- a/fdbclient/BackupAgent.actor.h +++ b/fdbclient/BackupAgent.actor.h @@ -165,6 +165,7 @@ public: Key backupTag, Standalone> backupRanges, Key bcUrl, + Optional proxy, Version targetVersion, LockDB lockDB, UID randomUID, @@ -187,6 +188,7 @@ public: Optional cxOrig, Key tagName, Key url, + Optional proxy, Standalone> ranges, WaitForComplete = WaitForComplete::True, Version targetVersion = ::invalidVersion, @@ -202,6 +204,7 @@ public: Optional cxOrig, Key tagName, Key url, + Optional proxy, WaitForComplete waitForComplete = WaitForComplete::True, Version targetVersion = ::invalidVersion, Verbose verbose = Verbose::True, @@ -219,6 +222,7 @@ public: cxOrig, tagName, url, + proxy, rangeRef, waitForComplete, targetVersion, @@ -263,6 +267,7 @@ public: Future submitBackup(Reference tr, Key outContainer, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, std::string const& tagName, @@ -273,6 +278,7 @@ public: Optional const& encryptionKeyFileName = {}); Future submitBackup(Database cx, Key outContainer, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, std::string const& tagName, @@ -284,6 +290,7 @@ public: return runRYWTransactionFailIfLocked(cx, [=](Reference tr) { return submitBackup(tr, outContainer, + proxy, initialSnapshotIntervalSeconds, snapshotIntervalSeconds, tagName, @@ -720,20 +727,31 @@ template <> inline Tuple Codec>::pack(Reference const& bc) { Tuple tuple; tuple.append(StringRef(bc->getURL())); + if (bc->getProxy().present()) { + tuple.append(StringRef(bc->getProxy().get())); + } else { + tuple.append(StringRef()); + } if (bc->getEncryptionKeyFileName().present()) { tuple.append(bc->getEncryptionKeyFileName().get()); + } else { + tuple.append(StringRef()); } return tuple; } template <> inline Reference Codec>::unpack(Tuple const& val) { - ASSERT(val.size() == 1 || val.size() == 2); + ASSERT(val.size() == 3); auto url = val.getString(0).toString(); - Optional encryptionKeyFileName; - if (val.size() == 2) { - encryptionKeyFileName = val.getString(1).toString(); + Optional proxy; + if (!val.getString(1).empty()) { + proxy = val.getString(1).toString(); } - return IBackupContainer::openContainer(url, encryptionKeyFileName); + Optional encryptionKeyFileName; + if (!val.getString(2).empty()) { + encryptionKeyFileName = val.getString(2).toString(); + } + return IBackupContainer::openContainer(url, proxy, encryptionKeyFileName); } class BackupConfig : public KeyBackedConfig { diff --git a/fdbclient/BackupContainer.actor.cpp b/fdbclient/BackupContainer.actor.cpp index 37b2eae015..416d15c548 100644 --- a/fdbclient/BackupContainer.actor.cpp +++ b/fdbclient/BackupContainer.actor.cpp @@ -256,7 +256,8 @@ std::vector IBackupContainer::getURLFormats() { // Get an IBackupContainer based on a container URL string Reference IBackupContainer::openContainer(const std::string& url, - Optional const& encryptionKeyFileName) { + const Optional& proxy, + const Optional& encryptionKeyFileName) { static std::map> m_cache; Reference& r = m_cache[url]; @@ -273,7 +274,7 @@ Reference IBackupContainer::openContainer(const std::string& u // The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options. S3BlobStoreEndpoint::ParametersT backupParams; Reference bstore = - S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams); + S3BlobStoreEndpoint::fromString(url, proxy, &resource, &lastOpenError, &backupParams); if (resource.empty()) throw backup_invalid_url(); @@ -317,7 +318,7 @@ Reference IBackupContainer::openContainer(const std::string& u // Get a list of URLS to backup containers based on some a shorter URL. This function knows about some set of supported // URL types which support this sort of backup discovery. -ACTOR Future> listContainers_impl(std::string baseURL) { +ACTOR Future> listContainers_impl(std::string baseURL, Optional proxy) { try { StringRef u(baseURL); if (u.startsWith("file://"_sr)) { @@ -327,8 +328,8 @@ ACTOR Future> listContainers_impl(std::string baseURL) std::string resource; S3BlobStoreEndpoint::ParametersT backupParams; - Reference bstore = - S3BlobStoreEndpoint::fromString(baseURL, &resource, &IBackupContainer::lastOpenError, &backupParams); + Reference bstore = S3BlobStoreEndpoint::fromString( + baseURL, proxy, &resource, &IBackupContainer::lastOpenError, &backupParams); if (!resource.empty()) { TraceEvent(SevWarn, "BackupContainer") @@ -370,8 +371,9 @@ ACTOR Future> listContainers_impl(std::string baseURL) } } -Future> IBackupContainer::listContainers(const std::string& baseURL) { - return listContainers_impl(baseURL); +Future> IBackupContainer::listContainers(const std::string& baseURL, + const Optional& proxy) { + return listContainers_impl(baseURL, proxy); } ACTOR Future timeKeeperVersionFromDatetime(std::string datetime, Database db) { diff --git a/fdbclient/BackupContainer.h b/fdbclient/BackupContainer.h index 312e3b8ac7..36c9ff7cfa 100644 --- a/fdbclient/BackupContainer.h +++ b/fdbclient/BackupContainer.h @@ -156,6 +156,7 @@ struct BackupFileList { struct BackupDescription { BackupDescription() : snapshotBytes(0) {} std::string url; + Optional proxy; std::vector snapshots; int64_t snapshotBytes; // The version before which everything has been deleted by an expire @@ -294,11 +295,14 @@ public: // Get an IBackupContainer based on a container spec string static Reference openContainer(const std::string& url, - const Optional& encryptionKeyFileName = {}); + const Optional& proxy, + const Optional& encryptionKeyFileName); static std::vector getURLFormats(); - static Future> listContainers(const std::string& baseURL); + static Future> listContainers(const std::string& baseURL, + const Optional& proxy); std::string const& getURL() const { return URL; } + Optional const& getProxy() const { return proxy; } Optional const& getEncryptionKeyFileName() const { return encryptionKeyFileName; } static std::string lastOpenError; @@ -306,6 +310,7 @@ public: // TODO: change the following back to `private` once blob obj access is refactored protected: std::string URL; + Optional proxy; Optional encryptionKeyFileName; }; diff --git a/fdbclient/BackupContainerFileSystem.actor.cpp b/fdbclient/BackupContainerFileSystem.actor.cpp index 7acbd227f2..a4778ecc10 100644 --- a/fdbclient/BackupContainerFileSystem.actor.cpp +++ b/fdbclient/BackupContainerFileSystem.actor.cpp @@ -409,6 +409,7 @@ public: Version logStartVersionOverride) { state BackupDescription desc; desc.url = bc->getURL(); + desc.proxy = bc->getProxy(); TraceEvent("BackupContainerDescribe1") .detail("URL", bc->getURL()) @@ -1500,7 +1501,8 @@ Future BackupContainerFileSystem::createTestEncryptionKeyFile(std::string // code but returning a different template type because you can't cast between them Reference BackupContainerFileSystem::openContainerFS( const std::string& url, - Optional const& encryptionKeyFileName) { + const Optional& proxy, + const Optional& encryptionKeyFileName) { static std::map> m_cache; Reference& r = m_cache[url]; @@ -1517,7 +1519,7 @@ Reference BackupContainerFileSystem::openContainerFS( // The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options. S3BlobStoreEndpoint::ParametersT backupParams; Reference bstore = - S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams); + S3BlobStoreEndpoint::fromString(url, proxy, &resource, &lastOpenError, &backupParams); if (resource.empty()) throw backup_invalid_url(); @@ -1635,7 +1637,9 @@ ACTOR static Future testWriteSnapshotFile(Reference file, Key return Void(); } -ACTOR Future testBackupContainer(std::string url, Optional encryptionKeyFileName) { +ACTOR Future testBackupContainer(std::string url, + Optional proxy, + Optional encryptionKeyFileName) { state FlowLock lock(100e6); if (encryptionKeyFileName.present()) { @@ -1644,7 +1648,7 @@ ACTOR Future testBackupContainer(std::string url, Optional en printf("BackupContainerTest URL %s\n", url.c_str()); - state Reference c = IBackupContainer::openContainer(url, encryptionKeyFileName); + state Reference c = IBackupContainer::openContainer(url, proxy, encryptionKeyFileName); // Make sure container doesn't exist, then create it. try { @@ -1789,12 +1793,13 @@ ACTOR Future testBackupContainer(std::string url, Optional en } TEST_CASE("/backup/containers/localdir/unencrypted") { - wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {})); + wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {}, {})); return Void(); } TEST_CASE("/backup/containers/localdir/encrypted") { wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), + {}, format("%s/test_encryption_key", params.getDataDir().c_str()))); return Void(); } @@ -1803,7 +1808,7 @@ TEST_CASE("/backup/containers/url") { if (!g_network->isSimulated()) { const char* url = getenv("FDB_TEST_BACKUP_URL"); ASSERT(url != nullptr); - wait(testBackupContainer(url, {})); + wait(testBackupContainer(url, {}, {})); } return Void(); } @@ -1813,7 +1818,7 @@ TEST_CASE("/backup/containers_list") { state const char* url = getenv("FDB_TEST_BACKUP_URL"); ASSERT(url != nullptr); printf("Listing %s\n", url); - std::vector urls = wait(IBackupContainer::listContainers(url)); + std::vector urls = wait(IBackupContainer::listContainers(url, {})); for (auto& u : urls) { printf("%s\n", u.c_str()); } diff --git a/fdbclient/BackupContainerFileSystem.h b/fdbclient/BackupContainerFileSystem.h index 52c5d3fc54..784b113395 100644 --- a/fdbclient/BackupContainerFileSystem.h +++ b/fdbclient/BackupContainerFileSystem.h @@ -81,9 +81,9 @@ public: Future exists() override = 0; // TODO: refactor this to separate out the "deal with blob store" stuff from the backup business logic - static Reference openContainerFS( - const std::string& url, - const Optional& encryptionKeyFileName = {}); + static Reference openContainerFS(const std::string& url, + const Optional& proxy, + const Optional& encryptionKeyFileName); // Get a list of fileNames and their sizes in the container under the given path // Although not required, an implementation can avoid traversing unwanted subfolders diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index fc1dc558c7..b451747f08 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -4363,13 +4363,14 @@ public: Key backupTag, Standalone> backupRanges, Key bcUrl, + Optional proxy, Version targetVersion, LockDB lockDB, UID randomUID, Key addPrefix, Key removePrefix) { // Sanity check backup is valid - state Reference bc = IBackupContainer::openContainer(bcUrl.toString()); + state Reference bc = IBackupContainer::openContainer(bcUrl.toString(), proxy, {}); state BackupDescription desc = wait(bc->describeBackup()); wait(desc.resolveVersionTimes(cx)); @@ -4430,6 +4431,7 @@ public: struct RestoreRequest restoreRequest(restoreIndex, restoreTag, bcUrl, + proxy, targetVersion, range, deterministicRandom()->randomUniqueID(), @@ -4510,6 +4512,7 @@ public: ACTOR static Future submitBackup(FileBackupAgent* backupAgent, Reference tr, Key outContainer, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, std::string tagName, @@ -4555,7 +4558,8 @@ public: backupContainer = joinPath(backupContainer, std::string("backup-") + nowStr.toString()); } - state Reference bc = IBackupContainer::openContainer(backupContainer, encryptionKeyFileName); + state Reference bc = + IBackupContainer::openContainer(backupContainer, proxy, encryptionKeyFileName); try { wait(timeoutError(bc->create(), 30)); } catch (Error& e) { @@ -4642,6 +4646,7 @@ public: Reference tr, Key tagName, Key backupURL, + Optional proxy, Standalone> ranges, Version restoreVersion, Key addPrefix, @@ -4710,7 +4715,7 @@ public: // Point the tag to the new uid tag.set(tr, { uid, false }); - Reference bc = IBackupContainer::openContainer(backupURL.toString()); + Reference bc = IBackupContainer::openContainer(backupURL.toString(), proxy, {}); // Configure the new restore restore.tag().set(tr, tagName.toString()); @@ -5303,6 +5308,7 @@ public: Optional cxOrig, Key tagName, Key url, + Optional proxy, Standalone> ranges, WaitForComplete waitForComplete, Version targetVersion, @@ -5320,7 +5326,7 @@ public: throw restore_error(); } - state Reference bc = IBackupContainer::openContainer(url.toString()); + state Reference bc = IBackupContainer::openContainer(url.toString(), proxy, {}); state BackupDescription desc = wait(bc->describeBackup(true)); if (cxOrig.present()) { @@ -5360,6 +5366,7 @@ public: tr, tagName, url, + proxy, ranges, targetVersion, addPrefix, @@ -5499,6 +5506,7 @@ public: tagName, ranges, KeyRef(bc->getURL()), + bc->getProxy(), targetVersion, LockDB::True, randomUid, @@ -5520,6 +5528,7 @@ public: cx, tagName, KeyRef(bc->getURL()), + bc->getProxy(), ranges, WaitForComplete::True, ::invalidVersion, @@ -5561,13 +5570,14 @@ Future FileBackupAgent::submitParallelRestore(Database cx, Key backupTag, Standalone> backupRanges, Key bcUrl, + Optional proxy, Version targetVersion, LockDB lockDB, UID randomUID, Key addPrefix, Key removePrefix) { return FileBackupAgentImpl::submitParallelRestore( - cx, backupTag, backupRanges, bcUrl, targetVersion, lockDB, randomUID, addPrefix, removePrefix); + cx, backupTag, backupRanges, bcUrl, proxy, targetVersion, lockDB, randomUID, addPrefix, removePrefix); } Future FileBackupAgent::atomicParallelRestore(Database cx, @@ -5582,6 +5592,7 @@ Future FileBackupAgent::restore(Database cx, Optional cxOrig, Key tagName, Key url, + Optional proxy, Standalone> ranges, WaitForComplete waitForComplete, Version targetVersion, @@ -5598,6 +5609,7 @@ Future FileBackupAgent::restore(Database cx, cxOrig, tagName, url, + proxy, ranges, waitForComplete, targetVersion, @@ -5639,6 +5651,7 @@ Future FileBackupAgent::waitRestore(Database cx, Key tagName, Ver Future FileBackupAgent::submitBackup(Reference tr, Key outContainer, + Optional proxy, int initialSnapshotIntervalSeconds, int snapshotIntervalSeconds, std::string const& tagName, @@ -5650,6 +5663,7 @@ Future FileBackupAgent::submitBackup(Reference return FileBackupAgentImpl::submitBackup(this, tr, outContainer, + proxy, initialSnapshotIntervalSeconds, snapshotIntervalSeconds, tagName, diff --git a/fdbclient/RestoreInterface.h b/fdbclient/RestoreInterface.h index bdb2499298..b7f3b04bcc 100644 --- a/fdbclient/RestoreInterface.h +++ b/fdbclient/RestoreInterface.h @@ -49,6 +49,7 @@ struct RestoreRequest { int index; Key tagName; Key url; + Optional proxy; Version targetVersion; KeyRange range; UID randomUid; @@ -64,27 +65,29 @@ struct RestoreRequest { explicit RestoreRequest(const int index, const Key& tagName, const Key& url, + const Optional& proxy, Version targetVersion, const KeyRange& range, const UID& randomUid, Key& addPrefix, Key removePrefix) - : index(index), tagName(tagName), url(url), targetVersion(targetVersion), range(range), randomUid(randomUid), - addPrefix(addPrefix), removePrefix(removePrefix) {} + : index(index), tagName(tagName), url(url), proxy(proxy), targetVersion(targetVersion), range(range), + randomUid(randomUid), addPrefix(addPrefix), removePrefix(removePrefix) {} // To change this serialization, ProtocolVersion::RestoreRequestValue must be updated, and downgrades need to be // considered template void serialize(Ar& ar) { - serializer(ar, index, tagName, url, targetVersion, range, randomUid, addPrefix, removePrefix, reply); + serializer(ar, index, tagName, url, proxy, targetVersion, range, randomUid, addPrefix, removePrefix, reply); } std::string toString() const { std::stringstream ss; ss << "index:" << std::to_string(index) << " tagName:" << tagName.contents().toString() - << " url:" << url.contents().toString() << " targetVersion:" << std::to_string(targetVersion) - << " range:" << range.toString() << " randomUid:" << randomUid.toString() - << " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString(); + << " url:" << url.contents().toString() << " proxy:" << (proxy.present() ? proxy.get() : "") + << " targetVersion:" << std::to_string(targetVersion) << " range:" << range.toString() + << " randomUid:" << randomUid.toString() << " addPrefix:" << addPrefix.toString() + << " removePrefix:" << removePrefix.toString(); return ss.str(); } }; diff --git a/fdbclient/S3BlobStore.actor.cpp b/fdbclient/S3BlobStore.actor.cpp index a4fa95616a..799f631c6e 100644 --- a/fdbclient/S3BlobStore.actor.cpp +++ b/fdbclient/S3BlobStore.actor.cpp @@ -162,7 +162,8 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const { return r; } -Reference S3BlobStoreEndpoint::fromString(std::string const& url, +Reference S3BlobStoreEndpoint::fromString(const std::string& url, + const Optional& proxy, std::string* resourceFromURL, std::string* error, ParametersT* ignored_parameters) { @@ -175,6 +176,13 @@ Reference S3BlobStoreEndpoint::fromString(std::string const if (prefix != LiteralStringRef("blobstore")) throw format("Invalid blobstore URL prefix '%s'", prefix.toString().c_str()); + Optional proxyHost, proxyPort; + if (proxy.present()) { + StringRef p(proxy.get()); + proxyHost = p.eat(":").toString(); + proxyPort = p.eat().toString(); + } + Optional cred; if (url.find("@") != std::string::npos) { cred = t.eat("@"); @@ -261,7 +269,8 @@ Reference S3BlobStoreEndpoint::fromString(std::string const creds = S3BlobStoreEndpoint::Credentials{ key.toString(), secret.toString(), securityToken.toString() }; } - return makeReference(host.toString(), service.toString(), creds, knobs, extraHeaders); + return makeReference( + host.toString(), service.toString(), proxyHost, proxyPort, creds, knobs, extraHeaders); } catch (std::string& err) { if (error != nullptr) @@ -624,11 +633,11 @@ ACTOR Future connect_impl(Referenceservice; + std::string host = b->host, service = b->service; if (service.empty()) service = b->knobs.secure_connection ? "https" : "http"; state Reference conn = - wait(INetworkConnections::net()->connect(b->host, service, b->knobs.secure_connection ? true : false)); + wait(INetworkConnections::net()->connect(host, service, b->knobs.secure_connection ? true : false)); wait(conn->connectHandshake()); TraceEvent("S3BlobStoreEndpointNewConnection") @@ -1609,7 +1618,7 @@ TEST_CASE("/backup/s3/v4headers") { S3BlobStoreEndpoint::Credentials creds{ "AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "" } // GET without query parameters { - S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", creds); + S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds); std::string verb("GET"); std::string resource("/test.txt"); HTTP::Headers headers; @@ -1624,7 +1633,7 @@ TEST_CASE("/backup/s3/v4headers") { // GET with query parameters { - S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", creds); + S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds); std::string verb("GET"); std::string resource("/test/examplebucket?Action=DescribeRegions&Version=2013-10-15"); HTTP::Headers headers; @@ -1639,7 +1648,7 @@ TEST_CASE("/backup/s3/v4headers") { // POST { - S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", creds); + S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", "proxy", "port", creds); std::string verb("POST"); std::string resource("/simple.json"); HTTP::Headers headers; diff --git a/fdbclient/S3BlobStore.h b/fdbclient/S3BlobStore.h index 21f39e1d0e..bd29675bae 100644 --- a/fdbclient/S3BlobStore.h +++ b/fdbclient/S3BlobStore.h @@ -99,11 +99,15 @@ public: }; S3BlobStoreEndpoint(std::string const& host, - std::string service, + std::string const& service, + Optional const& proxyHost, + Optional const& proxyPort, Optional const& creds, BlobKnobs const& knobs = BlobKnobs(), HTTP::Headers extraHeaders = HTTP::Headers()) - : host(host), service(service), credentials(creds), lookupKey(creds.present() && creds.get().key.empty()), + : host(host), service(service), proxyHost(proxyHost), proxyPort(proxyPort), + useProxy(proxyHost.present() && proxyPort.present()), credentials(creds), + lookupKey(creds.present() && creds.get().key.empty()), lookupSecret(creds.present() && creds.get().secret.empty()), knobs(knobs), extraHeaders(extraHeaders), requestRate(new SpeedLimit(knobs.requests_per_second, 1)), requestRateList(new SpeedLimit(knobs.list_requests_per_second, 1)), @@ -114,7 +118,7 @@ public: recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)), concurrentRequests(knobs.concurrent_requests), concurrentUploads(knobs.concurrent_uploads), concurrentLists(knobs.concurrent_lists) { - if (host.empty()) + if (host.empty() || (proxyHost.present() != proxyPort.present())) throw connection_string_invalid(); } @@ -132,10 +136,11 @@ public: // Parse url and return a S3BlobStoreEndpoint // If the url has parameters that S3BlobStoreEndpoint can't consume then an error will be thrown unless // ignored_parameters is given in which case the unconsumed parameters will be added to it. - static Reference fromString(std::string const& url, - std::string* resourceFromURL = nullptr, - std::string* error = nullptr, - ParametersT* ignored_parameters = nullptr); + static Reference fromString(const std::string& url, + const Optional& proxy, + std::string* resourceFromURL, + std::string* error, + ParametersT* ignored_parameters); // Get a normalized version of this URL with the given resource and any non-default BlobKnob values as URL // parameters in addition to the passed params string @@ -151,6 +156,10 @@ public: std::string host; std::string service; + Optional proxyHost; + Optional proxyPort; + bool useProxy; + Optional credentials; bool lookupKey; bool lookupSecret; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index b81fee7d70..736d5b7a59 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -2506,7 +2506,7 @@ ACTOR Future monitorPruneKeys(Reference self) { if (BM_DEBUG) { fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str()); } - self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); if (BM_DEBUG) { printf("BM constructed backup container\n"); } diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 791ee7a05a..906994922b 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -3021,7 +3021,7 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, if (BW_DEBUG) { fmt::print("BW constructing backup container from {0}\n", SERVER_KNOBS->BG_URL); } - self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); if (BW_DEBUG) { printf("BW constructed backup container\n"); } diff --git a/fdbserver/RestoreController.actor.cpp b/fdbserver/RestoreController.actor.cpp index 64d7d3d785..8092ebe39a 100644 --- a/fdbserver/RestoreController.actor.cpp +++ b/fdbserver/RestoreController.actor.cpp @@ -47,7 +47,8 @@ ACTOR static Future collectBackupFiles(Reference bc, RestoreRequest request); ACTOR static Future buildRangeVersions(KeyRangeMap* pRangeVersions, std::vector* pRangeFiles, - Key url); + Key url, + Optional proxy); ACTOR static Future processRestoreRequest(Reference self, Database cx, @@ -317,7 +318,7 @@ ACTOR static Future processRestoreRequest(Reference allFiles; state Version minRangeVersion = MAX_VERSION; - self->initBackupContainer(request.url); + self->initBackupContainer(request.url, request.proxy); // Get all backup files' description and save them to files state Version targetVersion = @@ -334,7 +335,7 @@ ACTOR static Future processRestoreRequest(Reference rangeVersions(minRangeVersion, allKeys.end); if (SERVER_KNOBS->FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE) { - wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url)); + wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url, request.proxy)); } else { // Debug purpose, dump range versions auto ranges = rangeVersions.ranges(); @@ -881,13 +882,14 @@ ACTOR static Future insertRangeVersion(KeyRangeMap* pRangeVersion // Expensive and slow operation that should not run in real prod. ACTOR static Future buildRangeVersions(KeyRangeMap* pRangeVersions, std::vector* pRangeFiles, - Key url) { + Key url, + Optional proxy) { if (!g_network->isSimulated()) { TraceEvent(SevError, "ExpensiveBuildRangeVersions") .detail("Reason", "Parsing all range files is slow and memory intensive"); return Void(); } - Reference bc = IBackupContainer::openContainer(url.toString()); + Reference bc = IBackupContainer::openContainer(url.toString(), proxy, {}); // Key ranges not in range files are empty; // Assign highest version to avoid applying any mutation in these ranges diff --git a/fdbserver/RestoreController.actor.h b/fdbserver/RestoreController.actor.h index 5c9a271f7a..77aa5e6494 100644 --- a/fdbserver/RestoreController.actor.h +++ b/fdbserver/RestoreController.actor.h @@ -446,13 +446,15 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted proxy) { if (bcUrl == url && bc.isValid()) { return; } - TraceEvent("FastRestoreControllerInitBackupContainer").detail("URL", url); + TraceEvent("FastRestoreControllerInitBackupContainer") + .detail("URL", url) + .detail("Proxy", proxy.present() ? proxy.get() : ""); bcUrl = url; - bc = IBackupContainer::openContainer(url.toString()); + bc = IBackupContainer::openContainer(url.toString(), proxy, {}); } }; diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 9aa1aadee3..1afabdcb95 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -262,7 +262,7 @@ ACTOR Future restoreLoaderCore(RestoreLoaderInterface loaderInterf, when(RestoreLoadFileRequest req = waitNext(loaderInterf.loadFile.getFuture())) { requestTypeStr = "loadFile"; hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty(); - self->initBackupContainer(req.param.url); + self->initBackupContainer(req.param.url, req.param.proxy); self->loadingQueue.push(req); if (!hasQueuedRequests) { self->hasPendingRequests->set(true); diff --git a/fdbserver/RestoreLoader.actor.h b/fdbserver/RestoreLoader.actor.h index b16e4c11fa..92b11a5a1c 100644 --- a/fdbserver/RestoreLoader.actor.h +++ b/fdbserver/RestoreLoader.actor.h @@ -226,12 +226,12 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted proxy) { if (bcUrl == url && bc.isValid()) { return; } bcUrl = url; - bc = IBackupContainer::openContainer(url.toString()); + bc = IBackupContainer::openContainer(url.toString(), proxy, {}); } }; diff --git a/fdbserver/RestoreWorkerInterface.actor.h b/fdbserver/RestoreWorkerInterface.actor.h index 065b22c468..3c2830514a 100644 --- a/fdbserver/RestoreWorkerInterface.actor.h +++ b/fdbserver/RestoreWorkerInterface.actor.h @@ -368,6 +368,7 @@ struct LoadingParam { bool isRangeFile; Key url; + Optional proxy; Optional rangeVersion; // range file's version int64_t blockSize; @@ -386,12 +387,13 @@ struct LoadingParam { template void serialize(Ar& ar) { - serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset); + serializer(ar, isRangeFile, url, proxy, rangeVersion, blockSize, asset); } std::string toString() const { std::stringstream str; str << "isRangeFile:" << isRangeFile << " url:" << url.toString() + << " proxy:" << (proxy.present() ? proxy.get() : "") << " rangeVersion:" << (rangeVersion.present() ? rangeVersion.get() : -1) << " blockSize:" << blockSize << " RestoreAsset:" << asset.toString(); return str.str(); diff --git a/fdbserver/workloads/AtomicRestore.actor.cpp b/fdbserver/workloads/AtomicRestore.actor.cpp index 4c3c2703f9..86d90e1093 100644 --- a/fdbserver/workloads/AtomicRestore.actor.cpp +++ b/fdbserver/workloads/AtomicRestore.actor.cpp @@ -93,6 +93,7 @@ struct AtomicRestoreWorkload : TestWorkload { try { wait(backupAgent.submitBackup(cx, StringRef(backupContainer), + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), BackupAgentBase::getDefaultTagName(), diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index 890bdf6a3a..650ca6f2c6 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -222,6 +222,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { try { wait(backupAgent->submitBackup(cx, StringRef(backupContainer), + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), tag.toString(), @@ -377,6 +378,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { cx, self->backupTag, KeyRef(lastBackupContainer), + {}, WaitForComplete::True, ::invalidVersion, Verbose::True, @@ -478,6 +480,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { // the configuration to disable backup workers before restore. extraBackup = backupAgent.submitBackup(cx, LiteralStringRef("file://simfdb/backups/"), + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), self->backupTag.toString(), @@ -523,7 +526,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { .detail("BackupTag", printable(self->backupTag)); // start restoring - auto container = IBackupContainer::openContainer(lastBackupContainer->getURL()); + auto container = + IBackupContainer::openContainer(lastBackupContainer->getURL(), lastBackupContainer->getProxy(), {}); BackupDescription desc = wait(container->describeBackup()); ASSERT(self->usePartitionedLogs == desc.partitioned); ASSERT(desc.minRestorableVersion.present()); // We must have a valid backup now. @@ -566,6 +570,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { self->backupTag, self->backupRanges, KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), targetVersion, self->locked, randomID, diff --git a/fdbserver/workloads/BackupCorrectness.actor.cpp b/fdbserver/workloads/BackupCorrectness.actor.cpp index 92550a23bf..4c82762764 100644 --- a/fdbserver/workloads/BackupCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupCorrectness.actor.cpp @@ -266,6 +266,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { try { wait(backupAgent->submitBackup(cx, StringRef(backupContainer), + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), tag.toString(), @@ -423,6 +424,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, self->backupTag, KeyRef(lastBackupContainer), + {}, WaitForComplete::True, ::invalidVersion, Verbose::True, @@ -523,6 +525,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { try { extraBackup = backupAgent.submitBackup(cx, "file://simfdb/backups/"_sr, + {}, deterministicRandom()->randomInt(0, 60), deterministicRandom()->randomInt(0, 100), self->backupTag.toString(), @@ -557,7 +560,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { .detail("RestoreAfter", self->restoreAfter) .detail("BackupTag", printable(self->backupTag)); - auto container = IBackupContainer::openContainer(lastBackupContainer->getURL()); + auto container = IBackupContainer::openContainer(lastBackupContainer->getURL(), + lastBackupContainer->getProxy(), + lastBackupContainer->getEncryptionKeyFileName()); BackupDescription desc = wait(container->describeBackup()); Version targetVersion = -1; @@ -593,6 +598,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, restoreTag, KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), WaitForComplete::True, targetVersion, Verbose::True, @@ -616,6 +622,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, restoreTag, KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), self->restoreRanges, WaitForComplete::True, targetVersion, @@ -646,6 +653,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, restoreTags[restoreIndex], KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), self->restoreRanges, WaitForComplete::True, ::invalidVersion, @@ -675,6 +683,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { cx, restoreTags[restoreIndex], KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), WaitForComplete::True, ::invalidVersion, Verbose::True, diff --git a/fdbserver/workloads/BackupToBlob.actor.cpp b/fdbserver/workloads/BackupToBlob.actor.cpp index ee27e1a480..480ae62466 100644 --- a/fdbserver/workloads/BackupToBlob.actor.cpp +++ b/fdbserver/workloads/BackupToBlob.actor.cpp @@ -62,6 +62,7 @@ struct BackupToBlobWorkload : TestWorkload { wait(delay(self->backupAfter)); wait(backupAgent.submitBackup(cx, self->backupURL, + {}, self->initSnapshotInterval, self->snapshotInterval, self->backupTag.toString(), diff --git a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp index e0d309954e..fc6d3035ae 100644 --- a/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp +++ b/fdbserver/workloads/BlobGranuleCorrectnessWorkload.actor.cpp @@ -250,13 +250,13 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload { if (BGW_DEBUG) { printf("Blob Granule Correctness constructing simulated backup container\n"); } - self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/"); + self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/", {}, {}); } else { if (BGW_DEBUG) { printf("Blob Granule Correctness constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str()); } - self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); if (BGW_DEBUG) { printf("Blob Granule Correctness constructed backup container\n"); } diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index cd97b1960b..ba49923bf1 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -90,13 +90,13 @@ struct BlobGranuleVerifierWorkload : TestWorkload { if (BGV_DEBUG) { printf("Blob Granule Verifier constructing simulated backup container\n"); } - bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/"); + bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/", {}, {}); } else { if (BGV_DEBUG) { printf("Blob Granule Verifier constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str()); } - bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {}); if (BGV_DEBUG) { printf("Blob Granule Verifier constructed backup container\n"); } diff --git a/fdbserver/workloads/IncrementalBackup.actor.cpp b/fdbserver/workloads/IncrementalBackup.actor.cpp index 688387023e..e40133ffd0 100644 --- a/fdbserver/workloads/IncrementalBackup.actor.cpp +++ b/fdbserver/workloads/IncrementalBackup.actor.cpp @@ -98,12 +98,12 @@ struct IncrementalBackupWorkload : TestWorkload { if (!backupContainer.isValid()) { TraceEvent("IBackupCheckListContainersAttempt").log(); state std::vector containers = - wait(IBackupContainer::listContainers(self->backupDir.toString())); + wait(IBackupContainer::listContainers(self->backupDir.toString(), {})); TraceEvent("IBackupCheckListContainersSuccess") .detail("Size", containers.size()) .detail("First", containers.front()); if (containers.size()) { - backupContainer = IBackupContainer::openContainer(containers.front()); + backupContainer = IBackupContainer::openContainer(containers.front(), {}, {}); } } state bool e = wait(backupContainer->exists()); @@ -152,6 +152,7 @@ struct IncrementalBackupWorkload : TestWorkload { try { wait(self->backupAgent.submitBackup(cx, self->backupDir, + {}, 0, 1e8, self->tag.toString(), @@ -219,7 +220,7 @@ struct IncrementalBackupWorkload : TestWorkload { } TraceEvent("IBackupStartListContainersAttempt").log(); state std::vector containers = - wait(IBackupContainer::listContainers(self->backupDir.toString())); + wait(IBackupContainer::listContainers(self->backupDir.toString(), {})); TraceEvent("IBackupStartListContainersSuccess") .detail("Size", containers.size()) .detail("First", containers.front()); @@ -229,6 +230,7 @@ struct IncrementalBackupWorkload : TestWorkload { cx, Key(self->tag.toString()), backupURL, + {}, WaitForComplete::True, invalidVersion, Verbose::True, diff --git a/fdbserver/workloads/RestoreBackup.actor.cpp b/fdbserver/workloads/RestoreBackup.actor.cpp index c7122bc107..c08fc7de70 100644 --- a/fdbserver/workloads/RestoreBackup.actor.cpp +++ b/fdbserver/workloads/RestoreBackup.actor.cpp @@ -114,6 +114,7 @@ struct RestoreBackupWorkload final : TestWorkload { cx, self->tag, Key(self->backupContainer->getURL()), + self->backupContainer->getProxy(), WaitForComplete::True, ::invalidVersion, Verbose::True))); diff --git a/fdbserver/workloads/RestoreFromBlob.actor.cpp b/fdbserver/workloads/RestoreFromBlob.actor.cpp index 482f22ded4..9d072bb731 100644 --- a/fdbserver/workloads/RestoreFromBlob.actor.cpp +++ b/fdbserver/workloads/RestoreFromBlob.actor.cpp @@ -61,8 +61,8 @@ struct RestoreFromBlobWorkload : TestWorkload { restoreRanges.push_back_deep(restoreRanges.arena(), normalKeys); wait(delay(self->restoreAfter)); - Version v = - wait(backupAgent.restore(cx, {}, self->backupTag, self->backupURL, restoreRanges, self->waitForComplete)); + Version v = wait( + backupAgent.restore(cx, {}, self->backupTag, self->backupURL, {}, restoreRanges, self->waitForComplete)); return Void(); } diff --git a/fdbserver/workloads/SubmitBackup.actor.cpp b/fdbserver/workloads/SubmitBackup.actor.cpp index 50759bf014..aa4dd13d9b 100644 --- a/fdbserver/workloads/SubmitBackup.actor.cpp +++ b/fdbserver/workloads/SubmitBackup.actor.cpp @@ -57,6 +57,7 @@ struct SubmitBackupWorkload final : TestWorkload { try { wait(self->backupAgent.submitBackup(cx, self->backupDir, + {}, self->initSnapshotInterval, self->snapshotInterval, self->tag.toString(),