Add destinations that are read-write to the source list, so that cancelled data movement can contribute to copying the data for the next movement.

This commit is contained in:
Evan Tschannen 2017-10-03 17:39:08 -07:00
parent fd5fe3a000
commit 3a2ddcc84a

View File

@ -94,11 +94,11 @@ Future<Void> checkMoveKeysLockReadOnly( Transaction* tr, MoveKeysLock lock ) {
return checkMoveKeysLock(tr, lock, false);
}
ACTOR Future<bool> checkReadWrite( Future< ErrorOr<Version> > fReply ) {
ACTOR Future<Optional<UID>> checkReadWrite( Future< ErrorOr<Version> > fReply, UID uid ) {
ErrorOr<Version> reply = wait( fReply );
if (!reply.present())
return false;
return true;
if (!reply.present())
return Optional<UID>();
return Optional<UID>(uid);
}
Future<Void> removeOldDestinations(Transaction *tr, UID oldDest, VectorRef<KeyRangeRef> shards, KeyRangeRef currentKeys) {
@ -118,11 +118,58 @@ Future<Void> removeOldDestinations(Transaction *tr, UID oldDest, VectorRef<KeyRa
return waitForAll(actors);
}
ACTOR Future<vector<vector<Optional<UID>>>> findReadWriteDestinations(Standalone<RangeResultRef> shards, UID relocationIntervalId, Transaction* tr) {
vector<Future<Optional<Value>>> serverListEntries;
for(int i = 0; i < shards.size() - 1; ++i) {
vector<UID> src;
vector<UID> dest;
decodeKeyServersValue( shards[i].value, src, dest );
for(int s=0; s<dest.size(); s++) {
serverListEntries.push_back( tr->get( serverListKeyFor(dest[s]) ) );
}
}
vector<Optional<Value>> serverListValues = wait( getAll(serverListEntries) );
std::map<UID, StorageServerInterface> ssiMap;
for(int s=0; s<serverListValues.size(); s++) {
auto si = decodeServerListValue(serverListValues[s].get());
StorageServerInterface ssi = decodeServerListValue(serverListValues[s].get());
ssiMap[ssi.id()] = ssi;
}
vector<Future<vector<Optional<UID>>>> allChecks;
for(int i = 0; i < shards.size() - 1; ++i) {
KeyRangeRef rangeIntersectKeys( shards[i].key, shards[i+1].key );
vector<UID> src;
vector<UID> dest;
vector<StorageServerInterface> storageServerInterfaces;
decodeKeyServersValue( shards[i].value, src, dest );
for(int s=0; s<dest.size(); s++)
storageServerInterfaces.push_back( ssiMap[dest[s]] );
vector< Future<Optional<UID>> > checks;
for(int s=0; s<storageServerInterfaces.size(); s++) {
checks.push_back( checkReadWrite( storageServerInterfaces[s].getShardState.getReplyUnlessFailedFor(
GetShardStateRequest( rangeIntersectKeys, GetShardStateRequest::NO_WAIT), SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, 0, TaskMoveKeys ), dest[s] ) );
}
allChecks.push_back(getAll(checks));
}
vector<vector<Optional<UID>>> readWriteDestinations = wait(getAll(allChecks));
return readWriteDestinations;
}
// Set keyServers[keys].dest = servers
// Set serverKeys[servers][keys] = active for each subrange of keys that the server did not already have, complete for each subrange that it already has
// Set serverKeys[dest][keys] = "" for the dest servers of each existing shard in keys (unless that destination is a member of servers OR if the source list is sufficiently degraded)
ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> servers,
MoveKeysLock lock, int durableStorageQuorum,
ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> servers,
MoveKeysLock lock, int durableStorageQuorum,
FlowLock *startMoveKeysLock, UID relocationIntervalId ) {
state TraceInterval interval("RelocateShard_StartMoveKeys");
//state TraceInterval waitInterval("");
@ -138,7 +185,7 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
state int shards = 0;
state int maxRetries = 0;
//This process can be split up into multiple transactions if there are too many existing overlapping shards
//This process can be split up into multiple transactions if there are too many existing overlapping shards
//In that case, each iteration of this loop will have begin set to the end of the last processed shard
while(begin < keys.end) {
TEST(begin > keys.begin); //Multi-transactional startMoveKeys
@ -192,6 +239,9 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
//for(int i=0; i<old.size(); i++)
// printf("'%s': '%s'\n", old[i].key.toString().c_str(), old[i].value.toString().c_str());
//Check that enough servers for each shard are in the correct state
vector<vector<Optional<UID>>> readWriteDestinations = wait(findReadWriteDestinations(old, relocationIntervalId, &tr));
// For each intersecting range, update keyServers[range] dest to be servers and clear existing dest servers from serverKeys
for(int i = 0; i < old.size() - 1; ++i) {
KeyRangeRef rangeIntersectKeys( old[i].key, old[i+1].key );
@ -206,6 +256,15 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
.detail("OldDest", describe(dest))
.detail("ReadVersion", tr.getReadVersion().get());*/
for(auto& uid : readWriteDestinations[i]) {
if(uid.present()) {
src.push_back(uid.get());
}
}
std::sort( src.begin(), src.end() );
src.resize( std::unique( src.begin(), src.end() ) - src.begin() );
//Update dest servers for this range to be equal to servers
krmSetPreviouslyEmptyRange( &tr, keyServersPrefix, rangeIntersectKeys, keyServersValue(src, servers), old[i+1].value );
@ -229,7 +288,7 @@ ACTOR Future<Void> startMoveKeys( Database occ, KeyRange keys, vector<UID> serve
state std::set<UID>::iterator oldDest;
//Remove old dests from serverKeys. In order for krmSetRangeCoalescing to work correctly in the same prefix for a single transaction, we must
//Remove old dests from serverKeys. In order for krmSetRangeCoalescing to work correctly in the same prefix for a single transaction, we must
//do most of the coalescing ourselves. Only the shards on the boundary of currentRange are actually coalesced with the ranges outside of currentRange.
//For all shards internal to currentRange, we overwrite all consecutive keys whose value is or should be serverKeysFalse in a single write
vector<Future<Void>> actors;
@ -305,7 +364,7 @@ ACTOR Future<Void> waitForShardReady( StorageServerInterface server, KeyRange ke
}
}
ACTOR Future<Void> checkFetchingState( Database cx, vector<UID> dest, KeyRange keys,
ACTOR Future<Void> checkFetchingState( Database cx, vector<UID> dest, KeyRange keys,
Promise<Void> dataMovementComplete, UID relocationIntervalId ) {
state Transaction tr(cx);
@ -315,7 +374,7 @@ ACTOR Future<Void> checkFetchingState( Database cx, vector<UID> dest, KeyRange k
tr.info.taskID = TaskMoveKeys;
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
vector< Future< Optional<Value> > > serverListEntries;
for(int s=0; s<dest.size(); s++)
serverListEntries.push_back( tr.get( serverListKeyFor(dest[s]) ) );
@ -331,8 +390,8 @@ ACTOR Future<Void> checkFetchingState( Database cx, vector<UID> dest, KeyRange k
ASSERT( si.id() == dest[s] );
requests.push_back( waitForShardReady( si, keys, tr.getReadVersion().get(), GetShardStateRequest::FETCHING ) );
}
Void _ = wait( timeoutError( waitForAll( requests ),
Void _ = wait( timeoutError( waitForAll( requests ),
SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT, TaskMoveKeys ) );
dataMovementComplete.send(Void());
@ -350,8 +409,8 @@ ACTOR Future<Void> checkFetchingState( Database cx, vector<UID> dest, KeyRange k
// keyServers[k].dest must be the same for all k in keys
// Set serverKeys[dest][keys] = true; serverKeys[src][keys] = false for all src not in dest
// Should be cancelled and restarted if keyServers[keys].dest changes (?so this is no longer true?)
ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> destinationTeam,
MoveKeysLock lock, int durableStorageQuorum, FlowLock *finishMoveKeysParallelismLock, UID relocationIntervalId )
ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> destinationTeam,
MoveKeysLock lock, int durableStorageQuorum, FlowLock *finishMoveKeysParallelismLock, UID relocationIntervalId )
{
state TraceInterval interval("RelocateShard_FinishMoveKeys");
state TraceInterval waitInterval("");
@ -364,19 +423,19 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
try {
TraceEvent(SevDebug, interval.begin(), relocationIntervalId).detail("KeyBegin", printable(keys.begin)).detail("KeyEnd", printable(keys.end));
//This process can be split up into multiple transactions if there are too many existing overlapping shards
//This process can be split up into multiple transactions if there are too many existing overlapping shards
//In that case, each iteration of this loop will have begin set to the end of the last processed shard
while(begin < keys.end) {
TEST(begin > keys.begin); //Multi-transactional finishMoveKeys
state Transaction tr( occ );
//printf("finishMoveKeys( '%s'-'%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str());
loop {
try {
tr.info.taskID = TaskMoveKeys;
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
releaser.release();
Void _ = wait( finishMoveKeysParallelismLock->take( TaskDataDistributionLaunch ) );
releaser = FlowLock::Releaser( *finishMoveKeysParallelismLock );
@ -389,7 +448,7 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
//Determine the last processed key (which will be the beginning for the next iteration)
state Key endKey = keyServers.end()[-1].key;
currentKeys = KeyRangeRef(currentKeys.begin, endKey);
//printf(" finishMoveKeys( '%s'-'%s' ): read keyServers at %lld\n", keys.begin.toString().c_str(), keys.end.toString().c_str(), tr.getReadVersion().get());
// Decode and sanity check the result (dest must be the same for all ranges)
@ -506,8 +565,8 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
for(int s=0; s<storageServerInterfaces.size(); s++)
serverReady.push_back( waitForShardReady( storageServerInterfaces[s], keys, tr.getReadVersion().get(), GetShardStateRequest::READABLE) );
Void _ = wait( timeout(
smartQuorum( serverReady, durableStorageQuorum, SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, TaskMoveKeys ),
Void _ = wait( timeout(
smartQuorum( serverReady, durableStorageQuorum, SERVER_KNOBS->SERVER_READY_QUORUM_INTERVAL, TaskMoveKeys ),
SERVER_KNOBS->SERVER_READY_QUORUM_TIMEOUT, Void(), TaskMoveKeys ) );
int count = 0;
for(int s=0; s<serverReady.size(); s++)
@ -515,10 +574,10 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
//printf(" fMK: moved data to %d/%d servers\n", count, serverReady.size());
TraceEvent(SevDebug, waitInterval.end(), relocationIntervalId).detail("ReadyServers", count);
if( count >= durableStorageQuorum ) {
// update keyServers, serverKeys
// SOMEDAY: Doing these in parallel is safe because none of them overlap or touch (one per server)
// SOMEDAY: Doing these in parallel is safe because none of them overlap or touch (one per server)
Void _ = wait( krmSetRangeCoalescing( &tr, keyServersPrefix, currentKeys, keys, keyServersValue( dest ) ) );
std::set<UID>::iterator asi = allServers.begin();
@ -526,7 +585,7 @@ ACTOR Future<Void> finishMoveKeys( Database occ, KeyRange keys, vector<UID> dest
while (asi != allServers.end()) {
bool destHasServer = std::find(dest.begin(), dest.end(), *asi) != dest.end();
actors.push_back( krmSetRangeCoalescing( &tr, serverKeysPrefixFor(*asi), currentKeys, allKeys, destHasServer ? serverKeysTrue : serverKeysFalse ) );
++asi;
++asi;
}
Void _ = wait(waitForAll(actors));
@ -570,9 +629,9 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer( Database cx, StorageServ
loop {
try {
state Future<Optional<Value>> fv = tr.get( serverListKeyFor(server.id()) );
state Future<Optional<Value>> fExclProc = tr.get(
state Future<Optional<Value>> fExclProc = tr.get(
StringRef(encodeExcludedServersKey( AddressExclusion( server.address().ip, server.address().port ))) );
state Future<Optional<Value>> fExclIP = tr.get(
state Future<Optional<Value>> fExclIP = tr.get(
StringRef(encodeExcludedServersKey( AddressExclusion( server.address().ip ))) );
state Future<Standalone<RangeResultRef>> fTags( tr.getRange( serverTagKeys, CLIENT_KNOBS->TOO_MANY, true) );
@ -682,14 +741,14 @@ ACTOR Future<Void> removeStorageServer( Database cx, UID serverID, MoveKeysLock
}
ACTOR Future<Void> moveKeys(
Database cx,
Database cx,
KeyRange keys,
vector<UID> destinationTeam,
MoveKeysLock lock,
int durableStorageQuorum,
Promise<Void> dataMovementComplete,
FlowLock *startMoveKeysParallelismLock,
FlowLock *finishMoveKeysParallelismLock,
FlowLock *finishMoveKeysParallelismLock,
UID relocationIntervalId)
{
ASSERT( destinationTeam.size() );
@ -708,9 +767,9 @@ ACTOR Future<Void> moveKeys(
return Void();
}
void seedShardServers(
void seedShardServers(
Arena& arena,
CommitTransactionRef &tr,
CommitTransactionRef &tr,
vector<StorageServerInterface> servers )
{
std::map<UID, Tag> server_tag;