/* * FileConverter.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbbackup/FileConverter.h" #include #include #include #include #include #include "fdbclient/BackupAgent.actor.h" #include "fdbclient/BackupContainer.h" #include "fdbclient/MutationList.h" #include "flow/flow.h" #include "flow/serialize.h" #include "flow/actorcompiler.h" // has to be last include namespace file_converter { void printConvertUsage() { std::cout << "\n" << " -r, --container Container URL.\n" << " -b, --begin BEGIN\n" << " Begin version.\n" << " -e, --end END End version.\n" << " --log Enables trace file logging for the CLI session.\n" << " --logdir PATH Specifes the output directory for trace files. If\n" << " unspecified, defaults to the current directory. Has\n" << " no effect unless --log is specified.\n" << " --loggroup LOG_GROUP\n" << " Sets the LogGroup field with the specified value for all\n" << " events in the trace output (defaults to `default').\n" << " --trace_format FORMAT\n" << " Select the format of the trace files. xml (the default) and json are supported.\n" << " Has no effect unless --log is specified.\n" << " -h, --help Display this help and exit.\n" << "\n"; return; } void printLogFiles(std::string msg, const std::vector& files) { std::cout << msg << " " << files.size() << " log files\n"; for (const auto& file : files) { std::cout << file.toString() << "\n"; } std::cout << std::endl; } std::vector getRelevantLogFiles(const std::vector& files, Version begin, Version end) { std::vector filtered; for (const auto& file : files) { if (file.beginVersion <= end && file.endVersion >= begin && file.tagId >= 0 && file.fileSize > 0) { filtered.push_back(file); } } std::sort(filtered.begin(), filtered.end()); // Remove duplicates. This is because backup workers may store the log for // old epochs successfully, but do not update the progress before another // recovery happened. As a result, next epoch will retry and creates // duplicated log files. std::vector sorted; int i = 0; for (int j = 1; j < filtered.size(); j++) { if (!filtered[i].isSubset(filtered[j])) { sorted.push_back(filtered[i]); } i = j; } if (i < filtered.size()) { sorted.push_back(filtered[i]); } return sorted; } struct ConvertParams { std::string container_url; Version begin = invalidVersion; Version end = invalidVersion; bool log_enabled = false; std::string log_dir, trace_format, trace_log_group; bool isValid() { return begin != invalidVersion && end != invalidVersion && !container_url.empty(); } std::string toString() { std::string s; s.append("ContainerURL:"); s.append(container_url); s.append(" Begin:"); s.append(format("%" PRId64, begin)); s.append(" End:"); s.append(format("%" PRId64, end)); if (log_enabled) { if (!log_dir.empty()) { s.append(" LogDir:").append(log_dir); } if (!trace_format.empty()) { s.append(" Format:").append(trace_format); } if (!trace_log_group.empty()) { s.append(" LogGroup:").append(trace_log_group); } } return s; } }; struct VersionedData { LogMessageVersion version; StringRef message; // Serialized mutation. Arena arena; // The arena that contains mutation. VersionedData() : version(invalidVersion, -1) {} VersionedData(LogMessageVersion v, StringRef m, Arena a) : version(v), message(m), arena(a) {} }; struct MutationFilesReadProgress : public ReferenceCounted { MutationFilesReadProgress(std::vector& logs, Version begin, Version end) : files(logs), beginVersion(begin), endVersion(end) {} struct FileProgress : public ReferenceCounted { FileProgress(Reference f, int index) : fd(f), idx(index), offset(0), eof(false) {} bool operator<(const FileProgress& rhs) const { if (rhs.mutations.empty()) return true; if (mutations.empty()) return false; return mutations[0].version < rhs.mutations[0].version; } bool operator<=(const FileProgress& rhs) const { if (rhs.mutations.empty()) return true; if (mutations.empty()) return false; return mutations[0].version <= rhs.mutations[0].version; } bool empty() { return eof && mutations.empty(); } // Decodes the block into mutations and save them if >= minVersion and < maxVersion. // Returns true if new mutations has been saved. bool decodeBlock(const Standalone& buf, int len, Version minVersion, Version maxVersion) { StringRef block(buf.begin(), len); StringRefReader reader(block, restore_corrupted_data()); int count = 0, inserted = 0; Version msgVersion = invalidVersion; try { // Read block header if (reader.consume() != PARTITIONED_MLOG_VERSION) throw restore_unsupported_file_version(); while (1) { // If eof reached or first key len bytes is 0xFF then end of block was reached. if (reader.eof() || *reader.rptr == 0xFF) break; // Deserialize messages written in saveMutationsToFile(). msgVersion = bigEndian64(reader.consume()); uint32_t sub = bigEndian32(reader.consume()); int msgSize = bigEndian32(reader.consume()); const uint8_t* message = reader.consume(msgSize); ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(currentProtocolVersion)); MutationRef m; rd >> m; count++; if (msgVersion >= maxVersion) { TraceEvent("FileDecodeEnd") .detail("MaxV", maxVersion) .detail("Version", msgVersion) .detail("File", fd->getFilename()); eof = true; break; // skip } if (msgVersion >= minVersion) { mutations.emplace_back(LogMessageVersion(msgVersion, sub), StringRef(message, msgSize), buf.arena()); inserted++; } } offset += len; TraceEvent("Decoded") .detail("Name", fd->getFilename()) .detail("Count", count) .detail("Insert", inserted) .detail("BlockOffset", reader.rptr - buf.begin()) .detail("Total", mutations.size()) .detail("EOF", eof) .detail("Version", msgVersion) .detail("NewOffset", offset); return inserted > 0; } catch (Error& e) { TraceEvent(SevWarn, "CorruptLogFileBlock") .error(e) .detail("Filename", fd->getFilename()) .detail("BlockOffset", offset) .detail("BlockLen", len) .detail("ErrorRelativeOffset", reader.rptr - buf.begin()) .detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset); throw; } } Reference fd; int idx; // index in the MutationFilesReadProgress::files vector int64_t offset; // offset of the file to be read bool eof; // If EOF is seen so far or endVersion is encountered. If true, the file can't be read further. std::vector mutations; // Buffered mutations read so far }; bool hasMutations() { for (const auto& fp : fileProgress) { if (!fp->empty()) return true; } return false; } void dumpProgress(std::string msg) { std::cout << msg << "\n "; for (const auto& fp : fileProgress) { std::cout << fp->fd->getFilename() << " " << fp->mutations.size() << " mutations"; if (fp->mutations.size() > 0) { std::cout << ", range " << fp->mutations[0].version.toString() << " " << fp->mutations.back().version.toString() << "\n"; } else { std::cout << "\n\n"; } } } // Sorts files according to their first mutation version and removes files without mutations. void sortAndRemoveEmpty() { std::sort(fileProgress.begin(), fileProgress.end(), [](const Reference& a, const Reference& b) { return (*a) < (*b); }); while (!fileProgress.empty() && fileProgress.back()->empty()) { fileProgress.pop_back(); } } // Requires hasMutations() return true before calling this function. // The caller must hold on the the arena associated with the mutation. Future getNextMutation() { return getMutationImpl(this); } ACTOR static Future getMutationImpl(MutationFilesReadProgress* self) { ASSERT(!self->fileProgress.empty() && !self->fileProgress[0]->mutations.empty()); state Reference fp = self->fileProgress[0]; state VersionedData data = fp->mutations[0]; fp->mutations.erase(fp->mutations.begin()); if (fp->mutations.empty()) { // decode one more block wait(decodeToVersion(fp, /*version=*/0, self->endVersion, self->getLogFile(fp->idx))); } if (fp->empty()) { self->fileProgress.erase(self->fileProgress.begin()); } else { // Keep fileProgress sorted for (int i = 1; i < self->fileProgress.size(); i++) { if (*self->fileProgress[i - 1] <= *self->fileProgress[i]) { break; } std::swap(self->fileProgress[i - 1], self->fileProgress[i]); } } return data; } LogFile& getLogFile(int index) { return files[index]; } Future openLogFiles(Reference container) { return openLogFilesImpl(this, container); } // Opens log files in the progress and starts decoding until the beginVersion is seen. ACTOR static Future openLogFilesImpl(MutationFilesReadProgress* progress, Reference container) { state std::vector>> asyncFiles; for (const auto& file : progress->files) { asyncFiles.push_back(container->readFile(file.fileName)); } wait(waitForAll(asyncFiles)); // open all files // Attempt decode the first few blocks of log files until beginVersion is consumed std::vector> fileDecodes; for (int i = 0; i < asyncFiles.size(); i++) { auto fp = makeReference(asyncFiles[i].get(), i); progress->fileProgress.push_back(fp); fileDecodes.push_back( decodeToVersion(fp, progress->beginVersion, progress->endVersion, progress->getLogFile(i))); } wait(waitForAll(fileDecodes)); progress->sortAndRemoveEmpty(); return Void(); } // Decodes the file until EOF or an mutation >= minVersion and saves these mutations. // Skip mutations >= maxVersion. ACTOR static Future decodeToVersion(Reference fp, Version minVersion, Version maxVersion, LogFile file) { if (fp->empty()) return Void(); if (!fp->mutations.empty() && fp->mutations.back().version.version >= minVersion) return Void(); state int64_t len; try { // Read block by block until we see the minVersion loop { len = std::min(file.blockSize, file.fileSize - fp->offset); if (len == 0) { fp->eof = true; return Void(); } state Standalone buf = makeString(len); int rLen = wait(fp->fd->read(mutateString(buf), len, fp->offset)); if (len != rLen) throw restore_bad_read(); TraceEvent("ReadFile") .detail("Name", fp->fd->getFilename()) .detail("Length", rLen) .detail("Offset", fp->offset); if (fp->decodeBlock(buf, rLen, minVersion, maxVersion)) break; } return Void(); } catch (Error& e) { TraceEvent(SevWarn, "CorruptedLogFileBlock") .error(e) .detail("Filename", fp->fd->getFilename()) .detail("BlockOffset", fp->offset) .detail("BlockLen", len); throw; } } std::vector files; const Version beginVersion, endVersion; std::vector> fileProgress; }; // Writes a log file in the old backup format, described in backup-dataFormat.md. // This is similar to the LogFileWriter in FileBackupAgent.actor.cpp. struct LogFileWriter { LogFileWriter() : blockSize(-1) {} LogFileWriter(Reference f, int bsize) : file(f), blockSize(bsize) {} // Returns the block key, i.e., `Param1`, in the back file. The format is // `hash_value|commitVersion|part`. static Standalone getBlockKey(Version commitVersion, int part) { const int32_t version = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; BinaryWriter wr(Unversioned()); wr << (uint8_t)hashlittle(&version, sizeof(version), 0); wr << bigEndian64(commitVersion); wr << bigEndian32(part); return wr.toValue(); } // Start a new block if needed, then write the key and value ACTOR static Future writeKV_impl(LogFileWriter* self, Key k, Value v) { // If key and value do not fit in this block, end it and start a new one int toWrite = sizeof(int32_t) + k.size() + sizeof(int32_t) + v.size(); if (self->file->size() + toWrite > self->blockEnd) { // Write padding if needed int bytesLeft = self->blockEnd - self->file->size(); if (bytesLeft > 0) { state Value paddingFFs = fileBackup::makePadding(bytesLeft); wait(self->file->append(paddingFFs.begin(), bytesLeft)); } // Set new blockEnd self->blockEnd += self->blockSize; // write Header wait(self->file->append((uint8_t*)&BACKUP_AGENT_MLOG_VERSION, sizeof(BACKUP_AGENT_MLOG_VERSION))); } wait(self->file->appendStringRefWithLen(k)); wait(self->file->appendStringRefWithLen(v)); // At this point we should be in whatever the current block is or the block size is too small if (self->file->size() > self->blockEnd) throw backup_bad_block_size(); return Void(); } Future writeKV(Key k, Value v) { return writeKV_impl(this, k, v); } // Adds a new mutation to an interal buffer and writes out when encountering // a new commitVersion or exceeding the block size. ACTOR static Future addMutation(LogFileWriter* self, Version commitVersion, MutationListRef mutations) { state Standalone value = BinaryWriter::toValue(mutations, IncludeVersion()); state int part = 0; for (; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < value.size(); part++) { StringRef partBuf = value.substr( part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, std::min(value.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, CLIENT_KNOBS->MUTATION_BLOCK_SIZE)); Standalone key = getBlockKey(commitVersion, part); wait(writeKV_impl(self, key, partBuf)); } return Void(); } private: Reference file; int blockSize; int64_t blockEnd = 0; }; ACTOR Future convert(ConvertParams params) { state Reference container = IBackupContainer::openContainer(params.container_url); state BackupFileList listing = wait(container->dumpFileList()); std::sort(listing.logs.begin(), listing.logs.end()); TraceEvent("Container").detail("URL", params.container_url).detail("Logs", listing.logs.size()); state BackupDescription desc = wait(container->describeBackup()); std::cout << "\n" << desc.toString() << "\n"; // std::cout << "Using Protocol Version: 0x" << std::hex << currentProtocolVersion.version() << std::dec << "\n"; std::vector logs = getRelevantLogFiles(listing.logs, params.begin, params.end); printLogFiles("Range has", logs); state Reference progress(new MutationFilesReadProgress(logs, params.begin, params.end)); wait(progress->openLogFiles(container)); state int blockSize = CLIENT_KNOBS->BACKUP_LOGFILE_BLOCK_SIZE; state Reference outFile = wait(container->writeLogFile(params.begin, params.end, blockSize)); state LogFileWriter logFile(outFile, blockSize); std::cout << "Output file: " << outFile->getFileName() << "\n"; state MutationList list; state Arena arena; state Version version = invalidVersion; while (progress->hasMutations()) { state VersionedData data = wait(progress->getNextMutation()); // emit a mutation batch to file when encounter a new version if (list.totalSize() > 0 && version != data.version.version) { wait(LogFileWriter::addMutation(&logFile, version, list)); list = MutationList(); arena = Arena(); } ArenaReader rd(data.arena, data.message, AssumeVersion(currentProtocolVersion)); MutationRef m; rd >> m; std::cout << data.version.toString() << " m = " << m.toString() << "\n"; list.push_back_deep(arena, m); version = data.version.version; } if (list.totalSize() > 0) { wait(LogFileWriter::addMutation(&logFile, version, list)); } wait(outFile->finish()); return Void(); } int parseCommandLine(ConvertParams* param, CSimpleOpt* args) { while (args->Next()) { auto lastError = args->LastError(); switch (lastError) { case SO_SUCCESS: break; default: std::cerr << "ERROR: argument given for option: " << args->OptionText() << "\n"; return FDB_EXIT_ERROR; break; } int optId = args->OptionId(); const char* arg = args->OptionArg(); switch (optId) { case OPT_HELP: printConvertUsage(); return FDB_EXIT_ERROR; case OPT_BEGIN_VERSION: if (!sscanf(arg, "%" SCNd64, ¶m->begin)) { std::cerr << "ERROR: could not parse begin version " << arg << "\n"; printConvertUsage(); return FDB_EXIT_ERROR; } break; case OPT_END_VERSION: if (!sscanf(arg, "%" SCNd64, ¶m->end)) { std::cerr << "ERROR: could not parse end version " << arg << "\n"; printConvertUsage(); return FDB_EXIT_ERROR; } break; case OPT_CONTAINER: param->container_url = args->OptionArg(); break; case OPT_TRACE: param->log_enabled = true; break; case OPT_TRACE_DIR: param->log_dir = args->OptionArg(); break; case OPT_TRACE_FORMAT: if (!validateTraceFormat(args->OptionArg())) { std::cerr << "ERROR: Unrecognized trace format " << args->OptionArg() << "\n"; return FDB_EXIT_ERROR; } param->trace_format = args->OptionArg(); break; case OPT_TRACE_LOG_GROUP: param->trace_log_group = args->OptionArg(); break; } } return FDB_EXIT_SUCCESS; } } // namespace file_converter int main(int argc, char** argv) { try { CSimpleOpt* args = new CSimpleOpt(argc, argv, file_converter::gConverterOptions, SO_O_EXACT); file_converter::ConvertParams param; int status = file_converter::parseCommandLine(¶m, args); std::cout << "Params: " << param.toString() << "\n"; if (status != FDB_EXIT_SUCCESS || !param.isValid()) { file_converter::printConvertUsage(); return status; } if (param.log_enabled) { if (param.log_dir.empty()) { setNetworkOption(FDBNetworkOptions::TRACE_ENABLE); } else { setNetworkOption(FDBNetworkOptions::TRACE_ENABLE, StringRef(param.log_dir)); } if (!param.trace_format.empty()) { setNetworkOption(FDBNetworkOptions::TRACE_FORMAT, StringRef(param.trace_format)); } if (!param.trace_log_group.empty()) { setNetworkOption(FDBNetworkOptions::TRACE_LOG_GROUP, StringRef(param.trace_log_group)); } } platformInit(); Error::init(); StringRef url(param.container_url); setupNetwork(0, true); TraceEvent::setNetworkThread(); openTraceFile(NetworkAddress(), 10 << 20, 10 << 20, param.log_dir, "convert", param.trace_log_group); auto f = stopAfter(convert(param)); runNetwork(); return status; } catch (Error& e) { fprintf(stderr, "ERROR: %s\n", e.what()); return FDB_EXIT_ERROR; } catch (std::exception& e) { TraceEvent(SevError, "MainError").error(unknown_error()).detail("RootException", e.what()); return FDB_EXIT_MAIN_EXCEPTION; } }