First draft of Azure blob storage backup container

This commit is contained in:
sfc-gh-tclinkenbeard 2020-10-10 20:10:12 -07:00
parent ef50452bfb
commit 82b6daa16b
15 changed files with 712 additions and 22 deletions

5
build/Dockerfile.azure Normal file
View File

@ -0,0 +1,5 @@
FROM foundationdb/foundationdb-dev:0.11.4
USER root
RUN yum install -y libcurl-devel libuuid-devel openssl-devel

View File

@ -0,0 +1,93 @@
/*
* AsyncTaskThread.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __AZURE_CONNECTION_H__
#define __AZURE_CONNECTION_H__
#include <future>
#include <thread>
#include <memory>
#include "flow/network.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/ThreadSafeQueue.h"
class IAsyncTask {
public:
virtual void operator()() = 0;
virtual ~IAsyncTask() = default;
};
template <class F>
class AsyncTask : public IAsyncTask {
F func;
public:
AsyncTask(const F& func) : func(func) {}
void operator()() override { func(); }
};
class AsyncTaskThread {
ThreadSafeQueue<std::shared_ptr<IAsyncTask>> queue;
std::promise<void> wakeUp;
std::thread thread;
static void run(AsyncTaskThread* conn) {
while (true) {
std::shared_ptr<IAsyncTask> task;
{
if (conn->queue.canSleep()) {
conn->wakeUp.get_future().get();
conn->wakeUp = {};
}
task = conn->queue.pop().get();
}
}
}
template <class F>
void addTask(const F& func) {
if (queue.push(std::make_shared<AsyncTask<F>>(func))) {
wakeUp.set_value();
}
}
public:
AsyncTaskThread() : thread([this] { run(this); }) {}
template <class F>
auto execAsync(const F& func, TaskPriority priority = TaskPriority::DefaultOnMainThread)
-> Future<decltype(func())> {
if (g_network->isSimulated()) {
// TODO: Add some random delay
return func();
}
Promise<decltype(func())> promise;
addTask([&promise, &func, priority] {
auto funcResult = func();
onMainThreadVoid([&promise, &funcResult] { promise.send(funcResult); }, nullptr,
priority); // TODO: Add error handling
});
return promise.getFuture();
}
};
#endif

View File

@ -18,7 +18,15 @@
* limitations under the License.
*/
#include <cstdlib>
#include <ostream>
#include "storage_credential.h"
#include "storage_account.h"
#include "blob/blob_client.h"
#include "flow/Platform.actor.h"
#include "fdbclient/AsyncTaskThread.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/FDBTypes.h"
@ -289,7 +297,7 @@ public:
Future<Reference<IAsyncFile>> readFile(std::string fileName) override = 0;
// Open a file for write by fileName
virtual Future<Reference<IBackupFile>> writeFile(std::string fileName) = 0;
virtual Future<Reference<IBackupFile>> writeFile(const std::string& fileName) = 0;
// Delete a file
virtual Future<Void> deleteFile(std::string fileName) = 0;
@ -1777,7 +1785,7 @@ public:
std::string m_finalFullPath;
};
Future<Reference<IBackupFile>> writeFile(std::string path) final {
Future<Reference<IBackupFile>> writeFile(const std::string& path) final {
int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE;
std::string fullPath = joinPath(m_path, path);
platform::createDirectory(parentDirectory(fullPath));
@ -1928,7 +1936,7 @@ public:
Reference<IAsyncFile> m_file;
};
Future<Reference<IBackupFile>> writeFile(std::string path) final {
Future<Reference<IBackupFile>> writeFile(const std::string& path) final {
return Reference<IBackupFile>(new BackupFile(path, Reference<IAsyncFile>(new AsyncFileBlobStoreWrite(m_bstore, m_bucket, dataPath(path)))));
}
@ -2004,6 +2012,250 @@ public:
}
};
class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
ReferenceCounted<BackupContainerAzureBlobStore> {
using AzureClient = azure::storage_lite::blob_client;
std::unique_ptr<AzureClient> client;
std::string containerName;
AsyncTaskThread asyncTaskThread;
class ReadFile final : public IAsyncFile, ReferenceCounted<ReadFile> {
AsyncTaskThread& asyncTaskThread;
std::string containerName;
std::string blobName;
AzureClient* client;
public:
ReadFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName,
AzureClient* client)
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
void addref() override { ReferenceCounted<ReadFile>::addref(); }
void delref() override { ReferenceCounted<ReadFile>::delref(); }
Future<int> read(void* data, int length, int64_t offset) {
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
blobName = this->blobName, data, length, offset] {
std::ostringstream oss(std::ios::out | std::ios::binary);
client->download_blob_to_stream(containerName, blobName, offset, length, oss);
auto str = oss.str();
memcpy(data, str.c_str(), str.size());
return static_cast<int>(str.size());
});
}
Future<Void> zeroRange(int64_t offset, int64_t length) override { throw file_not_writable(); }
Future<Void> write(void const* data, int length, int64_t offset) override { throw file_not_writable(); }
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
Future<Void> sync() override { throw file_not_writable(); }
Future<int64_t> size() const override {
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
blobName = this->blobName] {
return static_cast<int64_t>(client->get_blob_properties(containerName, blobName).get().response().size);
});
}
std::string getFilename() const override { return blobName; }
int64_t debugFD() const override { return 0; }
};
class WriteFile final : public IAsyncFile, ReferenceCounted<WriteFile> {
AsyncTaskThread& asyncTaskThread;
AzureClient* client;
std::string containerName;
std::string blobName;
int64_t m_cursor{ 0 };
std::string buffer;
static constexpr size_t bufferLimit = 1 << 20;
// From https://tuttlem.github.io/2014/08/18/getting-istream-to-work-off-a-byte-array.html:
class MemStream : public std::istream {
class MemBuf : public std::basic_streambuf<char> {
public:
MemBuf(const uint8_t* p, size_t l) { setg((char*)p, (char*)p, (char*)p + l); }
} buffer;
public:
MemStream(const uint8_t* p, size_t l) : std::istream(&buffer), buffer(p, l) { rdbuf(&buffer); }
};
public:
WriteFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName,
AzureClient* client)
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
void addref() override { ReferenceCounted<WriteFile>::addref(); }
void delref() override { ReferenceCounted<WriteFile>::delref(); }
Future<int> read(void* data, int length, int64_t offset) override { throw file_not_readable(); }
Future<Void> write(void const* data, int length, int64_t offset) override {
if (offset != m_cursor) {
throw non_sequential_op();
}
m_cursor += length;
auto p = static_cast<char const*>(data);
buffer.insert(buffer.cend(), p, p + length);
if (buffer.size() > bufferLimit) {
return sync();
} else {
return Void();
}
}
Future<Void> truncate(int64_t size) override {
if (size != m_cursor) {
throw non_sequential_op();
}
return Void();
}
Future<Void> sync() override {
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName,
blobName = this->blobName, buffer = std::move(this->buffer)] {
// MemStream memStream(buffer.data(), buffer.size());
std::istringstream iss(buffer);
auto resp = client->append_block_from_stream(containerName, blobName, iss).get();
return Void();
});
}
Future<int64_t> size() const override {
return asyncTaskThread.execAsync(
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
auto resp = client->get_blob_properties(containerName, blobName).get().response();
ASSERT(resp.valid()); // TODO: Should instead throw here
return static_cast<int64_t>(resp.size);
});
}
std::string getFilename() const override { return blobName; }
int64_t debugFD() const override { return -1; }
};
class BackupFile final : public IBackupFile, ReferenceCounted<BackupFile> {
Reference<IAsyncFile> m_file;
public:
BackupFile(const std::string& fileName, Reference<IAsyncFile> file) : IBackupFile(fileName), m_file(file) {}
Future<Void> append(const void* data, int len) override {
Future<Void> r = m_file->write(data, len, m_offset);
m_offset += len;
return r;
}
Future<Void> finish() override {
Reference<BackupFile> self = Reference<BackupFile>::addRef(this);
return map(m_file->sync(), [=](Void _) {
self->m_file.clear();
return Void();
});
}
void addref() override { ReferenceCounted<BackupFile>::addref(); }
void delref() override { ReferenceCounted<BackupFile>::delref(); }
};
Future<bool> blobExists(const std::string& fileName) {
return asyncTaskThread.execAsync(
[client = this->client.get(), containerName = this->containerName, fileName = fileName] {
auto resp = client->get_blob_properties(containerName, fileName).get().response();
return resp.valid();
});
}
public:
BackupContainerAzureBlobStore() : containerName("test_container") {
// std::string account_name = std::getenv("AZURE_TESTACCOUNT");
// std::string account_key = std::getenv("AZURE_TESTKEY");
// bool use_https = true;
// auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(account_name, account_key);
// auto storage_account =
// std::make_shared<azure::storage_lite::storage_account>(account_name, credential, use_https);
auto storage_account = azure::storage_lite::storage_account::development_storage_account();
client = std::make_unique<AzureClient>(storage_account, 1);
}
void addref() override { return ReferenceCounted<BackupContainerAzureBlobStore>::addref(); }
void delref() override { return ReferenceCounted<BackupContainerAzureBlobStore>::delref(); }
Future<Void> create() override {
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
client->create_container(containerName).wait();
return Void();
});
}
Future<bool> exists() override {
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
auto resp = client->get_container_properties(containerName).get().response();
return resp.valid();
});
}
ACTOR static Future<Reference<IAsyncFile>> readFile_impl(BackupContainerAzureBlobStore* self,
std::string fileName) {
bool exists = wait(self->blobExists(fileName));
if (!exists) {
throw file_not_found();
}
return Reference<IAsyncFile>(
new ReadFile(self->asyncTaskThread, self->containerName, fileName, self->client.get()));
}
Future<Reference<IAsyncFile>> readFile(std::string fileName) override { return readFile_impl(this, fileName); }
ACTOR static Future<Reference<IBackupFile>> writeFile_impl(BackupContainerAzureBlobStore* self,
std::string fileName) {
wait(self->asyncTaskThread.execAsync(
[client = self->client.get(), containerName = self->containerName, fileName = fileName] {
auto outcome = client->create_append_blob(containerName, fileName).get();
return Void();
}));
return Reference<IBackupFile>(
new BackupFile(fileName, Reference<IAsyncFile>(new WriteFile(self->asyncTaskThread, self->containerName,
fileName, self->client.get()))));
}
Future<Reference<IBackupFile>> writeFile(const std::string& fileName) override {
return writeFile_impl(this, fileName);
}
static bool isDirectory(const std::string& blobName) { return blobName.size() && blobName.back() == '/'; }
static void listFilesImpl(AzureClient* client, const std::string& containerName, const std::string& path,
std::function<bool(std::string const&)> folderPathFilter, FilesAndSizesT& result) {
auto resp = client->list_blobs_segmented(containerName, "/", "", path).get().response();
for (const auto& blob : resp.blobs) {
if (isDirectory(blob.name) && folderPathFilter(blob.name)) {
listFilesImpl(client, containerName, blob.name, folderPathFilter, result);
} else {
result.emplace_back(blob.name, blob.content_length);
}
}
}
Future<FilesAndSizesT> listFiles(std::string path = "",
std::function<bool(std::string const&)> folderPathFilter = nullptr) {
return asyncTaskThread.execAsync([client = this->client.get(), containerName = this->containerName, path = path,
folderPathFilter = folderPathFilter] {
FilesAndSizesT result;
listFilesImpl(client, containerName, path, folderPathFilter, result);
return result;
});
}
Future<Void> deleteFile(std::string fileName) override {
return asyncTaskThread.execAsync(
[containerName = this->containerName, fileName = fileName, client = client.get()]() {
client->delete_blob(containerName, fileName).wait();
return Void();
});
}
Future<Void> deleteContainer(int* pNumDeleted) override {
// TODO: Update pNumDeleted?
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
client->delete_container(containerName).wait();
return Void();
});
}
};
const std::string BackupContainerBlobStore::DATAFOLDER = "data";
const std::string BackupContainerBlobStore::INDEXFOLDER = "backups";
@ -2027,9 +2279,9 @@ Reference<IBackupContainer> IBackupContainer::openContainer(std::string url)
try {
StringRef u(url);
if(u.startsWith(LiteralStringRef("file://")))
if (u.startsWith(LiteralStringRef("file://"))) {
r = Reference<IBackupContainer>(new BackupContainerLocalDirectory(url));
else if(u.startsWith(LiteralStringRef("blobstore://"))) {
} else if (u.startsWith(LiteralStringRef("blobstore://"))) {
std::string resource;
// The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options.
@ -2042,8 +2294,9 @@ Reference<IBackupContainer> IBackupContainer::openContainer(std::string url)
if(!isalnum(c) && c != '_' && c != '-' && c != '.' && c != '/')
throw backup_invalid_url();
r = Reference<IBackupContainer>(new BackupContainerBlobStore(bstore, resource, backupParams));
}
else {
} else if (u.startsWith(LiteralStringRef("http"))) {
r = Reference<IBackupContainer>(new BackupContainerAzureBlobStore());
} else {
lastOpenError = "invalid URL prefix";
throw backup_invalid_url();
}

View File

@ -0,0 +1,124 @@
/*
* BackupContainerAzure.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstdlib>
#include "storage_credential.h"
#include "storage_account.h"
#include "blob/blob_client.h"
#include "fdbclient/BackupContainer.h"
#include "flow/actorcompiler.h" // has to be last include
class BackupContainerAzureBlobStore final : public IBackupContainer, ReferenceCounted<BackupContainerAzureBlobStore> {
azure::storage_lite::blob_client client;
using Self = BackupContainerAzureBlobStore;
ACTOR static Future<Reference<IBackupFile>> writeLogFile_impl(Self* self, Version beginVersion, Version endVersion,
int blockSize) {
wait(delay(0.0));
return Reference<IBackupFile>(new BackupFile("test"));
}
ACTOR static Future<Reference<IBackupFile>> writeRangeFile_impl(Self* self, Version snapshotBeginVersion,
int snapshotFileCount, Version fileVersion,
int blockSize) {
wait(delay(0.0));
return Reference<IBackupFile>(new BackupFile("test"));
}
ACTOR static Future<Reference<IBackupFile>> writeTaggedLogFile_impl(Self* self, Version beginVersion,
Version endVersion, int blockSize,
uint16_t tagId, int totalTags) {
wait(delay(0.0));
return Reference<IBackupFile>(new BackupFile("test"));
}
public:
void addref() override { return ReferenceCounted<BackupContainerAzureBlobStore>::addref(); }
void delref() override { return ReferenceCounted<BackupContainerAzureBlobStore>::delref(); }
Future<Void> create() override {
std::string account_name = std::getenv("AZURE_TESTACCOUNT"); //"devstoreaccount1";
std::string account_key = std::getenv("AZURE_TESTKEY");
bool use_https = true;
// std::string blob_endpoint = "https://127.0.0.1:10000/devstoreaccount1/";
int connection_count = 2;
auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(account_name, account_key);
auto storage_account =
std::make_shared<azure::storage_lite::storage_account>(account_name, credential, use_https);
client = azure::storage_lite::blob_client(storage_account, connection_count);
return Void();
}
Future<bool> exists() override { return true; }
class BackupFile : public IBackupFile, ReferenceCounted<BackupFile> {
public:
BackupFile(std::string fileName) : IBackupFile(fileName) {}
Future<Void> append(const void* data, int len) override { return Void(); }
Future<Void> finish() override { return Void(); }
void addref() override { ReferenceCounted<BackupFile>::addref(); }
void delref() override { ReferenceCounted<BackupFile>::delref(); }
};
Future<Reference<IBackupFile>> writeLogFile(Version beginVersion, Version endVersion, int blockSize) override {
return writeLogFile_impl(this, beginVersion, endVersion, blockSize);
}
Future<Reference<IBackupFile>> writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount,
Version fileVersion, int blockSize) override {
return writeRangeFile_impl(this, snapshotBeginVersion, snapshotFileCount, fileVersion, blockSize);
}
Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
uint16_t tagId, int totalTags) override {
return writeTaggedLogFile_impl(this, beginVersion, endVersion, blockSize, tagId, totalTags);
}
Future<Void> writeKeyspaceSnapshotFile(const std::vector<std::string>& fileNames,
const std::vector<std::pair<Key, Key>>& beginEndKeys,
int64_t totalBytes) override {
return delay(0.0);
}
Future<Reference<IAsyncFile>> readFile(std::string name) override { return Reference<IAsyncFile>(); }
Future<KeyRange> getSnapshotFileKeyRange(const RangeFile& file) override { return KeyRange(normalKeys); }
Future<Void> expireData(Version expireEndVersion, bool force = false, ExpireProgress* progress = nullptr,
Version restorableBeginVersion = std::numeric_limits<Version>::max()) override {
return Void();
}
Future<Void> deleteContainer(int* pNumDeleted = nullptr) override { return Void(); }
Future<BackupDescription> describeBackup(bool deepScan = false,
Version logStartVersionOverride = invalidVersion) override {
return {};
}
Future<BackupFileList> dumpFileList(Version begin = 0, Version end = std::numeric_limits<Version>::max()) override {
return {};
}
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion) { return {}; }
};

View File

@ -3,6 +3,7 @@ set(FDBCLIENT_SRCS
AsyncFileBlobStore.actor.h
Atomic.h
AutoPublicAddress.cpp
AsyncTaskThread.h
BackupAgent.actor.h
BackupAgentBase.actor.cpp
BackupContainer.actor.cpp
@ -93,6 +94,35 @@ set(options_srcs ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.cpp)
vexillographer_compile(TARGET fdboptions LANG cpp OUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.h ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.cpp)
configure_file(azurestorage.cmake azurestorage-download/CMakeLists.txt)
execute_process(
COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" .
RESULT_VARIABLE results
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/azurestorage-download
)
if(results)
message(FATAL_ERROR "Configuration step for AzureStorage has Failed. ${results}")
endif()
execute_process(
COMMAND ${CMAKE_COMMAND} --build . --config Release
RESULT_VARIABLE results
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/azurestorage-download
)
if(results)
message(FATAL_ERROR "Build step for AzureStorage has Failed. ${results}")
endif()
add_subdirectory(
${CMAKE_CURRENT_BINARY_DIR}/azurestorage-src
${CMAKE_CURRENT_BINARY_DIR}/azurestorage-build
#EXCLUDE_FROM_ALL
)
add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs})
add_dependencies(fdbclient fdboptions)
target_link_libraries(fdbclient PUBLIC fdbrpc)
#target_include_directories(fdbclient PRIVATE /home/tclinkenbeard/Azure/azure-storage-cpplite/include)
target_link_libraries(fdbclient PUBLIC fdbrpc curl uuid azure-storage-lite)

View File

@ -0,0 +1,15 @@
project(azurestorage-download)
include(ExternalProject)
ExternalProject_Add(azurestorage
GIT_REPOSITORY https://github.com/Azure/azure-storage-cpplite.git
GIT_TAG 11e1f98b021446ef340f4886796899a6eb1ad9a5 # v0.3.0
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/azurestorage-src"
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/azurestorage-build"
CMAKE_ARGS "-DCMAKE_BUILD_TYPE=Release"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
BUILD_BYPRODUCTS "${CMAKE_CURRENT_BINARY_DIR}/libazure-storage-lite.a"
)

View File

@ -292,7 +292,7 @@ TransportData::TransportData(uint64_t transportId)
#pragma pack( push, 1 )
struct ConnectPacket {
// The value does not inclueds the size of `connectPacketLength` itself,
// The value does not include the size of `connectPacketLength` itself,
// but only the other fields of this structure.
uint32_t connectPacketLength;
ProtocolVersion protocolVersion; // Expect currentProtocolVersion

View File

@ -123,6 +123,7 @@ set(FDBSERVER_SRCS
workloads/BackupCorrectness.actor.cpp
workloads/BackupAndParallelRestoreCorrectness.actor.cpp
workloads/ParallelRestore.actor.cpp
workloads/BackupToBlob.actor.cpp
workloads/BackupToDBAbort.actor.cpp
workloads/BackupToDBCorrectness.actor.cpp
workloads/BackupToDBUpgrade.actor.cpp
@ -183,6 +184,7 @@ set(FDBSERVER_SRCS
workloads/RemoveServersSafely.actor.cpp
workloads/ReportConflictingKeys.actor.cpp
workloads/RestoreBackup.actor.cpp
workloads/RestoreFromBlob.actor.cpp
workloads/Rollback.actor.cpp
workloads/RyowCorrectness.actor.cpp
workloads/RYWDisable.actor.cpp

View File

@ -387,7 +387,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
state Future<Void> cp = changePaused(cx, &backupAgent);
}
// Increment the backup agent requets
// Increment the backup agent requests
if (self->agentRequest) {
BackupAndRestoreCorrectnessWorkload::backupAgentRequests ++;
}

View File

@ -0,0 +1,65 @@
/*
* BackupToBlob.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct BackupToBlobWorkload : TestWorkload {
double backupAfter;
Key backupTag;
Standalone<StringRef> backupURL;
int snapshotInterval = 100000;
static constexpr const char* DESCRIPTION = "BackupToBlob";
BackupToBlobWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
backupAfter = getOption(options, LiteralStringRef("backupAfter"), 10.0);
backupTag = getOption(options, LiteralStringRef("backupTag"), BackupAgentBase::getDefaultTag());
backupURL = getOption(options, LiteralStringRef("backupURL"), LiteralStringRef("http://0.0.0.0:10000"));
}
std::string description() override { return DESCRIPTION; }
Future<Void> setup(Database const& cx) override { return Void(); }
ACTOR static Future<Void> _start(Database cx, BackupToBlobWorkload* self) {
state FileBackupAgent backupAgent;
state Standalone<VectorRef<KeyRangeRef>> backupRanges;
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
wait(delay(self->backupAfter));
wait(backupAgent.submitBackup(cx, self->backupURL, self->snapshotInterval, self->backupTag.toString(),
backupRanges));
EBackupState backupStatus = wait(backupAgent.waitBackup(cx, self->backupTag.toString(), true));
TraceEvent("BackupToBlob_BackupStatus").detail("Status", BackupAgentBase::getStateText(backupStatus));
return Void();
}
Future<Void> start(Database const& cx) override { return clientId ? Void() : _start(cx, this); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<BackupToBlobWorkload> BackupToBlobWorkloadFactory(BackupToBlobWorkload::DESCRIPTION);

View File

@ -0,0 +1,65 @@
/*
* RestoreFromBlob.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct RestoreFromBlobWorkload : TestWorkload {
double restoreAfter;
Key backupTag;
Standalone<StringRef> backupURL;
bool waitForComplete;
static constexpr const char* DESCRIPTION = "RestoreFromBlob";
RestoreFromBlobWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
restoreAfter = getOption(options, LiteralStringRef("restoreAfter"), 10.0);
backupTag = getOption(options, LiteralStringRef("backupTag"), BackupAgentBase::getDefaultTag());
backupURL = getOption(options, LiteralStringRef("backupURL"), LiteralStringRef("http://0.0.0.0:10000"));
waitForComplete = getOption(options, LiteralStringRef("waitForComplete"), true);
}
std::string description() override { return DESCRIPTION; }
Future<Void> setup(Database const& cx) override { return Void(); }
ACTOR static Future<Void> _start(Database cx, RestoreFromBlobWorkload* self) {
state FileBackupAgent backupAgent;
state Standalone<VectorRef<KeyRangeRef>> restoreRanges;
restoreRanges.push_back_deep(restoreRanges.arena(), normalKeys);
wait(delay(self->restoreAfter));
Version v =
wait(backupAgent.restore(cx, {}, self->backupTag, self->backupURL, restoreRanges, self->waitForComplete));
return Void();
}
Future<Void> start(Database const& cx) override { return clientId ? Void() : _start(cx, this); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
};
WorkloadFactory<RestoreFromBlobWorkload> RestoreFromBlobWorkloadFactory(RestoreFromBlobWorkload::DESCRIPTION);

View File

@ -309,8 +309,7 @@ Future<U> mapAsync(Future<T> what, F actorFunc) {
//maps a vector of futures with an asynchronous function
template <class T, class F>
std::vector<Future<std::invoke_result_t<F, T>>> mapAsync(std::vector<Future<T>> const& what, F const& actorFunc)
{
auto mapAsync(std::vector<Future<T>> const& what, F const& actorFunc) {
std::vector<std::invoke_result_t<F, T>> ret;
ret.reserve(what.size());
for (const auto& f : what) ret.push_back(mapAsync(f, actorFunc));
@ -368,8 +367,7 @@ Future<std::invoke_result_t<F, T>> map(Future<T> what, F func)
//maps a vector of futures
template <class T, class F>
std::vector<Future<std::invoke_result_t<F, T>>> map(std::vector<Future<T>> const& what, F const& func)
{
auto map(std::vector<Future<T>> const& what, F const& func) {
std::vector<Future<std::invoke_result_t<F, T>>> ret;
ret.reserve(what.size());
for (const auto& f : what) ret.push_back(map(f, func));
@ -443,9 +441,7 @@ Future<Void> asyncFilter( FutureStream<T> input, F actorPred, PromiseStream<T> o
loop {
try {
choose {
when ( T nextInput = waitNext(input) ) {
futures.push_back( std::pair<T, Future<bool>>(nextInput, actorPred(nextInput)) );
}
when(T nextInput = waitNext(input)) { futures.emplace_back(nextInput, actorPred(nextInput)); }
when ( bool pass = wait( futures.size() == 0 ? Never() : futures.front().second ) ) {
if(pass) output.send(futures.front().first);
futures.pop_front();
@ -1309,7 +1305,8 @@ private:
Promise<Void> broken_on_destruct;
ACTOR static Future<Void> takeActor(FlowLock* lock, TaskPriority taskID, int64_t amount) {
state std::list<std::pair<Promise<Void>, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise<Void>(), amount));
state std::list<std::pair<Promise<Void>, int64_t>>::iterator it =
lock->takers.emplace(lock->takers.end(), Promise<Void>(), amount);
try {
wait( it->first.getFuture() );
@ -1366,7 +1363,7 @@ struct NotifiedInt {
Future<Void> whenAtLeast( int64_t limit ) {
if (val >= limit) return Void();
Promise<Void> p;
waiting.push( std::make_pair(limit,p) );
waiting.emplace(limit, p);
return p.getFuture();
}

View File

@ -110,6 +110,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/AtomicOpsApiCorrectness.toml)
add_fdb_test(TEST_FILES fast/BackupCorrectness.toml)
add_fdb_test(TEST_FILES fast/BackupCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/BackupToBlob.toml)
add_fdb_test(TEST_FILES fast/BackupToDBCorrectness.toml)
add_fdb_test(TEST_FILES fast/BackupToDBCorrectnessClean.toml)
add_fdb_test(TEST_FILES fast/CacheTest.toml)

View File

@ -0,0 +1,40 @@
[[test]]
testTitle = 'Cycle'
clearAfterTest = 'false'
simBackupAgents = 'BackupToFile'
[[test.workload]]
testName = 'Cycle'
nodeCount = 3000
testDuration = 10.0
expectedRate = 0
[[test]]
testTitle = 'Backup'
[[test.workload]]
testName = 'BackupToBlob'
backupAfter = 0.0
backupTag = 'default'
backupURL = 'http://0.0.0.0:10000'
[[test]]
testTitle = 'Restore'
clearAfterTest = 'false'
[[test.workload]]
testName = 'RestoreFromBlob'
restoreAfter = 0.0
backupTag = 'default'
backupURL = 'http://0.0.0.0:10000'
[[test]]
testTitle = 'CycleCheck'
checkOnly = 'true'
[[test.workload]]
testName = 'Cycle'
nodeCount = 3000
testDuration = 10.0
expectedRate = 0