1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-26 09:22:34 +08:00

Merge branch 'master' into merge

This commit is contained in:
Trevor Clinkenbeard 2020-10-29 22:27:40 -07:00 committed by GitHub
commit 0faaecc780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 4980 additions and 3754 deletions

File diff suppressed because it is too large Load Diff

@ -0,0 +1,88 @@
/*
* AsyncTaskThread.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/AsyncTaskThread.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
class TerminateTask final : public IAsyncTask {
public:
void operator()() override { ASSERT(false); }
bool isTerminate() const override { return true; }
};
ACTOR Future<Void> asyncTaskThreadClient(AsyncTaskThread* asyncTaskThread, int* sum, int count) {
state int i = 0;
for (; i < count; ++i) {
wait(asyncTaskThread->execAsync([sum = sum] {
++(*sum);
return Void();
}));
}
return Void();
}
} // namespace
const double AsyncTaskThread::meanDelay = 0.01;
AsyncTaskThread::AsyncTaskThread() : thread([this] { run(this); }) {}
AsyncTaskThread::~AsyncTaskThread() {
bool wakeUp = false;
{
std::lock_guard<std::mutex> g(m);
wakeUp = queue.push(std::make_shared<TerminateTask>());
}
if (wakeUp) {
cv.notify_one();
}
thread.join();
}
void AsyncTaskThread::run(AsyncTaskThread* self) {
while (true) {
std::shared_ptr<IAsyncTask> task;
{
std::unique_lock<std::mutex> lk(self->m);
self->cv.wait(lk, [self] { return !self->queue.canSleep(); });
task = self->queue.pop().get();
if (task->isTerminate()) {
return;
}
}
(*task)();
}
}
TEST_CASE("/asynctaskthread/add") {
state int sum = 0;
state AsyncTaskThread asyncTaskThread;
std::vector<Future<Void>> clients;
clients.reserve(10);
for (int i = 0; i < 10; ++i) {
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, 100));
}
wait(waitForAll(clients));
ASSERT(sum == 1000);
return Void();
}

@ -0,0 +1,98 @@
/*
* AsyncTaskThread.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __ASYNC_TASK_THREAD_H__
#define __ASYNC_TASK_THREAD_H__
#include <thread>
#include <memory>
#include <mutex>
#include "flow/network.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/ThreadSafeQueue.h"
class IAsyncTask {
public:
virtual void operator()() = 0;
virtual ~IAsyncTask() = default;
virtual bool isTerminate() const = 0;
};
template <class F>
class AsyncTask final : public IAsyncTask {
F func;
public:
AsyncTask(const F& func) : func(func) {}
void operator()() override { func(); }
bool isTerminate() const override { return false; }
};
class AsyncTaskThread {
ThreadSafeQueue<std::shared_ptr<IAsyncTask>> queue;
std::condition_variable cv;
std::mutex m;
std::thread thread;
static void run(AsyncTaskThread* self);
template <class F>
void addTask(const F& func) {
bool wakeUp = false;
{
std::lock_guard<std::mutex> g(m);
wakeUp = queue.push(std::make_shared<AsyncTask<F>>(func));
}
if (wakeUp) {
cv.notify_one();
}
}
static const double meanDelay;
public:
AsyncTaskThread();
// Warning: This destructor can hang if a task hangs, so it is
// up to the caller to prevent tasks from hanging indefinitely
~AsyncTaskThread();
template <class F>
auto execAsync(const F& func, TaskPriority priority = TaskPriority::DefaultOnMainThread)
-> Future<decltype(func())> {
if (g_network->isSimulated()) {
return map(delayJittered(meanDelay), [func](Void _) { return func(); });
}
Promise<decltype(func())> promise;
addTask([promise, func, priority] {
try {
auto funcResult = func();
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, nullptr, priority);
} catch (Error& e) {
onMainThreadVoid([promise, e] { promise.sendError(e); }, nullptr, priority);
}
});
return promise.getFuture();
}
};
#endif

File diff suppressed because it is too large Load Diff

@ -18,8 +18,8 @@
* limitations under the License.
*/
#ifndef FDBCLIENT_BackupContainer_H
#define FDBCLIENT_BackupContainer_H
#ifndef FDBCLIENT_BACKUP_CONTAINER_H
#define FDBCLIENT_BACKUP_CONTAINER_H
#pragma once
#include "flow/flow.h"
@ -40,7 +40,7 @@ Future<Version> timeKeeperVersionFromDatetime(std::string const &datetime, Datab
// TODO: Move the log file and range file format encoding/decoding stuff to this file and behind interfaces.
class IBackupFile {
public:
IBackupFile(std::string fileName) : m_fileName(fileName), m_offset(0) {}
IBackupFile(const std::string& fileName) : m_fileName(fileName), m_offset(0) {}
virtual ~IBackupFile() {}
// Backup files are append-only and cannot have more than 1 append outstanding at once.
virtual Future<Void> append(const void *data, int len) = 0;
@ -247,7 +247,7 @@ public:
int64_t totalBytes) = 0;
// Open a file for read by name
virtual Future<Reference<IAsyncFile>> readFile(std::string name) = 0;
virtual Future<Reference<IAsyncFile>> readFile(const std::string& name) = 0;
// Returns the key ranges in the snapshot file. This is an expensive function
// and should only be used in simulation for sanity check.
@ -289,9 +289,9 @@ public:
bool logsOnly = false, Version beginVersion = -1) = 0;
// Get an IBackupContainer based on a container spec string
static Reference<IBackupContainer> openContainer(std::string url);
static Reference<IBackupContainer> openContainer(const std::string& url);
static std::vector<std::string> getURLFormats();
static Future<std::vector<std::string>> listContainers(std::string baseURL);
static Future<std::vector<std::string>> listContainers(const std::string& baseURL);
std::string getURL() const {
return URL;
@ -303,4 +303,4 @@ private:
std::string URL;
};
#endif
#endif

@ -0,0 +1,279 @@
/*
* BackupContainerAzureBlobStore.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/BackupContainerAzureBlobStore.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class BackupContainerAzureBlobStoreImpl {
public:
using AzureClient = azure::storage_lite::blob_client;
class ReadFile final : public IAsyncFile, ReferenceCounted<ReadFile> {
AsyncTaskThread& asyncTaskThread;
std::string containerName;
std::string blobName;
AzureClient* client;
public:
ReadFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName,
AzureClient* client)
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
void addref() override { ReferenceCounted<ReadFile>::addref(); }
void delref() override { ReferenceCounted<ReadFile>::delref(); }
Future<int> read(void* data, int length, int64_t offset) {
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
blobName = this->blobName, data, length, offset] {
std::ostringstream oss(std::ios::out | std::ios::binary);
client->download_blob_to_stream(containerName, blobName, offset, length, oss);
auto str = oss.str();
memcpy(data, str.c_str(), str.size());
return static_cast<int>(str.size());
});
}
Future<Void> zeroRange(int64_t offset, int64_t length) override { throw file_not_writable(); }
Future<Void> write(void const* data, int length, int64_t offset) override { throw file_not_writable(); }
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
Future<Void> sync() override { throw file_not_writable(); }
Future<int64_t> size() const override {
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
blobName = this->blobName] {
return static_cast<int64_t>(client->get_blob_properties(containerName, blobName).get().response().size);
});
}
std::string getFilename() const override { return blobName; }
int64_t debugFD() const override { return 0; }
};
class WriteFile final : public IAsyncFile, ReferenceCounted<WriteFile> {
AsyncTaskThread& asyncTaskThread;
AzureClient* client;
std::string containerName;
std::string blobName;
int64_t m_cursor{ 0 };
// Ideally this buffer should not be a string, but
// the Azure SDK only supports/tests uploading to append
// blobs from a stringstream.
std::string buffer;
static constexpr size_t bufferLimit = 1 << 20;
public:
WriteFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName,
AzureClient* client)
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
void addref() override { ReferenceCounted<WriteFile>::addref(); }
void delref() override { ReferenceCounted<WriteFile>::delref(); }
Future<int> read(void* data, int length, int64_t offset) override { throw file_not_readable(); }
Future<Void> write(void const* data, int length, int64_t offset) override {
if (offset != m_cursor) {
throw non_sequential_op();
}
m_cursor += length;
auto p = static_cast<char const*>(data);
buffer.insert(buffer.cend(), p, p + length);
if (buffer.size() > bufferLimit) {
return sync();
} else {
return Void();
}
}
Future<Void> truncate(int64_t size) override {
if (size != m_cursor) {
throw non_sequential_op();
}
return Void();
}
Future<Void> sync() override {
auto movedBuffer = std::move(buffer);
buffer.clear();
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
blobName = this->blobName, buffer = std::move(movedBuffer)] {
std::istringstream iss(std::move(buffer));
auto resp = client->append_block_from_stream(containerName, blobName, iss).get();
return Void();
});
}
Future<int64_t> size() const override {
return asyncTaskThread.execAsync(
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
auto resp = client->get_blob_properties(containerName, blobName).get().response();
ASSERT(resp.valid()); // TODO: Should instead throw here
return static_cast<int64_t>(resp.size);
});
}
std::string getFilename() const override { return blobName; }
int64_t debugFD() const override { return -1; }
};
class BackupFile final : public IBackupFile, ReferenceCounted<BackupFile> {
Reference<IAsyncFile> m_file;
public:
BackupFile(const std::string& fileName, Reference<IAsyncFile> file) : IBackupFile(fileName), m_file(file) {}
Future<Void> append(const void* data, int len) override {
Future<Void> r = m_file->write(data, len, m_offset);
m_offset += len;
return r;
}
Future<Void> finish() override {
Reference<BackupFile> self = Reference<BackupFile>::addRef(this);
return map(m_file->sync(), [=](Void _) {
self->m_file.clear();
return Void();
});
}
void addref() override { ReferenceCounted<BackupFile>::addref(); }
void delref() override { ReferenceCounted<BackupFile>::delref(); }
};
static bool isDirectory(const std::string& blobName) { return blobName.size() && blobName.back() == '/'; }
ACTOR static Future<Reference<IAsyncFile>> readFile(BackupContainerAzureBlobStore* self, std::string fileName) {
bool exists = wait(self->blobExists(fileName));
if (!exists) {
throw file_not_found();
}
return Reference<IAsyncFile>(
new ReadFile(self->asyncTaskThread, self->containerName, fileName, self->client.get()));
}
ACTOR static Future<Reference<IBackupFile>> writeFile(BackupContainerAzureBlobStore* self, std::string fileName) {
wait(self->asyncTaskThread.execAsync(
[client = self->client.get(), containerName = self->containerName, fileName = fileName] {
auto outcome = client->create_append_blob(containerName, fileName).get();
return Void();
}));
return Reference<IBackupFile>(
new BackupFile(fileName, Reference<IAsyncFile>(new WriteFile(self->asyncTaskThread, self->containerName,
fileName, self->client.get()))));
}
static void listFiles(AzureClient* client, const std::string& containerName, const std::string& path,
std::function<bool(std::string const&)> folderPathFilter,
BackupContainerFileSystem::FilesAndSizesT& result) {
auto resp = client->list_blobs_segmented(containerName, "/", "", path).get().response();
for (const auto& blob : resp.blobs) {
if (isDirectory(blob.name) && folderPathFilter(blob.name)) {
listFiles(client, containerName, blob.name, folderPathFilter, result);
} else {
result.emplace_back(blob.name, blob.content_length);
}
}
}
ACTOR static Future<Void> deleteContainer(BackupContainerAzureBlobStore* self, int* pNumDeleted) {
state int filesToDelete = 0;
if (pNumDeleted) {
BackupContainerFileSystem::FilesAndSizesT files = wait(self->listFiles());
filesToDelete = files.size();
}
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
client->delete_container(containerName).wait();
return Void();
}));
if (pNumDeleted) {
*pNumDeleted += filesToDelete;
}
return Void();
}
};
Future<bool> BackupContainerAzureBlobStore::blobExists(const std::string& fileName) {
return asyncTaskThread.execAsync(
[client = this->client.get(), containerName = this->containerName, fileName = fileName] {
auto resp = client->get_blob_properties(containerName, fileName).get().response();
return resp.valid();
});
}
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore(const NetworkAddress& address,
const std::string& accountName,
const std::string& containerName)
: containerName(containerName) {
std::string accountKey = std::getenv("AZURE_KEY");
auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(accountName, accountKey);
auto storageAccount = std::make_shared<azure::storage_lite::storage_account>(
accountName, credential, false, format("http://%s/%s", address.toString().c_str(), accountName.c_str()));
client = std::make_unique<AzureClient>(storageAccount, 1);
}
void BackupContainerAzureBlobStore::addref() {
return ReferenceCounted<BackupContainerAzureBlobStore>::addref();
}
void BackupContainerAzureBlobStore::delref() {
return ReferenceCounted<BackupContainerAzureBlobStore>::delref();
}
Future<Void> BackupContainerAzureBlobStore::create() {
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
client->create_container(containerName).wait();
return Void();
});
}
Future<bool> BackupContainerAzureBlobStore::exists() {
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
auto resp = client->get_container_properties(containerName).get().response();
return resp.valid();
});
}
Future<Reference<IAsyncFile>> BackupContainerAzureBlobStore::readFile(const std::string& fileName) {
return BackupContainerAzureBlobStoreImpl::readFile(this, fileName);
}
Future<Reference<IBackupFile>> BackupContainerAzureBlobStore::writeFile(const std::string& fileName) {
return BackupContainerAzureBlobStoreImpl::writeFile(this, fileName);
}
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerAzureBlobStore::listFiles(
const std::string& path, std::function<bool(std::string const&)> folderPathFilter) {
return asyncTaskThread.execAsync([client = this->client.get(), containerName = this->containerName, path = path,
folderPathFilter = folderPathFilter] {
FilesAndSizesT result;
BackupContainerAzureBlobStoreImpl::listFiles(client, containerName, path, folderPathFilter, result);
return result;
});
}
Future<Void> BackupContainerAzureBlobStore::deleteFile(const std::string& fileName) {
return asyncTaskThread.execAsync(
[containerName = this->containerName, fileName = fileName, client = client.get()]() {
client->delete_blob(containerName, fileName).wait();
return Void();
});
}
Future<Void> BackupContainerAzureBlobStore::deleteContainer(int* pNumDeleted) {
return BackupContainerAzureBlobStoreImpl::deleteContainer(this, pNumDeleted);
}
Future<std::vector<std::string>> BackupContainerAzureBlobStore::listURLs(const std::string& baseURL) {
// TODO: Implement this
return std::vector<std::string>{};
}
std::string BackupContainerAzureBlobStore::getURLFormat() {
return "azure://<ip>:<port>/<accountname>/<container>/<path_to_file>";
}

@ -0,0 +1,71 @@
/*
* BackupContainerAzureBlobStore.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if (!defined FDBCLIENT_BACKUP_CONTAINER_AZURE_BLOBSTORE_H) && (defined BUILD_AZURE_BACKUP)
#define FDBCLIENT_BACKUP_CONTAINER_AZURE_BLOBSTORE_H
#pragma once
#include "fdbclient/AsyncTaskThread.h"
#include "fdbclient/BackupContainerFileSystem.h"
#include "storage_credential.h"
#include "storage_account.h"
#include "blob/blob_client.h"
class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
ReferenceCounted<BackupContainerAzureBlobStore> {
using AzureClient = azure::storage_lite::blob_client;
std::unique_ptr<AzureClient> client;
std::string containerName;
AsyncTaskThread asyncTaskThread;
Future<bool> blobExists(const std::string& fileName);
friend class BackupContainerAzureBlobStoreImpl;
public:
BackupContainerAzureBlobStore(const NetworkAddress& address, const std::string& accountName,
const std::string& containerName);
void addref() override;
void delref() override;
Future<Void> create() override;
Future<bool> exists() override;
Future<Reference<IAsyncFile>> readFile(const std::string& fileName) override;
Future<Reference<IBackupFile>> writeFile(const std::string& fileName) override;
Future<FilesAndSizesT> listFiles(const std::string& path = "",
std::function<bool(std::string const&)> folderPathFilter = nullptr) override;
Future<Void> deleteFile(const std::string& fileName) override;
Future<Void> deleteContainer(int* pNumDeleted) override;
static Future<std::vector<std::string>> listURLs(const std::string& baseURL);
static std::string getURLFormat();
};
#endif

File diff suppressed because it is too large Load Diff

@ -0,0 +1,182 @@
/*
* BackupContainerFileSystem.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_BACKUP_CONTAINER_FILESYSTEM_H
#define FDBCLIENT_BACKUP_CONTAINER_FILESYSTEM_H
#pragma once
#include "fdbclient/BackupContainer.h"
#include "fdbclient/FDBTypes.h"
#include "flow/Trace.h"
#include "fdbclient/BackupContainer.h"
/* BackupContainerFileSystem implements a backup container which stores files in a nested folder structure.
* Inheritors must only defined methods for writing, reading, deleting, sizing, and listing files.
*
* Snapshot manifests (a complete set of files constituting a database snapshot for the backup's target ranges)
* are stored as JSON files at paths like
* /snapshots/snapshot,minVersion,maxVersion,totalBytes
*
* Key range files for snapshots are stored at paths like
* /kvranges/snapshot,startVersion/N/range,version,uid,blockSize
* where startVersion is the version at which the backup snapshot execution began and N is a number
* that is increased as key range files are generated over time (at varying rates) such that there
* are around 5,000 key range files in each folder.
*
* Note that startVersion will NOT correspond to the minVersion of a snapshot manifest because
* snapshot manifest min/max versions are based on the actual contained data and the first data
* file written will be after the start version of the snapshot's execution.
*
* Log files are at file paths like
* /plogs/.../log,startVersion,endVersion,UID,tagID-of-N,blocksize
* /logs/.../log,startVersion,endVersion,UID,blockSize
* where ... is a multi level path which sorts lexically into version order and results in approximately 1
* unique folder per day containing about 5,000 files. Logs after FDB 6.3 are stored in "plogs"
* directory and are partitioned according to tagIDs (0, 1, 2, ...) and the total number partitions is N.
* Old backup logs FDB 6.2 and earlier are stored in "logs" directory and are not partitioned.
* After FDB 6.3, users can choose to use the new partitioned logs or old logs.
*
*
* BACKWARD COMPATIBILITY
*
* Prior to FDB version 6.0.16, key range files were stored using a different folder scheme. Newer versions
* still support this scheme for all restore and backup management operations but key range files generated
* by backup using version 6.0.16 or later use the scheme describe above.
*
* The old format stored key range files at paths like
* /ranges/.../range,version,uid,blockSize
* where ... is a multi level path with sorts lexically into version order and results in up to approximately
* 900 unique folders per day. The number of files per folder depends on the configured snapshot rate and
* database size and will vary from 1 to around 5,000.
*/
class BackupContainerFileSystem : public IBackupContainer {
public:
void addref() override = 0;
void delref() override = 0;
BackupContainerFileSystem() {}
virtual ~BackupContainerFileSystem() {}
// Create the container
Future<Void> create() override = 0;
Future<bool> exists() override = 0;
// Get a list of fileNames and their sizes in the container under the given path
// Although not required, an implementation can avoid traversing unwanted subfolders
// by calling folderPathFilter(absoluteFolderPath) and checking for a false return value.
using FilesAndSizesT = std::vector<std::pair<std::string, int64_t>>;
virtual Future<FilesAndSizesT> listFiles(const std::string& path = "",
std::function<bool(std::string const&)> folderPathFilter = nullptr) = 0;
// Open a file for read by fileName
Future<Reference<IAsyncFile>> readFile(const std::string& fileName) override = 0;
// Open a file for write by fileName
virtual Future<Reference<IBackupFile>> writeFile(const std::string& fileName) = 0;
// Delete a file
virtual Future<Void> deleteFile(const std::string& fileName) = 0;
// Delete entire container. During the process, if pNumDeleted is not null it will be
// updated with the count of deleted files so that progress can be seen.
Future<Void> deleteContainer(int* pNumDeleted) override = 0;
Future<Reference<IBackupFile>> writeLogFile(Version beginVersion, Version endVersion, int blockSize) final;
Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
uint16_t tagId, int totalTags) final;
Future<Reference<IBackupFile>> writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount,
Version fileVersion, int blockSize) override;
Future<std::pair<std::vector<RangeFile>, std::map<std::string, KeyRange>>> readKeyspaceSnapshot(
KeyspaceSnapshotFile snapshot);
Future<Void> writeKeyspaceSnapshotFile(const std::vector<std::string>& fileNames,
const std::vector<std::pair<Key, Key>>& beginEndKeys,
int64_t totalBytes) final;
// List log files, unsorted, which contain data at any version >= beginVersion and <= targetVersion.
// "partitioned" flag indicates if new partitioned mutation logs or old logs should be listed.
Future<std::vector<LogFile>> listLogFiles(Version beginVersion, Version targetVersion, bool partitioned);
// List range files, unsorted, which contain data at or between beginVersion and endVersion
// Note: The contents of each top level snapshot.N folder do not necessarily constitute a valid snapshot
// and therefore listing files is not how RestoreSets are obtained.
// Note: Snapshots partially written using FDB versions prior to 6.0.16 will have some range files stored
// using the old folder scheme read by old_listRangeFiles
Future<std::vector<RangeFile>> listRangeFiles(Version beginVersion, Version endVersion);
// List snapshots which have been fully written, in sorted beginVersion order, which start before end and finish on
// or after begin
Future<std::vector<KeyspaceSnapshotFile>> listKeyspaceSnapshots(Version begin = 0,
Version end = std::numeric_limits<Version>::max());
Future<BackupFileList> dumpFileList(Version begin, Version end) override;
// Uses the virtual methods to describe the backup contents
Future<BackupDescription> describeBackup(bool deepScan, Version logStartVersionOverride) final;
// Delete all data up to (but not including endVersion)
Future<Void> expireData(Version expireEndVersion, bool force, ExpireProgress* progress,
Version restorableBeginVersion) final;
Future<KeyRange> getSnapshotFileKeyRange(const RangeFile& file) final;
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion, VectorRef<KeyRangeRef> keyRangesFilter,
bool logsOnly, Version beginVersion) final;
private:
struct VersionProperty {
VersionProperty(Reference<BackupContainerFileSystem> bc, const std::string& name)
: bc(bc), path("properties/" + name) {}
Reference<BackupContainerFileSystem> bc;
std::string path;
Future<Optional<Version>> get();
Future<Void> set(Version v);
Future<Void> clear();
};
// To avoid the need to scan the underyling filesystem in many cases, some important version boundaries are stored
// in named files. These versions also indicate what version ranges are known to be deleted or partially deleted.
//
// The values below describe version ranges as follows:
// 0 - expiredEndVersion All files in this range have been deleted
// expiredEndVersion - unreliableEndVersion Some files in this range may have been deleted.
//
// logBeginVersion - logEnd Log files are contiguous in this range and have NOT been deleted by
// fdbbackup logEnd - infinity Files in this range may or may not exist yet
//
VersionProperty logBeginVersion();
VersionProperty logEndVersion();
VersionProperty expiredEndVersion();
VersionProperty unreliableEndVersion();
VersionProperty logType();
// List range files, unsorted, which contain data at or between beginVersion and endVersion
// NOTE: This reads the range file folder schema from FDB 6.0.15 and earlier and is provided for backward
// compatibility
Future<std::vector<RangeFile>> old_listRangeFiles(Version beginVersion, Version endVersion);
friend class BackupContainerFileSystemImpl;
};
#endif

@ -0,0 +1,255 @@
/*
* BackupContainerLocalDirectory.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/BackupContainerLocalDirectory.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/Platform.actor.h"
#include "flow/Platform.h"
#include "fdbrpc/simulator.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
public:
BackupFile(std::string fileName, Reference<IAsyncFile> file, std::string finalFullPath)
: IBackupFile(fileName), m_file(file), m_finalFullPath(finalFullPath) {}
Future<Void> append(const void* data, int len) {
Future<Void> r = m_file->write(data, len, m_offset);
m_offset += len;
return r;
}
ACTOR static Future<Void> finish_impl(Reference<BackupFile> f) {
wait(f->m_file->truncate(f->size())); // Some IAsyncFile implementations extend in whole block sizes.
wait(f->m_file->sync());
std::string name = f->m_file->getFilename();
f->m_file.clear();
renameFile(name, f->m_finalFullPath);
return Void();
}
Future<Void> finish() { return finish_impl(Reference<BackupFile>::addRef(this)); }
void addref() override { return ReferenceCounted<BackupFile>::addref(); }
void delref() override { return ReferenceCounted<BackupFile>::delref(); }
private:
Reference<IAsyncFile> m_file;
std::string m_finalFullPath;
};
ACTOR static Future<BackupContainerFileSystem::FilesAndSizesT> listFiles_impl(std::string path, std::string m_path) {
state std::vector<std::string> files;
wait(platform::findFilesRecursivelyAsync(joinPath(m_path, path), &files));
BackupContainerFileSystem::FilesAndSizesT results;
// Remove .lnk files from results, they are a side effect of a backup that was *read* during simulation. See
// openFile() above for more info on why they are created.
if (g_network->isSimulated())
files.erase(
std::remove_if(files.begin(), files.end(),
[](std::string const& f) { return StringRef(f).endsWith(LiteralStringRef(".lnk")); }),
files.end());
for (auto& f : files) {
// Hide .part or .temp files.
StringRef s(f);
if (!s.endsWith(LiteralStringRef(".part")) && !s.endsWith(LiteralStringRef(".temp")))
results.push_back({ f.substr(m_path.size() + 1), ::fileSize(f) });
}
return results;
}
} // namespace
void BackupContainerLocalDirectory::addref() {
return ReferenceCounted<BackupContainerLocalDirectory>::addref();
}
void BackupContainerLocalDirectory::delref() {
return ReferenceCounted<BackupContainerLocalDirectory>::delref();
}
std::string BackupContainerLocalDirectory::getURLFormat() {
return "file://</path/to/base/dir/>";
}
BackupContainerLocalDirectory::BackupContainerLocalDirectory(const std::string& url) {
std::string path;
if (url.find("file://") != 0) {
TraceEvent(SevWarn, "BackupContainerLocalDirectory")
.detail("Description", "Invalid URL for BackupContainerLocalDirectory")
.detail("URL", url);
}
path = url.substr(7);
// Remove trailing slashes on path
path.erase(path.find_last_not_of("\\/") + 1);
std::string absolutePath = abspath(path);
if (!g_network->isSimulated() && path != absolutePath) {
TraceEvent(SevWarn, "BackupContainerLocalDirectory")
.detail("Description", "Backup path must be absolute (e.g. file:///some/path)")
.detail("URL", url)
.detail("Path", path)
.detail("AbsolutePath", absolutePath);
// throw io_error();
IBackupContainer::lastOpenError =
format("Backup path '%s' must be the absolute path '%s'", path.c_str(), absolutePath.c_str());
throw backup_invalid_url();
}
// Finalized path written to will be will be <path>/backup-<uid>
m_path = path;
}
Future<std::vector<std::string>> BackupContainerLocalDirectory::listURLs(const std::string& url) {
std::string path;
if (url.find("file://") != 0) {
TraceEvent(SevWarn, "BackupContainerLocalDirectory")
.detail("Description", "Invalid URL for BackupContainerLocalDirectory")
.detail("URL", url);
}
path = url.substr(7);
// Remove trailing slashes on path
path.erase(path.find_last_not_of("\\/") + 1);
if (!g_network->isSimulated() && path != abspath(path)) {
TraceEvent(SevWarn, "BackupContainerLocalDirectory")
.detail("Description", "Backup path must be absolute (e.g. file:///some/path)")
.detail("URL", url)
.detail("Path", path);
throw io_error();
}
std::vector<std::string> dirs = platform::listDirectories(path);
std::vector<std::string> results;
for (auto& r : dirs) {
if (r == "." || r == "..") continue;
results.push_back(std::string("file://") + joinPath(path, r));
}
return results;
}
Future<Void> BackupContainerLocalDirectory::create() {
// Nothing should be done here because create() can be called by any process working with the container URL,
// such as fdbbackup. Since "local directory" containers are by definition local to the machine they are
// accessed from, the container's creation (in this case the creation of a directory) must be ensured prior to
// every file creation, which is done in openFile(). Creating the directory here will result in unnecessary
// directories being created on machines that run fdbbackup but not agents.
return Void();
}
Future<bool> BackupContainerLocalDirectory::exists() {
return directoryExists(m_path);
}
Future<Reference<IAsyncFile>> BackupContainerLocalDirectory::readFile(const std::string& path) {
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED;
// Simulation does not properly handle opening the same file from multiple machines using a shared filesystem,
// so create a symbolic link to make each file opening appear to be unique. This could also work in production
// but only if the source directory is writeable which shouldn't be required for a restore.
std::string fullPath = joinPath(m_path, path);
#ifndef _WIN32
if (g_network->isSimulated()) {
if (!fileExists(fullPath)) {
throw file_not_found();
}
if (g_simulator.getCurrentProcess()->uid == UID()) {
TraceEvent(SevError, "BackupContainerReadFileOnUnsetProcessID");
}
std::string uniquePath = fullPath + "." + g_simulator.getCurrentProcess()->uid.toString() + ".lnk";
unlink(uniquePath.c_str());
ASSERT(symlink(basename(path).c_str(), uniquePath.c_str()) == 0);
fullPath = uniquePath;
}
// Opening cached mode forces read/write mode at a lower level, overriding the readonly request. So cached mode
// can't be used because backup files are read-only. Cached mode can only help during restore task retries handled
// by the same process that failed the first task execution anyway, which is a very rare case.
#endif
Future<Reference<IAsyncFile>> f = IAsyncFileSystem::filesystem()->open(fullPath, flags, 0644);
if (g_network->isSimulated()) {
int blockSize = 0;
// Extract block size from the filename, if present
size_t lastComma = path.find_last_of(',');
if (lastComma != path.npos) {
blockSize = atoi(path.substr(lastComma + 1).c_str());
}
if (blockSize <= 0) {
blockSize = deterministicRandom()->randomInt(1e4, 1e6);
}
if (deterministicRandom()->random01() < .01) {
blockSize /= deterministicRandom()->randomInt(1, 3);
}
ASSERT(blockSize > 0);
return map(f, [=](Reference<IAsyncFile> fr) {
int readAhead = deterministicRandom()->randomInt(0, 3);
int reads = deterministicRandom()->randomInt(1, 3);
int cacheSize = deterministicRandom()->randomInt(0, 3);
return Reference<IAsyncFile>(new AsyncFileReadAheadCache(fr, blockSize, readAhead, reads, cacheSize));
});
}
return f;
}
Future<Reference<IBackupFile>> BackupContainerLocalDirectory::writeFile(const std::string& path) {
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE |
IAsyncFile::OPEN_READWRITE;
std::string fullPath = joinPath(m_path, path);
platform::createDirectory(parentDirectory(fullPath));
std::string temp = fullPath + "." + deterministicRandom()->randomUniqueID().toString() + ".temp";
Future<Reference<IAsyncFile>> f = IAsyncFileSystem::filesystem()->open(temp, flags, 0644);
return map(f, [=](Reference<IAsyncFile> f) { return Reference<IBackupFile>(new BackupFile(path, f, fullPath)); });
}
Future<Void> BackupContainerLocalDirectory::deleteFile(const std::string& path) {
::deleteFile(joinPath(m_path, path));
return Void();
}
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerLocalDirectory::listFiles(
const std::string& path, std::function<bool(std::string const&)>) {
return listFiles_impl(path, m_path);
}
Future<Void> BackupContainerLocalDirectory::deleteContainer(int* pNumDeleted) {
// In order to avoid deleting some random directory due to user error, first describe the backup
// and make sure it has something in it.
return map(describeBackup(false, invalidVersion), [=](BackupDescription const& desc) {
// If the backup has no snapshots and no logs then it's probably not a valid backup
if (desc.snapshots.size() == 0 && !desc.minLogBegin.present()) throw backup_invalid_url();
int count = platform::eraseDirectoryRecursive(m_path);
if (pNumDeleted != nullptr) *pNumDeleted = count;
return Void();
});
}

@ -0,0 +1,59 @@
/*
* BackupContainerLocalDirectory.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_BACKUP_CONTAINER_LOCAL_DIRECTORY_H
#define FDBCLIENT_BACKUP_CONTAINER_LOCAL_DIRECTORY_H
#pragma once
#include "fdbclient/BackupContainerFileSystem.h"
#include "flow/flow.h"
class BackupContainerLocalDirectory : public BackupContainerFileSystem,
ReferenceCounted<BackupContainerLocalDirectory> {
public:
void addref() final;
void delref() final;
static std::string getURLFormat();
BackupContainerLocalDirectory(const std::string& url);
static Future<std::vector<std::string>> listURLs(const std::string& url);
Future<Void> create() final;
// The container exists if the folder it resides in exists
Future<bool> exists() final;
Future<Reference<IAsyncFile>> readFile(const std::string& path) final;
Future<Reference<IBackupFile>> writeFile(const std::string& path) final;
Future<Void> deleteFile(const std::string& path) final;
Future<FilesAndSizesT> listFiles(const std::string& path, std::function<bool(std::string const&)>) final;
Future<Void> deleteContainer(int* pNumDeleted) final;
private:
std::string m_path;
};
#endif

@ -0,0 +1,204 @@
/*
* BackupContainerS3BlobStore.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/AsyncFileBlobStore.actor.h"
#include "fdbclient/BackupContainerS3BlobStore.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class BackupContainerS3BlobStoreImpl {
public:
// Backup files to under a single folder prefix with subfolders for each named backup
static const std::string DATAFOLDER;
// Indexfolder contains keys for which user-named backups exist. Backup names can contain an arbitrary
// number of slashes so the backup names are kept in a separate folder tree from their actual data.
static const std::string INDEXFOLDER;
ACTOR static Future<std::vector<std::string>> listURLs(Reference<BlobStoreEndpoint> bstore, std::string bucket) {
state std::string basePath = INDEXFOLDER + '/';
BlobStoreEndpoint::ListResult contents = wait(bstore->listObjects(bucket, basePath));
std::vector<std::string> results;
for (auto& f : contents.objects) {
results.push_back(
bstore->getResourceURL(f.name.substr(basePath.size()), format("bucket=%s", bucket.c_str())));
}
return results;
}
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
public:
BackupFile(std::string fileName, Reference<IAsyncFile> file) : IBackupFile(fileName), m_file(file) {}
Future<Void> append(const void* data, int len) {
Future<Void> r = m_file->write(data, len, m_offset);
m_offset += len;
return r;
}
Future<Void> finish() {
Reference<BackupFile> self = Reference<BackupFile>::addRef(this);
return map(m_file->sync(), [=](Void _) {
self->m_file.clear();
return Void();
});
}
void addref() final { return ReferenceCounted<BackupFile>::addref(); }
void delref() final { return ReferenceCounted<BackupFile>::delref(); }
private:
Reference<IAsyncFile> m_file;
};
ACTOR static Future<BackupContainerFileSystem::FilesAndSizesT> listFiles(
Reference<BackupContainerS3BlobStore> bc, std::string path,
std::function<bool(std::string const&)> pathFilter) {
// pathFilter expects container based paths, so create a wrapper which converts a raw path
// to a container path by removing the known backup name prefix.
state int prefixTrim = bc->dataPath("").size();
std::function<bool(std::string const&)> rawPathFilter = [=](const std::string& folderPath) {
ASSERT(folderPath.size() >= prefixTrim);
return pathFilter(folderPath.substr(prefixTrim));
};
state BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listObjects(
bc->m_bucket, bc->dataPath(path), '/', std::numeric_limits<int>::max(), rawPathFilter));
BackupContainerFileSystem::FilesAndSizesT files;
for (auto& o : result.objects) {
ASSERT(o.name.size() >= prefixTrim);
files.push_back({ o.name.substr(prefixTrim), o.size });
}
return files;
}
ACTOR static Future<Void> create(Reference<BackupContainerS3BlobStore> bc) {
wait(bc->m_bstore->createBucket(bc->m_bucket));
// Check/create the index entry
bool exists = wait(bc->m_bstore->objectExists(bc->m_bucket, bc->indexEntry()));
if (!exists) {
wait(bc->m_bstore->writeEntireFile(bc->m_bucket, bc->indexEntry(), ""));
}
return Void();
}
ACTOR static Future<Void> deleteContainer(Reference<BackupContainerS3BlobStore> bc, int* pNumDeleted) {
bool e = wait(bc->exists());
if (!e) {
TraceEvent(SevWarnAlways, "BackupContainerDoesNotExist").detail("URL", bc->getURL());
throw backup_does_not_exist();
}
// First delete everything under the data prefix in the bucket
wait(bc->m_bstore->deleteRecursively(bc->m_bucket, bc->dataPath(""), pNumDeleted));
// Now that all files are deleted, delete the index entry
wait(bc->m_bstore->deleteObject(bc->m_bucket, bc->indexEntry()));
return Void();
}
};
const std::string BackupContainerS3BlobStoreImpl::DATAFOLDER = "data";
const std::string BackupContainerS3BlobStoreImpl::INDEXFOLDER = "backups";
std::string BackupContainerS3BlobStore::dataPath(const std::string& path) {
return BackupContainerS3BlobStoreImpl::DATAFOLDER + "/" + m_name + "/" + path;
}
// Get the path of the backups's index entry
std::string BackupContainerS3BlobStore::indexEntry() {
return BackupContainerS3BlobStoreImpl::INDEXFOLDER + "/" + m_name;
}
BackupContainerS3BlobStore::BackupContainerS3BlobStore(Reference<BlobStoreEndpoint> bstore, const std::string& name,
const BlobStoreEndpoint::ParametersT& params)
: m_bstore(bstore), m_name(name), m_bucket("FDB_BACKUPS_V2") {
// Currently only one parameter is supported, "bucket"
for (auto& kv : params) {
if (kv.first == "bucket") {
m_bucket = kv.second;
continue;
}
TraceEvent(SevWarn, "BackupContainerS3BlobStoreInvalidParameter")
.detail("Name", kv.first)
.detail("Value", kv.second);
IBackupContainer::lastOpenError = format("Unknown URL parameter: '%s'", kv.first.c_str());
throw backup_invalid_url();
}
}
void BackupContainerS3BlobStore::addref() {
return ReferenceCounted<BackupContainerS3BlobStore>::addref();
}
void BackupContainerS3BlobStore::delref() {
return ReferenceCounted<BackupContainerS3BlobStore>::delref();
}
std::string BackupContainerS3BlobStore::getURLFormat() {
return BlobStoreEndpoint::getURLFormat(true) + " (Note: The 'bucket' parameter is required.)";
}
Future<Reference<IAsyncFile>> BackupContainerS3BlobStore::readFile(const std::string& path) {
return Reference<IAsyncFile>(new AsyncFileReadAheadCache(
Reference<IAsyncFile>(new AsyncFileBlobStoreRead(m_bstore, m_bucket, dataPath(path))),
m_bstore->knobs.read_block_size, m_bstore->knobs.read_ahead_blocks, m_bstore->knobs.concurrent_reads_per_file,
m_bstore->knobs.read_cache_blocks_per_file));
}
Future<std::vector<std::string>> BackupContainerS3BlobStore::listURLs(Reference<BlobStoreEndpoint> bstore,
const std::string& bucket) {
return BackupContainerS3BlobStoreImpl::listURLs(bstore, bucket);
}
Future<Reference<IBackupFile>> BackupContainerS3BlobStore::writeFile(const std::string& path) {
return Reference<IBackupFile>(new BackupContainerS3BlobStoreImpl::BackupFile(
path, Reference<IAsyncFile>(new AsyncFileBlobStoreWrite(m_bstore, m_bucket, dataPath(path)))));
}
Future<Void> BackupContainerS3BlobStore::deleteFile(const std::string& path) {
return m_bstore->deleteObject(m_bucket, dataPath(path));
}
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerS3BlobStore::listFiles(
const std::string& path, std::function<bool(std::string const&)> pathFilter) {
return BackupContainerS3BlobStoreImpl::listFiles(Reference<BackupContainerS3BlobStore>::addRef(this), path,
pathFilter);
}
Future<Void> BackupContainerS3BlobStore::create() {
return BackupContainerS3BlobStoreImpl::create(Reference<BackupContainerS3BlobStore>::addRef(this));
}
Future<bool> BackupContainerS3BlobStore::exists() {
return m_bstore->objectExists(m_bucket, indexEntry());
}
Future<Void> BackupContainerS3BlobStore::deleteContainer(int* pNumDeleted) {
return BackupContainerS3BlobStoreImpl::deleteContainer(Reference<BackupContainerS3BlobStore>::addRef(this),
pNumDeleted);
}
std::string BackupContainerS3BlobStore::getBucket() const {
return m_bucket;
}

@ -0,0 +1,72 @@
/*
* BackupContainerS3BlobStore.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_BACKUP_CONTAINER_S3_BLOBSTORE_H
#define FDBCLIENT_BACKUP_CONTAINER_S3_BLOBSTORE_H
#pragma once
#include "fdbclient/AsyncFileBlobStore.actor.h"
#include "fdbclient/BackupContainerFileSystem.h"
class BackupContainerS3BlobStore final : public BackupContainerFileSystem,
ReferenceCounted<BackupContainerS3BlobStore> {
Reference<BlobStoreEndpoint> m_bstore;
std::string m_name;
// All backup data goes into a single bucket
std::string m_bucket;
std::string dataPath(const std::string& path);
// Get the path of the backups's index entry
std::string indexEntry();
friend class BackupContainerS3BlobStoreImpl;
public:
BackupContainerS3BlobStore(Reference<BlobStoreEndpoint> bstore, const std::string& name,
const BlobStoreEndpoint::ParametersT& params);
void addref() override;
void delref() override;
static std::string getURLFormat();
Future<Reference<IAsyncFile>> readFile(const std::string& path) final;
static Future<std::vector<std::string>> listURLs(Reference<BlobStoreEndpoint> bstore, const std::string& bucket);
Future<Reference<IBackupFile>> writeFile(const std::string& path) final;
Future<Void> deleteFile(const std::string& path) final;
Future<FilesAndSizesT> listFiles(const std::string& path, std::function<bool(std::string const&)> pathFilter) final;
Future<Void> create() final;
// The container exists if the index entry in the blob bucket exists
Future<bool> exists() final;
Future<Void> deleteContainer(int* pNumDeleted) final;
std::string getBucket() const;
};
#endif

@ -1,12 +1,20 @@
set(FDBCLIENT_SRCS
AsyncFileBlobStore.actor.cpp
AsyncFileBlobStore.actor.h
AsyncTaskThread.actor.cpp
AsyncTaskThread.h
Atomic.h
AutoPublicAddress.cpp
BackupAgent.actor.h
BackupAgentBase.actor.cpp
BackupContainer.actor.cpp
BackupContainer.h
BackupContainerFileSystem.actor.cpp
BackupContainerFileSystem.h
BackupContainerLocalDirectory.actor.cpp
BackupContainerLocalDirectory.h
BackupContainerS3BlobStore.actor.cpp
BackupContainerS3BlobStore.h
BlobStore.actor.cpp
ClientLogEvents.h
ClientWorkerInterface.h
@ -93,6 +101,46 @@ set(options_srcs ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.cpp)
vexillographer_compile(TARGET fdboptions LANG cpp OUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.h ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.cpp)
set(BUILD_AZURE_BACKUP OFF CACHE BOOL "Build Azure backup client")
if(BUILD_AZURE_BACKUP)
add_compile_definitions(BUILD_AZURE_BACKUP)
set(FDBCLIENT_SRCS
${FDBCLIENT_SRCS}
BackupContainerAzureBlobStore.actor.cpp
BackupContainerAzureBlobStore.h)
configure_file(azurestorage.cmake azurestorage-download/CMakeLists.txt)
execute_process(
COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" .
RESULT_VARIABLE results
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/azurestorage-download
)
if(results)
message(FATAL_ERROR "Configuration step for AzureStorage has Failed. ${results}")
endif()
execute_process(
COMMAND ${CMAKE_COMMAND} --build . --config Release
RESULT_VARIABLE results
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/azurestorage-download
)
if(results)
message(FATAL_ERROR "Build step for AzureStorage has Failed. ${results}")
endif()
add_subdirectory(
${CMAKE_CURRENT_BINARY_DIR}/azurestorage-src
${CMAKE_CURRENT_BINARY_DIR}/azurestorage-build
)
endif()
add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs})
add_dependencies(fdbclient fdboptions)
target_link_libraries(fdbclient PUBLIC fdbrpc)
if(BUILD_AZURE_BACKUP)
target_link_libraries(fdbclient PUBLIC fdbrpc PRIVATE curl uuid azure-storage-lite)
else()
target_link_libraries(fdbclient PUBLIC fdbrpc)
endif()

@ -0,0 +1,15 @@
project(azurestorage-download)
include(ExternalProject)
ExternalProject_Add(azurestorage
GIT_REPOSITORY https://github.com/Azure/azure-storage-cpplite.git
GIT_TAG 11e1f98b021446ef340f4886796899a6eb1ad9a5 # v0.3.0
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/azurestorage-src"
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/azurestorage-build"
CMAKE_ARGS "-DCMAKE_BUILD_TYPE=Release"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
BUILD_BYPRODUCTS "${CMAKE_CURRENT_BINARY_DIR}/libazure-storage-lite.a"
)

@ -292,7 +292,7 @@ TransportData::TransportData(uint64_t transportId)
#pragma pack( push, 1 )
struct ConnectPacket {
// The value does not inclueds the size of `connectPacketLength` itself,
// The value does not include the size of `connectPacketLength` itself,
// but only the other fields of this structure.
uint32_t connectPacketLength;
ProtocolVersion protocolVersion; // Expect currentProtocolVersion

@ -123,6 +123,7 @@ set(FDBSERVER_SRCS
workloads/BackupCorrectness.actor.cpp
workloads/BackupAndParallelRestoreCorrectness.actor.cpp
workloads/ParallelRestore.actor.cpp
workloads/BackupToBlob.actor.cpp
workloads/BackupToDBAbort.actor.cpp
workloads/BackupToDBCorrectness.actor.cpp
workloads/BackupToDBUpgrade.actor.cpp
@ -183,6 +184,7 @@ set(FDBSERVER_SRCS
workloads/RemoveServersSafely.actor.cpp
workloads/ReportConflictingKeys.actor.cpp
workloads/RestoreBackup.actor.cpp
workloads/RestoreFromBlob.actor.cpp
workloads/Rollback.actor.cpp
workloads/RyowCorrectness.actor.cpp
workloads/RYWDisable.actor.cpp

@ -387,7 +387,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
state Future<Void> cp = changePaused(cx, &backupAgent);
}
// Increment the backup agent requets
// Increment the backup agent requests
if (self->agentRequest) {
BackupAndRestoreCorrectnessWorkload::backupAgentRequests ++;
}

@ -0,0 +1,65 @@
/*
* BackupToBlob.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct BackupToBlobWorkload : TestWorkload {
double backupAfter;
Key backupTag;
Standalone<StringRef> backupURL;
int snapshotInterval = 100000;
static constexpr const char* DESCRIPTION = "BackupToBlob";
BackupToBlobWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
backupAfter = getOption(options, LiteralStringRef("backupAfter"), 10.0);
backupTag = getOption(options, LiteralStringRef("backupTag"), BackupAgentBase::getDefaultTag());
backupURL = getOption(options, LiteralStringRef("backupURL"), LiteralStringRef("http://0.0.0.0:10000"));
}
std::string description() const override { return DESCRIPTION; }
Future<Void> setup(Database const& cx) override { return Void(); }
ACTOR static Future<Void> _start(Database cx, BackupToBlobWorkload* self) {
state FileBackupAgent backupAgent;
state Standalone<VectorRef<KeyRangeRef>> backupRanges;
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
wait(delay(self->backupAfter));
wait(backupAgent.submitBackup(cx, self->backupURL, self->snapshotInterval, self->backupTag.toString(),
backupRanges));
EBackupState backupStatus = wait(backupAgent.waitBackup(cx, self->backupTag.toString(), true));
TraceEvent("BackupToBlob_BackupStatus").detail("Status", BackupAgentBase::getStateText(backupStatus));
return Void();
}
Future<Void> start(Database const& cx) override { return clientId ? Void() : _start(cx, this); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<BackupToBlobWorkload> BackupToBlobWorkloadFactory(BackupToBlobWorkload::DESCRIPTION);

@ -0,0 +1,65 @@
/*
* RestoreFromBlob.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct RestoreFromBlobWorkload : TestWorkload {
double restoreAfter;
Key backupTag;
Standalone<StringRef> backupURL;
bool waitForComplete;
static constexpr const char* DESCRIPTION = "RestoreFromBlob";
RestoreFromBlobWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
restoreAfter = getOption(options, LiteralStringRef("restoreAfter"), 10.0);
backupTag = getOption(options, LiteralStringRef("backupTag"), BackupAgentBase::getDefaultTag());
backupURL = getOption(options, LiteralStringRef("backupURL"), LiteralStringRef("http://0.0.0.0:10000"));
waitForComplete = getOption(options, LiteralStringRef("waitForComplete"), true);
}
std::string description() const override { return DESCRIPTION; }
Future<Void> setup(Database const& cx) override { return Void(); }
ACTOR static Future<Void> _start(Database cx, RestoreFromBlobWorkload* self) {
state FileBackupAgent backupAgent;
state Standalone<VectorRef<KeyRangeRef>> restoreRanges;
restoreRanges.push_back_deep(restoreRanges.arena(), normalKeys);
wait(delay(self->restoreAfter));
Version v =
wait(backupAgent.restore(cx, {}, self->backupTag, self->backupURL, restoreRanges, self->waitForComplete));
return Void();
}
Future<Void> start(Database const& cx) override { return clientId ? Void() : _start(cx, this); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<RestoreFromBlobWorkload> RestoreFromBlobWorkloadFactory(RestoreFromBlobWorkload::DESCRIPTION);

@ -308,9 +308,8 @@ Future<U> mapAsync(Future<T> what, F actorFunc) {
}
//maps a vector of futures with an asynchronous function
template<class T, class F>
std::vector<Future<std::invoke_result_t<F, T>>> mapAsync(std::vector<Future<T>> const& what, F const& actorFunc)
{
template <class T, class F>
auto mapAsync(std::vector<Future<T>> const& what, F const& actorFunc) {
std::vector<std::invoke_result_t<F, T>> ret;
ret.reserve(what.size());
for (const auto& f : what) ret.push_back(mapAsync(f, actorFunc));
@ -367,9 +366,8 @@ Future<std::invoke_result_t<F, T>> map(Future<T> what, F func)
}
//maps a vector of futures
template<class T, class F>
std::vector<Future<std::invoke_result_t<F, T>>> map(std::vector<Future<T>> const& what, F const& func)
{
template <class T, class F>
auto map(std::vector<Future<T>> const& what, F const& func) {
std::vector<Future<std::invoke_result_t<F, T>>> ret;
ret.reserve(what.size());
for (const auto& f : what) ret.push_back(map(f, func));
@ -443,9 +441,7 @@ Future<Void> asyncFilter( FutureStream<T> input, F actorPred, PromiseStream<T> o
loop {
try {
choose {
when ( T nextInput = waitNext(input) ) {
futures.push_back( std::pair<T, Future<bool>>(nextInput, actorPred(nextInput)) );
}
when(T nextInput = waitNext(input)) { futures.emplace_back(nextInput, actorPred(nextInput)); }
when ( bool pass = wait( futures.size() == 0 ? Never() : futures.front().second ) ) {
if(pass) output.send(futures.front().first);
futures.pop_front();
@ -1309,7 +1305,8 @@ private:
Promise<Void> broken_on_destruct;
ACTOR static Future<Void> takeActor(FlowLock* lock, TaskPriority taskID, int64_t amount) {
state std::list<std::pair<Promise<Void>, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise<Void>(), amount));
state std::list<std::pair<Promise<Void>, int64_t>>::iterator it =
lock->takers.emplace(lock->takers.end(), Promise<Void>(), amount);
try {
wait( it->first.getFuture() );
@ -1366,7 +1363,7 @@ struct NotifiedInt {
Future<Void> whenAtLeast( int64_t limit ) {
if (val >= limit) return Void();
Promise<Void> p;
waiting.push( std::make_pair(limit,p) );
waiting.emplace(limit, p);
return p.getFuture();
}

@ -108,6 +108,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/AtomicBackupToDBCorrectness.toml)
add_fdb_test(TEST_FILES fast/AtomicOps.toml)
add_fdb_test(TEST_FILES fast/AtomicOpsApiCorrectness.toml)
add_fdb_test(TEST_FILES fast/BackupBlobCorrectness.toml IGNORE)
add_fdb_test(TEST_FILES fast/BackupCorrectness.toml)
add_fdb_test(TEST_FILES fast/BackupCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/BackupToDBCorrectness.toml)

@ -0,0 +1,107 @@
[[test]]
testTitle = 'Cycle'
clearAfterTest = 'false'
simBackupAgents = 'BackupToFile'
[[test.workload]]
testName = 'Cycle'
nodeCount = 3000
testDuration = 10.0
expectedRate = 0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 10.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 5.0
testDuration = 10.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test]]
testTitle = 'Backup'
[[test.workload]]
testName = 'BackupToBlob'
backupAfter = 0.0
backupTag = 'default'
backupURL = 'azure://0.0.0.0:10000/devstoreaccount1/test_container/'
[[test.workload]]
testName = 'RandomClogging'
testDuration = 10.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 5.0
testDuration = 10.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test]]
testTitle = 'Restore'
clearAfterTest = 'false'
[[test.workload]]
testName = 'RestoreFromBlob'
restoreAfter = 0.0
backupTag = 'default'
backupURL = 'azure://0.0.0.0:10000/devstoreaccount1/test_container/'
[[test.workload]]
testName = 'RandomClogging'
testDuration = 60.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 5.0
testDuration = 10.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test]]
testTitle = 'CycleCheck'
checkOnly = 'true'
[[test.workload]]
testName = 'Cycle'
nodeCount = 3000
expectedRate = 0