diff --git a/build/Dockerfile.azure b/build/Dockerfile.azure new file mode 100644 index 0000000000..19bd41863c --- /dev/null +++ b/build/Dockerfile.azure @@ -0,0 +1,5 @@ +FROM foundationdb/foundationdb-dev:0.11.4 + +USER root + +RUN yum install -y libcurl-devel libuuid-devel openssl-devel diff --git a/fdbclient/AsyncTaskThread.h b/fdbclient/AsyncTaskThread.h new file mode 100644 index 0000000000..ba1571e4ed --- /dev/null +++ b/fdbclient/AsyncTaskThread.h @@ -0,0 +1,93 @@ +/* + * AsyncTaskThread.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __AZURE_CONNECTION_H__ +#define __AZURE_CONNECTION_H__ + +#include +#include +#include + +#include "flow/network.h" +#include "flow/ThreadHelper.actor.h" +#include "flow/ThreadSafeQueue.h" + +class IAsyncTask { +public: + virtual void operator()() = 0; + virtual ~IAsyncTask() = default; +}; + +template +class AsyncTask : public IAsyncTask { + F func; + +public: + AsyncTask(const F& func) : func(func) {} + + void operator()() override { func(); } +}; + +class AsyncTaskThread { + ThreadSafeQueue> queue; + std::promise wakeUp; + std::thread thread; + + static void run(AsyncTaskThread* conn) { + while (true) { + std::shared_ptr task; + { + if (conn->queue.canSleep()) { + conn->wakeUp.get_future().get(); + conn->wakeUp = {}; + } + task = conn->queue.pop().get(); + } + } + } + + template + void addTask(const F& func) { + if (queue.push(std::make_shared>(func))) { + wakeUp.set_value(); + } + } + +public: + AsyncTaskThread() : thread([this] { run(this); }) {} + + template + auto execAsync(const F& func, TaskPriority priority = TaskPriority::DefaultOnMainThread) + -> Future { + if (g_network->isSimulated()) { + // TODO: Add some random delay + return func(); + } + Promise promise; + addTask([&promise, &func, priority] { + auto funcResult = func(); + onMainThreadVoid([&promise, &funcResult] { promise.send(funcResult); }, nullptr, + priority); // TODO: Add error handling + }); + return promise.getFuture(); + } +}; + +#endif diff --git a/fdbclient/BackupContainer.actor.cpp b/fdbclient/BackupContainer.actor.cpp index 223ef1949b..2a34d2cad5 100644 --- a/fdbclient/BackupContainer.actor.cpp +++ b/fdbclient/BackupContainer.actor.cpp @@ -18,7 +18,15 @@ * limitations under the License. */ +#include +#include + +#include "storage_credential.h" +#include "storage_account.h" +#include "blob/blob_client.h" + #include "flow/Platform.actor.h" +#include "fdbclient/AsyncTaskThread.h" #include "fdbclient/BackupContainer.h" #include "fdbclient/BackupAgent.actor.h" #include "fdbclient/FDBTypes.h" @@ -289,7 +297,7 @@ public: Future> readFile(std::string fileName) override = 0; // Open a file for write by fileName - virtual Future> writeFile(std::string fileName) = 0; + virtual Future> writeFile(const std::string& fileName) = 0; // Delete a file virtual Future deleteFile(std::string fileName) = 0; @@ -1777,7 +1785,7 @@ public: std::string m_finalFullPath; }; - Future> writeFile(std::string path) final { + Future> writeFile(const std::string& path) final { int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE; std::string fullPath = joinPath(m_path, path); platform::createDirectory(parentDirectory(fullPath)); @@ -1928,7 +1936,7 @@ public: Reference m_file; }; - Future> writeFile(std::string path) final { + Future> writeFile(const std::string& path) final { return Reference(new BackupFile(path, Reference(new AsyncFileBlobStoreWrite(m_bstore, m_bucket, dataPath(path))))); } @@ -2004,6 +2012,250 @@ public: } }; +class BackupContainerAzureBlobStore final : public BackupContainerFileSystem, + ReferenceCounted { + + using AzureClient = azure::storage_lite::blob_client; + + std::unique_ptr client; + std::string containerName; + AsyncTaskThread asyncTaskThread; + + class ReadFile final : public IAsyncFile, ReferenceCounted { + AsyncTaskThread& asyncTaskThread; + std::string containerName; + std::string blobName; + AzureClient* client; + + public: + ReadFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName, + AzureClient* client) + : asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {} + + void addref() override { ReferenceCounted::addref(); } + void delref() override { ReferenceCounted::delref(); } + Future read(void* data, int length, int64_t offset) { + return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName, + blobName = this->blobName, data, length, offset] { + std::ostringstream oss(std::ios::out | std::ios::binary); + client->download_blob_to_stream(containerName, blobName, offset, length, oss); + auto str = oss.str(); + memcpy(data, str.c_str(), str.size()); + return static_cast(str.size()); + }); + } + Future zeroRange(int64_t offset, int64_t length) override { throw file_not_writable(); } + Future write(void const* data, int length, int64_t offset) override { throw file_not_writable(); } + Future truncate(int64_t size) override { throw file_not_writable(); } + Future sync() override { throw file_not_writable(); } + Future size() const override { + return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName, + blobName = this->blobName] { + return static_cast(client->get_blob_properties(containerName, blobName).get().response().size); + }); + } + std::string getFilename() const override { return blobName; } + int64_t debugFD() const override { return 0; } + }; + + class WriteFile final : public IAsyncFile, ReferenceCounted { + AsyncTaskThread& asyncTaskThread; + AzureClient* client; + std::string containerName; + std::string blobName; + int64_t m_cursor{ 0 }; + std::string buffer; + + static constexpr size_t bufferLimit = 1 << 20; + + // From https://tuttlem.github.io/2014/08/18/getting-istream-to-work-off-a-byte-array.html: + class MemStream : public std::istream { + class MemBuf : public std::basic_streambuf { + public: + MemBuf(const uint8_t* p, size_t l) { setg((char*)p, (char*)p, (char*)p + l); } + } buffer; + + public: + MemStream(const uint8_t* p, size_t l) : std::istream(&buffer), buffer(p, l) { rdbuf(&buffer); } + }; + + public: + WriteFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName, + AzureClient* client) + : asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {} + + void addref() override { ReferenceCounted::addref(); } + void delref() override { ReferenceCounted::delref(); } + Future read(void* data, int length, int64_t offset) override { throw file_not_readable(); } + Future write(void const* data, int length, int64_t offset) override { + if (offset != m_cursor) { + throw non_sequential_op(); + } + m_cursor += length; + auto p = static_cast(data); + buffer.insert(buffer.cend(), p, p + length); + if (buffer.size() > bufferLimit) { + return sync(); + } else { + return Void(); + } + } + Future truncate(int64_t size) override { + if (size != m_cursor) { + throw non_sequential_op(); + } + return Void(); + } + Future sync() override { + return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName, + blobName = this->blobName, buffer = std::move(this->buffer)] { + // MemStream memStream(buffer.data(), buffer.size()); + std::istringstream iss(buffer); + auto resp = client->append_block_from_stream(containerName, blobName, iss).get(); + return Void(); + }); + } + Future size() const override { + return asyncTaskThread.execAsync( + [client = this->client, containerName = this->containerName, blobName = this->blobName] { + auto resp = client->get_blob_properties(containerName, blobName).get().response(); + ASSERT(resp.valid()); // TODO: Should instead throw here + return static_cast(resp.size); + }); + } + std::string getFilename() const override { return blobName; } + int64_t debugFD() const override { return -1; } + }; + + class BackupFile final : public IBackupFile, ReferenceCounted { + Reference m_file; + + public: + BackupFile(const std::string& fileName, Reference file) : IBackupFile(fileName), m_file(file) {} + Future append(const void* data, int len) override { + Future r = m_file->write(data, len, m_offset); + m_offset += len; + return r; + } + Future finish() override { + Reference self = Reference::addRef(this); + return map(m_file->sync(), [=](Void _) { + self->m_file.clear(); + return Void(); + }); + } + void addref() override { ReferenceCounted::addref(); } + void delref() override { ReferenceCounted::delref(); } + }; + + Future blobExists(const std::string& fileName) { + return asyncTaskThread.execAsync( + [client = this->client.get(), containerName = this->containerName, fileName = fileName] { + auto resp = client->get_blob_properties(containerName, fileName).get().response(); + return resp.valid(); + }); + } + +public: + BackupContainerAzureBlobStore() : containerName("test_container") { + // std::string account_name = std::getenv("AZURE_TESTACCOUNT"); + // std::string account_key = std::getenv("AZURE_TESTKEY"); + // bool use_https = true; + + // auto credential = std::make_shared(account_name, account_key); + // auto storage_account = + // std::make_shared(account_name, credential, use_https); + + auto storage_account = azure::storage_lite::storage_account::development_storage_account(); + + client = std::make_unique(storage_account, 1); + } + + void addref() override { return ReferenceCounted::addref(); } + void delref() override { return ReferenceCounted::delref(); } + + Future create() override { + return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] { + client->create_container(containerName).wait(); + return Void(); + }); + } + Future exists() override { + return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] { + auto resp = client->get_container_properties(containerName).get().response(); + return resp.valid(); + }); + } + + ACTOR static Future> readFile_impl(BackupContainerAzureBlobStore* self, + std::string fileName) { + bool exists = wait(self->blobExists(fileName)); + if (!exists) { + throw file_not_found(); + } + return Reference( + new ReadFile(self->asyncTaskThread, self->containerName, fileName, self->client.get())); + } + + Future> readFile(std::string fileName) override { return readFile_impl(this, fileName); } + + ACTOR static Future> writeFile_impl(BackupContainerAzureBlobStore* self, + std::string fileName) { + wait(self->asyncTaskThread.execAsync( + [client = self->client.get(), containerName = self->containerName, fileName = fileName] { + auto outcome = client->create_append_blob(containerName, fileName).get(); + return Void(); + })); + return Reference( + new BackupFile(fileName, Reference(new WriteFile(self->asyncTaskThread, self->containerName, + fileName, self->client.get())))); + } + + Future> writeFile(const std::string& fileName) override { + return writeFile_impl(this, fileName); + } + + static bool isDirectory(const std::string& blobName) { return blobName.size() && blobName.back() == '/'; } + + static void listFilesImpl(AzureClient* client, const std::string& containerName, const std::string& path, + std::function folderPathFilter, FilesAndSizesT& result) { + auto resp = client->list_blobs_segmented(containerName, "/", "", path).get().response(); + for (const auto& blob : resp.blobs) { + if (isDirectory(blob.name) && folderPathFilter(blob.name)) { + listFilesImpl(client, containerName, blob.name, folderPathFilter, result); + } else { + result.emplace_back(blob.name, blob.content_length); + } + } + } + + Future listFiles(std::string path = "", + std::function folderPathFilter = nullptr) { + return asyncTaskThread.execAsync([client = this->client.get(), containerName = this->containerName, path = path, + folderPathFilter = folderPathFilter] { + FilesAndSizesT result; + listFilesImpl(client, containerName, path, folderPathFilter, result); + return result; + }); + } + + Future deleteFile(std::string fileName) override { + return asyncTaskThread.execAsync( + [containerName = this->containerName, fileName = fileName, client = client.get()]() { + client->delete_blob(containerName, fileName).wait(); + return Void(); + }); + } + + Future deleteContainer(int* pNumDeleted) override { + // TODO: Update pNumDeleted? + return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] { + client->delete_container(containerName).wait(); + return Void(); + }); + } +}; + const std::string BackupContainerBlobStore::DATAFOLDER = "data"; const std::string BackupContainerBlobStore::INDEXFOLDER = "backups"; @@ -2027,9 +2279,9 @@ Reference IBackupContainer::openContainer(std::string url) try { StringRef u(url); - if(u.startsWith(LiteralStringRef("file://"))) + if (u.startsWith(LiteralStringRef("file://"))) { r = Reference(new BackupContainerLocalDirectory(url)); - else if(u.startsWith(LiteralStringRef("blobstore://"))) { + } else if (u.startsWith(LiteralStringRef("blobstore://"))) { std::string resource; // The URL parameters contain blobstore endpoint tunables as well as possible backup-specific options. @@ -2042,8 +2294,9 @@ Reference IBackupContainer::openContainer(std::string url) if(!isalnum(c) && c != '_' && c != '-' && c != '.' && c != '/') throw backup_invalid_url(); r = Reference(new BackupContainerBlobStore(bstore, resource, backupParams)); - } - else { + } else if (u.startsWith(LiteralStringRef("http"))) { + r = Reference(new BackupContainerAzureBlobStore()); + } else { lastOpenError = "invalid URL prefix"; throw backup_invalid_url(); } diff --git a/fdbclient/BackupContainer.h b/fdbclient/BackupContainer.h index 1b7fa7a7eb..71736c6902 100644 --- a/fdbclient/BackupContainer.h +++ b/fdbclient/BackupContainer.h @@ -303,4 +303,4 @@ private: std::string URL; }; -#endif \ No newline at end of file +#endif diff --git a/fdbclient/BackupContainerAzure.actor.cpp b/fdbclient/BackupContainerAzure.actor.cpp new file mode 100644 index 0000000000..da40122f2b --- /dev/null +++ b/fdbclient/BackupContainerAzure.actor.cpp @@ -0,0 +1,124 @@ +/* + * BackupContainerAzure.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2020 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "storage_credential.h" +#include "storage_account.h" +#include "blob/blob_client.h" + +#include "fdbclient/BackupContainer.h" +#include "flow/actorcompiler.h" // has to be last include + +class BackupContainerAzureBlobStore final : public IBackupContainer, ReferenceCounted { + + azure::storage_lite::blob_client client; + + using Self = BackupContainerAzureBlobStore; + + ACTOR static Future> writeLogFile_impl(Self* self, Version beginVersion, Version endVersion, + int blockSize) { + wait(delay(0.0)); + return Reference(new BackupFile("test")); + } + + ACTOR static Future> writeRangeFile_impl(Self* self, Version snapshotBeginVersion, + int snapshotFileCount, Version fileVersion, + int blockSize) { + wait(delay(0.0)); + return Reference(new BackupFile("test")); + } + + ACTOR static Future> writeTaggedLogFile_impl(Self* self, Version beginVersion, + Version endVersion, int blockSize, + uint16_t tagId, int totalTags) { + wait(delay(0.0)); + return Reference(new BackupFile("test")); + } + +public: + void addref() override { return ReferenceCounted::addref(); } + void delref() override { return ReferenceCounted::delref(); } + + Future create() override { + std::string account_name = std::getenv("AZURE_TESTACCOUNT"); //"devstoreaccount1"; + std::string account_key = std::getenv("AZURE_TESTKEY"); + bool use_https = true; + // std::string blob_endpoint = "https://127.0.0.1:10000/devstoreaccount1/"; + int connection_count = 2; + + auto credential = std::make_shared(account_name, account_key); + auto storage_account = + std::make_shared(account_name, credential, use_https); + + client = azure::storage_lite::blob_client(storage_account, connection_count); + return Void(); + } + + Future exists() override { return true; } + + class BackupFile : public IBackupFile, ReferenceCounted { + public: + BackupFile(std::string fileName) : IBackupFile(fileName) {} + Future append(const void* data, int len) override { return Void(); } + Future finish() override { return Void(); } + void addref() override { ReferenceCounted::addref(); } + void delref() override { ReferenceCounted::delref(); } + }; + + Future> writeLogFile(Version beginVersion, Version endVersion, int blockSize) override { + return writeLogFile_impl(this, beginVersion, endVersion, blockSize); + } + Future> writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount, + Version fileVersion, int blockSize) override { + return writeRangeFile_impl(this, snapshotBeginVersion, snapshotFileCount, fileVersion, blockSize); + } + + Future> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize, + uint16_t tagId, int totalTags) override { + return writeTaggedLogFile_impl(this, beginVersion, endVersion, blockSize, tagId, totalTags); + } + + Future writeKeyspaceSnapshotFile(const std::vector& fileNames, + const std::vector>& beginEndKeys, + int64_t totalBytes) override { + return delay(0.0); + } + Future> readFile(std::string name) override { return Reference(); } + Future getSnapshotFileKeyRange(const RangeFile& file) override { return KeyRange(normalKeys); } + + Future expireData(Version expireEndVersion, bool force = false, ExpireProgress* progress = nullptr, + Version restorableBeginVersion = std::numeric_limits::max()) override { + return Void(); + } + + Future deleteContainer(int* pNumDeleted = nullptr) override { return Void(); } + + Future describeBackup(bool deepScan = false, + Version logStartVersionOverride = invalidVersion) override { + return {}; + } + + Future dumpFileList(Version begin = 0, Version end = std::numeric_limits::max()) override { + return {}; + } + + Future> getRestoreSet(Version targetVersion) { return {}; } +}; diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index 3f7333b632..194aec7ecb 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -3,6 +3,7 @@ set(FDBCLIENT_SRCS AsyncFileBlobStore.actor.h Atomic.h AutoPublicAddress.cpp + AsyncTaskThread.h BackupAgent.actor.h BackupAgentBase.actor.cpp BackupContainer.actor.cpp @@ -93,6 +94,35 @@ set(options_srcs ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.cpp) vexillographer_compile(TARGET fdboptions LANG cpp OUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.h ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.cpp) +configure_file(azurestorage.cmake azurestorage-download/CMakeLists.txt) + +execute_process( + COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" . + RESULT_VARIABLE results + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/azurestorage-download +) + +if(results) + message(FATAL_ERROR "Configuration step for AzureStorage has Failed. ${results}") +endif() + +execute_process( + COMMAND ${CMAKE_COMMAND} --build . --config Release + RESULT_VARIABLE results + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/azurestorage-download +) + +if(results) + message(FATAL_ERROR "Build step for AzureStorage has Failed. ${results}") +endif() + +add_subdirectory( + ${CMAKE_CURRENT_BINARY_DIR}/azurestorage-src + ${CMAKE_CURRENT_BINARY_DIR}/azurestorage-build + #EXCLUDE_FROM_ALL +) + add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs}) add_dependencies(fdbclient fdboptions) -target_link_libraries(fdbclient PUBLIC fdbrpc) +#target_include_directories(fdbclient PRIVATE /home/tclinkenbeard/Azure/azure-storage-cpplite/include) +target_link_libraries(fdbclient PUBLIC fdbrpc curl uuid azure-storage-lite) diff --git a/fdbclient/azurestorage.cmake b/fdbclient/azurestorage.cmake new file mode 100644 index 0000000000..36f8e24f6e --- /dev/null +++ b/fdbclient/azurestorage.cmake @@ -0,0 +1,15 @@ +project(azurestorage-download) + +include(ExternalProject) +ExternalProject_Add(azurestorage + GIT_REPOSITORY https://github.com/Azure/azure-storage-cpplite.git + GIT_TAG 11e1f98b021446ef340f4886796899a6eb1ad9a5 # v0.3.0 + SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/azurestorage-src" + BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/azurestorage-build" + CMAKE_ARGS "-DCMAKE_BUILD_TYPE=Release" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" + BUILD_BYPRODUCTS "${CMAKE_CURRENT_BINARY_DIR}/libazure-storage-lite.a" +) diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index f32c719e54..9180b521c2 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -292,7 +292,7 @@ TransportData::TransportData(uint64_t transportId) #pragma pack( push, 1 ) struct ConnectPacket { - // The value does not inclueds the size of `connectPacketLength` itself, + // The value does not include the size of `connectPacketLength` itself, // but only the other fields of this structure. uint32_t connectPacketLength; ProtocolVersion protocolVersion; // Expect currentProtocolVersion diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 00a8be3238..2b1394e39d 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -123,6 +123,7 @@ set(FDBSERVER_SRCS workloads/BackupCorrectness.actor.cpp workloads/BackupAndParallelRestoreCorrectness.actor.cpp workloads/ParallelRestore.actor.cpp + workloads/BackupToBlob.actor.cpp workloads/BackupToDBAbort.actor.cpp workloads/BackupToDBCorrectness.actor.cpp workloads/BackupToDBUpgrade.actor.cpp @@ -183,6 +184,7 @@ set(FDBSERVER_SRCS workloads/RemoveServersSafely.actor.cpp workloads/ReportConflictingKeys.actor.cpp workloads/RestoreBackup.actor.cpp + workloads/RestoreFromBlob.actor.cpp workloads/Rollback.actor.cpp workloads/RyowCorrectness.actor.cpp workloads/RYWDisable.actor.cpp diff --git a/fdbserver/workloads/BackupCorrectness.actor.cpp b/fdbserver/workloads/BackupCorrectness.actor.cpp index fac28ce5cd..d625dbf1c7 100644 --- a/fdbserver/workloads/BackupCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupCorrectness.actor.cpp @@ -387,7 +387,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { state Future cp = changePaused(cx, &backupAgent); } - // Increment the backup agent requets + // Increment the backup agent requests if (self->agentRequest) { BackupAndRestoreCorrectnessWorkload::backupAgentRequests ++; } diff --git a/fdbserver/workloads/BackupToBlob.actor.cpp b/fdbserver/workloads/BackupToBlob.actor.cpp new file mode 100644 index 0000000000..64e7bfc1a0 --- /dev/null +++ b/fdbserver/workloads/BackupToBlob.actor.cpp @@ -0,0 +1,65 @@ +/* + * BackupToBlob.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbrpc/simulator.h" +#include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/BackupContainer.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +struct BackupToBlobWorkload : TestWorkload { + double backupAfter; + Key backupTag; + Standalone backupURL; + int snapshotInterval = 100000; + + static constexpr const char* DESCRIPTION = "BackupToBlob"; + + BackupToBlobWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + backupAfter = getOption(options, LiteralStringRef("backupAfter"), 10.0); + backupTag = getOption(options, LiteralStringRef("backupTag"), BackupAgentBase::getDefaultTag()); + backupURL = getOption(options, LiteralStringRef("backupURL"), LiteralStringRef("http://0.0.0.0:10000")); + } + + std::string description() override { return DESCRIPTION; } + + Future setup(Database const& cx) override { return Void(); } + + ACTOR static Future _start(Database cx, BackupToBlobWorkload* self) { + state FileBackupAgent backupAgent; + state Standalone> backupRanges; + backupRanges.push_back_deep(backupRanges.arena(), normalKeys); + + wait(delay(self->backupAfter)); + wait(backupAgent.submitBackup(cx, self->backupURL, self->snapshotInterval, self->backupTag.toString(), + backupRanges)); + EBackupState backupStatus = wait(backupAgent.waitBackup(cx, self->backupTag.toString(), true)); + TraceEvent("BackupToBlob_BackupStatus").detail("Status", BackupAgentBase::getStateText(backupStatus)); + return Void(); + } + + Future start(Database const& cx) override { return clientId ? Void() : _start(cx, this); } + + Future check(Database const& cx) override { return true; } + + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory BackupToBlobWorkloadFactory(BackupToBlobWorkload::DESCRIPTION); diff --git a/fdbserver/workloads/RestoreFromBlob.actor.cpp b/fdbserver/workloads/RestoreFromBlob.actor.cpp new file mode 100644 index 0000000000..ed62920269 --- /dev/null +++ b/fdbserver/workloads/RestoreFromBlob.actor.cpp @@ -0,0 +1,65 @@ +/* + * RestoreFromBlob.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbrpc/simulator.h" +#include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/BackupContainer.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/workloads/BulkSetup.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +struct RestoreFromBlobWorkload : TestWorkload { + double restoreAfter; + Key backupTag; + Standalone backupURL; + bool waitForComplete; + + static constexpr const char* DESCRIPTION = "RestoreFromBlob"; + + RestoreFromBlobWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + restoreAfter = getOption(options, LiteralStringRef("restoreAfter"), 10.0); + backupTag = getOption(options, LiteralStringRef("backupTag"), BackupAgentBase::getDefaultTag()); + backupURL = getOption(options, LiteralStringRef("backupURL"), LiteralStringRef("http://0.0.0.0:10000")); + waitForComplete = getOption(options, LiteralStringRef("waitForComplete"), true); + } + + std::string description() override { return DESCRIPTION; } + + Future setup(Database const& cx) override { return Void(); } + + ACTOR static Future _start(Database cx, RestoreFromBlobWorkload* self) { + state FileBackupAgent backupAgent; + state Standalone> restoreRanges; + restoreRanges.push_back_deep(restoreRanges.arena(), normalKeys); + + wait(delay(self->restoreAfter)); + Version v = + wait(backupAgent.restore(cx, {}, self->backupTag, self->backupURL, restoreRanges, self->waitForComplete)); + return Void(); + } + + Future start(Database const& cx) override { return clientId ? Void() : _start(cx, this); } + + Future check(Database const& cx) override { return true; } + + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory RestoreFromBlobWorkloadFactory(RestoreFromBlobWorkload::DESCRIPTION); diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index 9e90539db5..d710764c69 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -308,9 +308,8 @@ Future mapAsync(Future what, F actorFunc) { } //maps a vector of futures with an asynchronous function -template -std::vector>> mapAsync(std::vector> const& what, F const& actorFunc) -{ +template +auto mapAsync(std::vector> const& what, F const& actorFunc) { std::vector> ret; ret.reserve(what.size()); for (const auto& f : what) ret.push_back(mapAsync(f, actorFunc)); @@ -367,9 +366,8 @@ Future> map(Future what, F func) } //maps a vector of futures -template -std::vector>> map(std::vector> const& what, F const& func) -{ +template +auto map(std::vector> const& what, F const& func) { std::vector>> ret; ret.reserve(what.size()); for (const auto& f : what) ret.push_back(map(f, func)); @@ -443,9 +441,7 @@ Future asyncFilter( FutureStream input, F actorPred, PromiseStream o loop { try { choose { - when ( T nextInput = waitNext(input) ) { - futures.push_back( std::pair>(nextInput, actorPred(nextInput)) ); - } + when(T nextInput = waitNext(input)) { futures.emplace_back(nextInput, actorPred(nextInput)); } when ( bool pass = wait( futures.size() == 0 ? Never() : futures.front().second ) ) { if(pass) output.send(futures.front().first); futures.pop_front(); @@ -1309,7 +1305,8 @@ private: Promise broken_on_destruct; ACTOR static Future takeActor(FlowLock* lock, TaskPriority taskID, int64_t amount) { - state std::list, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise(), amount)); + state std::list, int64_t>>::iterator it = + lock->takers.emplace(lock->takers.end(), Promise(), amount); try { wait( it->first.getFuture() ); @@ -1366,7 +1363,7 @@ struct NotifiedInt { Future whenAtLeast( int64_t limit ) { if (val >= limit) return Void(); Promise p; - waiting.push( std::make_pair(limit,p) ); + waiting.emplace(limit, p); return p.getFuture(); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 89bef9dcc3..c02d3c4a4b 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -110,6 +110,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/AtomicOpsApiCorrectness.toml) add_fdb_test(TEST_FILES fast/BackupCorrectness.toml) add_fdb_test(TEST_FILES fast/BackupCorrectnessClean.toml) + add_fdb_test(TEST_FILES fast/BackupToBlob.toml) add_fdb_test(TEST_FILES fast/BackupToDBCorrectness.toml) add_fdb_test(TEST_FILES fast/BackupToDBCorrectnessClean.toml) add_fdb_test(TEST_FILES fast/CacheTest.toml) diff --git a/tests/fast/BackupToBlob.toml b/tests/fast/BackupToBlob.toml new file mode 100644 index 0000000000..8ce8e6667a --- /dev/null +++ b/tests/fast/BackupToBlob.toml @@ -0,0 +1,40 @@ +[[test]] +testTitle = 'Cycle' +clearAfterTest = 'false' +simBackupAgents = 'BackupToFile' + + [[test.workload]] + testName = 'Cycle' + nodeCount = 3000 + testDuration = 10.0 + expectedRate = 0 + +[[test]] +testTitle = 'Backup' + + [[test.workload]] + testName = 'BackupToBlob' + backupAfter = 0.0 + backupTag = 'default' + backupURL = 'http://0.0.0.0:10000' + +[[test]] +testTitle = 'Restore' +clearAfterTest = 'false' + + [[test.workload]] + testName = 'RestoreFromBlob' + restoreAfter = 0.0 + backupTag = 'default' + backupURL = 'http://0.0.0.0:10000' + + +[[test]] +testTitle = 'CycleCheck' +checkOnly = 'true' + + [[test.workload]] + testName = 'Cycle' + nodeCount = 3000 + testDuration = 10.0 + expectedRate = 0