/* * BackupAgentBase.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include "fdbclient/BackupAgent.actor.h" #include "fdbrpc/simulator.h" #include "flow/ActorCollection.h" #include "flow/actorcompiler.h" // has to be last include std::string BackupAgentBase::formatTime(int64_t epochs) { time_t curTime = (time_t)epochs; char buffer[30]; struct tm timeinfo; getLocalTime(&curTime, &timeinfo); strftime(buffer, 30, "%Y/%m/%d.%H:%M:%S%z", &timeinfo); return buffer; } int64_t BackupAgentBase::parseTime(std::string timestamp) { struct tm out; out.tm_isdst = -1; // This field is not set by strptime. -1 tells mktime to determine whether DST is in effect std::string timeOnly = timestamp.substr(0, 19); // TODO: Use std::get_time implementation for all platforms once supported // It would be nice to read the timezone using %z, but it seems not all get_time() // or strptime() implementations handle it correctly in all environments so we // will read the date and time independent of timezone at first and then adjust it. #ifdef _WIN32 std::istringstream s(timeOnly); s.imbue(std::locale(setlocale(LC_TIME, nullptr))); s >> std::get_time(&out, "%Y/%m/%d.%H:%M:%S"); if (s.fail()) { return -1; } #else if(strptime(timeOnly.c_str(), "%Y/%m/%d.%H:%M:%S", &out) == nullptr) { return -1; } #endif // Read timezone offset in +/-HHMM format then convert to seconds int tzHH; int tzMM; if(sscanf(timestamp.substr(19, 5).c_str(), "%3d%2d", &tzHH, &tzMM) != 2) { return -1; } if(tzHH < 0) { tzMM = -tzMM; } // tzOffset is the number of seconds EAST of GMT int tzOffset = tzHH * 60 * 60 + tzMM * 60; // The goal is to convert the timestamp string to epoch seconds assuming the date/time was expressed in the timezone at the end of the string. // However, mktime() will ONLY return epoch seconds assuming the date/time is expressed in local time (based on locale / environment) // mktime() will set out.tm_gmtoff when available int64_t ts = mktime(&out); // localTZOffset is the number of seconds EAST of GMT long localTZOffset; #ifdef _WIN32 // _get_timezone() returns the number of seconds WEST of GMT if(_get_timezone(&localTZOffset) != 0) { return -1; } // Negate offset to match the orientation of tzOffset localTZOffset = -localTZOffset; #else // tm.tm_gmtoff is the number of seconds EAST of GMT localTZOffset = out.tm_gmtoff; #endif // Add back the difference between the local timezone assumed by mktime() and the intended timezone from the input string ts += (localTZOffset - tzOffset); return ts; } const Key BackupAgentBase::keyFolderId = LiteralStringRef("config_folderid"); const Key BackupAgentBase::keyBeginVersion = LiteralStringRef("beginVersion"); const Key BackupAgentBase::keyEndVersion = LiteralStringRef("endVersion"); const Key BackupAgentBase::keyPrevBeginVersion = LiteralStringRef("prevBeginVersion"); const Key BackupAgentBase::keyConfigBackupTag = LiteralStringRef("config_backup_tag"); const Key BackupAgentBase::keyConfigLogUid = LiteralStringRef("config_log_uid"); const Key BackupAgentBase::keyConfigBackupRanges = LiteralStringRef("config_backup_ranges"); const Key BackupAgentBase::keyConfigStopWhenDoneKey = LiteralStringRef("config_stop_when_done"); const Key BackupAgentBase::keyStateStop = LiteralStringRef("state_stop"); const Key BackupAgentBase::keyStateStatus = LiteralStringRef("state_status"); const Key BackupAgentBase::keyLastUid = LiteralStringRef("last_uid"); const Key BackupAgentBase::keyBeginKey = LiteralStringRef("beginKey"); const Key BackupAgentBase::keyEndKey = LiteralStringRef("endKey"); const Key BackupAgentBase::keyDrVersion = LiteralStringRef("drVersion"); const Key BackupAgentBase::destUid = LiteralStringRef("destUid"); const Key BackupAgentBase::backupStartVersion = LiteralStringRef("backupStartVersion"); const Key BackupAgentBase::keyTagName = LiteralStringRef("tagname"); const Key BackupAgentBase::keyStates = LiteralStringRef("state"); const Key BackupAgentBase::keyConfig = LiteralStringRef("config"); const Key BackupAgentBase::keyErrors = LiteralStringRef("errors"); const Key BackupAgentBase::keyRanges = LiteralStringRef("ranges"); const Key BackupAgentBase::keyTasks = LiteralStringRef("tasks"); const Key BackupAgentBase::keyFutures = LiteralStringRef("futures"); const Key BackupAgentBase::keySourceStates = LiteralStringRef("source_states"); const Key BackupAgentBase::keySourceTagName = LiteralStringRef("source_tagname"); bool copyParameter(Reference source, Reference dest, Key key) { if (source) { dest->params[key] = source->params[key]; return true; } return false; } Version getVersionFromString(std::string const& value) { Version version(-1); int n = 0; if (sscanf(value.c_str(), "%lld%n", (long long*)&version, &n) != 1 || n != value.size()) { TraceEvent(SevWarnAlways, "GetVersionFromString").detail("InvalidVersion", value); throw restore_invalid_version(); } return version; } // Transaction log data is stored by the FoundationDB core in the // \xff / bklog / keyspace in a funny order for performance reasons. // Return the ranges of keys that contain the data for the given range // of versions. Standalone> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize) { Standalone> ret; Key baLogRangePrefix = destUidValue.withPrefix(backupLogKeys.begin); //TraceEvent("GetLogRanges").detail("DestUidValue", destUidValue).detail("Prefix", baLogRangePrefix); for (int64_t vblock = beginVersion / blockSize; vblock < (endVersion + blockSize - 1) / blockSize; ++vblock) { int64_t tb = vblock * blockSize / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; uint64_t bv = bigEndian64(std::max(beginVersion, vblock * blockSize)); uint64_t ev = bigEndian64(std::min(endVersion, (vblock + 1) * blockSize)); uint32_t data = tb & 0xffffffff; uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0); Key vblockPrefix = StringRef(&hash, sizeof(uint8_t)).withPrefix(baLogRangePrefix); ret.push_back_deep(ret.arena(), KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix), StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix))); } return ret; } Standalone> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) { Standalone> ret; Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin); //TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix); for (int64_t vblock = beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE; vblock < (endVersion + CLIENT_KNOBS->APPLY_BLOCK_SIZE - 1) / CLIENT_KNOBS->APPLY_BLOCK_SIZE; ++vblock) { int64_t tb = vblock * CLIENT_KNOBS->APPLY_BLOCK_SIZE / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; uint64_t bv = bigEndian64(std::max(beginVersion, vblock * CLIENT_KNOBS->APPLY_BLOCK_SIZE)); uint64_t ev = bigEndian64(std::min(endVersion, (vblock + 1) * CLIENT_KNOBS->APPLY_BLOCK_SIZE)); uint32_t data = tb & 0xffffffff; uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0); Key vblockPrefix = StringRef(&hash, sizeof(uint8_t)).withPrefix(baLogRangePrefix); ret.push_back_deep(ret.arena(), KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix), StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix))); } return ret; } Key getApplyKey( Version version, Key backupUid ) { int64_t vblock = (version-1) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; uint64_t v = bigEndian64(version); uint32_t data = vblock & 0xffffffff; uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0); Key k1 = StringRef((uint8_t*)&v, sizeof(uint64_t)).withPrefix(StringRef(&hash, sizeof(uint8_t))); Key k2 = k1.withPrefix(backupUid); return k2.withPrefix(applyLogKeys.begin); } //Given a key from one of the ranges returned by get_log_ranges, //returns(version, part) where version is the database version number of //the transaction log data in the value, and part is 0 for the first such //data for a given version, 1 for the second block of data, etc. std::pair decodeBKMutationLogKey(Key key) { return std::make_pair(bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t))), bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t)))); } // value is an iterable representing all of the transaction log data for // a given version.Returns an iterable(generator) yielding a tuple for // each mutation in the log.At present, all mutations are represented as // (type, param1, param2) where type is an integer and param1 and param2 are byte strings Standalone> decodeBackupLogValue(StringRef value) { try { uint64_t offset(0); uint64_t protocolVersion = 0; memcpy(&protocolVersion, value.begin(), sizeof(uint64_t)); offset += sizeof(uint64_t); if (protocolVersion <= 0x0FDB00A200090001){ TraceEvent(SevError, "DecodeBackupLogValue").detail("IncompatibleProtocolVersion", protocolVersion) .detail("ValueSize", value.size()).detail("Value", value); throw incompatible_protocol_version(); } Standalone> result; uint32_t totalBytes = 0; memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); uint32_t consumed = 0; if(totalBytes + offset > value.size()) throw restore_missing_data(); int originalOffset = offset; while (consumed < totalBytes){ uint32_t type = 0; memcpy(&type, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); uint32_t len1 = 0; memcpy(&len1, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); uint32_t len2 = 0; memcpy(&len2, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); MutationRef logValue; logValue.type = type; logValue.param1 = value.substr(offset, len1); offset += len1; logValue.param2 = value.substr(offset, len2); offset += len2; result.push_back_deep(result.arena(), logValue); consumed += BackupAgentBase::logHeaderSize + len1 + len2; } ASSERT(consumed == totalBytes); if (value.size() != offset) { TraceEvent(SevError, "BA_DecodeBackupLogValue").detail("UnexpectedExtraDataSize", value.size()).detail("Offset", offset).detail("TotalBytes", totalBytes).detail("Consumed", consumed).detail("OriginalOffset", originalOffset); throw restore_corrupted_data(); } return result; } catch (Error& e) { TraceEvent(e.code() == error_code_restore_missing_data ? SevWarn : SevError, "BA_DecodeBackupLogValue").error(e).GetLastError().detail("ValueSize", value.size()).detail("Value", value); throw; } } void decodeBackupLogValue(Arena& arena, VectorRef& result, int& mutationSize, StringRef value, StringRef addPrefix, StringRef removePrefix, Version version, Reference> key_version) { try { uint64_t offset(0); uint64_t protocolVersion = 0; memcpy(&protocolVersion, value.begin(), sizeof(uint64_t)); offset += sizeof(uint64_t); if (protocolVersion <= 0x0FDB00A200090001){ TraceEvent(SevError, "DecodeBackupLogValue").detail("IncompatibleProtocolVersion", protocolVersion) .detail("ValueSize", value.size()).detail("Value", value); throw incompatible_protocol_version(); } uint32_t totalBytes = 0; memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); uint32_t consumed = 0; if(totalBytes + offset > value.size()) throw restore_missing_data(); int originalOffset = offset; while (consumed < totalBytes){ uint32_t type = 0; memcpy(&type, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); uint32_t len1 = 0; memcpy(&len1, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); uint32_t len2 = 0; memcpy(&len2, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); ASSERT(offset+len1+len2<=value.size() && isValidMutationType(type)); MutationRef logValue; Arena tempArena; logValue.type = type; logValue.param1 = value.substr(offset, len1); offset += len1; logValue.param2 = value.substr(offset, len2); offset += len2; if (logValue.type == MutationRef::ClearRange) { KeyRangeRef range(logValue.param1, logValue.param2); auto ranges = key_version->intersectingRanges(range); for (auto r : ranges) { if (version > r.value() && r.value() != invalidVersion) { KeyRef minKey = std::min(r.range().end, range.end); if (minKey == (removePrefix == StringRef() ? normalKeys.end : strinc(removePrefix))) { logValue.param1 = std::max(r.range().begin, range.begin); if(removePrefix.size()) { logValue.param1 = logValue.param1.removePrefix(removePrefix); } if(addPrefix.size()) { logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena); } logValue.param2 = addPrefix == StringRef() ? normalKeys.end : strinc(addPrefix, tempArena); result.push_back_deep(arena, logValue); mutationSize += logValue.expectedSize(); } else { logValue.param1 = std::max(r.range().begin, range.begin); logValue.param2 = minKey; if(removePrefix.size()) { logValue.param1 = logValue.param1.removePrefix(removePrefix); logValue.param2 = logValue.param2.removePrefix(removePrefix); } if(addPrefix.size()) { logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena); logValue.param2 = logValue.param2.withPrefix(addPrefix, tempArena); } result.push_back_deep(arena, logValue); mutationSize += logValue.expectedSize(); } } } } else { Version ver = key_version->rangeContaining(logValue.param1).value(); //TraceEvent("ApplyMutation").detail("LogValue", logValue.toString()).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion); if (version > ver && ver != invalidVersion) { if(removePrefix.size()) { logValue.param1 = logValue.param1.removePrefix(removePrefix); } if(addPrefix.size()) { logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena); } result.push_back_deep(arena, logValue); mutationSize += logValue.expectedSize(); } } consumed += BackupAgentBase::logHeaderSize + len1 + len2; } ASSERT(consumed == totalBytes); if (value.size() != offset) { TraceEvent(SevError, "BA_DecodeBackupLogValue").detail("UnexpectedExtraDataSize", value.size()).detail("Offset", offset).detail("TotalBytes", totalBytes).detail("Consumed", consumed).detail("OriginalOffset", originalOffset); throw restore_corrupted_data(); } } catch (Error& e) { TraceEvent(e.code() == error_code_restore_missing_data ? SevWarn : SevError, "BA_DecodeBackupLogValue").error(e).GetLastError().detail("ValueSize", value.size()).detail("Value", value); throw; } } static double lastErrorTime = 0; void logErrorWorker(Reference tr, Key keyErrors, std::string message) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); if(now() - lastErrorTime > CLIENT_KNOBS->BACKUP_ERROR_DELAY) { TraceEvent("BA_LogError").detail("Key", keyErrors).detail("Message", message); lastErrorTime = now(); } tr->set(keyErrors, message); } Future logError(Database cx, Key keyErrors, const std::string& message) { return runRYWTransaction(cx, [=](Reference tr) { logErrorWorker(tr, keyErrors, message); return Future(Void()); }); } Future logError(Reference tr, Key keyErrors, const std::string& message) { return logError(tr->getDatabase(), keyErrors, message); } ACTOR Future readCommitted(Database cx, PromiseStream results, Reference lock, KeyRangeRef range, bool terminator, bool systemAccess, bool lockAware) { state KeySelector begin = firstGreaterOrEqual(range.begin); state KeySelector end = firstGreaterOrEqual(range.end); state Transaction tr(cx); state FlowLock::Releaser releaser; loop{ try { state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES); if (systemAccess) tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); if (lockAware) tr.setOption(FDBTransactionOptions::LOCK_AWARE); //add lock releaser.release(); wait(lock->take(TaskDefaultYield, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT)); releaser = FlowLock::Releaser(*lock, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT); state Standalone values = wait(tr.getRange(begin, end, limits)); // When this buggify line is enabled, if there are more than 1 result then use half of the results if(values.size() > 1 && BUGGIFY) { values.resize(values.arena(), values.size() / 2); values.more = true; // Half of the time wait for this tr to expire so that the next read is at a different version if(deterministicRandom()->random01() < 0.5) wait(delay(6.0)); } releaser.remaining -= values.expectedSize(); //its the responsibility of the caller to release after this point ASSERT(releaser.remaining >= 0); results.send(RangeResultWithVersion(values, tr.getReadVersion().get())); if (values.size() > 0) begin = firstGreaterThan(values.end()[-1].key); if (!values.more && !limits.isReached()) { if(terminator) results.sendError(end_of_stream()); return Void(); } } catch (Error &e) { if (e.code() == error_code_transaction_too_old) { // We are using this transaction until it's too old and then resetting to a fresh one, // so we don't need to delay. tr.fullReset(); } else { wait(tr.onError(e)); } } } } ACTOR Future readCommitted(Database cx, PromiseStream results, Future active, Reference lock, KeyRangeRef range, std::function< std::pair(Key key) > groupBy, bool terminator, bool systemAccess, bool lockAware) { state KeySelector nextKey = firstGreaterOrEqual(range.begin); state KeySelector end = firstGreaterOrEqual(range.end); state RCGroup rcGroup = RCGroup(); state uint64_t skipGroup(ULLONG_MAX); state Transaction tr(cx); state FlowLock::Releaser releaser; loop{ try { state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES); if (systemAccess) tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); if (lockAware) tr.setOption(FDBTransactionOptions::LOCK_AWARE); state Standalone rangevalue = wait(tr.getRange(nextKey, end, limits)); // When this buggify line is enabled, if there are more than 1 result then use half of the results if(rangevalue.size() > 1 && BUGGIFY) { rangevalue.resize(rangevalue.arena(), rangevalue.size() / 2); rangevalue.more = true; // Half of the time wait for this tr to expire so that the next read is at a different version if(deterministicRandom()->random01() < 0.5) wait(delay(6.0)); } //add lock wait(active); releaser.release(); wait(lock->take(TaskDefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize())); releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize()); for (auto & s : rangevalue){ uint64_t groupKey = groupBy(s.key).first; //TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size()); if (groupKey != skipGroup){ if (rcGroup.version == -1){ rcGroup.version = tr.getReadVersion().get(); rcGroup.groupKey = groupKey; } else if (rcGroup.groupKey != groupKey) { //TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size()); //state uint32_t len(0); //for (size_t j = 0; j < rcGroup.items.size(); ++j) { // len += rcGroup.items[j].value.size(); //} //TraceEvent("SendGroup").detail("GroupKey", rcGroup.groupKey).detail("Version", rcGroup.version).detail("Length", len).detail("Releaser.remaining", releaser.remaining); releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point ASSERT(releaser.remaining >= 0); results.send(rcGroup); nextKey = firstGreaterThan(rcGroup.items.end()[-1].key); skipGroup = rcGroup.groupKey; rcGroup = RCGroup(); rcGroup.version = tr.getReadVersion().get(); rcGroup.groupKey = groupKey; } rcGroup.items.push_back_deep(rcGroup.items.arena(), s); } } if (!rangevalue.more) { if (rcGroup.version != -1){ releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point ASSERT(releaser.remaining >= 0); //TraceEvent("Log_ReadCommitted").detail("SendGroup1", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength", rcGroup.items[0].value.size()); results.send(rcGroup); } if(terminator) results.sendError(end_of_stream()); return Void(); } nextKey = firstGreaterThan(rangevalue.end()[-1].key); } catch (Error &e) { if (e.code() == error_code_transaction_too_old) { // We are using this transaction until it's too old and then resetting to a fresh one, // so we don't need to delay. tr.fullReset(); } else { wait(tr.onError(e)); } } } } Future readCommitted(Database cx, PromiseStream results, Reference lock, KeyRangeRef range, std::function< std::pair(Key key) > groupBy) { return readCommitted(cx, results, Void(), lock, range, groupBy, true, true, true); } ACTOR Future dumpData(Database cx, PromiseStream results, Reference lock, Key uid, Key addPrefix, Key removePrefix, RequestStream commit, NotifiedVersion* committedVersion, Optional endVersion, Key rangeBegin, PromiseStream> addActor, FlowLock* commitLock, Reference> keyVersion ) { state Version lastVersion = invalidVersion; state bool endOfStream = false; state int totalBytes = 0; loop { state CommitTransactionRequest req; state Version newBeginVersion = invalidVersion; state int mutationSize = 0; loop { try { RCGroup group = waitNext(results.getFuture()); lock->release(group.items.expectedSize()); BinaryWriter bw(Unversioned()); for(int i = 0; i < group.items.size(); ++i) { bw.serializeBytes(group.items[i].value); } decodeBackupLogValue(req.arena, req.transaction.mutations, mutationSize, bw.toValue(), addPrefix, removePrefix, group.groupKey, keyVersion); newBeginVersion = group.groupKey + 1; if(mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) { break; } } catch (Error &e) { if (e.code() == error_code_end_of_stream) { if(endVersion.present() && endVersion.get() > lastVersion && endVersion.get() > newBeginVersion) { newBeginVersion = endVersion.get(); } if(newBeginVersion == invalidVersion) return totalBytes; endOfStream = true; break; } throw; } } Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin); Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned()); Key rangeEnd = getApplyKey(newBeginVersion, uid); req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey)); req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin)); req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd)); req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin)); // The commit request contains no read conflict ranges, so regardless of what read version we // choose, it's impossible for us to get a transaction_too_old error back, and it's impossible // for our transaction to be aborted due to conflicts. req.transaction.read_snapshot = committedVersion->get(); req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE; totalBytes += mutationSize; wait( commitLock->take(TaskDefaultYield, mutationSize) ); addActor.send( commitLock->releaseWhen( success(commit.getReply(req)), mutationSize ) ); if(endOfStream) { return totalBytes; } } } ACTOR Future coalesceKeyVersionCache(Key uid, Version endVersion, Reference> keyVersion, RequestStream commit, NotifiedVersion* committedVersion, PromiseStream> addActor, FlowLock* commitLock) { Version lastVersion = -1000; int64_t removed = 0; state CommitTransactionRequest req; state int64_t mutationSize = 0; Key mapPrefix = uid.withPrefix(applyMutationsKeyVersionMapRange.begin); for(auto it : keyVersion->ranges()) { if( lastVersion == -1000 ) { lastVersion = it.value(); } else { Version ver = it.value(); if(ver < endVersion && lastVersion < endVersion && ver != invalidVersion && lastVersion != invalidVersion) { Key removeKey = it.range().begin.withPrefix(mapPrefix); Key removeEnd = keyAfter(removeKey); req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, removeKey, removeEnd)); mutationSize += removeKey.size() + removeEnd.size(); removed--; } else { lastVersion = ver; } } } if(removed != 0) { Key countKey = uid.withPrefix(applyMutationsKeyVersionCountRange.begin); req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(countKey)); req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::AddValue, countKey, StringRef((uint8_t*)&removed, 8))); req.transaction.read_snapshot = committedVersion->get(); req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE; wait( commitLock->take(TaskDefaultYield, mutationSize) ); addActor.send( commitLock->releaseWhen( success(commit.getReply(req)), mutationSize ) ); } return Void(); } ACTOR Future applyMutations(Database cx, Key uid, Key addPrefix, Key removePrefix, Version beginVersion, Version* endVersion, RequestStream commit, NotifiedVersion* committedVersion, Reference> keyVersion ) { state FlowLock commitLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES); state PromiseStream> addActor; state Future error = actorCollection( addActor.getFuture() ); state int maxBytes = CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES; keyVersion->insert(metadataVersionKey, 0); try { loop { if(beginVersion >= *endVersion) { wait( commitLock.take(TaskDefaultYield, CLIENT_KNOBS->BACKUP_LOCK_BYTES) ); commitLock.release(CLIENT_KNOBS->BACKUP_LOCK_BYTES); if(beginVersion >= *endVersion) { return Void(); } } int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes); state Version newEndVersion = std::min(*endVersion, ((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) * CLIENT_KNOBS->APPLY_BLOCK_SIZE); state Standalone> ranges = getApplyRanges(beginVersion, newEndVersion, uid); state size_t idx; state std::vector> results; state std::vector> rc; state std::vector> locks; for (int i = 0; i < ranges.size(); ++i) { results.push_back(PromiseStream()); locks.push_back(Reference( new FlowLock(std::max(CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES/ranges.size(), CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES)))); rc.push_back(readCommitted(cx, results[i], locks[i], ranges[i], decodeBKMutationLogKey)); } maxBytes = std::max(maxBytes*CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES); for (idx = 0; idx < ranges.size(); ++idx) { int bytes = wait(dumpData(cx, results[idx], locks[idx], uid, addPrefix, removePrefix, commit, committedVersion, idx==ranges.size()-1 ? newEndVersion : Optional(), ranges[idx].begin, addActor, &commitLock, keyVersion)); maxBytes = std::max(CLIENT_KNOBS->APPLY_MAX_INCREASE_FACTOR*bytes, maxBytes); if(error.isError()) throw error.getError(); } wait(coalesceKeyVersionCache(uid, newEndVersion, keyVersion, commit, committedVersion, addActor, &commitLock)); beginVersion = newEndVersion; } } catch( Error &e ) { TraceEvent(e.code() == error_code_restore_missing_data ? SevWarnAlways : SevError, "ApplyMutationsError").error(e); throw; } } ACTOR static Future _eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional endVersion, bool checkBackupUid, Version backupUid) { state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix); state Key backupLatestVersionsKey = logUidValue.withPrefix(backupLatestVersionsPath); if (!destUidValue.size()) { return Void(); } state Reference tr(new ReadYourWritesTransaction(cx)); loop{ try { tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); if (checkBackupUid) { Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue); Optional v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) ); if(v.present() && BinaryReader::fromStringRef(v.get(), Unversioned()) > backupUid) return Void(); } state Standalone backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY)); // Make sure version history key does exist and lower the beginVersion if needed state Version currBeginVersion = invalidVersion; for (auto backupVersion : backupVersions) { Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue); if (currLogUidValue == logUidValue) { currBeginVersion = BinaryReader::fromStringRef(backupVersion.value, Unversioned()); break; } } // Do not clear anything if version history key cannot be found if (currBeginVersion == invalidVersion) { return Void(); } state Version currEndVersion = currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; if(endVersion.present()) { currEndVersion = std::min(currEndVersion, endVersion.get()); } state Version nextSmallestVersion = currEndVersion; bool clearLogRangesRequired = true; // More than one backup/DR with the same range if (backupVersions.size() > 1) { for (auto backupVersion : backupVersions) { Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue); Version currVersion = BinaryReader::fromStringRef(backupVersion.value, Unversioned()); if (currLogUidValue == logUidValue) { continue; } else if (currVersion > currBeginVersion) { nextSmallestVersion = std::min(currVersion, nextSmallestVersion); } else { // If we can find a version less than or equal to beginVersion, clearing log ranges is not required clearLogRangesRequired = false; break; } } } if (!endVersion.present() && backupVersions.size() == 1) { // Clear version history tr->clear(prefixRange(backupLatestVersionsPath)); // Clear everything under blog/[destUid] tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin))); // Disable committing mutations into blog tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin))); } else { if (!endVersion.present() && currEndVersion >= nextSmallestVersion) { // Clear current backup version history tr->clear(backupLatestVersionsKey); } else { // Update current backup latest version tr->set(backupLatestVersionsKey, BinaryWriter::toValue(currEndVersion, Unversioned())); } // Clear log ranges if needed if (clearLogRangesRequired) { Standalone> ranges = getLogRanges(currBeginVersion, nextSmallestVersion, destUidValue); for (auto& range : ranges) { tr->clear(range); } } } wait(tr->commit()); if (!endVersion.present() && (backupVersions.size() == 1 || currEndVersion >= nextSmallestVersion)) { return Void(); } if(endVersion.present() && currEndVersion == endVersion.get()) { return Void(); } tr->reset(); } catch (Error &e) { wait(tr->onError(e)); } } } Future eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional endVersion, bool checkBackupUid, Version backupUid) { return _eraseLogData(cx, logUidValue, destUidValue, endVersion, checkBackupUid, backupUid); }