foundationdb/fdbclient/BackupAgentBase.actor.cpp
Evan Tschannen ef01ad2ed8 optimized log range clearing to clear everything for each possible hash (256 clears) if that would be more efficient than one clear per second that has elapsed
aborting a DR without the —cleanup flag will still attempt to cleanup for 30 seconds before giving up
added a cleanup command to fdbbackup which can remove mutations from orphaned DRs which were stopped without the —cleanup flag
2019-09-27 18:32:27 -07:00

895 lines
38 KiB
C++

/*
* BackupAgentBase.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <iomanip>
#include <time.h>
#include "fdbclient/BackupAgent.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
#include "flow/actorcompiler.h" // has to be last include
std::string BackupAgentBase::formatTime(int64_t epochs) {
time_t curTime = (time_t)epochs;
char buffer[30];
struct tm timeinfo;
getLocalTime(&curTime, &timeinfo);
strftime(buffer, 30, "%Y/%m/%d.%H:%M:%S%z", &timeinfo);
return buffer;
}
int64_t BackupAgentBase::parseTime(std::string timestamp) {
struct tm out;
out.tm_isdst = -1; // This field is not set by strptime. -1 tells mktime to determine whether DST is in effect
std::string timeOnly = timestamp.substr(0, 19);
// TODO: Use std::get_time implementation for all platforms once supported
// It would be nice to read the timezone using %z, but it seems not all get_time()
// or strptime() implementations handle it correctly in all environments so we
// will read the date and time independent of timezone at first and then adjust it.
#ifdef _WIN32
std::istringstream s(timeOnly);
s.imbue(std::locale(setlocale(LC_TIME, nullptr)));
s >> std::get_time(&out, "%Y/%m/%d.%H:%M:%S");
if (s.fail()) {
return -1;
}
#else
if(strptime(timeOnly.c_str(), "%Y/%m/%d.%H:%M:%S", &out) == nullptr) {
return -1;
}
#endif
// Read timezone offset in +/-HHMM format then convert to seconds
int tzHH;
int tzMM;
if(sscanf(timestamp.substr(19, 5).c_str(), "%3d%2d", &tzHH, &tzMM) != 2) {
return -1;
}
if(tzHH < 0) {
tzMM = -tzMM;
}
// tzOffset is the number of seconds EAST of GMT
int tzOffset = tzHH * 60 * 60 + tzMM * 60;
// The goal is to convert the timestamp string to epoch seconds assuming the date/time was expressed in the timezone at the end of the string.
// However, mktime() will ONLY return epoch seconds assuming the date/time is expressed in local time (based on locale / environment)
// mktime() will set out.tm_gmtoff when available
int64_t ts = mktime(&out);
// localTZOffset is the number of seconds EAST of GMT
long localTZOffset;
#ifdef _WIN32
// _get_timezone() returns the number of seconds WEST of GMT
if(_get_timezone(&localTZOffset) != 0) {
return -1;
}
// Negate offset to match the orientation of tzOffset
localTZOffset = -localTZOffset;
#else
// tm.tm_gmtoff is the number of seconds EAST of GMT
localTZOffset = out.tm_gmtoff;
#endif
// Add back the difference between the local timezone assumed by mktime() and the intended timezone from the input string
ts += (localTZOffset - tzOffset);
return ts;
}
const Key BackupAgentBase::keyFolderId = LiteralStringRef("config_folderid");
const Key BackupAgentBase::keyBeginVersion = LiteralStringRef("beginVersion");
const Key BackupAgentBase::keyEndVersion = LiteralStringRef("endVersion");
const Key BackupAgentBase::keyPrevBeginVersion = LiteralStringRef("prevBeginVersion");
const Key BackupAgentBase::keyConfigBackupTag = LiteralStringRef("config_backup_tag");
const Key BackupAgentBase::keyConfigLogUid = LiteralStringRef("config_log_uid");
const Key BackupAgentBase::keyConfigBackupRanges = LiteralStringRef("config_backup_ranges");
const Key BackupAgentBase::keyConfigStopWhenDoneKey = LiteralStringRef("config_stop_when_done");
const Key BackupAgentBase::keyStateStop = LiteralStringRef("state_stop");
const Key BackupAgentBase::keyStateStatus = LiteralStringRef("state_status");
const Key BackupAgentBase::keyLastUid = LiteralStringRef("last_uid");
const Key BackupAgentBase::keyBeginKey = LiteralStringRef("beginKey");
const Key BackupAgentBase::keyEndKey = LiteralStringRef("endKey");
const Key BackupAgentBase::keyDrVersion = LiteralStringRef("drVersion");
const Key BackupAgentBase::destUid = LiteralStringRef("destUid");
const Key BackupAgentBase::backupStartVersion = LiteralStringRef("backupStartVersion");
const Key BackupAgentBase::keyTagName = LiteralStringRef("tagname");
const Key BackupAgentBase::keyStates = LiteralStringRef("state");
const Key BackupAgentBase::keyConfig = LiteralStringRef("config");
const Key BackupAgentBase::keyErrors = LiteralStringRef("errors");
const Key BackupAgentBase::keyRanges = LiteralStringRef("ranges");
const Key BackupAgentBase::keyTasks = LiteralStringRef("tasks");
const Key BackupAgentBase::keyFutures = LiteralStringRef("futures");
const Key BackupAgentBase::keySourceStates = LiteralStringRef("source_states");
const Key BackupAgentBase::keySourceTagName = LiteralStringRef("source_tagname");
bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key) {
if (source) {
dest->params[key] = source->params[key];
return true;
}
return false;
}
Version getVersionFromString(std::string const& value) {
Version version(-1);
int n = 0;
if (sscanf(value.c_str(), "%lld%n", (long long*)&version, &n) != 1 || n != value.size()) {
TraceEvent(SevWarnAlways, "GetVersionFromString").detail("InvalidVersion", value);
throw restore_invalid_version();
}
return version;
}
// Transaction log data is stored by the FoundationDB core in the
// \xff / bklog / keyspace in a funny order for performance reasons.
// Return the ranges of keys that contain the data for the given range
// of versions.
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize) {
Standalone<VectorRef<KeyRangeRef>> ret;
Key baLogRangePrefix = destUidValue.withPrefix(backupLogKeys.begin);
//TraceEvent("GetLogRanges").detail("DestUidValue", destUidValue).detail("Prefix", baLogRangePrefix);
for (int64_t vblock = beginVersion / blockSize; vblock < (endVersion + blockSize - 1) / blockSize; ++vblock) {
int64_t tb = vblock * blockSize / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t bv = bigEndian64(std::max(beginVersion, vblock * blockSize));
uint64_t ev = bigEndian64(std::min(endVersion, (vblock + 1) * blockSize));
uint32_t data = tb & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key vblockPrefix = StringRef(&hash, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
ret.push_back_deep(ret.arena(), KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
return ret;
}
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) {
Standalone<VectorRef<KeyRangeRef>> ret;
Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin);
//TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix);
for (int64_t vblock = beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE; vblock < (endVersion + CLIENT_KNOBS->APPLY_BLOCK_SIZE - 1) / CLIENT_KNOBS->APPLY_BLOCK_SIZE; ++vblock) {
int64_t tb = vblock * CLIENT_KNOBS->APPLY_BLOCK_SIZE / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t bv = bigEndian64(std::max(beginVersion, vblock * CLIENT_KNOBS->APPLY_BLOCK_SIZE));
uint64_t ev = bigEndian64(std::min(endVersion, (vblock + 1) * CLIENT_KNOBS->APPLY_BLOCK_SIZE));
uint32_t data = tb & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key vblockPrefix = StringRef(&hash, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
ret.push_back_deep(ret.arena(), KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
return ret;
}
Key getApplyKey( Version version, Key backupUid ) {
int64_t vblock = (version-1) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t v = bigEndian64(version);
uint32_t data = vblock & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key k1 = StringRef((uint8_t*)&v, sizeof(uint64_t)).withPrefix(StringRef(&hash, sizeof(uint8_t)));
Key k2 = k1.withPrefix(backupUid);
return k2.withPrefix(applyLogKeys.begin);
}
//Given a key from one of the ranges returned by get_log_ranges,
//returns(version, part) where version is the database version number of
//the transaction log data in the value, and part is 0 for the first such
//data for a given version, 1 for the second block of data, etc.
std::pair<uint64_t, uint32_t> decodeBKMutationLogKey(Key key) {
return std::make_pair(bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t))),
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
}
// value is an iterable representing all of the transaction log data for
// a given version.Returns an iterable(generator) yielding a tuple for
// each mutation in the log.At present, all mutations are represented as
// (type, param1, param2) where type is an integer and param1 and param2 are byte strings
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value) {
try {
uint64_t offset(0);
uint64_t protocolVersion = 0;
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
if (protocolVersion <= 0x0FDB00A200090001){
TraceEvent(SevError, "DecodeBackupLogValue").detail("IncompatibleProtocolVersion", protocolVersion)
.detail("ValueSize", value.size()).detail("Value", value);
throw incompatible_protocol_version();
}
Standalone<VectorRef<MutationRef>> result;
uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t consumed = 0;
if(totalBytes + offset > value.size())
throw restore_missing_data();
int originalOffset = offset;
while (consumed < totalBytes){
uint32_t type = 0;
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
MutationRef logValue;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
result.push_back_deep(result.arena(), logValue);
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
}
ASSERT(consumed == totalBytes);
if (value.size() != offset) {
TraceEvent(SevError, "BA_DecodeBackupLogValue").detail("UnexpectedExtraDataSize", value.size()).detail("Offset", offset).detail("TotalBytes", totalBytes).detail("Consumed", consumed).detail("OriginalOffset", originalOffset);
throw restore_corrupted_data();
}
return result;
}
catch (Error& e) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarn : SevError, "BA_DecodeBackupLogValue").error(e).GetLastError().detail("ValueSize", value.size()).detail("Value", value);
throw;
}
}
void decodeBackupLogValue(Arena& arena, VectorRef<MutationRef>& result, int& mutationSize, StringRef value, StringRef addPrefix, StringRef removePrefix, Version version, Reference<KeyRangeMap<Version>> key_version) {
try {
uint64_t offset(0);
uint64_t protocolVersion = 0;
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
if (protocolVersion <= 0x0FDB00A200090001){
TraceEvent(SevError, "DecodeBackupLogValue").detail("IncompatibleProtocolVersion", protocolVersion)
.detail("ValueSize", value.size()).detail("Value", value);
throw incompatible_protocol_version();
}
uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t consumed = 0;
if(totalBytes + offset > value.size())
throw restore_missing_data();
int originalOffset = offset;
while (consumed < totalBytes){
uint32_t type = 0;
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
ASSERT(offset+len1+len2<=value.size() && isValidMutationType(type));
MutationRef logValue;
Arena tempArena;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
if (logValue.type == MutationRef::ClearRange) {
KeyRangeRef range(logValue.param1, logValue.param2);
auto ranges = key_version->intersectingRanges(range);
for (auto r : ranges) {
if (version > r.value() && r.value() != invalidVersion) {
KeyRef minKey = std::min(r.range().end, range.end);
if (minKey == (removePrefix == StringRef() ? normalKeys.end : strinc(removePrefix))) {
logValue.param1 = std::max(r.range().begin, range.begin);
if(removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
}
if(addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
logValue.param2 = addPrefix == StringRef() ? normalKeys.end : strinc(addPrefix, tempArena);
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
}
else {
logValue.param1 = std::max(r.range().begin, range.begin);
logValue.param2 = minKey;
if(removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
logValue.param2 = logValue.param2.removePrefix(removePrefix);
}
if(addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
logValue.param2 = logValue.param2.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
}
}
}
}
else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("LogValue", logValue.toString()).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
if (version > ver && ver != invalidVersion) {
if(removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
}
if(addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
}
}
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
}
ASSERT(consumed == totalBytes);
if (value.size() != offset) {
TraceEvent(SevError, "BA_DecodeBackupLogValue").detail("UnexpectedExtraDataSize", value.size()).detail("Offset", offset).detail("TotalBytes", totalBytes).detail("Consumed", consumed).detail("OriginalOffset", originalOffset);
throw restore_corrupted_data();
}
}
catch (Error& e) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarn : SevError, "BA_DecodeBackupLogValue").error(e).GetLastError().detail("ValueSize", value.size()).detail("Value", value);
throw;
}
}
static double lastErrorTime = 0;
void logErrorWorker(Reference<ReadYourWritesTransaction> tr, Key keyErrors, std::string message) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
if(now() - lastErrorTime > CLIENT_KNOBS->BACKUP_ERROR_DELAY) {
TraceEvent("BA_LogError").detail("Key", keyErrors).detail("Message", message);
lastErrorTime = now();
}
tr->set(keyErrors, message);
}
Future<Void> logError(Database cx, Key keyErrors, const std::string& message) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
logErrorWorker(tr, keyErrors, message);
return Future<Void>(Void());
});
}
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message) {
return logError(tr->getDatabase(), keyErrors, message);
}
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersion> results, Reference<FlowLock> lock,
KeyRangeRef range, bool terminator, bool systemAccess, bool lockAware) {
state KeySelector begin = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state Transaction tr(cx);
state FlowLock::Releaser releaser;
loop{
try {
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
if (systemAccess)
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lockAware)
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
//add lock
releaser.release();
wait(lock->take(TaskPriority::DefaultYield, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT));
releaser = FlowLock::Releaser(*lock, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
state Standalone<RangeResultRef> values = wait(tr.getRange(begin, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if(values.size() > 1 && BUGGIFY) {
values.resize(values.arena(), values.size() / 2);
values.more = true;
// Half of the time wait for this tr to expire so that the next read is at a different version
if(deterministicRandom()->random01() < 0.5)
wait(delay(6.0));
}
releaser.remaining -= values.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(RangeResultWithVersion(values, tr.getReadVersion().get()));
if (values.size() > 0)
begin = firstGreaterThan(values.end()[-1].key);
if (!values.more && !limits.isReached()) {
if(terminator)
results.sendError(end_of_stream());
return Void();
}
}
catch (Error &e) {
if (e.code() == error_code_transaction_too_old) {
// We are using this transaction until it's too old and then resetting to a fresh one,
// so we don't need to delay.
tr.fullReset();
}
else {
wait(tr.onError(e));
}
}
}
}
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Future<Void> active, Reference<FlowLock> lock,
KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy,
bool terminator, bool systemAccess, bool lockAware)
{
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state RCGroup rcGroup = RCGroup();
state uint64_t skipGroup(ULLONG_MAX);
state Transaction tr(cx);
state FlowLock::Releaser releaser;
loop{
try {
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
if (systemAccess)
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lockAware)
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state Standalone<RangeResultRef> rangevalue = wait(tr.getRange(nextKey, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if(rangevalue.size() > 1 && BUGGIFY) {
rangevalue.resize(rangevalue.arena(), rangevalue.size() / 2);
rangevalue.more = true;
// Half of the time wait for this tr to expire so that the next read is at a different version
if(deterministicRandom()->random01() < 0.5)
wait(delay(6.0));
}
//add lock
wait(active);
releaser.release();
wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());
for (auto & s : rangevalue){
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
if (groupKey != skipGroup){
if (rcGroup.version == -1){
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
else if (rcGroup.groupKey != groupKey) {
//TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size());
//state uint32_t len(0);
//for (size_t j = 0; j < rcGroup.items.size(); ++j) {
// len += rcGroup.items[j].value.size();
//}
//TraceEvent("SendGroup").detail("GroupKey", rcGroup.groupKey).detail("Version", rcGroup.version).detail("Length", len).detail("Releaser.remaining", releaser.remaining);
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(rcGroup);
nextKey = firstGreaterThan(rcGroup.items.end()[-1].key);
skipGroup = rcGroup.groupKey;
rcGroup = RCGroup();
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
if (!rangevalue.more) {
if (rcGroup.version != -1){
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
//TraceEvent("Log_ReadCommitted").detail("SendGroup1", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength", rcGroup.items[0].value.size());
results.send(rcGroup);
}
if(terminator)
results.sendError(end_of_stream());
return Void();
}
nextKey = firstGreaterThan(rangevalue.end()[-1].key);
}
catch (Error &e) {
if (e.code() == error_code_transaction_too_old) {
// We are using this transaction until it's too old and then resetting to a fresh one,
// so we don't need to delay.
tr.fullReset();
}
else {
wait(tr.onError(e));
}
}
}
}
Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock, KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy) {
return readCommitted(cx, results, Void(), lock, range, groupBy, true, true, true);
}
ACTOR Future<int> dumpData(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock, Key uid, Key addPrefix, Key removePrefix, RequestStream<CommitTransactionRequest> commit,
NotifiedVersion* committedVersion, Optional<Version> endVersion, Key rangeBegin, PromiseStream<Future<Void>> addActor, FlowLock* commitLock, Reference<KeyRangeMap<Version>> keyVersion ) {
state Version lastVersion = invalidVersion;
state bool endOfStream = false;
state int totalBytes = 0;
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
state int mutationSize = 0;
loop {
try {
RCGroup group = waitNext(results.getFuture());
lock->release(group.items.expectedSize());
BinaryWriter bw(Unversioned());
for(int i = 0; i < group.items.size(); ++i) {
bw.serializeBytes(group.items[i].value);
}
decodeBackupLogValue(req.arena, req.transaction.mutations, mutationSize, bw.toValue(), addPrefix, removePrefix, group.groupKey, keyVersion);
newBeginVersion = group.groupKey + 1;
if(mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
break;
}
}
catch (Error &e) {
if (e.code() == error_code_end_of_stream) {
if(endVersion.present() && endVersion.get() > lastVersion && endVersion.get() > newBeginVersion) {
newBeginVersion = endVersion.get();
}
if(newBeginVersion == invalidVersion)
return totalBytes;
endOfStream = true;
break;
}
throw;
}
}
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
Key rangeEnd = getApplyKey(newBeginVersion, uid);
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));
// The commit request contains no read conflict ranges, so regardless of what read version we
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
// for our transaction to be aborted due to conflicts.
req.transaction.read_snapshot = committedVersion->get();
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
totalBytes += mutationSize;
wait( commitLock->take(TaskPriority::DefaultYield, mutationSize) );
addActor.send( commitLock->releaseWhen( success(commit.getReply(req)), mutationSize ) );
if(endOfStream) {
return totalBytes;
}
}
}
ACTOR Future<Void> coalesceKeyVersionCache(Key uid, Version endVersion, Reference<KeyRangeMap<Version>> keyVersion, RequestStream<CommitTransactionRequest> commit, NotifiedVersion* committedVersion, PromiseStream<Future<Void>> addActor, FlowLock* commitLock) {
Version lastVersion = -1000;
int64_t removed = 0;
state CommitTransactionRequest req;
state int64_t mutationSize = 0;
Key mapPrefix = uid.withPrefix(applyMutationsKeyVersionMapRange.begin);
for(auto it : keyVersion->ranges()) {
if( lastVersion == -1000 ) {
lastVersion = it.value();
} else {
Version ver = it.value();
if(ver < endVersion && lastVersion < endVersion && ver != invalidVersion && lastVersion != invalidVersion) {
Key removeKey = it.range().begin.withPrefix(mapPrefix);
Key removeEnd = keyAfter(removeKey);
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, removeKey, removeEnd));
mutationSize += removeKey.size() + removeEnd.size();
removed--;
} else {
lastVersion = ver;
}
}
}
if(removed != 0) {
Key countKey = uid.withPrefix(applyMutationsKeyVersionCountRange.begin);
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(countKey));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::AddValue, countKey, StringRef((uint8_t*)&removed, 8)));
req.transaction.read_snapshot = committedVersion->get();
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
wait( commitLock->take(TaskPriority::DefaultYield, mutationSize) );
addActor.send( commitLock->releaseWhen( success(commit.getReply(req)), mutationSize ) );
}
return Void();
}
ACTOR Future<Void> applyMutations(Database cx, Key uid, Key addPrefix, Key removePrefix, Version beginVersion, Version* endVersion, RequestStream<CommitTransactionRequest> commit, NotifiedVersion* committedVersion, Reference<KeyRangeMap<Version>> keyVersion ) {
state FlowLock commitLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection( addActor.getFuture() );
state int maxBytes = CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES;
keyVersion->insert(metadataVersionKey, 0);
try {
loop {
if(beginVersion >= *endVersion) {
wait( commitLock.take(TaskPriority::DefaultYield, CLIENT_KNOBS->BACKUP_LOCK_BYTES) );
commitLock.release(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
if(beginVersion >= *endVersion) {
return Void();
}
}
int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes);
state Version newEndVersion = std::min(*endVersion, ((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) * CLIENT_KNOBS->APPLY_BLOCK_SIZE);
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Reference<FlowLock>> locks;
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
locks.push_back(Reference<FlowLock>( new FlowLock(std::max(CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES/ranges.size(), CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES))));
rc.push_back(readCommitted(cx, results[i], locks[i], ranges[i], decodeBKMutationLogKey));
}
maxBytes = std::max<int>(maxBytes*CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES);
for (idx = 0; idx < ranges.size(); ++idx) {
int bytes = wait(dumpData(cx, results[idx], locks[idx], uid, addPrefix, removePrefix, commit, committedVersion, idx==ranges.size()-1 ? newEndVersion : Optional<Version>(), ranges[idx].begin, addActor, &commitLock, keyVersion));
maxBytes = std::max<int>(CLIENT_KNOBS->APPLY_MAX_INCREASE_FACTOR*bytes, maxBytes);
if(error.isError()) throw error.getError();
}
wait(coalesceKeyVersionCache(uid, newEndVersion, keyVersion, commit, committedVersion, addActor, &commitLock));
beginVersion = newEndVersion;
}
} catch( Error &e ) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarnAlways : SevError, "ApplyMutationsError").error(e);
throw;
}
}
ACTOR static Future<Void> _eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Key backupLatestVersionsKey = logUidValue.withPrefix(backupLatestVersionsPath);
if (!destUidValue.size()) {
return Void();
}
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (checkBackupUid) {
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
return Void();
}
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
// Make sure version history key does exist and lower the beginVersion if needed
state Version currBeginVersion = invalidVersion;
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currLogUidValue == logUidValue) {
currBeginVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
break;
}
}
// Do not clear anything if version history key cannot be found
if (currBeginVersion == invalidVersion) {
return Void();
}
state Version currEndVersion = std::numeric_limits<Version>::max();
if(endVersion.present()) {
currEndVersion = std::min(currEndVersion, endVersion.get());
}
state Version nextSmallestVersion = currEndVersion;
bool clearLogRangesRequired = true;
// More than one backup/DR with the same range
if (backupVersions.size() > 1) {
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
if (currLogUidValue == logUidValue) {
continue;
} else if (currVersion > currBeginVersion) {
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false;
break;
}
}
}
if (endVersion.present() || backupVersions.size() != 1 || BUGGIFY) {
if (!endVersion.present()) {
// Clear current backup version history
tr->clear(backupLatestVersionsKey);
if(backupVersions.size() == 1) {
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
}
} else {
// Update current backup latest version
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(currEndVersion, Unversioned()));
}
// Clear log ranges if needed
if (clearLogRangesRequired) {
if((nextSmallestVersion - currBeginVersion) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE >= std::numeric_limits<uint8_t>::max() || BUGGIFY) {
Key baLogRangePrefix = destUidValue.withPrefix(backupLogKeys.begin);
for(int h = 0; h <= std::numeric_limits<uint8_t>::max(); h++) {
uint64_t bv = bigEndian64(Version(0));
uint64_t ev = bigEndian64(nextSmallestVersion);
uint8_t h1 = h;
Key vblockPrefix = StringRef(&h1, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
tr->clear(KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
} else {
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(currBeginVersion, nextSmallestVersion, destUidValue);
for (auto& range : ranges) {
tr->clear(range);
}
}
}
} else {
// Clear version history
tr->clear(prefixRange(backupLatestVersionsPath));
// Clear everything under blog/[destUid]
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
// Disable committing mutations into blog
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
}
return Void();
}
Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
return _eraseLogData(tr, logUidValue, destUidValue, endVersion, checkBackupUid, backupUid);
}
ACTOR Future<Void> cleanupLogMutations(Database cx, Value destUidValue, bool deleteData) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
state Version readVer = tr->getReadVersion().get();
state Version minVersion = std::numeric_limits<Version>::max();
state Key minVersionLogUid;
state int backupIdx = 0;
for (; backupIdx < backupVersions.size(); backupIdx++) {
state Version currVersion = BinaryReader::fromStringRef<Version>(backupVersions[backupIdx].value, Unversioned());
state Key currLogUid = backupVersions[backupIdx].key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if( currVersion < minVersion ) {
minVersionLogUid = currLogUid;
minVersion = currVersion;
}
state Future<Optional<Value>> foundDRKey = tr->get(Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(currLogUid).pack(DatabaseBackupAgent::keyStateStatus));
state Future<Optional<Value>> foundBackupKey = tr->get(Subspace(currLogUid.withPrefix(LiteralStringRef("uid->config/")).withPrefix(fileBackupPrefixRange.begin)).pack(LiteralStringRef("stateEnum")));
wait(success(foundDRKey) && success(foundBackupKey));
if(foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("WARNING: Found a tag which looks like both a backup and a DR. This tag was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(foundDRKey.get().present() && !foundBackupKey.get().present()) {
printf("Found a DR which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if(!foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("Found a Backup which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else {
printf("WARNING: Found a unknown tag which was %.4f hours behind.\n", (readVer - currVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
}
if( readVer - minVersion > CLIENT_KNOBS->MIN_CLEANUP_SECONDS*CLIENT_KNOBS->CORE_VERSIONSPERSECOND && deleteData ) {
wait(eraseLogData(tr, minVersionLogUid, destUidValue));
wait(tr->commit());
printf("\nSuccessfully removed the tag which was %.4f hours behind.\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if( deleteData ) {
printf("\nWARNING: Did not delete data because the tag was not at least %.4f hours behind. Change MIN_CLEANUP_SECONDS to adjust this threshold.\n", CLIENT_KNOBS->MIN_CLEANUP_SECONDS/3600.0);
} else {
printf("\nPassing `--delete_data' would delete the tag which was %.4f hours behind.\n", (readVer - minVersion)/(3600.0*CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
return Void();
} catch( Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> cleanupBackup(Database cx, bool deleteData) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Standalone<RangeResultRef> destUids = wait(tr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
for(auto destUid : destUids) {
wait(cleanupLogMutations(cx, destUid.value, deleteData));
}
return Void();
} catch( Error& e) {
wait(tr->onError(e));
}
}
}