Merge pull request #47 from apple/release-5.1

Merge Release 5.1 into Release 5.2
This commit is contained in:
A.J. Beamon 2018-03-08 13:18:45 -08:00 committed by GitHub
commit 2c92ef8ff8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 113 deletions

View File

@ -421,7 +421,7 @@ Future<Void> logError(Database cx, Key keyErrors, const std::string& message);
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message);
Future<Void> checkVersion(Reference<ReadYourWritesTransaction> const& tr);
Future<Void> readCommitted(Database const& cx, PromiseStream<RangeResultWithVersion> const& results, Reference<FlowLock> const& lock, KeyRangeRef const& range, bool const& terminator = true, bool const& systemAccess = false, bool const& lockAware = false);
Future<Void> readCommitted(Database const& cx, PromiseStream<RCGroup> const& results, Future<Void> const& active, Reference<FlowLock> const& lock, KeyRangeRef const& range, std::function< std::pair<uint64_t, uint32_t>(Key key) > const& groupBy, bool const& terminator = true, bool const& systemAccess = false, bool const& lockAware = false, std::function< Future<Void>(Reference<ReadYourWritesTransaction> tr) > const& withEachFunction = nullptr);
Future<Void> readCommitted(Database const& cx, PromiseStream<RCGroup> const& results, Future<Void> const& active, Reference<FlowLock> const& lock, KeyRangeRef const& range, std::function< std::pair<uint64_t, uint32_t>(Key key) > const& groupBy, bool const& terminator = true, bool const& systemAccess = false, bool const& lockAware = false);
Future<Void> applyMutations(Database const& cx, Key const& uid, Key const& addPrefix, Key const& removePrefix, Version const& beginVersion, Version* const& endVersion, RequestStream<CommitTransactionRequest> const& commit, NotifiedVersion* const& committedVersion, Reference<KeyRangeMap<Version>> const& keyVersion);
typedef BackupAgentBase::enumState EBackupState;

View File

@ -328,7 +328,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersi
KeyRangeRef range, bool terminator, bool systemAccess, bool lockAware) {
state KeySelector begin = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Transaction tr(cx);
state FlowLock::Releaser releaser;
loop{
@ -336,16 +336,16 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersi
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
if (systemAccess)
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lockAware)
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
//add lock
releaser.release();
Void _ = wait(lock->take(TaskDefaultYield, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT));
releaser = FlowLock::Releaser(*lock, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
state Standalone<RangeResultRef> values = wait(tr->getRange(begin, end, limits));
state Standalone<RangeResultRef> values = wait(tr.getRange(begin, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if(values.size() > 1 && BUGGIFY) {
@ -359,7 +359,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersi
releaser.remaining -= values.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(RangeResultWithVersion(values, tr->getReadVersion().get()));
results.send(RangeResultWithVersion(values, tr.getReadVersion().get()));
if (values.size() > 0)
begin = firstGreaterThan(values.end()[-1].key);
@ -373,21 +373,21 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersi
catch (Error &e) {
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
throw;
tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
tr = Transaction(cx);
}
}
}
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Future<Void> active, Reference<FlowLock> lock,
KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy,
bool terminator, bool systemAccess, bool lockAware, std::function< Future<Void>(Reference<ReadYourWritesTransaction> tr) > withEachFunction)
bool terminator, bool systemAccess, bool lockAware)
{
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state RCGroup rcGroup = RCGroup();
state uint64_t skipGroup(ULLONG_MAX);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Transaction tr(cx);
state FlowLock::Releaser releaser;
loop{
@ -395,14 +395,11 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
if (systemAccess)
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lockAware)
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Void> withEach = Void();
if (withEachFunction) withEach = withEachFunction(tr);
state Standalone<RangeResultRef> rangevalue = wait(tr->getRange(nextKey, end, limits));
state Standalone<RangeResultRef> rangevalue = wait(tr.getRange(nextKey, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if(rangevalue.size() > 1 && BUGGIFY) {
@ -413,8 +410,6 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
Void _ = wait(delay(6.0));
}
Void _ = wait(withEach);
//add lock
Void _ = wait(active);
releaser.release();
@ -427,7 +422,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
//TraceEvent("log_readCommitted").detail("groupKey", groupKey).detail("skipGroup", skipGroup).detail("nextKey", printable(nextKey.key)).detail("end", printable(end.key)).detail("valuesize", value.size()).detail("index",index++).detail("size",s.value.size());
if (groupKey != skipGroup){
if (rcGroup.version == -1){
rcGroup.version = tr->getReadVersion().get();
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
else if (rcGroup.groupKey != groupKey) {
@ -444,7 +439,7 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
skipGroup = rcGroup.groupKey;
rcGroup = RCGroup();
rcGroup.version = tr->getReadVersion().get();
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
@ -469,94 +464,13 @@ ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Fu
catch (Error &e) {
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
throw;
Void _ = wait(tr->onError(e));
Void _ = wait(tr.onError(e));
}
}
}
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock,
KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy)
{
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state RCGroup rcGroup = RCGroup();
state uint64_t skipGroup(ULLONG_MAX);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state FlowLock::Releaser releaser;
loop{
try {
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Standalone<RangeResultRef> rangevalue = wait(tr->getRange(nextKey, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if(rangevalue.size() > 1 && BUGGIFY) {
rangevalue.resize(rangevalue.arena(), rangevalue.size() / 2);
rangevalue.more = true;
// Half of the time wait for this tr to expire so that the next read is at a different version
if(g_random->random01() < 0.5)
Void _ = wait(delay(6.0));
}
releaser.release();
Void _ = wait(lock->take(TaskDefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());
int index(0);
for (auto & s : rangevalue){
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("log_readCommitted").detail("groupKey", groupKey).detail("skipGroup", skipGroup).detail("nextKey", printable(nextKey.key)).detail("end", printable(end.key)).detail("valuesize", value.size()).detail("index",index++).detail("size",s.value.size());
if (groupKey != skipGroup){
if (rcGroup.version == -1){
rcGroup.version = tr->getReadVersion().get();
rcGroup.groupKey = groupKey;
}
else if (rcGroup.groupKey != groupKey) {
//TraceEvent("log_readCommitted").detail("sendGroup0", rcGroup.groupKey).detail("itemSize", rcGroup.items.size()).detail("data_length",rcGroup.items[0].value.size());
state uint32_t len(0);
for (size_t j = 0; j < rcGroup.items.size(); ++j) {
len += rcGroup.items[j].value.size();
}
//TraceEvent("SendGroup").detail("groupKey", rcGroup.groupKey).detail("version", rcGroup.version).detail("length", len).detail("releaser.remaining", releaser.remaining);
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(rcGroup);
nextKey = firstGreaterThan(rcGroup.items.end()[-1].key);
skipGroup = rcGroup.groupKey;
rcGroup = RCGroup();
rcGroup.version = tr->getReadVersion().get();
rcGroup.groupKey = groupKey;
}
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
if (!rangevalue.more) {
if (rcGroup.version != -1){
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
//TraceEvent("log_readCommitted").detail("sendGroup1", rcGroup.groupKey).detail("itemSize", rcGroup.items.size()).detail("data_length", rcGroup.items[0].value.size());
results.send(rcGroup);
}
results.sendError(end_of_stream());
return Void();
}
nextKey = firstGreaterThan(rangevalue.end()[-1].key);
}
catch (Error &e) {
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
throw;
Void _ = wait(tr->onError(e));
}
}
Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock, KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy) {
return readCommitted(cx, results, Void(), lock, range, groupBy, true, true, true);
}
ACTOR Future<int> dumpData(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock, Key uid, Key addPrefix, Key removePrefix, RequestStream<CommitTransactionRequest> commit,

View File

@ -651,7 +651,7 @@ namespace dbBackup {
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
rc.push_back(readCommitted(taskBucket->src, results[i], Future<Void>(Void()), lock, ranges[i], decodeBKMutationLogKey, true, true, true, nullptr));
rc.push_back(readCommitted(taskBucket->src, results[i], Future<Void>(Void()), lock, ranges[i], decodeBKMutationLogKey, true, true, true));
dump.push_back(dumpData(cx, task, results[i], lock.getPtr(), taskBucket));
}

View File

@ -343,27 +343,26 @@ ACTOR Future<Void> runProfiler(ProfilerRequest req) {
}
}
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, StorageServerInterface ssi, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit ) {
ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer, KeyValueStoreType storeType, std::string filename, UID id, LocalityData locality, Reference<AsyncVar<ServerDBInfo>> db, std::string folder, ActorCollection* filesClosed, int64_t memoryLimit ) {
loop {
ErrorOr<Void> e = wait( errorOr( prevStorageServer) );
if (!e.isError()) return Void();
else if (e.getError().code() != error_code_please_reboot) throw e.getError();
TraceEvent("StorageServerRequestedReboot", ssi.id());
TraceEvent("StorageServerRequestedReboot", id);
//if (BUGGIFY) Void _ = wait(delay(1.0)); // This does the same thing as zombie()
// We need a new interface, since the new storageServer will do replaceInterface(). And we need to destroy
// the old one so the failure detector will know it is gone.
StorageServerInterface ssi_new;
ssi_new.uniqueID = ssi.uniqueID;
ssi_new.locality = ssi.locality;
ssi = ssi_new;
StorageServerInterface ssi;
ssi.uniqueID = id;
ssi.locality = locality;
ssi.initEndpoints();
auto* kv = openKVStore( storeType, filename, ssi.uniqueID, memoryLimit );
Future<Void> kvClosed = kv->onClosed();
filesClosed->add( kvClosed );
prevStorageServer = storageServer( kv, ssi, db, folder, Promise<Void>() );
prevStorageServer = handleIOErrors( prevStorageServer, kv, ssi.id(), kvClosed );
prevStorageServer = handleIOErrors( prevStorageServer, kv, id, kvClosed );
}
}
@ -610,7 +609,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
Future<Void> f = storageServer( kv, recruited, dbInfo, folder, recovery );
recoveries.push_back(recovery.getFuture());
f = handleIOErrors( f, kv, s.storeID, kvClosed );
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited, dbInfo, folder, &filesClosed, memoryLimit );
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit );
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), f ) );
} else if( s.storedComponent == DiskStore::TLogData ) {
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
@ -763,7 +762,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
s = handleIOErrors(s, data, recruited.id(), kvClosed);
s = storageCache.removeOnReady( req.reqId, s );
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited, dbInfo, folder, &filesClosed, memoryLimit );
s = storageServerRollbackRebooter( s, req.storeType, filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit );
errorForwarders.add( forwardError( errors, "StorageServer", recruited.id(), s ) );
} else
forwardPromise( req.reply, storageCache.get( req.reqId ) );