/* * FileDecoder.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include "fdbclient/BackupAgent.actor.h" #include "fdbclient/BackupContainer.h" #include "fdbbackup/FileConverter.h" #include "fdbclient/MutationList.h" #include "flow/flow.h" #include "flow/serialize.h" #include "fdbclient/BuildFlags.h" #include "flow/actorcompiler.h" // has to be last include #define SevDecodeInfo SevVerbose extern bool g_crashOnError; namespace file_converter { void printDecodeUsage() { std::cout << "\n" " -r, --container Container URL.\n" " -i, --input FILE Log file filter, only matched files are decoded.\n" " --log Enables trace file logging for the CLI session.\n" " --logdir PATH Specifes the output directory for trace files. If\n" " unspecified, defaults to the current directory. Has\n" " no effect unless --log is specified.\n" " --loggroup LOG_GROUP\n" " Sets the LogGroup field with the specified value for all\n" " events in the trace output (defaults to `default').\n" " --trace_format FORMAT\n" " Select the format of the trace files. xml (the default) and json are supported.\n" " Has no effect unless --log is specified.\n" " --crash Crash on serious error.\n" " --build_flags Print build information and exit.\n" "\n"; return; } void printBuildInformation() { printf("%s\n", jsonBuildInformation().c_str()); } struct DecodeParams { std::string container_url; std::string fileFilter; // only files match the filter will be decoded bool log_enabled = false; std::string log_dir, trace_format, trace_log_group; std::string toString() { std::string s; s.append("ContainerURL: "); s.append(container_url); s.append(", FileFilter: "); s.append(fileFilter); if (log_enabled) { if (!log_dir.empty()) { s.append(" LogDir:").append(log_dir); } if (!trace_format.empty()) { s.append(" Format:").append(trace_format); } if (!trace_log_group.empty()) { s.append(" LogGroup:").append(trace_log_group); } } return s; } }; int parseDecodeCommandLine(DecodeParams* param, CSimpleOpt* args) { while (args->Next()) { auto lastError = args->LastError(); switch (lastError) { case SO_SUCCESS: break; default: std::cerr << "ERROR: argument given for option: " << args->OptionText() << "\n"; return FDB_EXIT_ERROR; break; } int optId = args->OptionId(); switch (optId) { case OPT_HELP: printDecodeUsage(); return FDB_EXIT_ERROR; case OPT_CONTAINER: param->container_url = args->OptionArg(); break; case OPT_CRASHONERROR: g_crashOnError = true; break; case OPT_INPUT_FILE: param->fileFilter = args->OptionArg(); break; case OPT_TRACE: param->log_enabled = true; break; case OPT_TRACE_DIR: param->log_dir = args->OptionArg(); break; case OPT_TRACE_FORMAT: if (!validateTraceFormat(args->OptionArg())) { std::cerr << "ERROR: Unrecognized trace format " << args->OptionArg() << "\n"; return FDB_EXIT_ERROR; } param->trace_format = args->OptionArg(); break; case OPT_TRACE_LOG_GROUP: param->trace_log_group = args->OptionArg(); break; case OPT_BUILD_FLAGS: printBuildInformation(); return FDB_EXIT_ERROR; break; } } return FDB_EXIT_SUCCESS; } void printLogFiles(std::string msg, const std::vector& files) { std::cout << msg << " " << files.size() << " log files\n"; for (const auto& file : files) { std::cout << file.toString() << "\n"; } std::cout << std::endl; } std::vector getRelevantLogFiles(const std::vector& files, const DecodeParams& params) { std::vector filtered; for (const auto& file : files) { if (file.fileName.find(params.fileFilter) != std::string::npos) { filtered.push_back(file); } } return filtered; } std::pair decode_key(const StringRef& key) { ASSERT(key.size() == sizeof(uint8_t) + sizeof(Version) + sizeof(int32_t)); uint8_t hash; Version version; int32_t part; BinaryReader rd(key, Unversioned()); rd >> hash >> version >> part; version = bigEndian64(version); part = bigEndian32(part); int32_t v = version / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; ASSERT(((uint8_t)hashlittle(&v, sizeof(v), 0)) == hash); return std::make_pair(version, part); } // Decodes an encoded list of mutations in the format of: // [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k], // where a mutation is encoded as: // [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][key][value] std::vector decode_value(const StringRef& value) { StringRefReader reader(value, restore_corrupted_data()); reader.consume(); // Consume the includeVersion uint32_t val_length = reader.consume(); if (val_length != value.size() - sizeof(uint64_t) - sizeof(uint32_t)) { TraceEvent(SevError, "ValueError") .detail("ValueLen", val_length) .detail("ValueSize", value.size()) .detail("Value", printable(value)); } std::vector mutations; while (1) { if (reader.eof()) break; // Deserialization of a MutationRef, which was packed by MutationListRef::push_back_deep() uint32_t type, p1len, p2len; type = reader.consume(); p1len = reader.consume(); p2len = reader.consume(); const uint8_t* key = reader.consume(p1len); const uint8_t* val = reader.consume(p2len); mutations.emplace_back((MutationRef::Type)type, StringRef(key, p1len), StringRef(val, p2len)); } return mutations; } struct VersionedMutations { Version version; std::vector mutations; Arena arena; // The arena that contains the mutations. }; struct VersionedKVPart { Arena arena; Version version; int32_t part; StringRef kv; VersionedKVPart(Arena arena, Version version, int32_t part, StringRef kv) : arena(arena), version(version), part(part), kv(kv) {} }; /* * Model a decoding progress for a mutation file. Usage is: * * DecodeProgress progress(logfile); * wait(progress->openFile(container)); * while (!progress->finished()) { * VersionedMutations m = wait(progress->getNextBatch()); * ... * } * * Internally, the decoding process is done block by block -- each block is * decoded into a list of key/value pairs, which are then decoded into batches * of mutations. Because a version's mutations can be split into many key/value * pairs, the decoding of mutation batch needs to look ahead one more pair. So * at any time this object might have two blocks of data in memory. */ class DecodeProgress { std::vector keyValues; public: DecodeProgress() = default; template DecodeProgress(const LogFile& file, U&& values) : file(file), keyValues(std::forward(values)) {} // If there are no more mutations to pull from the file. // However, we could have unfinished version in the buffer when EOF is true, // which means we should look for data in the next file. The caller // should call getUnfinishedBuffer() to get these left data. bool finished() const { return (eof && keyValues.empty()) || (leftover && !keyValues.empty()); } std::vector&& getUnfinishedBuffer() && { return std::move(keyValues); } // Returns all mutations of the next version in a batch. Future getNextBatch() { return getNextBatchImpl(this); } Future openFile(Reference container) { return openFileImpl(this, container); } // The following are private APIs: // Returns true if value contains complete data. static bool isValueComplete(StringRef value) { StringRefReader reader(value, restore_corrupted_data()); reader.consume(); // Consume the includeVersion uint32_t val_length = reader.consume(); return val_length == value.size() - sizeof(uint64_t) - sizeof(uint32_t); } // PRECONDITION: finished() must return false before calling this function. // Returns the next batch of mutations along with the arena backing it. // Note the returned batch can be empty when the file has unfinished // version batch data that are in the next file. ACTOR static Future getNextBatchImpl(DecodeProgress* self) { ASSERT(!self->finished()); loop { if (self->keyValues.size() <= 1) { // Try to decode another block when less than one left wait(readAndDecodeFile(self)); } const auto& kv = self->keyValues[0]; ASSERT(kv.part == 0); // decode next versions, check if they are continuous parts int idx = 1; // next kv pair in "keyValues" int bufSize = kv.kv.size(); for (int lastPart = 0; idx < self->keyValues.size(); idx++, lastPart++) { if (idx == self->keyValues.size()) break; const auto& nextKV = self->keyValues[idx]; if (kv.version != nextKV.version) { break; } if (lastPart + 1 != nextKV.part) { TraceEvent("DecodeError").detail("Part1", lastPart).detail("Part2", nextKV.part); throw restore_corrupted_data(); } bufSize += nextKV.kv.size(); } VersionedMutations m; m.version = kv.version; TraceEvent("Decode").detail("Version", m.version).detail("Idx", idx).detail("Q", self->keyValues.size()); StringRef value = kv.kv; if (idx > 1) { // Stitch parts into one and then decode one by one Standalone buf = self->combineValues(idx, bufSize); value = buf; m.arena = buf.arena(); } if (isValueComplete(value)) { m.mutations = decode_value(value); if (m.arena.getSize() == 0) { m.arena = kv.arena; } self->keyValues.erase(self->keyValues.begin(), self->keyValues.begin() + idx); return m; } else if (!self->eof) { // Read one more block, hopefully the missing part of the value can be found. wait(readAndDecodeFile(self)); } else { TraceEvent(SevWarn, "MissingValue").detail("Version", m.version); self->leftover = true; return m; // Empty mutations } } } // Returns a buffer which stitches first "idx" values into one. // "len" MUST equal the summation of these values. Standalone combineValues(const int idx, const int len) { ASSERT(idx <= keyValues.size() && idx > 1); Standalone buf = makeString(len); int n = 0; for (int i = 0; i < idx; i++) { const auto& value = keyValues[i].kv; memcpy(mutateString(buf) + n, value.begin(), value.size()); n += value.size(); } ASSERT(n == len); return buf; } // Decodes a block into KeyValueRef stored in "keyValues". void decode_block(const Standalone& buf, int len) { StringRef block(buf.begin(), len); StringRefReader reader(block, restore_corrupted_data()); try { // Read header, currently only decoding version BACKUP_AGENT_MLOG_VERSION if (reader.consume() != BACKUP_AGENT_MLOG_VERSION) throw restore_unsupported_file_version(); // Read k/v pairs. Block ends either at end of last value exactly or with 0xFF as first key len byte. while (1) { // If eof reached or first key len bytes is 0xFF then end of block was reached. if (reader.eof() || *reader.rptr == 0xFF) break; // Read key and value. If anything throws then there is a problem. uint32_t kLen = reader.consumeNetworkUInt32(); const uint8_t* k = reader.consume(kLen); std::pair version_part = decode_key(StringRef(k, kLen)); uint32_t vLen = reader.consumeNetworkUInt32(); const uint8_t* v = reader.consume(vLen); TraceEvent(SevDecodeInfo, "Block") .detail("KeySize", kLen) .detail("valueSize", vLen) .detail("Offset", reader.rptr - buf.begin()) .detail("Version", version_part.first) .detail("Part", version_part.second); keyValues.emplace_back(buf.arena(), version_part.first, version_part.second, StringRef(v, vLen)); } // Make sure any remaining bytes in the block are 0xFF for (auto b : reader.remainder()) { if (b != 0xFF) throw restore_corrupted_data_padding(); } // The (version, part) in a block can be out of order, i.e., (3, 0) // can be followed by (4, 0), and then (3, 1). So we need to sort them // first by version, and then by part number. std::sort(keyValues.begin(), keyValues.end(), [](const VersionedKVPart& a, const VersionedKVPart& b) { return a.version == b.version ? a.part < b.part : a.version < b.version; }); return; } catch (Error& e) { TraceEvent(SevWarn, "CorruptBlock").error(e).detail("Offset", reader.rptr - buf.begin()); throw; } } ACTOR static Future openFileImpl(DecodeProgress* self, Reference container) { Reference fd = wait(container->readFile(self->file.fileName)); self->fd = fd; wait(readAndDecodeFile(self)); return Void(); } // Reads a file block, decodes it into key/value pairs, and stores these pairs. ACTOR static Future readAndDecodeFile(DecodeProgress* self) { try { state int64_t len = std::min(self->file.blockSize, self->file.fileSize - self->offset); if (len == 0) { self->eof = true; return Void(); } state Standalone buf = makeString(len); state int rLen = wait(self->fd->read(mutateString(buf), len, self->offset)); TraceEvent("ReadFile") .detail("Name", self->file.fileName) .detail("Len", rLen) .detail("Offset", self->offset); if (rLen != len) { throw restore_corrupted_data(); } self->decode_block(buf, rLen); self->offset += rLen; return Void(); } catch (Error& e) { TraceEvent(SevWarn, "CorruptLogFileBlock") .error(e) .detail("Filename", self->file.fileName) .detail("BlockOffset", self->offset) .detail("BlockLen", self->file.blockSize); throw; } } LogFile file; Reference fd; int64_t offset = 0; bool eof = false; bool leftover = false; // Done but has unfinished version batch data left }; ACTOR Future decode_logs(DecodeParams params) { state Reference container = IBackupContainer::openContainer(params.container_url); state BackupFileList listing = wait(container->dumpFileList()); // remove partitioned logs listing.logs.erase(std::remove_if(listing.logs.begin(), listing.logs.end(), [](const LogFile& file) { std::string prefix("plogs/"); return file.fileName.substr(0, prefix.size()) == prefix; }), listing.logs.end()); std::sort(listing.logs.begin(), listing.logs.end()); TraceEvent("Container").detail("URL", params.container_url).detail("Logs", listing.logs.size()); BackupDescription desc = wait(container->describeBackup()); std::cout << "\n" << desc.toString() << "\n"; state std::vector logs = getRelevantLogFiles(listing.logs, params); printLogFiles("Relevant files are: ", logs); state int i = 0; // Previous file's unfinished version data state std::vector left; for (; i < logs.size(); i++) { if (logs[i].fileSize == 0) continue; state DecodeProgress progress(logs[i], std::move(left)); wait(progress.openFile(container)); while (!progress.finished()) { VersionedMutations vms = wait(progress.getNextBatch()); for (const auto& m : vms.mutations) { std::cout << vms.version << " " << m.toString() << "\n"; } } left = std::move(progress).getUnfinishedBuffer(); if (!left.empty()) { TraceEvent("UnfinishedFile").detail("File", logs[i].fileName).detail("Q", left.size()); } } return Void(); } } // namespace file_converter int main(int argc, char** argv) { try { CSimpleOpt* args = new CSimpleOpt(argc, argv, file_converter::gConverterOptions, SO_O_EXACT); file_converter::DecodeParams param; int status = file_converter::parseDecodeCommandLine(¶m, args); std::cout << "Params: " << param.toString() << "\n"; if (status != FDB_EXIT_SUCCESS) { file_converter::printDecodeUsage(); return status; } if (param.log_enabled) { if (param.log_dir.empty()) { setNetworkOption(FDBNetworkOptions::TRACE_ENABLE); } else { setNetworkOption(FDBNetworkOptions::TRACE_ENABLE, StringRef(param.log_dir)); } if (!param.trace_format.empty()) { setNetworkOption(FDBNetworkOptions::TRACE_FORMAT, StringRef(param.trace_format)); } if (!param.trace_log_group.empty()) { setNetworkOption(FDBNetworkOptions::TRACE_LOG_GROUP, StringRef(param.trace_log_group)); } } platformInit(); Error::init(); StringRef url(param.container_url); setupNetwork(0, true); TraceEvent::setNetworkThread(); openTraceFile(NetworkAddress(), 10 << 20, 10 << 20, param.log_dir, "decode", param.trace_log_group); auto f = stopAfter(decode_logs(param)); runNetwork(); return status; } catch (Error& e) { fprintf(stderr, "ERROR: %s\n", e.what()); return FDB_EXIT_ERROR; } catch (std::exception& e) { TraceEvent(SevError, "MainError").error(unknown_error()).detail("RootException", e.what()); return FDB_EXIT_MAIN_EXCEPTION; } }