/* * BackupContainerFileSystem.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbclient/BackupAgent.actor.h" #include "fdbclient/BackupContainerAzureBlobStore.h" #include "fdbclient/BackupContainerFileSystem.h" #include "fdbclient/BackupContainerLocalDirectory.h" #include "fdbclient/JsonBuilder.h" #include "flow/UnitTest.h" #include #include #include "flow/actorcompiler.h" // This must be the last #include. class BackupContainerFileSystemImpl { public: // TODO: Do this more efficiently, as the range file list for a snapshot could potentially be hundreds of // megabytes. ACTOR static Future, std::map>> readKeyspaceSnapshot( Reference bc, KeyspaceSnapshotFile snapshot) { // Read the range file list for the specified version range, and then index them by fileName. // This is so we can verify that each of the files listed in the manifest file are also in the container at this // time. std::vector files = wait(bc->listRangeFiles(snapshot.beginVersion, snapshot.endVersion)); state std::map rangeIndex; for (auto& f : files) rangeIndex[f.fileName] = std::move(f); // Read the snapshot file, verify the version range, then find each of the range files by name in the index and // return them. state Reference f = wait(bc->readFile(snapshot.fileName)); int64_t size = wait(f->size()); state Standalone buf = makeString(size); wait(success(f->read(mutateString(buf), buf.size(), 0))); json_spirit::mValue json; json_spirit::read_string(buf.toString(), json); JSONDoc doc(json); Version v; if (!doc.tryGet("beginVersion", v) || v != snapshot.beginVersion) throw restore_corrupted_data(); if (!doc.tryGet("endVersion", v) || v != snapshot.endVersion) throw restore_corrupted_data(); json_spirit::mValue& filesArray = doc.create("files"); if (filesArray.type() != json_spirit::array_type) throw restore_corrupted_data(); std::vector results; int missing = 0; for (auto const& fileValue : filesArray.get_array()) { if (fileValue.type() != json_spirit::str_type) throw restore_corrupted_data(); // If the file is not in the index then log the error but don't throw yet, keep checking the whole list. auto i = rangeIndex.find(fileValue.get_str()); if (i == rangeIndex.end()) { TraceEvent(SevError, "FileRestoreMissingRangeFile") .detail("URL", bc->getURL()) .detail("File", fileValue.get_str()); ++missing; } // No point in using more memory once data is missing since an error will be thrown instead. if (missing == 0) { results.push_back(i->second); } } if (missing > 0) { TraceEvent(SevError, "FileRestoreMissingRangeFileSummary") .detail("URL", bc->getURL()) .detail("Count", missing); throw restore_missing_data(); } // Check key ranges for files std::map fileKeyRanges; JSONDoc ranges = doc.subDoc("keyRanges"); // Create an empty doc if not existed for (auto i : ranges.obj()) { const std::string& filename = i.first; JSONDoc fields(i.second); std::string begin, end; if (fields.tryGet("beginKey", begin) && fields.tryGet("endKey", end)) { TraceEvent("ManifestFields") .detail("File", filename) .detail("Begin", printable(StringRef(begin))) .detail("End", printable(StringRef(end))); fileKeyRanges.emplace(filename, KeyRange(KeyRangeRef(StringRef(begin), StringRef(end)))); } else { TraceEvent("MalFormattedManifest").detail("Key", filename); throw restore_corrupted_data(); } } return std::make_pair(results, fileKeyRanges); } // Backup log types static constexpr Version NON_PARTITIONED_MUTATION_LOG = 0; static constexpr Version PARTITIONED_MUTATION_LOG = 1; // Find what should be the filename of a path by finding whatever is after the last forward or backward slash, or // failing to find those, the whole string. static std::string fileNameOnly(const std::string& path) { // Find the last forward slash position, defaulting to 0 if not found int pos = path.find_last_of('/'); if (pos == std::string::npos) { pos = 0; } // Find the last backward slash position after pos, and update pos if found int b = path.find_last_of('\\', pos); if (b != std::string::npos) { pos = b; } return path.substr(pos + 1); } static bool pathToRangeFile(RangeFile& out, const std::string& path, int64_t size) { std::string name = fileNameOnly(path); RangeFile f; f.fileName = path; f.fileSize = size; int len; if (sscanf(name.c_str(), "range,%" SCNd64 ",%*[^,],%u%n", &f.version, &f.blockSize, &len) == 2 && len == name.size()) { out = f; return true; } return false; } ACTOR static Future writeKeyspaceSnapshotFile(Reference bc, std::vector fileNames, std::vector> beginEndKeys, int64_t totalBytes) { ASSERT(!fileNames.empty() && fileNames.size() == beginEndKeys.size()); state Version minVer = std::numeric_limits::max(); state Version maxVer = 0; state RangeFile rf; state json_spirit::mArray fileArray; state int i; // Validate each filename, update version range for (i = 0; i < fileNames.size(); ++i) { auto const& f = fileNames[i]; if (pathToRangeFile(rf, f, 0)) { fileArray.push_back(f); if (rf.version < minVer) minVer = rf.version; if (rf.version > maxVer) maxVer = rf.version; } else throw restore_unknown_file_type(); wait(yield()); } state json_spirit::mValue json; state JSONDoc doc(json); doc.create("files") = std::move(fileArray); doc.create("totalBytes") = totalBytes; doc.create("beginVersion") = minVer; doc.create("endVersion") = maxVer; auto ranges = doc.subDoc("keyRanges"); for (int i = 0; i < beginEndKeys.size(); i++) { auto fileDoc = ranges.subDoc(fileNames[i], /*split=*/false); fileDoc.create("beginKey") = beginEndKeys[i].first.toString(); fileDoc.create("endKey") = beginEndKeys[i].second.toString(); } wait(yield()); state std::string docString = json_spirit::write_string(json); state Reference f = wait(bc->writeFile(format("snapshots/snapshot,%lld,%lld,%lld", minVer, maxVer, totalBytes))); wait(f->append(docString.data(), docString.size())); wait(f->finish()); return Void(); } ACTOR static Future dumpFileList(Reference bc, Version begin, Version end) { state Future> fRanges = bc->listRangeFiles(begin, end); state Future> fSnapshots = bc->listKeyspaceSnapshots(begin, end); state std::vector logs; state std::vector pLogs; wait(success(fRanges) && success(fSnapshots) && store(logs, bc->listLogFiles(begin, end, false)) && store(pLogs, bc->listLogFiles(begin, end, true))); logs.insert(logs.end(), std::make_move_iterator(pLogs.begin()), std::make_move_iterator(pLogs.end())); return BackupFileList({ fRanges.get(), std::move(logs), fSnapshots.get() }); } static Version resolveRelativeVersion(Optional max, Version v, const char* name, Error e) { if (v == invalidVersion) { TraceEvent(SevError, "BackupExpireInvalidVersion").detail(name, v); throw e; } if (v < 0) { if (!max.present()) { TraceEvent(SevError, "BackupExpireCannotResolveRelativeVersion").detail(name, v); throw e; } v += max.get(); } return v; } // For a list of log files specified by their indices (of the same tag), // returns if they are continous in the range [begin, end]. If "tags" is not // nullptr, then it will be populated with [begin, end] -> tags, where next // pair's begin <= previous pair's end + 1. On return, the last pair's end // version (inclusive) gives the continuous range from begin. static bool isContinuous(const std::vector& files, const std::vector& indices, Version begin, Version end, std::map, int>* tags) { Version lastBegin = invalidVersion; Version lastEnd = invalidVersion; int lastTags = -1; ASSERT(tags == nullptr || tags->empty()); for (int idx : indices) { const LogFile& file = files[idx]; if (lastEnd == invalidVersion) { if (file.beginVersion > begin) return false; if (file.endVersion > begin) { lastBegin = begin; lastTags = file.totalTags; } else { continue; } } else if (lastEnd < file.beginVersion) { if (tags != nullptr) { tags->emplace(std::make_pair(lastBegin, lastEnd - 1), lastTags); } return false; } if (lastTags != file.totalTags) { if (tags != nullptr) { tags->emplace(std::make_pair(lastBegin, file.beginVersion - 1), lastTags); } lastBegin = file.beginVersion; lastTags = file.totalTags; } lastEnd = file.endVersion; if (lastEnd > end) break; } if (tags != nullptr && lastBegin != invalidVersion) { tags->emplace(std::make_pair(lastBegin, std::min(end, lastEnd - 1)), lastTags); } return lastBegin != invalidVersion && lastEnd > end; } // Returns the end version such that [begin, end] is continuous. // "logs" should be already sorted. static Version getPartitionedLogsContinuousEndVersion(const std::vector& logs, Version begin) { Version end = 0; std::map> tagIndices; // tagId -> indices in files for (int i = 0; i < logs.size(); i++) { ASSERT(logs[i].tagId >= 0); ASSERT(logs[i].tagId < logs[i].totalTags); auto& indices = tagIndices[logs[i].tagId]; // filter out if indices.back() is subset of files[i] or vice versa if (!indices.empty()) { if (logs[indices.back()].isSubset(logs[i])) { ASSERT(logs[indices.back()].fileSize <= logs[i].fileSize); indices.back() = i; } else if (!logs[i].isSubset(logs[indices.back()])) { indices.push_back(i); } } else { indices.push_back(i); } end = std::max(end, logs[i].endVersion - 1); } TraceEvent("ContinuousLogEnd").detail("Begin", begin).detail("InitVersion", end); // check partition 0 is continuous in [begin, end] and create a map of ranges to partitions std::map, int> tags; // range [start, end] -> partitions isContinuous(logs, tagIndices[0], begin, end, &tags); if (tags.empty() || end <= begin) return 0; end = std::min(end, tags.rbegin()->first.second); TraceEvent("ContinuousLogEnd").detail("Partition", 0).detail("EndVersion", end).detail("Begin", begin); // for each range in tags, check all partitions from 1 are continouous Version lastEnd = begin; for (const auto& [beginEnd, count] : tags) { Version tagEnd = beginEnd.second; // This range's minimum continous partition version for (int i = 1; i < count; i++) { std::map, int> rangeTags; isContinuous(logs, tagIndices[i], beginEnd.first, beginEnd.second, &rangeTags); tagEnd = rangeTags.empty() ? 0 : std::min(tagEnd, rangeTags.rbegin()->first.second); TraceEvent("ContinuousLogEnd") .detail("Partition", i) .detail("EndVersion", tagEnd) .detail("RangeBegin", beginEnd.first) .detail("RangeEnd", beginEnd.second); if (tagEnd == 0) return lastEnd == begin ? 0 : lastEnd; } if (tagEnd < beginEnd.second) { return tagEnd; } lastEnd = beginEnd.second; } return end; } // Analyze partitioned logs and set contiguousLogEnd for "desc" if larger // than the "scanBegin" version. static void updatePartitionedLogsContinuousEnd(BackupDescription* desc, const std::vector& logs, const Version scanBegin, const Version scanEnd) { if (logs.empty()) return; Version snapshotBeginVersion = desc->snapshots.size() > 0 ? desc->snapshots[0].beginVersion : invalidVersion; Version begin = std::max(scanBegin, desc->minLogBegin.get()); TraceEvent("ContinuousLogEnd") .detail("ScanBegin", scanBegin) .detail("ScanEnd", scanEnd) .detail("Begin", begin) .detail("ContiguousLogEnd", desc->contiguousLogEnd.get()); for (const auto& file : logs) { if (file.beginVersion > begin) { if (scanBegin > 0) return; // scanBegin is 0 desc->minLogBegin = file.beginVersion; begin = file.beginVersion; } Version ver = getPartitionedLogsContinuousEndVersion(logs, begin); if (ver >= desc->contiguousLogEnd.get()) { // contiguousLogEnd is not inclusive, so +1 here. desc->contiguousLogEnd.get() = ver + 1; TraceEvent("UpdateContinuousLogEnd").detail("Version", ver + 1); if (ver > snapshotBeginVersion) return; } } } // Computes the continuous end version for non-partitioned mutation logs up to // the "targetVersion". If "outLogs" is not nullptr, it will be updated with // continuous log files. "*end" is updated with the continuous end version. static void computeRestoreEndVersion(const std::vector& logs, std::vector* outLogs, Version* end, Version targetVersion) { auto i = logs.begin(); if (outLogs != nullptr) outLogs->push_back(*i); // Add logs to restorable logs set until continuity is broken OR we reach targetVersion while (++i != logs.end()) { if (i->beginVersion > *end || i->beginVersion > targetVersion) break; // If the next link in the log chain is found, update the end if (i->beginVersion == *end) { if (outLogs != nullptr) outLogs->push_back(*i); *end = i->endVersion; } } } ACTOR static Future describeBackup(Reference bc, bool deepScan, Version logStartVersionOverride) { state BackupDescription desc; desc.url = bc->getURL(); TraceEvent("BackupContainerDescribe1") .detail("URL", bc->getURL()) .detail("LogStartVersionOverride", logStartVersionOverride); bool e = wait(bc->exists()); if (!e) { TraceEvent(SevWarnAlways, "BackupContainerDoesNotExist").detail("URL", bc->getURL()); throw backup_does_not_exist(); } // If logStartVersion is relative, then first do a recursive call without it to find the max log version // from which to resolve the relative version. // This could be handled more efficiently without recursion but it's tricky, this will do for now. if (logStartVersionOverride != invalidVersion && logStartVersionOverride < 0) { BackupDescription tmp = wait(bc->describeBackup(false, invalidVersion)); logStartVersionOverride = resolveRelativeVersion(tmp.maxLogEnd, logStartVersionOverride, "LogStartVersionOverride", invalid_option_value()); } // Get metadata versions state Optional metaLogBegin; state Optional metaLogEnd; state Optional metaExpiredEnd; state Optional metaUnreliableEnd; state Optional metaLogType; std::vector> metaReads; metaReads.push_back(store(metaExpiredEnd, bc->expiredEndVersion().get())); metaReads.push_back(store(metaUnreliableEnd, bc->unreliableEndVersion().get())); metaReads.push_back(store(metaLogType, bc->logType().get())); // Only read log begin/end versions if not doing a deep scan, otherwise scan files and recalculate them. if (!deepScan) { metaReads.push_back(store(metaLogBegin, bc->logBeginVersion().get())); metaReads.push_back(store(metaLogEnd, bc->logEndVersion().get())); } wait(waitForAll(metaReads)); TraceEvent("BackupContainerDescribe2") .detail("URL", bc->getURL()) .detail("LogStartVersionOverride", logStartVersionOverride) .detail("ExpiredEndVersion", metaExpiredEnd.orDefault(invalidVersion)) .detail("UnreliableEndVersion", metaUnreliableEnd.orDefault(invalidVersion)) .detail("LogBeginVersion", metaLogBegin.orDefault(invalidVersion)) .detail("LogEndVersion", metaLogEnd.orDefault(invalidVersion)) .detail("LogType", metaLogType.orDefault(-1)); // If the logStartVersionOverride is positive (not relative) then ensure that unreliableEndVersion is equal or // greater if (logStartVersionOverride != invalidVersion && metaUnreliableEnd.orDefault(invalidVersion) < logStartVersionOverride) { metaUnreliableEnd = logStartVersionOverride; } // Don't use metaLogBegin or metaLogEnd if any of the following are true, the safest // thing to do is rescan to verify log continuity and get exact begin/end versions // - either are missing // - metaLogEnd <= metaLogBegin (invalid range) // - metaLogEnd < metaExpiredEnd (log continuity exists in missing data range) // - metaLogEnd < metaUnreliableEnd (log continuity exists in incomplete data range) if (!metaLogBegin.present() || !metaLogEnd.present() || metaLogEnd.get() <= metaLogBegin.get() || metaLogEnd.get() < metaExpiredEnd.orDefault(invalidVersion) || metaLogEnd.get() < metaUnreliableEnd.orDefault(invalidVersion)) { TraceEvent(SevWarnAlways, "BackupContainerMetadataInvalid") .detail("URL", bc->getURL()) .detail("ExpiredEndVersion", metaExpiredEnd.orDefault(invalidVersion)) .detail("UnreliableEndVersion", metaUnreliableEnd.orDefault(invalidVersion)) .detail("LogBeginVersion", metaLogBegin.orDefault(invalidVersion)) .detail("LogEndVersion", metaLogEnd.orDefault(invalidVersion)); metaLogBegin = Optional(); metaLogEnd = Optional(); } // If the unreliable end version is not set or is < expiredEndVersion then increase it to expiredEndVersion. // Describe does not update unreliableEnd in the backup metadata for safety reasons as there is no // compare-and-set operation to atomically change it and an expire process could be advancing it simultaneously. if (!metaUnreliableEnd.present() || metaUnreliableEnd.get() < metaExpiredEnd.orDefault(0)) metaUnreliableEnd = metaExpiredEnd; desc.unreliableEndVersion = metaUnreliableEnd; desc.expiredEndVersion = metaExpiredEnd; // Start scanning at the end of the unreliable version range, which is the version before which data is likely // missing because an expire process has operated on that range. state Version scanBegin = desc.unreliableEndVersion.orDefault(0); state Version scanEnd = std::numeric_limits::max(); // Use the known log range if present // Logs are assumed to be contiguious between metaLogBegin and metaLogEnd, so initalize desc accordingly if (metaLogBegin.present() && metaLogEnd.present()) { // minLogBegin is the greater of the log begin metadata OR the unreliable end version since we can't count // on log file presence before that version. desc.minLogBegin = std::max(metaLogBegin.get(), desc.unreliableEndVersion.orDefault(0)); // Set the maximum known end version of a log file, so far, which is also the assumed contiguous log file // end version desc.maxLogEnd = metaLogEnd.get(); desc.contiguousLogEnd = desc.maxLogEnd; // Advance scanBegin to the contiguous log end version scanBegin = desc.contiguousLogEnd.get(); } state std::vector logs; state std::vector plogs; TraceEvent("BackupContainerListFiles").detail("URL", bc->getURL()); wait(store(logs, bc->listLogFiles(scanBegin, scanEnd, false)) && store(plogs, bc->listLogFiles(scanBegin, scanEnd, true)) && store(desc.snapshots, bc->listKeyspaceSnapshots())); TraceEvent("BackupContainerListFiles") .detail("URL", bc->getURL()) .detail("LogFiles", logs.size()) .detail("PLogsFiles", plogs.size()) .detail("Snapshots", desc.snapshots.size()); if (plogs.size() > 0) { desc.partitioned = true; logs.swap(plogs); } else { desc.partitioned = metaLogType.present() && metaLogType.get() == BackupContainerFileSystemImpl::PARTITIONED_MUTATION_LOG; } // List logs in version order so log continuity can be analyzed std::sort(logs.begin(), logs.end()); // Find out contiguous log end version if (!logs.empty()) { desc.maxLogEnd = logs.rbegin()->endVersion; // If we didn't get log versions above then seed them using the first log file if (!desc.contiguousLogEnd.present()) { desc.minLogBegin = logs.begin()->beginVersion; if (desc.partitioned) { // Cannot use the first file's end version, which may not be contiguous // for other partitions. Set to its beginVersion to be safe. desc.contiguousLogEnd = logs.begin()->beginVersion; } else { desc.contiguousLogEnd = logs.begin()->endVersion; } } if (desc.partitioned) { updatePartitionedLogsContinuousEnd(&desc, logs, scanBegin, scanEnd); } else { Version& end = desc.contiguousLogEnd.get(); computeRestoreEndVersion(logs, nullptr, &end, std::numeric_limits::max()); } } // Only update stored contiguous log begin and end versions if we did NOT use a log start override. // Otherwise, a series of describe operations can result in a version range which is actually missing data. if (logStartVersionOverride == invalidVersion) { // If the log metadata begin/end versions are missing (or treated as missing due to invalidity) or // differ from the newly calculated values for minLogBegin and contiguousLogEnd, respectively, // then attempt to update the metadata in the backup container but ignore errors in case the // container is not writeable. try { state Future updates = Void(); if (desc.minLogBegin.present() && metaLogBegin != desc.minLogBegin) { updates = updates && bc->logBeginVersion().set(desc.minLogBegin.get()); } if (desc.contiguousLogEnd.present() && metaLogEnd != desc.contiguousLogEnd) { updates = updates && bc->logEndVersion().set(desc.contiguousLogEnd.get()); } if (!metaLogType.present()) { updates = updates && bc->logType().set(desc.partitioned ? BackupContainerFileSystemImpl::PARTITIONED_MUTATION_LOG : BackupContainerFileSystemImpl::NON_PARTITIONED_MUTATION_LOG); } wait(updates); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) throw; TraceEvent(SevWarn, "BackupContainerMetadataUpdateFailure").error(e).detail("URL", bc->getURL()); } } for (auto& s : desc.snapshots) { // Calculate restorability of each snapshot. Assume true, then try to prove false s.restorable = true; // If this is not a single-version snapshot then see if the available contiguous logs cover its range if (s.beginVersion != s.endVersion) { if (!desc.minLogBegin.present() || desc.minLogBegin.get() > s.beginVersion) s.restorable = false; if (!desc.contiguousLogEnd.present() || desc.contiguousLogEnd.get() <= s.endVersion) s.restorable = false; } desc.snapshotBytes += s.totalSize; // If the snapshot is at a single version then it requires no logs. Update min and max restorable. // TODO: Somehow check / report if the restorable range is not or may not be contiguous. if (s.beginVersion == s.endVersion) { if (!desc.minRestorableVersion.present() || s.endVersion < desc.minRestorableVersion.get()) desc.minRestorableVersion = s.endVersion; if (!desc.maxRestorableVersion.present() || s.endVersion > desc.maxRestorableVersion.get()) desc.maxRestorableVersion = s.endVersion; } // If the snapshot is covered by the contiguous log chain then update min/max restorable. if (desc.minLogBegin.present() && s.beginVersion >= desc.minLogBegin.get() && s.endVersion < desc.contiguousLogEnd.get()) { if (!desc.minRestorableVersion.present() || s.endVersion < desc.minRestorableVersion.get()) desc.minRestorableVersion = s.endVersion; if (!desc.maxRestorableVersion.present() || (desc.contiguousLogEnd.get() - 1) > desc.maxRestorableVersion.get()) desc.maxRestorableVersion = desc.contiguousLogEnd.get() - 1; } } return desc; } ACTOR static Future expireData(Reference bc, Version expireEndVersion, bool force, IBackupContainer::ExpireProgress* progress, Version restorableBeginVersion) { if (progress != nullptr) { progress->step = "Describing backup"; progress->total = 0; } TraceEvent("BackupContainerFileSystemExpire1") .detail("URL", bc->getURL()) .detail("ExpireEndVersion", expireEndVersion) .detail("RestorableBeginVersion", restorableBeginVersion); // Get the backup description. state BackupDescription desc = wait(bc->describeBackup(false, expireEndVersion)); // Resolve relative versions using max log version expireEndVersion = resolveRelativeVersion(desc.maxLogEnd, expireEndVersion, "ExpireEndVersion", invalid_option_value()); restorableBeginVersion = resolveRelativeVersion(desc.maxLogEnd, restorableBeginVersion, "RestorableBeginVersion", invalid_option_value()); // It would be impossible to have restorability to any version < expireEndVersion after expiring to that version if (restorableBeginVersion < expireEndVersion) throw backup_cannot_expire(); // If the expire request is to a version at or before the previous version to which data was already deleted // then do nothing and just return if (expireEndVersion <= desc.expiredEndVersion.orDefault(invalidVersion)) { return Void(); } // Assume force is needed, then try to prove otherwise. // Force is required if there is not a restorable snapshot which both // - begins at or after expireEndVersion // - ends at or before restorableBeginVersion state bool forceNeeded = true; for (KeyspaceSnapshotFile& s : desc.snapshots) { if (s.restorable.orDefault(false) && s.beginVersion >= expireEndVersion && s.endVersion <= restorableBeginVersion) { forceNeeded = false; break; } } // If force is needed but not passed then refuse to expire anything. // Note that it is possible for there to be no actual files in the backup prior to expireEndVersion, // if they were externally deleted or an expire operation deleted them but was terminated before // updating expireEndVersion if (forceNeeded && !force) throw backup_cannot_expire(); // Start scan for files to delete at the last completed expire operation's end or 0. state Version scanBegin = desc.expiredEndVersion.orDefault(0); TraceEvent("BackupContainerFileSystemExpire2") .detail("URL", bc->getURL()) .detail("ExpireEndVersion", expireEndVersion) .detail("RestorableBeginVersion", restorableBeginVersion) .detail("ScanBeginVersion", scanBegin); state std::vector logs; state std::vector pLogs; // partitioned mutation logs state std::vector ranges; if (progress != nullptr) { progress->step = "Listing files"; } // Get log files or range files that contain any data at or before expireEndVersion wait(store(logs, bc->listLogFiles(scanBegin, expireEndVersion - 1, false)) && store(pLogs, bc->listLogFiles(scanBegin, expireEndVersion - 1, true)) && store(ranges, bc->listRangeFiles(scanBegin, expireEndVersion - 1))); logs.insert(logs.end(), std::make_move_iterator(pLogs.begin()), std::make_move_iterator(pLogs.end())); // The new logBeginVersion will be taken from the last log file, if there is one state Optional newLogBeginVersion; if (!logs.empty()) { // Linear scan the unsorted logs to find the latest one in sorted order LogFile& last = *std::max_element(logs.begin(), logs.end()); // If the last log ends at expireEndVersion then that will be the next log begin if (last.endVersion == expireEndVersion) { newLogBeginVersion = expireEndVersion; } else { // If the last log overlaps the expiredEnd then use the log's begin version and move the expiredEnd // back to match it and keep the last log file if (last.endVersion > expireEndVersion) { newLogBeginVersion = last.beginVersion; // Instead of modifying this potentially very large vector, just clear LogFile last = LogFile(); expireEndVersion = newLogBeginVersion.get(); } } } // Make a list of files to delete state std::vector toDelete; // Move filenames out of vector then destroy it to save memory for (auto const& f : logs) { // We may have cleared the last log file earlier so skip any empty filenames if (!f.fileName.empty()) { toDelete.push_back(std::move(f.fileName)); } } logs.clear(); // Move filenames out of vector then destroy it to save memory for (auto const& f : ranges) { // The file version must be checked here again because it is likely that expireEndVersion is in the middle // of a log file, in which case after the log and range file listings are done (using the original // expireEndVersion) the expireEndVersion will be moved back slightly to the begin version of the last log // file found (which is also the first log to not be deleted) if (f.version < expireEndVersion) { toDelete.push_back(std::move(f.fileName)); } } ranges.clear(); for (auto const& f : desc.snapshots) { if (f.endVersion < expireEndVersion) toDelete.push_back(std::move(f.fileName)); } desc = BackupDescription(); // We are about to start deleting files, at which point all data prior to expireEndVersion is considered // 'unreliable' as some or all of it will be missing. So before deleting anything, read unreliableEndVersion // (don't use cached value in desc) and update its value if it is missing or < expireEndVersion if (progress != nullptr) { progress->step = "Initial metadata update"; } Optional metaUnreliableEnd = wait(bc->unreliableEndVersion().get()); if (metaUnreliableEnd.orDefault(0) < expireEndVersion) { wait(bc->unreliableEndVersion().set(expireEndVersion)); } if (progress != nullptr) { progress->step = "Deleting files"; progress->total = toDelete.size(); progress->done = 0; } // Delete files, but limit parallelism because the file list could use a lot of memory and the corresponding // delete actor states would use even more if they all existed at the same time. state std::list> deleteFutures; while (!toDelete.empty() || !deleteFutures.empty()) { // While there are files to delete and budget in the deleteFutures list, start a delete while (!toDelete.empty() && deleteFutures.size() < CLIENT_KNOBS->BACKUP_CONCURRENT_DELETES) { deleteFutures.push_back(bc->deleteFile(toDelete.back())); toDelete.pop_back(); } // Wait for deletes to finish until there are only targetDeletesInFlight remaining. // If there are no files left to start then this value is 0, otherwise it is one less // than the delete concurrency limit. state int targetFuturesSize = toDelete.empty() ? 0 : (CLIENT_KNOBS->BACKUP_CONCURRENT_DELETES - 1); while (deleteFutures.size() > targetFuturesSize) { wait(deleteFutures.front()); if (progress != nullptr) { ++progress->done; } deleteFutures.pop_front(); } } if (progress != nullptr) { progress->step = "Final metadata update"; progress->total = 0; } // Update the expiredEndVersion metadata to indicate that everything prior to that version has been // successfully deleted if the current version is lower or missing Optional metaExpiredEnd = wait(bc->expiredEndVersion().get()); if (metaExpiredEnd.orDefault(0) < expireEndVersion) { wait(bc->expiredEndVersion().set(expireEndVersion)); } return Void(); } // Returns true if logs are continuous in the range [begin, end]. // "files" should be pre-sorted according to version order. static bool isPartitionedLogsContinuous(const std::vector& files, Version begin, Version end) { std::map> tagIndices; // tagId -> indices in files for (int i = 0; i < files.size(); i++) { ASSERT(files[i].tagId >= 0 && files[i].tagId < files[i].totalTags); auto& indices = tagIndices[files[i].tagId]; indices.push_back(i); } // check partition 0 is continuous and create a map of ranges to tags std::map, int> tags; // range [begin, end] -> tags if (!isContinuous(files, tagIndices[0], begin, end, &tags)) { TraceEvent(SevWarn, "BackupFileNotContinuous") .detail("Partition", 0) .detail("RangeBegin", begin) .detail("RangeEnd", end); return false; } // for each range in tags, check all tags from 1 are continouous for (const auto& [beginEnd, count] : tags) { for (int i = 1; i < count; i++) { if (!isContinuous(files, tagIndices[i], beginEnd.first, std::min(beginEnd.second - 1, end), nullptr)) { TraceEvent(SevWarn, "BackupFileNotContinuous") .detail("Partition", i) .detail("RangeBegin", beginEnd.first) .detail("RangeEnd", beginEnd.second); return false; } } } return true; } // Returns log files that are not duplicated, or subset of another log. // If a log file's progress is not saved, a new log file will be generated // with the same begin version. So we can have a file that contains a subset // of contents in another log file. // PRE-CONDITION: logs are already sorted by (tagId, beginVersion, endVersion). static std::vector filterDuplicates(const std::vector& logs) { std::vector filtered; int i = 0; for (int j = 1; j < logs.size(); j++) { if (logs[j].isSubset(logs[i])) { ASSERT(logs[j].fileSize <= logs[i].fileSize); continue; } if (!logs[i].isSubset(logs[j])) { filtered.push_back(logs[i]); } i = j; } if (i < logs.size()) filtered.push_back(logs[i]); return filtered; } static Optional getRestoreSetFromLogs(const std::vector& logs, Version targetVersion, RestorableFileSet restorable) { Version end = logs.begin()->endVersion; computeRestoreEndVersion(logs, &restorable.logs, &end, targetVersion); if (end >= targetVersion) { restorable.continuousBeginVersion = logs.begin()->beginVersion; restorable.continuousEndVersion = end; return Optional(restorable); } return Optional(); } ACTOR static Future> getRestoreSet(Reference bc, Version targetVersion, VectorRef keyRangesFilter, bool logsOnly = false, Version beginVersion = invalidVersion) { // Does not support use keyRangesFilter for logsOnly yet if (logsOnly && !keyRangesFilter.empty()) { TraceEvent(SevError, "BackupContainerRestoreSetUnsupportedAPI") .detail("KeyRangesFilter", keyRangesFilter.size()); return Optional(); } if (logsOnly) { state RestorableFileSet restorableSet; state std::vector logFiles; Version begin = beginVersion == invalidVersion ? 0 : beginVersion; wait(store(logFiles, bc->listLogFiles(begin, targetVersion, false))); // List logs in version order so log continuity can be analyzed std::sort(logFiles.begin(), logFiles.end()); if (!logFiles.empty()) { return getRestoreSetFromLogs(logFiles, targetVersion, restorableSet); } } // Find the most recent keyrange snapshot through which we can restore filtered key ranges into targetVersion. state std::vector snapshots = wait(bc->listKeyspaceSnapshots()); state int i = snapshots.size() - 1; for (; i >= 0; i--) { // The smallest version of filtered range files >= snapshot beginVersion > targetVersion if (targetVersion >= 0 && snapshots[i].beginVersion > targetVersion) { continue; } state RestorableFileSet restorable; state Version minKeyRangeVersion = MAX_VERSION; state Version maxKeyRangeVersion = -1; std::pair, std::map> results = wait(bc->readKeyspaceSnapshot(snapshots[i])); // Old backup does not have metadata about key ranges and can not be filtered with key ranges. if (keyRangesFilter.size() && results.second.empty() && !results.first.empty()) { throw backup_not_filterable_with_key_ranges(); } // Filter by keyRangesFilter. if (keyRangesFilter.empty()) { restorable.ranges = std::move(results.first); restorable.keyRanges = std::move(results.second); minKeyRangeVersion = snapshots[i].beginVersion; maxKeyRangeVersion = snapshots[i].endVersion; } else { for (const auto& rangeFile : results.first) { const auto& keyRange = results.second.at(rangeFile.fileName); if (keyRange.intersects(keyRangesFilter)) { restorable.ranges.push_back(rangeFile); restorable.keyRanges[rangeFile.fileName] = keyRange; minKeyRangeVersion = std::min(minKeyRangeVersion, rangeFile.version); maxKeyRangeVersion = std::max(maxKeyRangeVersion, rangeFile.version); } } // No range file matches 'keyRangesFilter'. if (restorable.ranges.empty()) { throw backup_not_overlapped_with_keys_filter(); } } // 'latestVersion' represents using the minimum restorable version in a snapshot. restorable.targetVersion = targetVersion == latestVersion ? maxKeyRangeVersion : targetVersion; // Any version < maxKeyRangeVersion is not restorable. if (restorable.targetVersion < maxKeyRangeVersion) continue; restorable.snapshot = snapshots[i]; // TODO: Reenable the sanity check after TooManyFiles error is resolved if (false && g_network->isSimulated()) { // Sanity check key ranges state std::map::iterator rit; for (rit = restorable.keyRanges.begin(); rit != restorable.keyRanges.end(); rit++) { auto it = std::find_if(restorable.ranges.begin(), restorable.ranges.end(), [file = rit->first](const RangeFile f) { return f.fileName == file; }); ASSERT(it != restorable.ranges.end()); KeyRange result = wait(bc->getSnapshotFileKeyRange(*it)); ASSERT(rit->second.begin <= result.begin && rit->second.end >= result.end); } } // No logs needed if there is a complete filtered key space snapshot at the target version. if (minKeyRangeVersion == maxKeyRangeVersion && maxKeyRangeVersion == restorable.targetVersion) { restorable.continuousBeginVersion = restorable.continuousEndVersion = invalidVersion; TraceEvent("BackupContainerGetRestorableFilesWithoutLogs") .detail("KeyRangeVersion", restorable.targetVersion) .detail("NumberOfRangeFiles", restorable.ranges.size()) .detail("KeyRangesFilter", printable(keyRangesFilter)); return Optional(restorable); } // FIXME: check if there are tagged logs. for each tag, there is no version gap. state std::vector logs; state std::vector plogs; wait(store(logs, bc->listLogFiles(minKeyRangeVersion, restorable.targetVersion, false)) && store(plogs, bc->listLogFiles(minKeyRangeVersion, restorable.targetVersion, true))); if (plogs.size() > 0) { logs.swap(plogs); // sort by tag ID so that filterDuplicates works. std::sort(logs.begin(), logs.end(), [](const LogFile& a, const LogFile& b) { return std::tie(a.tagId, a.beginVersion, a.endVersion) < std::tie(b.tagId, b.beginVersion, b.endVersion); }); // Remove duplicated log files that can happen for old epochs. std::vector filtered = filterDuplicates(logs); restorable.logs.swap(filtered); // sort by version order again for continuous analysis std::sort(restorable.logs.begin(), restorable.logs.end()); if (isPartitionedLogsContinuous(restorable.logs, minKeyRangeVersion, restorable.targetVersion)) { restorable.continuousBeginVersion = minKeyRangeVersion; restorable.continuousEndVersion = restorable.targetVersion + 1; // not inclusive return Optional(restorable); } return Optional(); } // List logs in version order so log continuity can be analyzed std::sort(logs.begin(), logs.end()); // If there are logs and the first one starts at or before the keyrange's snapshot begin version, then // it is valid restore set and proceed if (!logs.empty() && logs.front().beginVersion <= minKeyRangeVersion) { return getRestoreSetFromLogs(logs, targetVersion, restorable); } } return Optional(); } static std::string versionFolderString(Version v, int smallestBucket) { ASSERT(smallestBucket < 14); // Get a 0-padded fixed size representation of v std::string vFixedPrecision = format("%019lld", v); ASSERT(vFixedPrecision.size() == 19); // Truncate smallestBucket from the fixed length representation vFixedPrecision.resize(vFixedPrecision.size() - smallestBucket); // Split the remaining digits with a '/' 4 places from the right vFixedPrecision.insert(vFixedPrecision.size() - 4, 1, '/'); return vFixedPrecision; } // This useful for comparing version folder strings regardless of where their "/" dividers are, as it is possible // that division points would change in the future. static std::string cleanFolderString(std::string f) { f.erase(std::remove(f.begin(), f.end(), '/'), f.end()); return f; } // The innermost folder covers 100 seconds (1e8 versions) During a full speed backup it is possible though very // unlikely write about 10,000 snapshot range files during that time. static std::string old_rangeVersionFolderString(Version v) { return format("ranges/%s/", versionFolderString(v, 8).c_str()); } // Get the root folder for a snapshot's data based on its begin version static std::string snapshotFolderString(Version snapshotBeginVersion) { return format("kvranges/snapshot.%018" PRId64, snapshotBeginVersion); } // Extract the snapshot begin version from a path static Version extractSnapshotBeginVersion(const std::string& path) { Version snapshotBeginVersion; if (sscanf(path.c_str(), "kvranges/snapshot.%018" SCNd64, &snapshotBeginVersion) == 1) { return snapshotBeginVersion; } return invalidVersion; } // The innermost folder covers 100,000 seconds (1e11 versions) which is 5,000 mutation log files at current // settings. static std::string logVersionFolderString(Version v, bool partitioned) { return format("%s/%s/", (partitioned ? "plogs" : "logs"), versionFolderString(v, 11).c_str()); } static bool pathToLogFile(LogFile& out, const std::string& path, int64_t size) { std::string name = fileNameOnly(path); LogFile f; f.fileName = path; f.fileSize = size; int len; if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u%n", &f.beginVersion, &f.endVersion, &f.blockSize, &len) == 3 && len == name.size()) { out = f; return true; } else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%d-of-%d,%u%n", &f.beginVersion, &f.endVersion, &f.tagId, &f.totalTags, &f.blockSize, &len) == 5 && len == name.size() && f.tagId >= 0) { out = f; return true; } return false; } static bool pathToKeyspaceSnapshotFile(KeyspaceSnapshotFile& out, const std::string& path) { std::string name = fileNameOnly(path); KeyspaceSnapshotFile f; f.fileName = path; int len; if (sscanf(name.c_str(), "snapshot,%" SCNd64 ",%" SCNd64 ",%" SCNd64 "%n", &f.beginVersion, &f.endVersion, &f.totalSize, &len) == 3 && len == name.size()) { out = f; return true; } return false; } }; // class BackupContainerFileSystemImpl Future> BackupContainerFileSystem::writeLogFile(Version beginVersion, Version endVersion, int blockSize) { return writeFile(BackupContainerFileSystemImpl::logVersionFolderString(beginVersion, false) + format("log,%lld,%lld,%s,%d", beginVersion, endVersion, deterministicRandom()->randomUniqueID().toString().c_str(), blockSize)); } Future> BackupContainerFileSystem::writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize, uint16_t tagId, int totalTags) { return writeFile(BackupContainerFileSystemImpl::logVersionFolderString(beginVersion, true) + format("log,%lld,%lld,%s,%d-of-%d,%d", beginVersion, endVersion, deterministicRandom()->randomUniqueID().toString().c_str(), tagId, totalTags, blockSize)); } Future> BackupContainerFileSystem::writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount, Version fileVersion, int blockSize) { std::string fileName = format("range,%" PRId64 ",%s,%d", fileVersion, deterministicRandom()->randomUniqueID().toString().c_str(), blockSize); // In order to test backward compatibility in simulation, sometimes write to the old path format if (g_network->isSimulated() && deterministicRandom()->coinflip()) { return writeFile(BackupContainerFileSystemImpl::old_rangeVersionFolderString(fileVersion) + fileName); } return writeFile(BackupContainerFileSystemImpl::snapshotFolderString(snapshotBeginVersion) + format("/%d/", snapshotFileCount / (BUGGIFY ? 1 : 5000)) + fileName); } Future, std::map>> BackupContainerFileSystem::readKeyspaceSnapshot(KeyspaceSnapshotFile snapshot) { return BackupContainerFileSystemImpl::readKeyspaceSnapshot(Reference::addRef(this), snapshot); } Future BackupContainerFileSystem::writeKeyspaceSnapshotFile(const std::vector& fileNames, const std::vector>& beginEndKeys, int64_t totalBytes) { return BackupContainerFileSystemImpl::writeKeyspaceSnapshotFile(Reference::addRef(this), fileNames, beginEndKeys, totalBytes); }; Future> BackupContainerFileSystem::listLogFiles(Version beginVersion, Version targetVersion, bool partitioned) { // The first relevant log file could have a begin version less than beginVersion based on the knobs which // determine log file range size, so start at an earlier version adjusted by how many versions a file could // contain. // // Get the cleaned (without slashes) first and last folders that could contain relevant results. std::string firstPath = BackupContainerFileSystemImpl::cleanFolderString(BackupContainerFileSystemImpl::logVersionFolderString( std::max(0, beginVersion - CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE), partitioned)); std::string lastPath = BackupContainerFileSystemImpl::cleanFolderString( BackupContainerFileSystemImpl::logVersionFolderString(targetVersion, partitioned)); std::function pathFilter = [=](const std::string& folderPath) { // Remove slashes in the given folder path so that the '/' positions in the version folder string do not // matter std::string cleaned = BackupContainerFileSystemImpl::cleanFolderString(folderPath); return StringRef(firstPath).startsWith(cleaned) || StringRef(lastPath).startsWith(cleaned) || (cleaned > firstPath && cleaned < lastPath); }; return map(listFiles((partitioned ? "plogs/" : "logs/"), pathFilter), [=](const FilesAndSizesT& files) { std::vector results; LogFile lf; for (auto& f : files) { if (BackupContainerFileSystemImpl::pathToLogFile(lf, f.first, f.second) && lf.endVersion > beginVersion && lf.beginVersion <= targetVersion) results.push_back(lf); } return results; }); } Future> BackupContainerFileSystem::old_listRangeFiles(Version beginVersion, Version endVersion) { // Get the cleaned (without slashes) first and last folders that could contain relevant results. std::string firstPath = BackupContainerFileSystemImpl::cleanFolderString( BackupContainerFileSystemImpl::old_rangeVersionFolderString(beginVersion)); std::string lastPath = BackupContainerFileSystemImpl::cleanFolderString( BackupContainerFileSystemImpl::old_rangeVersionFolderString(endVersion)); std::function pathFilter = [=](const std::string& folderPath) { // Remove slashes in the given folder path so that the '/' positions in the version folder string do not // matter std::string cleaned = BackupContainerFileSystemImpl::cleanFolderString(folderPath); return StringRef(firstPath).startsWith(cleaned) || StringRef(lastPath).startsWith(cleaned) || (cleaned > firstPath && cleaned < lastPath); }; return map(listFiles("ranges/", pathFilter), [=](const FilesAndSizesT& files) { std::vector results; RangeFile rf; for (auto& f : files) { if (BackupContainerFileSystemImpl::pathToRangeFile(rf, f.first, f.second) && rf.version >= beginVersion && rf.version <= endVersion) results.push_back(rf); } return results; }); } Future> BackupContainerFileSystem::listRangeFiles(Version beginVersion, Version endVersion) { // Until the old folder scheme is no longer supported, read files stored using old folder scheme Future> oldFiles = old_listRangeFiles(beginVersion, endVersion); // Define filter function (for listFiles() implementations that use it) to reject any folder // starting after endVersion std::function pathFilter = [=](std::string const& path) { return BackupContainerFileSystemImpl::extractSnapshotBeginVersion(path) <= endVersion; }; Future> newFiles = map(listFiles("kvranges/", pathFilter), [=](const FilesAndSizesT& files) { std::vector results; RangeFile rf; for (auto& f : files) { if (BackupContainerFileSystemImpl::pathToRangeFile(rf, f.first, f.second) && rf.version >= beginVersion && rf.version <= endVersion) results.push_back(rf); } return results; }); return map(success(oldFiles) && success(newFiles), [=](Void _) { std::vector results = std::move(newFiles.get()); std::vector oldResults = std::move(oldFiles.get()); results.insert(results.end(), std::make_move_iterator(oldResults.begin()), std::make_move_iterator(oldResults.end())); return results; }); } Future> BackupContainerFileSystem::listKeyspaceSnapshots(Version begin, Version end) { return map(listFiles("snapshots/"), [=](const FilesAndSizesT& files) { std::vector results; KeyspaceSnapshotFile sf; for (auto& f : files) { if (BackupContainerFileSystemImpl::pathToKeyspaceSnapshotFile(sf, f.first) && sf.beginVersion < end && sf.endVersion >= begin) results.push_back(sf); } std::sort(results.begin(), results.end()); return results; }); } Future BackupContainerFileSystem::dumpFileList(Version begin, Version end) { return BackupContainerFileSystemImpl::dumpFileList(Reference::addRef(this), begin, end); } Future BackupContainerFileSystem::describeBackup(bool deepScan, Version logStartVersionOverride) { return BackupContainerFileSystemImpl::describeBackup(Reference::addRef(this), deepScan, logStartVersionOverride); } Future BackupContainerFileSystem::expireData(Version expireEndVersion, bool force, ExpireProgress* progress, Version restorableBeginVersion) { return BackupContainerFileSystemImpl::expireData(Reference::addRef(this), expireEndVersion, force, progress, restorableBeginVersion); } ACTOR static Future getSnapshotFileKeyRange_impl(Reference bc, RangeFile file) { state int readFileRetries = 0; state bool beginKeySet = false; state Key beginKey; state Key endKey; loop { try { state Reference inFile = wait(bc->readFile(file.fileName)); beginKeySet = false; state int64_t j = 0; for (; j < file.fileSize; j += file.blockSize) { int64_t len = std::min(file.blockSize, file.fileSize - j); Standalone> blockData = wait(fileBackup::decodeRangeFileBlock(inFile, j, len)); if (!beginKeySet) { beginKey = blockData.front().key; beginKeySet = true; } endKey = blockData.back().key; } break; } catch (Error& e) { if (e.code() == error_code_restore_bad_read || e.code() == error_code_restore_unsupported_file_version || e.code() == error_code_restore_corrupted_data_padding) { // no retriable error TraceEvent(SevError, "BackupContainerGetSnapshotFileKeyRange").error(e); throw; } else if (e.code() == error_code_http_request_failed || e.code() == error_code_connection_failed || e.code() == error_code_timed_out || e.code() == error_code_lookup_failed) { // blob http request failure, retry TraceEvent(SevWarnAlways, "BackupContainerGetSnapshotFileKeyRangeConnectionFailure") .detail("Retries", ++readFileRetries) .error(e); wait(delayJittered(0.1)); } else { TraceEvent(SevError, "BackupContainerGetSnapshotFileKeyRangeUnexpectedError").error(e); throw; } } } return KeyRange(KeyRangeRef(beginKey, endKey)); } ACTOR static Future writeVersionProperty(Reference bc, std::string path, Version v) { try { state Reference f = wait(bc->writeFile(path)); std::string s = format("%lld", v); wait(f->append(s.data(), s.size())); wait(f->finish()); return Void(); } catch (Error& e) { TraceEvent(SevWarn, "BackupContainerWritePropertyFailed") .error(e) .detail("URL", bc->getURL()) .detail("Path", path); throw; } } ACTOR static Future> readVersionProperty(Reference bc, std::string path) { try { state Reference f = wait(bc->readFile(path)); state int64_t size = wait(f->size()); state std::string s; s.resize(size); int rs = wait(f->read((uint8_t*)s.data(), size, 0)); Version v; int len; if (rs == size && sscanf(s.c_str(), "%" SCNd64 "%n", &v, &len) == 1 && len == size) return v; TraceEvent(SevWarn, "BackupContainerInvalidProperty").detail("URL", bc->getURL()).detail("Path", path); throw backup_invalid_info(); } catch (Error& e) { if (e.code() == error_code_file_not_found) return Optional(); TraceEvent(SevWarn, "BackupContainerReadPropertyFailed") .error(e) .detail("URL", bc->getURL()) .detail("Path", path); throw; } } Future BackupContainerFileSystem::getSnapshotFileKeyRange(const RangeFile& file) { ASSERT(g_network->isSimulated()); return getSnapshotFileKeyRange_impl(Reference::addRef(this), file); } Future> BackupContainerFileSystem::getRestoreSet(Version targetVersion, VectorRef keyRangesFilter, bool logsOnly, Version beginVersion) { return BackupContainerFileSystemImpl::getRestoreSet(Reference::addRef(this), targetVersion, keyRangesFilter, logsOnly, beginVersion); } Future> BackupContainerFileSystem::VersionProperty::get() { return readVersionProperty(bc, path); } Future BackupContainerFileSystem::VersionProperty::set(Version v) { return writeVersionProperty(bc, path, v); } Future BackupContainerFileSystem::VersionProperty::clear() { return bc->deleteFile(path); } BackupContainerFileSystem::VersionProperty BackupContainerFileSystem::logBeginVersion() { return { Reference::addRef(this), "log_begin_version" }; } BackupContainerFileSystem::VersionProperty BackupContainerFileSystem::logEndVersion() { return { Reference::addRef(this), "log_end_version" }; } BackupContainerFileSystem::VersionProperty BackupContainerFileSystem::expiredEndVersion() { return { Reference::addRef(this), "expired_end_version" }; } BackupContainerFileSystem::VersionProperty BackupContainerFileSystem::unreliableEndVersion() { return { Reference::addRef(this), "unreliable_end_version" }; } BackupContainerFileSystem::VersionProperty BackupContainerFileSystem::logType() { return { Reference::addRef(this), "mutation_log_type" }; } namespace backup_test { int chooseFileSize(std::vector& sizes) { int size = 1000; if (!sizes.empty()) { size = sizes.back(); sizes.pop_back(); } return size; } ACTOR Future writeAndVerifyFile(Reference c, Reference f, int size) { state Standalone content; if (size > 0) { content = makeString(size); for (int i = 0; i < content.size(); ++i) mutateString(content)[i] = (uint8_t)deterministicRandom()->randomInt(0, 256); wait(f->append(content.begin(), content.size())); } wait(f->finish()); state Reference inputFile = wait(c->readFile(f->getFileName())); int64_t fileSize = wait(inputFile->size()); ASSERT(size == fileSize); if (size > 0) { state Standalone buf = makeString(size); int b = wait(inputFile->read(mutateString(buf), buf.size(), 0)); ASSERT(b == buf.size()); ASSERT(buf == content); } return Void(); } // Randomly advance version by up to 1 second of versions Version nextVersion(Version v) { int64_t increment = deterministicRandom()->randomInt64(1, CLIENT_KNOBS->CORE_VERSIONSPERSECOND); return v + increment; } // Write a snapshot file with only begin & end key ACTOR static Future testWriteSnapshotFile(Reference file, Key begin, Key end, uint32_t blockSize) { ASSERT(blockSize > 3 * sizeof(uint32_t) + begin.size() + end.size()); uint32_t fileVersion = BACKUP_AGENT_SNAPSHOT_FILE_VERSION; // write Header wait(file->append((uint8_t*)&fileVersion, sizeof(fileVersion))); // write begin key length and key wait(file->appendStringRefWithLen(begin)); // write end key length and key wait(file->appendStringRefWithLen(end)); int bytesLeft = blockSize - file->size(); if (bytesLeft > 0) { Value paddings = fileBackup::makePadding(bytesLeft); wait(file->append(paddings.begin(), bytesLeft)); } wait(file->finish()); return Void(); } ACTOR static Future testBackupContainer(std::string url) { printf("BackupContainerTest URL %s\n", url.c_str()); state Reference c = IBackupContainer::openContainer(url); // Make sure container doesn't exist, then create it. try { wait(c->deleteContainer()); } catch (Error& e) { if (e.code() != error_code_backup_invalid_url && e.code() != error_code_backup_does_not_exist) throw; } wait(c->create()); state std::vector> writes; state std::map> snapshots; state std::map snapshotSizes; state std::map>> snapshotBeginEndKeys; state int nRangeFiles = 0; state std::map logs; state Version v = deterministicRandom()->randomInt64(0, std::numeric_limits::max() / 2); // List of sizes to use to test edge cases on underlying file implementations state std::vector fileSizes = { 0, 10000000, 5000005 }; loop { state Version logStart = v; state int kvfiles = deterministicRandom()->randomInt(0, 3); state Key begin = LiteralStringRef(""); state Key end = LiteralStringRef(""); state int blockSize = 3 * sizeof(uint32_t) + begin.size() + end.size() + 8; while (kvfiles > 0) { if (snapshots.empty()) { snapshots[v] = {}; snapshotBeginEndKeys[v] = {}; snapshotSizes[v] = 0; if (deterministicRandom()->coinflip()) { v = nextVersion(v); } } Reference range = wait(c->writeRangeFile(snapshots.rbegin()->first, 0, v, blockSize)); ++nRangeFiles; v = nextVersion(v); snapshots.rbegin()->second.push_back(range->getFileName()); snapshotBeginEndKeys.rbegin()->second.emplace_back(begin, end); int size = chooseFileSize(fileSizes); snapshotSizes.rbegin()->second += size; // Write in actual range file format, instead of random data. // writes.push_back(writeAndVerifyFile(c, range, size)); wait(testWriteSnapshotFile(range, begin, end, blockSize)); if (deterministicRandom()->random01() < .2) { writes.push_back(c->writeKeyspaceSnapshotFile( snapshots.rbegin()->second, snapshotBeginEndKeys.rbegin()->second, snapshotSizes.rbegin()->second)); snapshots[v] = {}; snapshotBeginEndKeys[v] = {}; snapshotSizes[v] = 0; break; } --kvfiles; } if (logStart == v || deterministicRandom()->coinflip()) { v = nextVersion(v); } state Reference log = wait(c->writeLogFile(logStart, v, 10)); logs[logStart] = log->getFileName(); int size = chooseFileSize(fileSizes); writes.push_back(writeAndVerifyFile(c, log, size)); // Randomly stop after a snapshot has finished and all manually seeded file sizes have been used. if (fileSizes.empty() && !snapshots.empty() && snapshots.rbegin()->second.empty() && deterministicRandom()->random01() < .2) { snapshots.erase(snapshots.rbegin()->first); break; } } wait(waitForAll(writes)); state BackupFileList listing = wait(c->dumpFileList()); ASSERT(listing.ranges.size() == nRangeFiles); ASSERT(listing.logs.size() == logs.size()); ASSERT(listing.snapshots.size() == snapshots.size()); state BackupDescription desc = wait(c->describeBackup()); printf("\n%s\n", desc.toString().c_str()); // Do a series of expirations and verify resulting state state int i = 0; for (; i < listing.snapshots.size(); ++i) { { // Ensure we can still restore to the latest version Optional rest = wait(c->getRestoreSet(desc.maxRestorableVersion.get())); ASSERT(rest.present()); } { // Ensure we can restore to the end version of snapshot i Optional rest = wait(c->getRestoreSet(listing.snapshots[i].endVersion)); ASSERT(rest.present()); } // Test expiring to the end of this snapshot state Version expireVersion = listing.snapshots[i].endVersion; // Expire everything up to but not including the snapshot end version printf("EXPIRE TO %" PRId64 "\n", expireVersion); state Future f = c->expireData(expireVersion); wait(ready(f)); // If there is an error, it must be backup_cannot_expire and we have to be on the last snapshot if (f.isError()) { ASSERT(f.getError().code() == error_code_backup_cannot_expire); ASSERT(i == listing.snapshots.size() - 1); wait(c->expireData(expireVersion, true)); } BackupDescription d = wait(c->describeBackup()); printf("\n%s\n", d.toString().c_str()); } printf("DELETING\n"); wait(c->deleteContainer()); state Future d = c->describeBackup(); wait(ready(d)); ASSERT(d.isError() && d.getError().code() == error_code_backup_does_not_exist); BackupFileList empty = wait(c->dumpFileList()); ASSERT(empty.ranges.size() == 0); ASSERT(empty.logs.size() == 0); ASSERT(empty.snapshots.size() == 0); printf("BackupContainerTest URL=%s PASSED.\n", url.c_str()); return Void(); } TEST_CASE("/backup/containers/localdir") { if (g_network->isSimulated()) wait(testBackupContainer(format("file://simfdb/backups/%llx", timer_int()))); else wait(testBackupContainer(format("file:///private/tmp/fdb_backups/%llx", timer_int()))); return Void(); }; TEST_CASE("/backup/containers/url") { if (!g_network->isSimulated()) { const char* url = getenv("FDB_TEST_BACKUP_URL"); ASSERT(url != nullptr); wait(testBackupContainer(url)); } return Void(); }; TEST_CASE("/backup/containers_list") { if (!g_network->isSimulated()) { state const char* url = getenv("FDB_TEST_BACKUP_URL"); ASSERT(url != nullptr); printf("Listing %s\n", url); std::vector urls = wait(IBackupContainer::listContainers(url)); for (auto& u : urls) { printf("%s\n", u.c_str()); } } return Void(); }; TEST_CASE("/backup/time") { // test formatTime() for (int i = 0; i < 1000; ++i) { int64_t ts = deterministicRandom()->randomInt64(0, std::numeric_limits::max()); ASSERT(BackupAgentBase::parseTime(BackupAgentBase::formatTime(ts)) == ts); } ASSERT(BackupAgentBase::parseTime("2019/03/18.17:51:11-0600") == BackupAgentBase::parseTime("2019/03/18.16:51:11-0700")); ASSERT(BackupAgentBase::parseTime("2019/03/31.22:45:07-0700") == BackupAgentBase::parseTime("2019/04/01.03:45:07-0200")); ASSERT(BackupAgentBase::parseTime("2019/03/31.22:45:07+0000") == BackupAgentBase::parseTime("2019/04/01.03:45:07+0500")); ASSERT(BackupAgentBase::parseTime("2019/03/31.22:45:07+0030") == BackupAgentBase::parseTime("2019/04/01.03:45:07+0530")); ASSERT(BackupAgentBase::parseTime("2019/03/31.22:45:07+0030") == BackupAgentBase::parseTime("2019/04/01.04:00:07+0545")); return Void(); } TEST_CASE("/backup/continuous") { std::vector files; // [0, 100) 2 tags files.push_back({ 0, 100, 10, "file1", 100, 0, 2 }); // Tag 0: 0-100 ASSERT(!BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 0, 99)); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 0) == 0); files.push_back({ 0, 100, 10, "file2", 200, 1, 2 }); // Tag 1: 0-100 std::sort(files.begin(), files.end()); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 0, 99)); ASSERT(!BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 0, 100)); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 0) == 99); // [100, 300) 3 tags files.push_back({ 100, 200, 10, "file3", 200, 0, 3 }); // Tag 0: 100-200 files.push_back({ 100, 250, 10, "file4", 200, 1, 3 }); // Tag 1: 100-250 std::sort(files.begin(), files.end()); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 0, 99)); ASSERT(!BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 0, 100)); ASSERT(!BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 50, 150)); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 0) == 99); files.push_back({ 100, 300, 10, "file5", 200, 2, 3 }); // Tag 2: 100-300 std::sort(files.begin(), files.end()); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 50, 150)); ASSERT(!BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 50, 200)); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 10, 199)); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 0) == 199); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 100) == 199); files.push_back({ 250, 300, 10, "file6", 200, 0, 3 }); // Tag 0: 250-300, missing 200-250 std::sort(files.begin(), files.end()); ASSERT(!BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 50, 240)); ASSERT(!BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 100, 280)); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 99) == 199); files.push_back({ 250, 300, 10, "file7", 200, 1, 3 }); // Tag 1: 250-300 std::sort(files.begin(), files.end()); ASSERT(!BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 100, 280)); files.push_back({ 200, 250, 10, "file8", 200, 0, 3 }); // Tag 0: 200-250 std::sort(files.begin(), files.end()); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 0, 299)); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 100, 280)); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 150) == 299); // [300, 400) 1 tag // files.push_back({200, 250, 10, "file9", 200, 0, 3}); // Tag 0: 200-250, duplicate file files.push_back({ 300, 400, 10, "file10", 200, 0, 1 }); // Tag 1: 300-400 std::sort(files.begin(), files.end()); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 0, 399)); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 100, 399)); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 150, 399)); ASSERT(BackupContainerFileSystemImpl::isPartitionedLogsContinuous(files, 250, 399)); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 0) == 399); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 99) == 399); ASSERT(BackupContainerFileSystemImpl::getPartitionedLogsContinuousEndVersion(files, 250) == 399); return Void(); } } // namespace backup_test