/* * CoordinatedState.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "CoordinatedState.h" #include "CoordinationInterface.h" #include "Knobs.h" #include "flow/ActorCollection.h" #include "LeaderElection.h" ACTOR Future waitAndSendRead( RequestStream to, GenerationRegReadRequest req ) { if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY ) Void _ = wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) ); state GenerationRegReadReply reply = wait( retryBrokenPromise( to, req ) ); if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY ) Void _ = wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) ); return reply; } ACTOR Future waitAndSendWrite(RequestStream to, GenerationRegWriteRequest req) { if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY ) Void _ = wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) ); state UniqueGeneration reply = wait( retryBrokenPromise( to, req ) ); if( SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY ) Void _ = wait( delay( SERVER_KNOBS->BUGGIFIED_EVENTUAL_CONSISTENCY*g_random->random01() ) ); return reply; } ACTOR Future emptyToNever( Future f ) { state GenerationRegReadReply r = wait(f); if (r.gen.generation == 0) Void _ = wait( Future(Never()) ); return r; } ACTOR Future nonemptyToNever( Future f ) { state GenerationRegReadReply r = wait(f); if (r.gen.generation != 0) Void _ = wait( Future(Never()) ); return r; } struct CoordinatedStateImpl { ServerCoordinators coordinators; int stage; UniqueGeneration gen; uint64_t conflictGen; bool doomed; ActorCollection ac; //Errors are not reported bool initial; CoordinatedStateImpl( ServerCoordinators const& c ) : coordinators(c), stage(0), conflictGen(0), doomed(false), ac(false), initial(false) {} uint64_t getConflict() { return conflictGen; } bool isDoomed( GenerationRegReadReply const& rep ) { return rep.gen > gen // setExclusive is doomed, because there was a write at least started at a higher generation, which means a read completed at that higher generation // || rep.rgen > gen // setExclusive isn't absolutely doomed, but it may/probably will fail ; } ACTOR static Future read( CoordinatedStateImpl* self ) { ASSERT( self->stage == 0 ); self->stage = 1; GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, UniqueGeneration() ) ) ); self->conflictGen = std::max( self->conflictGen, std::max(rep.gen.generation, rep.rgen.generation) ) + 1; self->gen = UniqueGeneration( self->conflictGen, g_random->randomUniqueID() ); self->stage = 2; GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, self->gen ) ) ); self->stage = 3; self->conflictGen = std::max(self->conflictGen, std::max( rep.gen.generation, rep.rgen.generation )); if (self->isDoomed(rep)) self->doomed = true; self->initial = rep.gen.generation == 0; self->stage = 4; return rep.value.present() ? rep.value.get() : Value(); } ACTOR static Future onConflict( CoordinatedStateImpl* self ) { ASSERT( self->stage == 4 ); if (self->doomed) return Void(); loop { Void _ = wait( delay( SERVER_KNOBS->COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL ) ); GenerationRegReadReply rep = wait( self->replicatedRead( self, GenerationRegReadRequest( self->coordinators.clusterKey, UniqueGeneration() ) ) ); if (self->stage > 4) break; self->conflictGen = std::max(self->conflictGen, std::max( rep.gen.generation, rep.rgen.generation )); if (self->isDoomed(rep)) return Void(); } Void _ = wait( Future(Never()) ); return Void(); } ACTOR static Future setExclusive( CoordinatedStateImpl* self, Value v ) { ASSERT( self->stage == 4 ); self->stage = 5; UniqueGeneration wgen = wait( self->replicatedWrite( self, GenerationRegWriteRequest( KeyValueRef(self->coordinators.clusterKey, v), self->gen ) ) ); self->stage = 6; TraceEvent("CoordinatedStateSet").detail("gen", self->gen.generation).detail("wgen", wgen.generation) .detail("genu", self->gen.uid).detail("wgenu", wgen.uid) .detail("cgen", self->conflictGen); if (wgen == self->gen) return Void(); else { self->conflictGen = std::max(self->conflictGen, wgen.generation); throw coordinated_state_conflict(); } } ACTOR static Future replicatedRead( CoordinatedStateImpl* self, GenerationRegReadRequest req ) { state std::vector &replicas = self->coordinators.stateServers; state vector< Future > rep_empty_reply; state vector< Future > rep_reply; for(int i=0; i reply = waitAndSendRead( replicas[i].read, GenerationRegReadRequest(req.key, req.gen) ); rep_empty_reply.push_back( nonemptyToNever( reply ) ); rep_reply.push_back( emptyToNever( reply ) ); self->ac.add( success( reply ) ); } state Future majorityEmpty = quorum( rep_empty_reply, (replicas.size()+1)/2 ); //enough empty to ensure we cannot achieve a majority non-empty Void _ = wait( quorum( rep_reply, replicas.size()/2 + 1 ) || majorityEmpty ); if( majorityEmpty.isReady() ) { int best = -1; for(int i=0; i rep_empty_reply[best].get().rgen ) best = i; } ASSERT( best >= 0 ); auto result = rep_empty_reply[best].get(); return result; } else { int best = -1; for(int i=0; i rep_reply[best].get().gen || ( rep_reply[i].get().gen == rep_reply[best].get().gen && rep_reply[i].get().rgen > rep_reply[best].get().rgen ) ) best = i; } ASSERT( best >= 0 ); auto result = rep_reply[best].get(); return result; } } ACTOR static Future replicatedWrite( CoordinatedStateImpl* self, GenerationRegWriteRequest req ) { state std::vector &replicas = self->coordinators.stateServers; state vector< Future > wrep_reply; for(int i=0; i reply = waitAndSendWrite( replicas[i].write, GenerationRegWriteRequest( req.kv, req.gen ) ); wrep_reply.push_back( reply ); self->ac.add( success( reply ) ); } Void _ = wait( quorum( wrep_reply, self->initial ? replicas.size() : replicas.size()/2 + 1 ) ); UniqueGeneration maxGen; for(int i=0; i CoordinatedState::read() { return CoordinatedStateImpl::read(impl); } Future CoordinatedState::onConflict() { return CoordinatedStateImpl::onConflict(impl); } Future CoordinatedState::setExclusive(Value v) { return CoordinatedStateImpl::setExclusive(impl,v); } uint64_t CoordinatedState::getConflict() { return impl->getConflict(); } struct MovableValue { enum MoveState { MaybeTo = 1, Active = 2, MovingFrom = 3 }; Value value; int32_t mode; Optional other; // a cluster connection string MovableValue() : mode( Active ) {} MovableValue( Value const& v, int mode, Optional other = Optional() ) : value( v ), mode( mode ), other( other ) {} template void serialize(Ar& ar) { ASSERT( ar.protocolVersion() >= 0x0FDB00A2000D0001LL ); ar & value & mode & other; } }; struct MovableCoordinatedStateImpl { ServerCoordinators coordinators; CoordinatedState cs; Optional lastValue, // The value passed to setExclusive() lastCSValue; // The value passed to cs.setExclusive() MovableCoordinatedStateImpl( ServerCoordinators const& c ) : coordinators(c), cs(c) {} ACTOR static Future read( MovableCoordinatedStateImpl* self ) { state MovableValue moveState; Value rawValue = wait( self->cs.read() ); if( rawValue.size() ) { BinaryReader r( rawValue, IncludeVersion() ); if (r.protocolVersion() < 0x0FDB00A2000D0001LL) { // Old coordinated state, not a MovableValue moveState.value = rawValue; } else r >> moveState; } // SOMEDAY: If moveState.mode == MovingFrom, read (without locking) old state and assert that it corresponds with our state and is ReallyTo(coordinators) if (moveState.mode == MovableValue::MaybeTo) { TEST(true); ASSERT( moveState.other.present() ); Void _ = wait( self->moveTo( self, &self->cs, ClusterConnectionString( moveState.other.get().toString() ), moveState.value ) ); } return moveState.value; } Future onConflict() { return cs.onConflict(); } Future setExclusive( Value v ) { lastValue=v; lastCSValue=BinaryWriter::toValue( MovableValue( v, MovableValue::Active ), IncludeVersion() ); return cs.setExclusive( lastCSValue.get() ); } ACTOR static Future move( MovableCoordinatedStateImpl* self, ClusterConnectionString nc ) { // Call only after setExclusive returns. Attempts to move the coordinated state // permanently to the new ServerCoordinators, which must be uninitialized. Returns when the process has // reached the point where a leader elected by the new coordinators should be doing the rest of the work // (and therefore the caller should die). state CoordinatedState cs( self->coordinators ); state CoordinatedState nccs( ServerCoordinators( Reference( new ClusterConnectionFile(nc) ) ) ); state Future creationTimeout = delay(30); ASSERT( self->lastValue.present() && self->lastCSValue.present() ); TraceEvent("StartMove").detail("ConnectionString", nc.toString() ); choose { when (Void _ = wait(creationTimeout)) { throw new_coordinators_timed_out(); } when (Value ncInitialValue = wait( nccs.read() )) { ASSERT( !ncInitialValue.size() ); // The new coordinators must be uninitialized! } } TraceEvent("FinishedRead").detail("ConnectionString", nc.toString() ); choose { when (Void _ = wait(creationTimeout)) { throw new_coordinators_timed_out(); } when ( Void _ = wait( nccs.setExclusive( BinaryWriter::toValue( MovableValue( self->lastValue.get(), MovableValue::MovingFrom, self->coordinators.ccf->getConnectionString().toString() ), IncludeVersion() ) ) ) ) {} } if (BUGGIFY) Void _ = wait(delay(5)); Value oldQuorumState = wait( cs.read() ); if ( oldQuorumState != self->lastCSValue.get() ) { TEST(true); // Quorum change aborted by concurrent write to old coordination state TraceEvent("QuorumChangeAbortedByConcurrency"); throw coordinated_state_conflict(); } Void _ = wait( self->moveTo( self, &cs, nc, self->lastValue.get() ) ); throw coordinators_changed(); } ACTOR static Future moveTo( MovableCoordinatedStateImpl* self, CoordinatedState* coordinatedState, ClusterConnectionString nc, Value value ) { Void _ = wait( coordinatedState->setExclusive( BinaryWriter::toValue( MovableValue( value, MovableValue::MaybeTo, nc.toString() ), IncludeVersion() ) ) ); if (BUGGIFY) Void _ = wait( delay(5) ); // SOMEDAY: If we are worried about someone magically getting the new cluster ID and interfering, do a second cs.setExclusive( encode( ReallyTo, ... ) ) TraceEvent("ChangingQuorum").detail("ConnectionString", nc.toString()); Void _ = wait( changeLeaderCoordinators( self->coordinators, StringRef(nc.toString()) ) ); TraceEvent("ChangedQuorum").detail("ConnectionString", nc.toString()); throw coordinators_changed(); } }; MovableCoordinatedState::MovableCoordinatedState( class ServerCoordinators const& coord ) : impl( new MovableCoordinatedStateImpl(coord) ) {} MovableCoordinatedState::~MovableCoordinatedState() { delete impl; } Future MovableCoordinatedState::read() { return MovableCoordinatedStateImpl::read(impl); } Future MovableCoordinatedState::onConflict() { return impl->onConflict(); } Future MovableCoordinatedState::setExclusive(Value v) { return impl->setExclusive(v); } Future MovableCoordinatedState::move( ClusterConnectionString const& nc ) { return MovableCoordinatedStateImpl::move(impl, nc); }