Add proxy option to backup and restore params.

This commit is contained in:
Renxuan Wang 2022-03-28 17:10:49 -07:00
parent ad98d64799
commit 0a332ee1c1
29 changed files with 232 additions and 95 deletions

View File

@ -101,6 +101,7 @@ std::vector<LogFile> getRelevantLogFiles(const std::vector<LogFile>& files, Vers
struct ConvertParams {
std::string container_url;
Optional<std::string> proxy;
Version begin = invalidVersion;
Version end = invalidVersion;
bool log_enabled = false;
@ -112,6 +113,10 @@ struct ConvertParams {
std::string s;
s.append("ContainerURL:");
s.append(container_url);
if (proxy.present()) {
s.append(" Proxy:");
s.append(proxy.get());
}
s.append(" Begin:");
s.append(format("%" PRId64, begin));
s.append(" End:");
@ -448,7 +453,8 @@ private:
};
ACTOR Future<Void> convert(ConvertParams params) {
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
state Reference<IBackupContainer> container =
IBackupContainer::openContainer(params.container_url, params.proxy, {});
state BackupFileList listing = wait(container->dumpFileList());
std::sort(listing.logs.begin(), listing.logs.end());
TraceEvent("Container").detail("URL", params.container_url).detail("Logs", listing.logs.size());

View File

@ -94,6 +94,7 @@ void printBuildInformation() {
struct DecodeParams {
std::string container_url;
Optional<std::string> proxy;
std::string fileFilter; // only files match the filter will be decoded
bool log_enabled = true;
std::string log_dir, trace_format, trace_log_group;
@ -115,6 +116,10 @@ struct DecodeParams {
std::string s;
s.append("ContainerURL: ");
s.append(container_url);
if (proxy.present()) {
s.append(", Proxy: ");
s.append(proxy.get());
}
s.append(", FileFilter: ");
s.append(fileFilter);
if (log_enabled) {
@ -526,7 +531,8 @@ ACTOR Future<Void> process_file(Reference<IBackupContainer> container, LogFile f
}
ACTOR Future<Void> decode_logs(DecodeParams params) {
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
state Reference<IBackupContainer> container =
IBackupContainer::openContainer(params.container_url, params.proxy, {});
state UID uid = deterministicRandom()->randomUniqueID();
state BackupFileList listing = wait(container->dumpFileList());
// remove partitioned logs

View File

@ -130,6 +130,7 @@ enum {
OPT_USE_PARTITIONED_LOG,
// Backup and Restore constants
OPT_PROXY,
OPT_TAGNAME,
OPT_BACKUPKEYS,
OPT_WAITFORDONE,
@ -234,6 +235,7 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
{ OPT_NOSTOPWHENDONE, "--no-stop-when-done", SO_NONE },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
// Enable "-p" option after GA
// { OPT_USE_PARTITIONED_LOG, "-p", SO_NONE },
{ OPT_USE_PARTITIONED_LOG, "--partitioned-log-experimental", SO_NONE },
@ -294,6 +296,7 @@ CSimpleOpt::SOption g_rgBackupModifyOptions[] = {
{ OPT_MOD_VERIFY_UID, "--verify-uid", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_SNAPSHOTINTERVAL, "-s", SO_REQ_SEP },
{ OPT_SNAPSHOTINTERVAL, "--snapshot-interval", SO_REQ_SEP },
{ OPT_MOD_ACTIVE_INTERVAL, "--active-snapshot-interval", SO_REQ_SEP },
@ -482,6 +485,7 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = {
{ OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP },
@ -517,6 +521,7 @@ CSimpleOpt::SOption g_rgBackupDeleteOptions[] = {
#endif
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP },
@ -546,6 +551,7 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
{ OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP },
@ -578,6 +584,7 @@ CSimpleOpt::SOption g_rgBackupDumpOptions[] = {
{ OPT_CLUSTERFILE, "--cluster-file", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_TRACE, "--log", SO_NONE },
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
@ -652,6 +659,7 @@ CSimpleOpt::SOption g_rgBackupQueryOptions[] = {
{ OPT_RESTORE_TIMESTAMP, "--query-restore-timestamp", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "-d", SO_REQ_SEP },
{ OPT_DESTCONTAINER, "--destcontainer", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_RESTORE_VERSION, "-qrv", SO_REQ_SEP },
{ OPT_RESTORE_VERSION, "--query-restore-version", SO_REQ_SEP },
{ OPT_BACKUPKEYS_FILTER, "-k", SO_REQ_SEP },
@ -689,6 +697,7 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
{ OPT_RESTORE_TIMESTAMP, "--timestamp", SO_REQ_SEP },
{ OPT_KNOB, "--knob-", SO_REQ_SEP },
{ OPT_RESTORECONTAINER, "-r", SO_REQ_SEP },
{ OPT_PROXY, "--proxy", SO_REQ_SEP },
{ OPT_PREFIX_ADD, "--add-prefix", SO_REQ_SEP },
{ OPT_PREFIX_REMOVE, "--remove-prefix", SO_REQ_SEP },
{ OPT_TAGNAME, "-t", SO_REQ_SEP },
@ -1920,6 +1929,7 @@ ACTOR Future<Void> submitDBBackup(Database src,
ACTOR Future<Void> submitBackup(Database db,
std::string url,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
@ -1977,6 +1987,7 @@ ACTOR Future<Void> submitBackup(Database db,
else {
wait(backupAgent.submitBackup(db,
KeyRef(url),
proxy,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,
@ -2260,8 +2271,9 @@ ACTOR Future<Void> changeDBBackupResumed(Database src, Database dest, bool pause
}
Reference<IBackupContainer> openBackupContainer(const char* name,
std::string destinationContainer,
Optional<std::string> const& encryptionKeyFile = {}) {
const std::string& destinationContainer,
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFile) {
// Error, if no dest container was specified
if (destinationContainer.empty()) {
fprintf(stderr, "ERROR: No backup destination was specified.\n");
@ -2271,7 +2283,7 @@ Reference<IBackupContainer> openBackupContainer(const char* name,
Reference<IBackupContainer> c;
try {
c = IBackupContainer::openContainer(destinationContainer, encryptionKeyFile);
c = IBackupContainer::openContainer(destinationContainer, proxy, encryptionKeyFile);
} catch (Error& e) {
std::string msg = format("ERROR: '%s' on URL '%s'", e.what(), destinationContainer.c_str());
if (e.code() == error_code_backup_invalid_url && !IBackupContainer::lastOpenError.empty()) {
@ -2291,6 +2303,7 @@ ACTOR Future<Void> runRestore(Database db,
std::string originalClusterFile,
std::string tagName,
std::string container,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
Version beginVersion,
Version targetVersion,
@ -2339,7 +2352,7 @@ ACTOR Future<Void> runRestore(Database db,
state FileBackupAgent backupAgent;
state Reference<IBackupContainer> bc =
openBackupContainer(exeRestore.toString().c_str(), container, encryptionKeyFile);
openBackupContainer(exeRestore.toString().c_str(), container, proxy, encryptionKeyFile);
// If targetVersion is unset then use the maximum restorable version from the backup description
if (targetVersion == invalidVersion) {
@ -2368,6 +2381,7 @@ ACTOR Future<Void> runRestore(Database db,
origDb,
KeyRef(tagName),
KeyRef(container),
proxy,
ranges,
waitForDone,
targetVersion,
@ -2411,6 +2425,7 @@ ACTOR Future<Void> runRestore(Database db,
ACTOR Future<Void> runFastRestoreTool(Database db,
std::string tagName,
std::string container,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
Version dbVersion,
bool performRestore,
@ -2440,7 +2455,7 @@ ACTOR Future<Void> runFastRestoreTool(Database db,
if (performRestore) {
if (dbVersion == invalidVersion) {
TraceEvent("FastRestoreTool").detail("TargetRestoreVersion", "Largest restorable version");
BackupDescription desc = wait(IBackupContainer::openContainer(container)->describeBackup());
BackupDescription desc = wait(IBackupContainer::openContainer(container, proxy, {})->describeBackup());
if (!desc.maxRestorableVersion.present()) {
fprintf(stderr, "The specified backup is not restorable to any version.\n");
throw restore_error();
@ -2457,6 +2472,7 @@ ACTOR Future<Void> runFastRestoreTool(Database db,
KeyRef(tagName),
ranges,
KeyRef(container),
proxy,
dbVersion,
LockDB::True,
randomUID,
@ -2478,7 +2494,7 @@ ACTOR Future<Void> runFastRestoreTool(Database db,
restoreVersion = dbVersion;
} else {
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(container);
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(container, proxy, {});
state BackupDescription description = wait(bc->describeBackup());
if (dbVersion <= 0) {
@ -2522,9 +2538,10 @@ ACTOR Future<Void> runFastRestoreTool(Database db,
ACTOR Future<Void> dumpBackupData(const char* name,
std::string destinationContainer,
Optional<std::string> proxy,
Version beginVersion,
Version endVersion) {
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, proxy, {});
if (beginVersion < 0 || endVersion < 0) {
BackupDescription desc = wait(c->describeBackup());
@ -2552,6 +2569,7 @@ ACTOR Future<Void> dumpBackupData(const char* name,
ACTOR Future<Void> expireBackupData(const char* name,
std::string destinationContainer,
Optional<std::string> proxy,
Version endVersion,
std::string endDatetime,
Database db,
@ -2577,7 +2595,7 @@ ACTOR Future<Void> expireBackupData(const char* name,
}
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, encryptionKeyFile);
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, proxy, encryptionKeyFile);
state IBackupContainer::ExpireProgress progress;
state std::string lastProgress;
@ -2623,9 +2641,11 @@ ACTOR Future<Void> expireBackupData(const char* name,
return Void();
}
ACTOR Future<Void> deleteBackupContainer(const char* name, std::string destinationContainer) {
ACTOR Future<Void> deleteBackupContainer(const char* name,
std::string destinationContainer,
Optional<std::string> proxy) {
try {
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer);
state Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, proxy, {});
state int numDeleted = 0;
state Future<Void> done = c->deleteContainer(&numDeleted);
@ -2657,12 +2677,13 @@ ACTOR Future<Void> deleteBackupContainer(const char* name, std::string destinati
ACTOR Future<Void> describeBackup(const char* name,
std::string destinationContainer,
Optional<std::string> proxy,
bool deep,
Optional<Database> cx,
bool json,
Optional<std::string> encryptionKeyFile) {
try {
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, encryptionKeyFile);
Reference<IBackupContainer> c = openBackupContainer(name, destinationContainer, proxy, encryptionKeyFile);
state BackupDescription desc = wait(c->describeBackup(deep));
if (cx.present())
wait(desc.resolveVersionTimes(cx.get()));
@ -2688,6 +2709,7 @@ static void reportBackupQueryError(UID operationId, JsonBuilderObject& result, s
// resolved to that timestamp.
ACTOR Future<Void> queryBackup(const char* name,
std::string destinationContainer,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> keyRangesFilter,
Version restoreVersion,
std::string originalClusterFile,
@ -2734,7 +2756,7 @@ ACTOR Future<Void> queryBackup(const char* name,
}
try {
state Reference<IBackupContainer> bc = openBackupContainer(name, destinationContainer);
state Reference<IBackupContainer> bc = openBackupContainer(name, destinationContainer, proxy, {});
if (restoreVersion == invalidVersion) {
BackupDescription desc = wait(bc->describeBackup());
if (desc.maxRestorableVersion.present()) {
@ -2814,9 +2836,9 @@ ACTOR Future<Void> queryBackup(const char* name,
return Void();
}
ACTOR Future<Void> listBackup(std::string baseUrl) {
ACTOR Future<Void> listBackup(std::string baseUrl, Optional<std::string> proxy) {
try {
std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl));
std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl, proxy));
for (std::string container : containers) {
printf("%s\n", container.c_str());
}
@ -2852,6 +2874,7 @@ ACTOR Future<Void> listBackupTags(Database cx) {
struct BackupModifyOptions {
Optional<std::string> verifyUID;
Optional<std::string> destURL;
Optional<std::string> proxy;
Optional<int> snapshotIntervalSeconds;
Optional<int> activeSnapshotIntervalSeconds;
bool hasChanges() const {
@ -2869,7 +2892,7 @@ ACTOR Future<Void> modifyBackup(Database db, std::string tagName, BackupModifyOp
state Reference<IBackupContainer> bc;
if (options.destURL.present()) {
bc = openBackupContainer(exeBackup.toString().c_str(), options.destURL.get());
bc = openBackupContainer(exeBackup.toString().c_str(), options.destURL.get(), options.proxy, {});
try {
wait(timeoutError(bc->create(), 30));
} catch (Error& e) {
@ -3342,6 +3365,7 @@ int main(int argc, char* argv[]) {
break;
}
Optional<std::string> proxy;
std::string destinationContainer;
bool describeDeep = false;
bool describeTimestamps = false;
@ -3595,6 +3619,10 @@ int main(int argc, char* argv[]) {
return FDB_EXIT_ERROR;
}
break;
case OPT_PROXY:
proxy = args->OptionArg();
modifyOptions.proxy = proxy;
break;
case OPT_DESTCONTAINER:
destinationContainer = args->OptionArg();
// If the url starts with '/' then prepend "file://" for backwards compatibility
@ -3962,9 +3990,10 @@ int main(int argc, char* argv[]) {
if (!initCluster())
return FDB_EXIT_ERROR;
// Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable.
openBackupContainer(argv[0], destinationContainer, encryptionKeyFile);
openBackupContainer(argv[0], destinationContainer, proxy, encryptionKeyFile);
f = stopAfter(submitBackup(db,
destinationContainer,
proxy,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
backupKeys,
@ -4036,6 +4065,7 @@ int main(int argc, char* argv[]) {
}
f = stopAfter(expireBackupData(argv[0],
destinationContainer,
proxy,
expireVersion,
expireDatetime,
db,
@ -4047,7 +4077,7 @@ int main(int argc, char* argv[]) {
case BackupType::DELETE_BACKUP:
initTraceFile();
f = stopAfter(deleteBackupContainer(argv[0], destinationContainer));
f = stopAfter(deleteBackupContainer(argv[0], destinationContainer, proxy));
break;
case BackupType::DESCRIBE:
@ -4060,6 +4090,7 @@ int main(int argc, char* argv[]) {
// given, but quietly skip them if not.
f = stopAfter(describeBackup(argv[0],
destinationContainer,
proxy,
describeDeep,
describeTimestamps ? Optional<Database>(db) : Optional<Database>(),
jsonOutput,
@ -4068,7 +4099,7 @@ int main(int argc, char* argv[]) {
case BackupType::LIST:
initTraceFile();
f = stopAfter(listBackup(baseUrl));
f = stopAfter(listBackup(baseUrl, proxy));
break;
case BackupType::TAGS:
@ -4081,6 +4112,7 @@ int main(int argc, char* argv[]) {
initTraceFile();
f = stopAfter(queryBackup(argv[0],
destinationContainer,
proxy,
backupKeysFilter,
restoreVersion,
restoreClusterFileOrig,
@ -4090,7 +4122,7 @@ int main(int argc, char* argv[]) {
case BackupType::DUMP:
initTraceFile();
f = stopAfter(dumpBackupData(argv[0], destinationContainer, dumpBegin, dumpEnd));
f = stopAfter(dumpBackupData(argv[0], destinationContainer, proxy, dumpBegin, dumpEnd));
break;
case BackupType::UNDEFINED:
@ -4141,6 +4173,7 @@ int main(int argc, char* argv[]) {
restoreClusterFileOrig,
tagName,
restoreContainer,
proxy,
backupKeys,
beginVersion,
restoreVersion,
@ -4218,6 +4251,7 @@ int main(int argc, char* argv[]) {
f = stopAfter(runFastRestoreTool(db,
tagName,
restoreContainer,
proxy,
backupKeys,
restoreVersion,
!dryRun,

View File

@ -165,6 +165,7 @@ public:
Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
Key bcUrl,
Optional<std::string> proxy,
Version targetVersion,
LockDB lockDB,
UID randomUID,
@ -187,6 +188,7 @@ public:
Optional<Database> cxOrig,
Key tagName,
Key url,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
WaitForComplete = WaitForComplete::True,
Version targetVersion = ::invalidVersion,
@ -202,6 +204,7 @@ public:
Optional<Database> cxOrig,
Key tagName,
Key url,
Optional<std::string> proxy,
WaitForComplete waitForComplete = WaitForComplete::True,
Version targetVersion = ::invalidVersion,
Verbose verbose = Verbose::True,
@ -219,6 +222,7 @@ public:
cxOrig,
tagName,
url,
proxy,
rangeRef,
waitForComplete,
targetVersion,
@ -263,6 +267,7 @@ public:
Future<Void> submitBackup(Reference<ReadYourWritesTransaction> tr,
Key outContainer,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string const& tagName,
@ -273,6 +278,7 @@ public:
Optional<std::string> const& encryptionKeyFileName = {});
Future<Void> submitBackup(Database cx,
Key outContainer,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string const& tagName,
@ -284,6 +290,7 @@ public:
return runRYWTransactionFailIfLocked(cx, [=](Reference<ReadYourWritesTransaction> tr) {
return submitBackup(tr,
outContainer,
proxy,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,
@ -720,20 +727,31 @@ template <>
inline Tuple Codec<Reference<IBackupContainer>>::pack(Reference<IBackupContainer> const& bc) {
Tuple tuple;
tuple.append(StringRef(bc->getURL()));
if (bc->getProxy().present()) {
tuple.append(StringRef(bc->getProxy().get()));
} else {
tuple.append(StringRef());
}
if (bc->getEncryptionKeyFileName().present()) {
tuple.append(bc->getEncryptionKeyFileName().get());
} else {
tuple.append(StringRef());
}
return tuple;
}
template <>
inline Reference<IBackupContainer> Codec<Reference<IBackupContainer>>::unpack(Tuple const& val) {
ASSERT(val.size() == 1 || val.size() == 2);
ASSERT(val.size() == 3);
auto url = val.getString(0).toString();
Optional<std::string> encryptionKeyFileName;
if (val.size() == 2) {
encryptionKeyFileName = val.getString(1).toString();
Optional<std::string> proxy;
if (!val.getString(1).empty()) {
proxy = val.getString(1).toString();
}
return IBackupContainer::openContainer(url, encryptionKeyFileName);
Optional<std::string> encryptionKeyFileName;
if (!val.getString(2).empty()) {
encryptionKeyFileName = val.getString(2).toString();
}
return IBackupContainer::openContainer(url, proxy, encryptionKeyFileName);
}
class BackupConfig : public KeyBackedConfig {

View File

@ -256,7 +256,8 @@ std::vector<std::string> IBackupContainer::getURLFormats() {
// Get an IBackupContainer based on a container URL string
Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& url,
Optional<std::string> const& encryptionKeyFileName) {
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFileName) {
static std::map<std::string, Reference<IBackupContainer>> m_cache;
Reference<IBackupContainer>& r = m_cache[url];
@ -273,7 +274,7 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
// The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options.
S3BlobStoreEndpoint::ParametersT backupParams;
Reference<S3BlobStoreEndpoint> bstore =
S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams);
S3BlobStoreEndpoint::fromString(url, proxy, &resource, &lastOpenError, &backupParams);
if (resource.empty())
throw backup_invalid_url();
@ -317,7 +318,7 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
// Get a list of URLS to backup containers based on some a shorter URL. This function knows about some set of supported
// URL types which support this sort of backup discovery.
ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL) {
ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL, Optional<std::string> proxy) {
try {
StringRef u(baseURL);
if (u.startsWith("file://"_sr)) {
@ -327,8 +328,8 @@ ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL)
std::string resource;
S3BlobStoreEndpoint::ParametersT backupParams;
Reference<S3BlobStoreEndpoint> bstore =
S3BlobStoreEndpoint::fromString(baseURL, &resource, &IBackupContainer::lastOpenError, &backupParams);
Reference<S3BlobStoreEndpoint> bstore = S3BlobStoreEndpoint::fromString(
baseURL, proxy, &resource, &IBackupContainer::lastOpenError, &backupParams);
if (!resource.empty()) {
TraceEvent(SevWarn, "BackupContainer")
@ -370,8 +371,9 @@ ACTOR Future<std::vector<std::string>> listContainers_impl(std::string baseURL)
}
}
Future<std::vector<std::string>> IBackupContainer::listContainers(const std::string& baseURL) {
return listContainers_impl(baseURL);
Future<std::vector<std::string>> IBackupContainer::listContainers(const std::string& baseURL,
const Optional<std::string>& proxy) {
return listContainers_impl(baseURL, proxy);
}
ACTOR Future<Version> timeKeeperVersionFromDatetime(std::string datetime, Database db) {

View File

@ -156,6 +156,7 @@ struct BackupFileList {
struct BackupDescription {
BackupDescription() : snapshotBytes(0) {}
std::string url;
Optional<std::string> proxy;
std::vector<KeyspaceSnapshotFile> snapshots;
int64_t snapshotBytes;
// The version before which everything has been deleted by an expire
@ -294,11 +295,14 @@ public:
// Get an IBackupContainer based on a container spec string
static Reference<IBackupContainer> openContainer(const std::string& url,
const Optional<std::string>& encryptionKeyFileName = {});
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFileName);
static std::vector<std::string> getURLFormats();
static Future<std::vector<std::string>> listContainers(const std::string& baseURL);
static Future<std::vector<std::string>> listContainers(const std::string& baseURL,
const Optional<std::string>& proxy);
std::string const& getURL() const { return URL; }
Optional<std::string> const& getProxy() const { return proxy; }
Optional<std::string> const& getEncryptionKeyFileName() const { return encryptionKeyFileName; }
static std::string lastOpenError;
@ -306,6 +310,7 @@ public:
// TODO: change the following back to `private` once blob obj access is refactored
protected:
std::string URL;
Optional<std::string> proxy;
Optional<std::string> encryptionKeyFileName;
};

View File

@ -409,6 +409,7 @@ public:
Version logStartVersionOverride) {
state BackupDescription desc;
desc.url = bc->getURL();
desc.proxy = bc->getProxy();
TraceEvent("BackupContainerDescribe1")
.detail("URL", bc->getURL())
@ -1500,7 +1501,8 @@ Future<Void> BackupContainerFileSystem::createTestEncryptionKeyFile(std::string
// code but returning a different template type because you can't cast between them
Reference<BackupContainerFileSystem> BackupContainerFileSystem::openContainerFS(
const std::string& url,
Optional<std::string> const& encryptionKeyFileName) {
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFileName) {
static std::map<std::string, Reference<BackupContainerFileSystem>> m_cache;
Reference<BackupContainerFileSystem>& r = m_cache[url];
@ -1517,7 +1519,7 @@ Reference<BackupContainerFileSystem> BackupContainerFileSystem::openContainerFS(
// The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options.
S3BlobStoreEndpoint::ParametersT backupParams;
Reference<S3BlobStoreEndpoint> bstore =
S3BlobStoreEndpoint::fromString(url, &resource, &lastOpenError, &backupParams);
S3BlobStoreEndpoint::fromString(url, proxy, &resource, &lastOpenError, &backupParams);
if (resource.empty())
throw backup_invalid_url();
@ -1635,7 +1637,9 @@ ACTOR static Future<Void> testWriteSnapshotFile(Reference<IBackupFile> file, Key
return Void();
}
ACTOR Future<Void> testBackupContainer(std::string url, Optional<std::string> encryptionKeyFileName) {
ACTOR Future<Void> testBackupContainer(std::string url,
Optional<std::string> proxy,
Optional<std::string> encryptionKeyFileName) {
state FlowLock lock(100e6);
if (encryptionKeyFileName.present()) {
@ -1644,7 +1648,7 @@ ACTOR Future<Void> testBackupContainer(std::string url, Optional<std::string> en
printf("BackupContainerTest URL %s\n", url.c_str());
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url, encryptionKeyFileName);
state Reference<IBackupContainer> c = IBackupContainer::openContainer(url, proxy, encryptionKeyFileName);
// Make sure container doesn't exist, then create it.
try {
@ -1789,12 +1793,13 @@ ACTOR Future<Void> testBackupContainer(std::string url, Optional<std::string> en
}
TEST_CASE("/backup/containers/localdir/unencrypted") {
wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {}));
wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()), {}, {}));
return Void();
}
TEST_CASE("/backup/containers/localdir/encrypted") {
wait(testBackupContainer(format("file://%s/fdb_backups/%llx", params.getDataDir().c_str(), timer_int()),
{},
format("%s/test_encryption_key", params.getDataDir().c_str())));
return Void();
}
@ -1803,7 +1808,7 @@ TEST_CASE("/backup/containers/url") {
if (!g_network->isSimulated()) {
const char* url = getenv("FDB_TEST_BACKUP_URL");
ASSERT(url != nullptr);
wait(testBackupContainer(url, {}));
wait(testBackupContainer(url, {}, {}));
}
return Void();
}
@ -1813,7 +1818,7 @@ TEST_CASE("/backup/containers_list") {
state const char* url = getenv("FDB_TEST_BACKUP_URL");
ASSERT(url != nullptr);
printf("Listing %s\n", url);
std::vector<std::string> urls = wait(IBackupContainer::listContainers(url));
std::vector<std::string> urls = wait(IBackupContainer::listContainers(url, {}));
for (auto& u : urls) {
printf("%s\n", u.c_str());
}

View File

@ -81,9 +81,9 @@ public:
Future<bool> exists() override = 0;
// TODO: refactor this to separate out the "deal with blob store" stuff from the backup business logic
static Reference<BackupContainerFileSystem> openContainerFS(
const std::string& url,
const Optional<std::string>& encryptionKeyFileName = {});
static Reference<BackupContainerFileSystem> openContainerFS(const std::string& url,
const Optional<std::string>& proxy,
const Optional<std::string>& encryptionKeyFileName);
// Get a list of fileNames and their sizes in the container under the given path
// Although not required, an implementation can avoid traversing unwanted subfolders

View File

@ -4363,13 +4363,14 @@ public:
Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
Key bcUrl,
Optional<std::string> proxy,
Version targetVersion,
LockDB lockDB,
UID randomUID,
Key addPrefix,
Key removePrefix) {
// Sanity check backup is valid
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(bcUrl.toString());
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(bcUrl.toString(), proxy, {});
state BackupDescription desc = wait(bc->describeBackup());
wait(desc.resolveVersionTimes(cx));
@ -4430,6 +4431,7 @@ public:
struct RestoreRequest restoreRequest(restoreIndex,
restoreTag,
bcUrl,
proxy,
targetVersion,
range,
deterministicRandom()->randomUniqueID(),
@ -4510,6 +4512,7 @@ public:
ACTOR static Future<Void> submitBackup(FileBackupAgent* backupAgent,
Reference<ReadYourWritesTransaction> tr,
Key outContainer,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
@ -4555,7 +4558,8 @@ public:
backupContainer = joinPath(backupContainer, std::string("backup-") + nowStr.toString());
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupContainer, encryptionKeyFileName);
state Reference<IBackupContainer> bc =
IBackupContainer::openContainer(backupContainer, proxy, encryptionKeyFileName);
try {
wait(timeoutError(bc->create(), 30));
} catch (Error& e) {
@ -4642,6 +4646,7 @@ public:
Reference<ReadYourWritesTransaction> tr,
Key tagName,
Key backupURL,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
Version restoreVersion,
Key addPrefix,
@ -4710,7 +4715,7 @@ public:
// Point the tag to the new uid
tag.set(tr, { uid, false });
Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupURL.toString());
Reference<IBackupContainer> bc = IBackupContainer::openContainer(backupURL.toString(), proxy, {});
// Configure the new restore
restore.tag().set(tr, tagName.toString());
@ -5303,6 +5308,7 @@ public:
Optional<Database> cxOrig,
Key tagName,
Key url,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
WaitForComplete waitForComplete,
Version targetVersion,
@ -5320,7 +5326,7 @@ public:
throw restore_error();
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString(), proxy, {});
state BackupDescription desc = wait(bc->describeBackup(true));
if (cxOrig.present()) {
@ -5360,6 +5366,7 @@ public:
tr,
tagName,
url,
proxy,
ranges,
targetVersion,
addPrefix,
@ -5499,6 +5506,7 @@ public:
tagName,
ranges,
KeyRef(bc->getURL()),
bc->getProxy(),
targetVersion,
LockDB::True,
randomUid,
@ -5520,6 +5528,7 @@ public:
cx,
tagName,
KeyRef(bc->getURL()),
bc->getProxy(),
ranges,
WaitForComplete::True,
::invalidVersion,
@ -5561,13 +5570,14 @@ Future<Void> FileBackupAgent::submitParallelRestore(Database cx,
Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
Key bcUrl,
Optional<std::string> proxy,
Version targetVersion,
LockDB lockDB,
UID randomUID,
Key addPrefix,
Key removePrefix) {
return FileBackupAgentImpl::submitParallelRestore(
cx, backupTag, backupRanges, bcUrl, targetVersion, lockDB, randomUID, addPrefix, removePrefix);
cx, backupTag, backupRanges, bcUrl, proxy, targetVersion, lockDB, randomUID, addPrefix, removePrefix);
}
Future<Void> FileBackupAgent::atomicParallelRestore(Database cx,
@ -5582,6 +5592,7 @@ Future<Version> FileBackupAgent::restore(Database cx,
Optional<Database> cxOrig,
Key tagName,
Key url,
Optional<std::string> proxy,
Standalone<VectorRef<KeyRangeRef>> ranges,
WaitForComplete waitForComplete,
Version targetVersion,
@ -5598,6 +5609,7 @@ Future<Version> FileBackupAgent::restore(Database cx,
cxOrig,
tagName,
url,
proxy,
ranges,
waitForComplete,
targetVersion,
@ -5639,6 +5651,7 @@ Future<ERestoreState> FileBackupAgent::waitRestore(Database cx, Key tagName, Ver
Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction> tr,
Key outContainer,
Optional<std::string> proxy,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string const& tagName,
@ -5650,6 +5663,7 @@ Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction>
return FileBackupAgentImpl::submitBackup(this,
tr,
outContainer,
proxy,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,

View File

@ -49,6 +49,7 @@ struct RestoreRequest {
int index;
Key tagName;
Key url;
Optional<std::string> proxy;
Version targetVersion;
KeyRange range;
UID randomUid;
@ -64,27 +65,29 @@ struct RestoreRequest {
explicit RestoreRequest(const int index,
const Key& tagName,
const Key& url,
const Optional<std::string>& proxy,
Version targetVersion,
const KeyRange& range,
const UID& randomUid,
Key& addPrefix,
Key removePrefix)
: index(index), tagName(tagName), url(url), targetVersion(targetVersion), range(range), randomUid(randomUid),
addPrefix(addPrefix), removePrefix(removePrefix) {}
: index(index), tagName(tagName), url(url), proxy(proxy), targetVersion(targetVersion), range(range),
randomUid(randomUid), addPrefix(addPrefix), removePrefix(removePrefix) {}
// To change this serialization, ProtocolVersion::RestoreRequestValue must be updated, and downgrades need to be
// considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, index, tagName, url, targetVersion, range, randomUid, addPrefix, removePrefix, reply);
serializer(ar, index, tagName, url, proxy, targetVersion, range, randomUid, addPrefix, removePrefix, reply);
}
std::string toString() const {
std::stringstream ss;
ss << "index:" << std::to_string(index) << " tagName:" << tagName.contents().toString()
<< " url:" << url.contents().toString() << " targetVersion:" << std::to_string(targetVersion)
<< " range:" << range.toString() << " randomUid:" << randomUid.toString()
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
<< " url:" << url.contents().toString() << " proxy:" << (proxy.present() ? proxy.get() : "")
<< " targetVersion:" << std::to_string(targetVersion) << " range:" << range.toString()
<< " randomUid:" << randomUid.toString() << " addPrefix:" << addPrefix.toString()
<< " removePrefix:" << removePrefix.toString();
return ss.str();
}
};

View File

@ -162,7 +162,8 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
return r;
}
Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(std::string const& url,
Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(const std::string& url,
const Optional<std::string>& proxy,
std::string* resourceFromURL,
std::string* error,
ParametersT* ignored_parameters) {
@ -175,6 +176,13 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(std::string const
if (prefix != LiteralStringRef("blobstore"))
throw format("Invalid blobstore URL prefix '%s'", prefix.toString().c_str());
Optional<std::string> proxyHost, proxyPort;
if (proxy.present()) {
StringRef p(proxy.get());
proxyHost = p.eat(":").toString();
proxyPort = p.eat().toString();
}
Optional<StringRef> cred;
if (url.find("@") != std::string::npos) {
cred = t.eat("@");
@ -261,7 +269,8 @@ Reference<S3BlobStoreEndpoint> S3BlobStoreEndpoint::fromString(std::string const
creds = S3BlobStoreEndpoint::Credentials{ key.toString(), secret.toString(), securityToken.toString() };
}
return makeReference<S3BlobStoreEndpoint>(host.toString(), service.toString(), creds, knobs, extraHeaders);
return makeReference<S3BlobStoreEndpoint>(
host.toString(), service.toString(), proxyHost, proxyPort, creds, knobs, extraHeaders);
} catch (std::string& err) {
if (error != nullptr)
@ -624,11 +633,11 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
return rconn;
}
}
std::string service = b->service;
std::string host = b->host, service = b->service;
if (service.empty())
service = b->knobs.secure_connection ? "https" : "http";
state Reference<IConnection> conn =
wait(INetworkConnections::net()->connect(b->host, service, b->knobs.secure_connection ? true : false));
wait(INetworkConnections::net()->connect(host, service, b->knobs.secure_connection ? true : false));
wait(conn->connectHandshake());
TraceEvent("S3BlobStoreEndpointNewConnection")
@ -1609,7 +1618,7 @@ TEST_CASE("/backup/s3/v4headers") {
S3BlobStoreEndpoint::Credentials creds{ "AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "" }
// GET without query parameters
{
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", creds);
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds);
std::string verb("GET");
std::string resource("/test.txt");
HTTP::Headers headers;
@ -1624,7 +1633,7 @@ TEST_CASE("/backup/s3/v4headers") {
// GET with query parameters
{
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", creds);
S3BlobStoreEndpoint s3("s3.amazonaws.com", "s3", "proxy", "port", creds);
std::string verb("GET");
std::string resource("/test/examplebucket?Action=DescribeRegions&Version=2013-10-15");
HTTP::Headers headers;
@ -1639,7 +1648,7 @@ TEST_CASE("/backup/s3/v4headers") {
// POST
{
S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", creds);
S3BlobStoreEndpoint s3("s3.us-west-2.amazonaws.com", "s3", "proxy", "port", creds);
std::string verb("POST");
std::string resource("/simple.json");
HTTP::Headers headers;

View File

@ -99,11 +99,15 @@ public:
};
S3BlobStoreEndpoint(std::string const& host,
std::string service,
std::string const& service,
Optional<std::string> const& proxyHost,
Optional<std::string> const& proxyPort,
Optional<Credentials> const& creds,
BlobKnobs const& knobs = BlobKnobs(),
HTTP::Headers extraHeaders = HTTP::Headers())
: host(host), service(service), credentials(creds), lookupKey(creds.present() && creds.get().key.empty()),
: host(host), service(service), proxyHost(proxyHost), proxyPort(proxyPort),
useProxy(proxyHost.present() && proxyPort.present()), credentials(creds),
lookupKey(creds.present() && creds.get().key.empty()),
lookupSecret(creds.present() && creds.get().secret.empty()), knobs(knobs), extraHeaders(extraHeaders),
requestRate(new SpeedLimit(knobs.requests_per_second, 1)),
requestRateList(new SpeedLimit(knobs.list_requests_per_second, 1)),
@ -114,7 +118,7 @@ public:
recvRate(new SpeedLimit(knobs.max_recv_bytes_per_second, 1)), concurrentRequests(knobs.concurrent_requests),
concurrentUploads(knobs.concurrent_uploads), concurrentLists(knobs.concurrent_lists) {
if (host.empty())
if (host.empty() || (proxyHost.present() != proxyPort.present()))
throw connection_string_invalid();
}
@ -132,10 +136,11 @@ public:
// Parse url and return a S3BlobStoreEndpoint
// If the url has parameters that S3BlobStoreEndpoint can't consume then an error will be thrown unless
// ignored_parameters is given in which case the unconsumed parameters will be added to it.
static Reference<S3BlobStoreEndpoint> fromString(std::string const& url,
std::string* resourceFromURL = nullptr,
std::string* error = nullptr,
ParametersT* ignored_parameters = nullptr);
static Reference<S3BlobStoreEndpoint> fromString(const std::string& url,
const Optional<std::string>& proxy,
std::string* resourceFromURL,
std::string* error,
ParametersT* ignored_parameters);
// Get a normalized version of this URL with the given resource and any non-default BlobKnob values as URL
// parameters in addition to the passed params string
@ -151,6 +156,10 @@ public:
std::string host;
std::string service;
Optional<std::string> proxyHost;
Optional<std::string> proxyPort;
bool useProxy;
Optional<Credentials> credentials;
bool lookupKey;
bool lookupSecret;

View File

@ -2506,7 +2506,7 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
if (BM_DEBUG) {
fmt::print("BM constructing backup container from {}\n", SERVER_KNOBS->BG_URL.c_str());
}
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BM_DEBUG) {
printf("BM constructed backup container\n");
}

View File

@ -3021,7 +3021,7 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
if (BW_DEBUG) {
fmt::print("BW constructing backup container from {0}\n", SERVER_KNOBS->BG_URL);
}
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BW_DEBUG) {
printf("BW constructed backup container\n");
}

View File

@ -47,7 +47,8 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
RestoreRequest request);
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
std::vector<RestoreFileFR>* pRangeFiles,
Key url);
Key url,
Optional<std::string> proxy);
ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerData> self,
Database cx,
@ -317,7 +318,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
state std::vector<RestoreFileFR> allFiles;
state Version minRangeVersion = MAX_VERSION;
self->initBackupContainer(request.url);
self->initBackupContainer(request.url, request.proxy);
// Get all backup files' description and save them to files
state Version targetVersion =
@ -334,7 +335,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreControllerDa
// Build range versions: version of key ranges in range file
state KeyRangeMap<Version> rangeVersions(minRangeVersion, allKeys.end);
if (SERVER_KNOBS->FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE) {
wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url));
wait(buildRangeVersions(&rangeVersions, &rangeFiles, request.url, request.proxy));
} else {
// Debug purpose, dump range versions
auto ranges = rangeVersions.ranges();
@ -881,13 +882,14 @@ ACTOR static Future<Void> insertRangeVersion(KeyRangeMap<Version>* pRangeVersion
// Expensive and slow operation that should not run in real prod.
ACTOR static Future<Void> buildRangeVersions(KeyRangeMap<Version>* pRangeVersions,
std::vector<RestoreFileFR>* pRangeFiles,
Key url) {
Key url,
Optional<std::string> proxy) {
if (!g_network->isSimulated()) {
TraceEvent(SevError, "ExpensiveBuildRangeVersions")
.detail("Reason", "Parsing all range files is slow and memory intensive");
return Void();
}
Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString(), proxy, {});
// Key ranges not in range files are empty;
// Assign highest version to avoid applying any mutation in these ranges

View File

@ -446,13 +446,15 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
}
}
void initBackupContainer(Key url) {
void initBackupContainer(Key url, Optional<std::string> proxy) {
if (bcUrl == url && bc.isValid()) {
return;
}
TraceEvent("FastRestoreControllerInitBackupContainer").detail("URL", url);
TraceEvent("FastRestoreControllerInitBackupContainer")
.detail("URL", url)
.detail("Proxy", proxy.present() ? proxy.get() : "");
bcUrl = url;
bc = IBackupContainer::openContainer(url.toString());
bc = IBackupContainer::openContainer(url.toString(), proxy, {});
}
};

View File

@ -262,7 +262,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf,
when(RestoreLoadFileRequest req = waitNext(loaderInterf.loadFile.getFuture())) {
requestTypeStr = "loadFile";
hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty();
self->initBackupContainer(req.param.url);
self->initBackupContainer(req.param.url, req.param.proxy);
self->loadingQueue.push(req);
if (!hasQueuedRequests) {
self->hasPendingRequests->set(true);

View File

@ -226,12 +226,12 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
finishedBatch = NotifiedVersion(0);
}
void initBackupContainer(Key url) {
void initBackupContainer(Key url, Optional<std::string> proxy) {
if (bcUrl == url && bc.isValid()) {
return;
}
bcUrl = url;
bc = IBackupContainer::openContainer(url.toString());
bc = IBackupContainer::openContainer(url.toString(), proxy, {});
}
};

View File

@ -368,6 +368,7 @@ struct LoadingParam {
bool isRangeFile;
Key url;
Optional<std::string> proxy;
Optional<Version> rangeVersion; // range file's version
int64_t blockSize;
@ -386,12 +387,13 @@ struct LoadingParam {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset);
serializer(ar, isRangeFile, url, proxy, rangeVersion, blockSize, asset);
}
std::string toString() const {
std::stringstream str;
str << "isRangeFile:" << isRangeFile << " url:" << url.toString()
<< " proxy:" << (proxy.present() ? proxy.get() : "")
<< " rangeVersion:" << (rangeVersion.present() ? rangeVersion.get() : -1) << " blockSize:" << blockSize
<< " RestoreAsset:" << asset.toString();
return str.str();

View File

@ -93,6 +93,7 @@ struct AtomicRestoreWorkload : TestWorkload {
try {
wait(backupAgent.submitBackup(cx,
StringRef(backupContainer),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
BackupAgentBase::getDefaultTagName(),

View File

@ -222,6 +222,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
try {
wait(backupAgent->submitBackup(cx,
StringRef(backupContainer),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
tag.toString(),
@ -377,6 +378,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
cx,
self->backupTag,
KeyRef(lastBackupContainer),
{},
WaitForComplete::True,
::invalidVersion,
Verbose::True,
@ -478,6 +480,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
// the configuration to disable backup workers before restore.
extraBackup = backupAgent.submitBackup(cx,
LiteralStringRef("file://simfdb/backups/"),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
@ -523,7 +526,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
.detail("BackupTag", printable(self->backupTag));
// start restoring
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
auto container =
IBackupContainer::openContainer(lastBackupContainer->getURL(), lastBackupContainer->getProxy(), {});
BackupDescription desc = wait(container->describeBackup());
ASSERT(self->usePartitionedLogs == desc.partitioned);
ASSERT(desc.minRestorableVersion.present()); // We must have a valid backup now.
@ -566,6 +570,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
self->backupTag,
self->backupRanges,
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
targetVersion,
self->locked,
randomID,

View File

@ -266,6 +266,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
try {
wait(backupAgent->submitBackup(cx,
StringRef(backupContainer),
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
tag.toString(),
@ -423,6 +424,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
self->backupTag,
KeyRef(lastBackupContainer),
{},
WaitForComplete::True,
::invalidVersion,
Verbose::True,
@ -523,6 +525,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
try {
extraBackup = backupAgent.submitBackup(cx,
"file://simfdb/backups/"_sr,
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
@ -557,7 +560,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
.detail("RestoreAfter", self->restoreAfter)
.detail("BackupTag", printable(self->backupTag));
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL());
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL(),
lastBackupContainer->getProxy(),
lastBackupContainer->getEncryptionKeyFileName());
BackupDescription desc = wait(container->describeBackup());
Version targetVersion = -1;
@ -593,6 +598,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
restoreTag,
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
WaitForComplete::True,
targetVersion,
Verbose::True,
@ -616,6 +622,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
restoreTag,
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
self->restoreRanges,
WaitForComplete::True,
targetVersion,
@ -646,6 +653,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
restoreTags[restoreIndex],
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
self->restoreRanges,
WaitForComplete::True,
::invalidVersion,
@ -675,6 +683,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
cx,
restoreTags[restoreIndex],
KeyRef(lastBackupContainer->getURL()),
lastBackupContainer->getProxy(),
WaitForComplete::True,
::invalidVersion,
Verbose::True,

View File

@ -62,6 +62,7 @@ struct BackupToBlobWorkload : TestWorkload {
wait(delay(self->backupAfter));
wait(backupAgent.submitBackup(cx,
self->backupURL,
{},
self->initSnapshotInterval,
self->snapshotInterval,
self->backupTag.toString(),

View File

@ -250,13 +250,13 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
if (BGW_DEBUG) {
printf("Blob Granule Correctness constructing simulated backup container\n");
}
self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/");
self->bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/", {}, {});
} else {
if (BGW_DEBUG) {
printf("Blob Granule Correctness constructing backup container from %s\n",
SERVER_KNOBS->BG_URL.c_str());
}
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BGW_DEBUG) {
printf("Blob Granule Correctness constructed backup container\n");
}

View File

@ -90,13 +90,13 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
if (BGV_DEBUG) {
printf("Blob Granule Verifier constructing simulated backup container\n");
}
bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/");
bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/", {}, {});
} else {
if (BGV_DEBUG) {
printf("Blob Granule Verifier constructing backup container from %s\n",
SERVER_KNOBS->BG_URL.c_str());
}
bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL, {}, {});
if (BGV_DEBUG) {
printf("Blob Granule Verifier constructed backup container\n");
}

View File

@ -98,12 +98,12 @@ struct IncrementalBackupWorkload : TestWorkload {
if (!backupContainer.isValid()) {
TraceEvent("IBackupCheckListContainersAttempt").log();
state std::vector<std::string> containers =
wait(IBackupContainer::listContainers(self->backupDir.toString()));
wait(IBackupContainer::listContainers(self->backupDir.toString(), {}));
TraceEvent("IBackupCheckListContainersSuccess")
.detail("Size", containers.size())
.detail("First", containers.front());
if (containers.size()) {
backupContainer = IBackupContainer::openContainer(containers.front());
backupContainer = IBackupContainer::openContainer(containers.front(), {}, {});
}
}
state bool e = wait(backupContainer->exists());
@ -152,6 +152,7 @@ struct IncrementalBackupWorkload : TestWorkload {
try {
wait(self->backupAgent.submitBackup(cx,
self->backupDir,
{},
0,
1e8,
self->tag.toString(),
@ -219,7 +220,7 @@ struct IncrementalBackupWorkload : TestWorkload {
}
TraceEvent("IBackupStartListContainersAttempt").log();
state std::vector<std::string> containers =
wait(IBackupContainer::listContainers(self->backupDir.toString()));
wait(IBackupContainer::listContainers(self->backupDir.toString(), {}));
TraceEvent("IBackupStartListContainersSuccess")
.detail("Size", containers.size())
.detail("First", containers.front());
@ -229,6 +230,7 @@ struct IncrementalBackupWorkload : TestWorkload {
cx,
Key(self->tag.toString()),
backupURL,
{},
WaitForComplete::True,
invalidVersion,
Verbose::True,

View File

@ -114,6 +114,7 @@ struct RestoreBackupWorkload final : TestWorkload {
cx,
self->tag,
Key(self->backupContainer->getURL()),
self->backupContainer->getProxy(),
WaitForComplete::True,
::invalidVersion,
Verbose::True)));

View File

@ -61,8 +61,8 @@ struct RestoreFromBlobWorkload : TestWorkload {
restoreRanges.push_back_deep(restoreRanges.arena(), normalKeys);
wait(delay(self->restoreAfter));
Version v =
wait(backupAgent.restore(cx, {}, self->backupTag, self->backupURL, restoreRanges, self->waitForComplete));
Version v = wait(
backupAgent.restore(cx, {}, self->backupTag, self->backupURL, {}, restoreRanges, self->waitForComplete));
return Void();
}

View File

@ -57,6 +57,7 @@ struct SubmitBackupWorkload final : TestWorkload {
try {
wait(self->backupAgent.submitBackup(cx,
self->backupDir,
{},
self->initSnapshotInterval,
self->snapshotInterval,
self->tag.toString(),