/* * sim2.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "simulator.h" #include "flow/IThreadPool.h" #include "IAsyncFile.h" #include "AsyncFileCached.actor.h" #include "AsyncFileNonDurable.actor.h" #include "flow/Hash3.h" #include "TraceFileIO.h" #include "flow/FaultInjection.h" #include "flow/network.h" #include "Net2FileSystem.h" #include "fdbclient/FDBTypes.h" #include "fdbrpc/Replication.h" #include "fdbrpc/ReplicationUtils.h" #include "AsyncFileWriteChecker.h" using std::min; using std::max; using std::pair; using std::make_pair; bool simulator_should_inject_fault( const char* context, const char* file, int line, int error_code ) { if (!g_network->isSimulated()) return false; auto p = g_simulator.getCurrentProcess(); if (p->fault_injection_p2 && g_random->random01() < p->fault_injection_p2 && !g_simulator.speedUpSimulation) { uint32_t h1 = line + (p->fault_injection_r>>32), h2 = p->fault_injection_r; if (h1 < p->fault_injection_p1*std::numeric_limits::max()) { TEST(true); // A fault was injected TEST(error_code == error_code_io_timeout); // An io timeout was injected TEST(error_code == error_code_io_error); // An io error was injected TEST(error_code == error_code_platform_error); // A platform error was injected. TraceEvent(SevWarn, "FaultInjected").detail("Context", context).detail("File", file).detail("Line", line).detail("ErrorCode", error_code); if(error_code == error_code_io_timeout) { g_network->setGlobal(INetwork::enASIOTimedOut, (flowGlobalType)true); } return true; } } return false; } void ISimulator::displayWorkers() const { std::map> zoneMap; // Create a map of zone Id for (auto processInfo : getAllProcesses()) { std::string dataHall = processInfo->locality.dataHallId().present() ? processInfo->locality.dataHallId().get().printable() : "[unset]"; std::string zoneId = processInfo->locality.zoneId().present() ? processInfo->locality.zoneId().get().printable() : "[unset]"; zoneMap[format("%-8s %s", dataHall.c_str(), zoneId.c_str())].push_back(processInfo); } printf("DataHall ZoneId\n"); printf(" Address Name Class Excluded Failed Rebooting Cleared Role DataFolder\n"); for (auto& zoneRecord : zoneMap) { printf("\n%s\n", zoneRecord.first.c_str()); for (auto& processInfo : zoneRecord.second) { printf(" %9s %-10s%-13s%-8s %-6s %-9s %-8s %-48s %-40s\n", processInfo->address.toString().c_str(), processInfo->name, processInfo->startingClass.toString().c_str(), (processInfo->isExcluded() ? "True" : "False"), (processInfo->failed ? "True" : "False"), (processInfo->rebooting ? "True" : "False"), (processInfo->isCleared() ? "True" : "False"), getRoles(processInfo->address).c_str(), processInfo->dataFolder); } } return; } namespace std { template<> class hash { public: size_t operator()(const Endpoint &s) const { return hashlittle(&s, sizeof(s), 0); } }; } bool onlyBeforeSimulatorInit() { return g_network->isSimulated() && g_simulator.getAllProcesses().empty(); } const UID TOKEN_ENDPOINT_NOT_FOUND(-1, -1); const uint64_t TOKEN_STREAM_FLAG = 1; ISimulator* g_pSimulator = 0; thread_local ISimulator::ProcessInfo* ISimulator::currentProcess = 0; int openCount = 0; struct SimClogging { double getSendDelay( NetworkAddress from, NetworkAddress to ) { return halfLatency(); double tnow = now(); double t = tnow + halfLatency(); if (!g_simulator.speedUpSimulation && clogSendUntil.count( to.ip )) t = std::max( t, clogSendUntil[ to.ip ] ); return t - tnow; } double getRecvDelay( NetworkAddress from, NetworkAddress to ) { auto pair = make_pair( from.ip, to.ip ); double tnow = now(); double t = tnow + halfLatency(); if(!g_simulator.speedUpSimulation) t += clogPairLatency[ pair ]; if (!g_simulator.speedUpSimulation && clogPairUntil.count( pair )) t = std::max( t, clogPairUntil[ pair ] ); if (!g_simulator.speedUpSimulation && clogRecvUntil.count( to.ip )) t = std::max( t, clogRecvUntil[ to.ip ] ); return t - tnow; } void clogPairFor( uint32_t from, uint32_t to, double t ) { auto& u = clogPairUntil[ make_pair( from, to ) ]; u = std::max(u, now() + t); } void clogSendFor( uint32_t from, double t ) { auto& u = clogSendUntil[from]; u = std::max(u, now() + t); } void clogRecvFor( uint32_t from, double t ) { auto& u = clogRecvUntil[from]; u = std::max(u, now() + t); } double setPairLatencyIfNotSet( uint32_t from, uint32_t to, double t ) { auto i = clogPairLatency.find( make_pair(from,to) ); if (i == clogPairLatency.end()) i = clogPairLatency.insert( make_pair( make_pair(from,to), t ) ).first; return i->second; } private: std::map< uint32_t, double > clogSendUntil, clogRecvUntil; std::map< std::pair, double > clogPairUntil; std::map< std::pair, double > clogPairLatency; double halfLatency() { double a = g_random->random01(); const double pFast = 0.999; if (a <= pFast) { a = a / pFast; return 0.5 * (FLOW_KNOBS->MIN_NETWORK_LATENCY * (1-a) + FLOW_KNOBS->FAST_NETWORK_LATENCY/pFast * a); // 0.5ms average } else { a = (a-pFast) / (1-pFast); // uniform 0-1 again return 0.5 * (FLOW_KNOBS->MIN_NETWORK_LATENCY * (1-a) + FLOW_KNOBS->SLOW_NETWORK_LATENCY*a); // long tail up to X ms } } }; SimClogging g_clogging; struct Sim2Conn : IConnection, ReferenceCounted { Sim2Conn( ISimulator::ProcessInfo* process ) : process(process), dbgid( g_random->randomUniqueID() ), opened(false), closedByCaller(false) { pipes = sender(this) && receiver(this); } // connect() is called on a pair of connections immediately after creation; logically it is part of the constructor and no other method may be called previously! void connect( Reference peer, NetworkAddress peerEndpoint ) { this->peer = peer; this->peerProcess = peer->process; this->peerId = peer->dbgid; this->peerEndpoint = peerEndpoint; // Every one-way connection gets a random permanent latency and a random send buffer for the duration of the connection auto latency = g_clogging.setPairLatencyIfNotSet( peerProcess->address.ip, process->address.ip, FLOW_KNOBS->MAX_CLOGGING_LATENCY*g_random->random01() ); sendBufSize = std::max( g_random->randomInt(0, 5000000), 25e6 * (latency + .002) ); TraceEvent("Sim2Connection").detail("SendBufSize", sendBufSize).detail("Latency", latency); } ~Sim2Conn() { ASSERT_ABORT( !opened || closedByCaller ); } virtual void addref() { ReferenceCounted::addref(); } virtual void delref() { ReferenceCounted::delref(); } virtual void close() { closedByCaller = true; closeInternal(); } virtual Future onWritable() { return whenWritable(this); } virtual Future onReadable() { return whenReadable(this); } bool isPeerGone() { return !peer || peerProcess->failed; } void peerClosed() { leakedConnectionTracker = trackLeakedConnection(this); } // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0) // (or may throw an error if the connection dies) virtual int read( uint8_t* begin, uint8_t* end ) { rollRandomClose(); int64_t avail = receivedBytes.get() - readBytes.get(); // SOMEDAY: random? int toRead = std::min( end-begin, avail ); ASSERT( toRead >= 0 && toRead <= recvBuf.size() && toRead <= end-begin ); for(int i=0; i 0); int toSend = 0; if (BUGGIFY) { toSend = std::min(limit, buffer->bytes_written - buffer->bytes_sent); } else { for(auto p = buffer; p; p=p->next) { toSend += p->bytes_written - p->bytes_sent; if(toSend >= limit) { if(toSend > limit) toSend = limit; break; } } } ASSERT(toSend); if (BUGGIFY) toSend = std::min(toSend, g_random->randomInt(0, 1000)); if (!peer) return toSend; toSend = std::min( toSend, peer->availableSendBufferForPeer() ); ASSERT( toSend >= 0 ); int leftToSend = toSend; for(auto p = buffer; p && leftToSend>0; p=p->next) { int ts = std::min(leftToSend, p->bytes_written - p->bytes_sent); peer->recvBuf.insert( peer->recvBuf.end(), p->data + p->bytes_sent, p->data + p->bytes_sent + ts ); leftToSend -= ts; } ASSERT( leftToSend == 0 ); peer->writtenBytes.set( peer->writtenBytes.get() + toSend ); return toSend; } // Returns the network address and port of the other end of the connection. In the case of an incoming connection, this may not // be an address we can connect to! virtual NetworkAddress getPeerAddress() { return peerEndpoint; } virtual UID getDebugID() { return dbgid; } bool opened, closedByCaller; private: ISimulator::ProcessInfo* process, *peerProcess; UID dbgid, peerId; NetworkAddress peerEndpoint; std::deque< uint8_t > recvBuf; // Includes bytes written but not yet received! AsyncVar readBytes, // bytes already pulled from recvBuf (location of the beginning of recvBuf) receivedBytes, sentBytes, writtenBytes; // location of the end of recvBuf ( == recvBuf.size() + readBytes.get() ) Reference peer; int sendBufSize; Future leakedConnectionTracker; Future pipes; int availableSendBufferForPeer() const { return sendBufSize - (writtenBytes.get() - receivedBytes.get()); } // SOMEDAY: acknowledgedBytes instead of receivedBytes void closeInternal() { if(peer) { peer->peerClosed(); } leakedConnectionTracker.cancel(); peer.clear(); } ACTOR static Future sender( Sim2Conn* self ) { loop { Void _ = wait( self->writtenBytes.onChange() ); // takes place on peer! ASSERT( g_simulator.getCurrentProcess() == self->peerProcess ); Void _ = wait( delay( .002 * g_random->random01() ) ); self->sentBytes.set( self->writtenBytes.get() ); // or possibly just some sometimes... } } ACTOR static Future receiver( Sim2Conn* self ) { loop { if (self->sentBytes.get() != self->receivedBytes.get()) Void _ = wait( g_simulator.onProcess( self->peerProcess ) ); while ( self->sentBytes.get() == self->receivedBytes.get() ) Void _ = wait( self->sentBytes.onChange() ); ASSERT( g_simulator.getCurrentProcess() == self->peerProcess ); state int64_t pos = g_random->random01() < .5 ? self->sentBytes.get() : g_random->randomInt64( self->receivedBytes.get(), self->sentBytes.get()+1 ); Void _ = wait( delay( g_clogging.getSendDelay( self->process->address, self->peerProcess->address ) ) ); Void _ = wait( g_simulator.onProcess( self->process ) ); ASSERT( g_simulator.getCurrentProcess() == self->process ); Void _ = wait( delay( g_clogging.getRecvDelay( self->process->address, self->peerProcess->address ) ) ); ASSERT( g_simulator.getCurrentProcess() == self->process ); self->receivedBytes.set( pos ); Void _ = wait( Future(Void()) ); // Prior notification can delete self and cancel this actor ASSERT( g_simulator.getCurrentProcess() == self->process ); } } ACTOR static Future whenReadable( Sim2Conn* self ) { try { loop { if (self->readBytes.get() != self->receivedBytes.get()) { ASSERT( g_simulator.getCurrentProcess() == self->process ); return Void(); } Void _ = wait( self->receivedBytes.onChange() ); self->rollRandomClose(); } } catch (Error& e) { ASSERT( g_simulator.getCurrentProcess() == self->process ); throw; } } ACTOR static Future whenWritable( Sim2Conn* self ) { try { loop { if (!self->peer) return Void(); if (self->peer->availableSendBufferForPeer() > 0) { ASSERT( g_simulator.getCurrentProcess() == self->process ); return Void(); } try { Void _ = wait( self->peer->receivedBytes.onChange() ); ASSERT( g_simulator.getCurrentProcess() == self->peerProcess ); } catch (Error& e) { if (e.code() != error_code_broken_promise) throw; } Void _ = wait( g_simulator.onProcess( self->process ) ); } } catch (Error& e) { ASSERT( g_simulator.getCurrentProcess() == self->process ); throw; } } void rollRandomClose() { if (now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && g_random->random01() < .00001) { g_simulator.lastConnectionFailure = now(); double a = g_random->random01(), b = g_random->random01(); TEST(true); // Simulated connection failure TraceEvent("ConnectionFailure", dbgid).detail("MyAddr", process->address).detail("PeerAddr", peerProcess->address).detail("SendClosed", a > .33).detail("RecvClosed", a < .66).detail("Explicit", b < .3); if (a < .66 && peer) peer->closeInternal(); if (a > .33) closeInternal(); // At the moment, we occasionally notice the connection failed immediately. In principle, this could happen but only after a delay. if (b < .3) throw connection_failed(); } } ACTOR static Future trackLeakedConnection( Sim2Conn* self ) { Void _ = wait( g_simulator.onProcess( self->process ) ); // SOMEDAY: Make this value variable? Dependent on buggification status? Void _ = wait( delay( 20.0 ) ); TraceEvent(SevError, "LeakedConnection", self->dbgid).error(connection_leaked()).detail("MyAddr", self->process->address).detail("PeerAddr", self->peerEndpoint).detail("PeerId", self->peerId).detail("Opened", self->opened); return Void(); } }; #include #include int sf_open( const char* filename, int flags, int convFlags, int mode ); #if defined(_WIN32) #include #elif defined(__unixish__) #define _open ::open #define _read ::read #define _write ::write #define _close ::close #define _lseeki64 ::lseek #define _commit ::fsync #define _chsize ::ftruncate #define O_BINARY 0 int sf_open( const char* filename, int flags, int convFlags, int mode ) { return _open( filename, convFlags, mode ); } #else #error How do i open a file on a new platform? #endif class SimpleFile : public IAsyncFile, public ReferenceCounted { public: static void init() {} static bool should_poll() { return false; } ACTOR static Future> open( std::string filename, int flags, int mode, Reference diskParameters = Reference(new DiskParameters(25000, 150000000)), bool delayOnWrite = true ) { state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess(); state int currentTaskID = g_network->getCurrentTask(); if(++openCount >= 3000) { TraceEvent(SevError, "TooManyFiles"); ASSERT(false); } if(openCount == 2000) { TraceEvent(SevWarnAlways, "DisableConnectionFailures_TooManyFiles"); g_simulator.speedUpSimulation = true; g_simulator.connectionFailuresDisableDuration = 1e6; } Void _ = wait( g_simulator.onMachine( currentProcess ) ); try { Void _ = wait( delay(FLOW_KNOBS->MIN_OPEN_TIME + g_random->random01() * (FLOW_KNOBS->MAX_OPEN_TIME - FLOW_KNOBS->MIN_OPEN_TIME) ) ); std::string open_filename = filename; if (flags & OPEN_ATOMIC_WRITE_AND_CREATE) { ASSERT( (flags & OPEN_CREATE) && (flags & OPEN_READWRITE) && !(flags & OPEN_EXCLUSIVE) ); open_filename = filename + ".part"; } int h = sf_open( open_filename.c_str(), flags, flagConversion(flags), mode ); if( h == -1 ) { bool notFound = errno == ENOENT; Error e = notFound ? file_not_found() : io_error(); TraceEvent(notFound ? SevWarn : SevWarnAlways, "FileOpenError").error(e).GetLastError().detail("File", filename).detail("Flags", flags); throw e; } platform::makeTemporary(open_filename.c_str()); SimpleFile *simpleFile = new SimpleFile( h, diskParameters, delayOnWrite, filename, open_filename, flags ); state Reference file = Reference( simpleFile ); Void _ = wait( g_simulator.onProcess( currentProcess, currentTaskID ) ); return file; } catch( Error &e ) { state Error err = e; Void _ = wait( g_simulator.onProcess( currentProcess, currentTaskID ) ); throw err; } } virtual void addref() { ReferenceCounted::addref(); } virtual void delref() { ReferenceCounted::delref(); } virtual int64_t debugFD() { return (int64_t)h; } virtual Future read( void* data, int length, int64_t offset ) { return read_impl( this, data, length, offset ); } virtual Future write( void const* data, int length, int64_t offset ) { return write_impl( this, StringRef((const uint8_t*)data, length), offset ); } virtual Future truncate( int64_t size ) { return truncate_impl( this, size ); } virtual Future sync() { return sync_impl( this ); } virtual Future size() { return size_impl( this ); } virtual std::string getFilename() { return actualFilename; } ~SimpleFile() { _close( h ); } private: int h; //Performance parameters of simulated disk Reference diskParameters; std::string filename, actualFilename; int flags; UID dbgId; //If true, then writes/truncates will be preceded by a delay (like other operations). If false, then they will not //This is to support AsyncFileNonDurable, which issues its own delays for writes and truncates bool delayOnWrite; SimpleFile(int h, Reference diskParameters, bool delayOnWrite, const std::string& filename, const std::string& actualFilename, int flags) : h(h), diskParameters(diskParameters), delayOnWrite(delayOnWrite), filename(filename), actualFilename(actualFilename), dbgId(g_random->randomUniqueID()), flags(flags) {} static int flagConversion( int flags ) { int outFlags = O_BINARY; if( flags&OPEN_READWRITE ) outFlags |= O_RDWR; if( flags&OPEN_CREATE ) outFlags |= O_CREAT; if( flags&OPEN_READONLY ) outFlags |= O_RDONLY; if( flags&OPEN_EXCLUSIVE ) outFlags |= O_EXCL; if( flags&OPEN_ATOMIC_WRITE_AND_CREATE ) outFlags |= O_TRUNC; return outFlags; } ACTOR static Future read_impl( SimpleFile* self, void* data, int length, int64_t offset ) { state UID opId = g_random->randomUniqueID(); if (randLog) fprintf( randLog, "SFR1 %s %s %s %d %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), length, offset ); Void _ = wait( waitUntilDiskReady( self->diskParameters, length ) ); if( _lseeki64( self->h, offset, SEEK_SET ) == -1 ) { TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 1); throw io_error(); } unsigned int read_bytes = 0; if( ( read_bytes = _read( self->h, data, (unsigned int) length ) ) == -1 ) { TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 2); throw io_error(); } if (randLog) { uint32_t a=0, b=0; hashlittle2( data, read_bytes, &a, &b ); fprintf( randLog, "SFR2 %s %s %s %d %d\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), read_bytes, a ); } debugFileCheck("SimpleFileRead", self->filename, data, offset, length); INJECT_FAULT(io_timeout, "SimpleFile::read"); INJECT_FAULT(io_error, "SimpleFile::read"); return read_bytes; } ACTOR static Future write_impl( SimpleFile* self, StringRef data, int64_t offset ) { state UID opId = g_random->randomUniqueID(); if (randLog) { uint32_t a=0, b=0; hashlittle2( data.begin(), data.size(), &a, &b ); fprintf( randLog, "SFW1 %s %s %s %d %d %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), a, data.size(), offset ); } if(self->delayOnWrite) Void _ = wait( waitUntilDiskReady( self->diskParameters, data.size() ) ); if( _lseeki64( self->h, offset, SEEK_SET ) == -1 ) { TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 3); throw io_error(); } unsigned int write_bytes = 0; if ( ( write_bytes = _write( self->h, (void*)data.begin(), data.size() ) ) == -1 ) { TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 4); throw io_error(); } if ( write_bytes != data.size() ) { TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 5); throw io_error(); } if (randLog) { fprintf( randLog, "SFW2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str()); } debugFileCheck("SimpleFileWrite", self->filename, (void*)data.begin(), offset, data.size()); INJECT_FAULT(io_timeout, "SimpleFile::write"); INJECT_FAULT(io_error, "SimpleFile::write"); return Void(); } ACTOR static Future truncate_impl( SimpleFile* self, int64_t size ) { state UID opId = g_random->randomUniqueID(); if (randLog) fprintf( randLog, "SFT1 %s %s %s %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), size ); if(self->delayOnWrite) Void _ = wait( waitUntilDiskReady( self->diskParameters, 0 ) ); if( _chsize( self->h, (long) size ) == -1 ) { TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 6); throw io_error(); } if (randLog) fprintf( randLog, "SFT2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str()); INJECT_FAULT( io_timeout, "SimpleFile::truncate" ); INJECT_FAULT( io_error, "SimpleFile::truncate" ); return Void(); } ACTOR static Future sync_impl( SimpleFile* self ) { state UID opId = g_random->randomUniqueID(); if (randLog) fprintf( randLog, "SFC1 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str()); if(self->delayOnWrite) Void _ = wait( waitUntilDiskReady( self->diskParameters, 0, true ) ); if (self->flags & OPEN_ATOMIC_WRITE_AND_CREATE) { self->flags &= ~OPEN_ATOMIC_WRITE_AND_CREATE; auto& machineCache = g_simulator.getCurrentProcess()->machine->openFiles; std::string sourceFilename = self->filename + ".part"; if(machineCache.count(sourceFilename)) { TraceEvent("SimpleFileRename").detail("From", sourceFilename).detail("To", self->filename).detail("sourceCount", machineCache.count(sourceFilename)).detail("fileCount", machineCache.count(self->filename)); renameFile( sourceFilename.c_str(), self->filename.c_str() ); ASSERT(!machineCache.count(self->filename)); machineCache[self->filename] = machineCache[sourceFilename]; machineCache.erase(sourceFilename); self->actualFilename = self->filename; } } if (randLog) fprintf( randLog, "SFC2 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str()); INJECT_FAULT( io_timeout, "SimpleFile::sync" ); INJECT_FAULT( io_error, "SimpleFile::sync" ); return Void(); } ACTOR static Future size_impl( SimpleFile* self ) { state UID opId = g_random->randomUniqueID(); if (randLog) fprintf(randLog, "SFS1 %s %s %s\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str()); Void _ = wait( waitUntilDiskReady( self->diskParameters, 0 ) ); int64_t pos = _lseeki64( self->h, 0L, SEEK_END ); if( pos == -1 ) { TraceEvent(SevWarn, "SimpleFileIOError").detail("Location", 8); throw io_error(); } if (randLog) fprintf(randLog, "SFS2 %s %s %s %lld\n", self->dbgId.shortString().c_str(), self->filename.c_str(), opId.shortString().c_str(), pos); INJECT_FAULT( io_error, "SimpleFile::size" ); return pos; } }; struct SimDiskSpace { int64_t totalSpace; int64_t baseFreeSpace; //The original free space of the disk + deltas from simulated external modifications double lastUpdate; }; void doReboot( ISimulator::ProcessInfo* const& p, ISimulator::KillType const& kt ); struct Sim2Listener : IListener, ReferenceCounted { explicit Sim2Listener( ISimulator::ProcessInfo* process ) : process(process) {} void incomingConnection( double seconds, Reference conn ) { // Called by another process! incoming( Reference::addRef( this ), seconds, conn ); } virtual void addref() { ReferenceCounted::addref(); } virtual void delref() { ReferenceCounted::delref(); } virtual Future> accept() { return popOne( nextConnection.getFuture() ); } virtual NetworkAddress getListenAddress() { return process->address; } private: ISimulator::ProcessInfo* process; PromiseStream< Reference > nextConnection; ACTOR static void incoming( Reference self, double seconds, Reference conn ) { Void _ = wait( g_simulator.onProcess(self->process) ); Void _ = wait( delay( seconds ) ); if (((Sim2Conn*)conn.getPtr())->isPeerGone() && g_random->random01()<0.5) return; TraceEvent("Sim2IncomingConn", conn->getDebugID()); self->nextConnection.send( conn ); } ACTOR static Future> popOne( FutureStream< Reference > conns ) { Reference c = waitNext( conns ); ((Sim2Conn*)c.getPtr())->opened = true; return c; } }; #define g_sim2 ((Sim2&)g_simulator) class Sim2 : public ISimulator, public INetworkConnections { public: // Implement INetwork interface // Everything actually network related is delegated to the Sim2Net class; Sim2 is only concerned with simulating machines and time virtual double now() { return time; } virtual Future delay( double seconds, int taskID ) { ASSERT(taskID >= TaskMinPriority && taskID <= TaskMaxPriority); return delay( seconds, taskID, currentProcess ); } Future delay( double seconds, int taskID, ProcessInfo* machine ) { ASSERT( seconds >= -0.0001 ); seconds = std::max(0.0, seconds); Future f; if(!currentProcess->rebooting && machine == currentProcess && !currentProcess->shutdownSignal.isSet() && FLOW_KNOBS->MAX_BUGGIFIED_DELAY > 0 && g_random->random01() < 0.25) { //FIXME: why doesnt this work when we are changing machines? seconds += FLOW_KNOBS->MAX_BUGGIFIED_DELAY*pow(g_random->random01(),1000.0); } mutex.enter(); tasks.push( Task( time + seconds, taskID, taskCount++, machine, f ) ); mutex.leave(); return f; } ACTOR static Future checkShutdown(Sim2 *self, int taskID) { ISimulator::KillType kt = wait( self->getCurrentProcess()->shutdownSignal.getFuture() ); self->setCurrentTask(taskID); return Void(); } virtual Future yield( int taskID ) { if (taskID == TaskDefaultYield) taskID = currentTaskID; if (check_yield(taskID)) { // We want to check that yielders can handle actual time elapsing (it sometimes will outside simulation), but // don't want to prevent instantaneous shutdown of "rebooted" machines. return delay(getCurrentProcess()->rebooting ? 0 : .001,taskID) || checkShutdown(this, taskID); } setCurrentTask(taskID); return Void(); } virtual bool check_yield( int taskID ) { if (yielded) return true; if (--yield_limit <= 0) { yield_limit = g_random->randomInt(1, 150); // If yield returns false *too* many times in a row, there could be a stack overflow, since we can't deterministically check stack size as the real network does return yielded = true; } return yielded = BUGGIFY_WITH_PROB(0.01); } virtual int getCurrentTask() { return currentTaskID; } virtual void setCurrentTask(int taskID ) { currentTaskID = taskID; } // Sets the taskID/priority of the current task, without yielding virtual Future> connect( NetworkAddress toAddr ) { ASSERT( !toAddr.isTLS() ); if (!addressMap.count( toAddr )) { return waitForProcessAndConnect( toAddr, this ); } auto peerp = getProcessByAddress(toAddr); Reference myc( new Sim2Conn( getCurrentProcess() ) ); Reference peerc( new Sim2Conn( peerp ) ); myc->connect(peerc, toAddr); peerc->connect(myc, NetworkAddress( getCurrentProcess()->address.ip + g_random->randomInt(0,256), g_random->randomInt(40000, 60000) )); ((Sim2Listener*)peerp->listener.getPtr())->incomingConnection( 0.5*g_random->random01(), Reference(peerc) ); return onConnect( ::delay(0.5*g_random->random01()), myc ); } virtual Future> resolveTCPEndpoint( std::string host, std::string service) { throw lookup_failed(); } ACTOR static Future> onConnect( Future ready, Reference conn ) { Void _ = wait(ready); if (conn->isPeerGone() && g_random->random01()<0.5) { conn.clear(); Void _ = wait(Never()); } conn->opened = true; return conn; } virtual Reference listen( NetworkAddress localAddr ) { ASSERT( !localAddr.isTLS() ); ASSERT( localAddr == getCurrentProcess()->address ); return Reference( getCurrentProcess()->listener ); } ACTOR static Future> waitForProcessAndConnect( NetworkAddress toAddr, INetworkConnections *self ) { // We have to be able to connect to processes that don't yet exist, so we do some silly polling loop { Void _ = wait( ::delay( 0.1 * g_random->random01() ) ); if (g_sim2.addressMap.count(toAddr)) { Reference c = wait( self->connect( toAddr ) ); return c; } } } virtual void stop() { isStopped = true; } virtual bool isSimulated() const { return true; } struct SimThreadArgs { THREAD_FUNC_RETURN (*func) (void*); void *arg; ISimulator::ProcessInfo *currentProcess; SimThreadArgs(THREAD_FUNC_RETURN (*func) (void*), void *arg) : func(func), arg(arg) { ASSERT(g_network->isSimulated()); currentProcess = g_simulator.getCurrentProcess(); } }; //Starts a new thread, making sure to set any thread local state THREAD_FUNC simStartThread(void *arg) { SimThreadArgs *simArgs = (SimThreadArgs*)arg; ISimulator::currentProcess = simArgs->currentProcess; simArgs->func(simArgs->arg); delete simArgs; THREAD_RETURN; } virtual THREAD_HANDLE startThread( THREAD_FUNC_RETURN (*func) (void*), void *arg ) { SimThreadArgs *simArgs = new SimThreadArgs(func, arg); return ::startThread(simStartThread, simArgs); } virtual void getDiskBytes( std::string const& directory, int64_t& free, int64_t& total) { ProcessInfo *proc = getCurrentProcess(); SimDiskSpace &diskSpace = diskSpaceMap[proc->address.ip]; int64_t totalFileSize = 0; int numFiles = 0; //Get the size of all files we've created on the server and subtract them from the free space for(auto file = proc->machine->openFiles.begin(); file != proc->machine->openFiles.end(); ++file) { if( file->second.isReady() ) { totalFileSize += ((AsyncFileNonDurable*)file->second.get().getPtr())->approximateSize; } numFiles++; } bool ok = false; if(diskSpace.totalSpace == 0) { diskSpace.totalSpace = 5e9 + g_random->random01() * 100e9; //Total space between 5GB and 105GB diskSpace.baseFreeSpace = std::min(diskSpace.totalSpace, std::max(5e9, (g_random->random01() * (1 - .075) + .075) * diskSpace.totalSpace) + totalFileSize); //Minimum 5GB or 7.5% total disk space, whichever is higher TraceEvent("Sim2DiskSpaceInitialization").detail("TotalSpace", diskSpace.totalSpace).detail("BaseFreeSpace", diskSpace.baseFreeSpace).detail("totalFileSize", totalFileSize).detail("NumFiles", numFiles); } else { int64_t maxDelta = std::min(5.0, (now() - diskSpace.lastUpdate)) * (BUGGIFY ? 10e6 : 1e6); //External processes modifying the disk int64_t delta = -maxDelta + g_random->random01() * maxDelta * 2; diskSpace.baseFreeSpace = std::min(diskSpace.totalSpace, std::max(diskSpace.baseFreeSpace + delta, totalFileSize)); } diskSpace.lastUpdate = now(); total = diskSpace.totalSpace; free = std::max(0, diskSpace.baseFreeSpace - totalFileSize); if(free == 0) TraceEvent(SevWarnAlways, "Sim2NoFreeSpace").detail("TotalSpace", diskSpace.totalSpace).detail("BaseFreeSpace", diskSpace.baseFreeSpace).detail("totalFileSize", totalFileSize).detail("NumFiles", numFiles); } virtual bool isAddressOnThisHost( NetworkAddress const& addr ) { return addr.ip == getCurrentProcess()->address.ip; } ACTOR static Future deleteFileImpl( Sim2* self, std::string filename, bool mustBeDurable ) { // This is a _rudimentary_ simulation of the untrustworthiness of non-durable deletes and the possibility of // rebooting during a durable one. It isn't perfect: for example, on real filesystems testing // for the existence of a non-durably deleted file BEFORE a reboot will show that it apparently doesn't exist. if(g_simulator.getCurrentProcess()->machine->openFiles.count(filename)) { g_simulator.getCurrentProcess()->machine->openFiles.erase(filename); g_simulator.getCurrentProcess()->machine->deletingFiles.insert(filename); } if ( mustBeDurable || g_random->random01() < 0.5 ) { state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess(); state int currentTaskID = g_network->getCurrentTask(); Void _ = wait( g_simulator.onMachine( currentProcess ) ); try { Void _ = wait( ::delay(0.05 * g_random->random01()) ); if (!currentProcess->rebooting) { auto f = IAsyncFileSystem::filesystem(self->net2)->deleteFile(filename, false); ASSERT( f.isReady() ); Void _ = wait( ::delay(0.05 * g_random->random01()) ); TEST( true ); // Simulated durable delete } Void _ = wait( g_simulator.onProcess( currentProcess, currentTaskID ) ); return Void(); } catch( Error &e ) { state Error err = e; Void _ = wait( g_simulator.onProcess( currentProcess, currentTaskID ) ); throw err; } } else { TEST( true ); // Simulated non-durable delete return Void(); } } ACTOR static Future runLoop(Sim2 *self) { state ISimulator::ProcessInfo *callingMachine = self->currentProcess; while ( !self->isStopped ) { Void _ = wait( self->net2->yield(TaskDefaultYield) ); self->mutex.enter(); if( self->tasks.size() == 0 ) { self->mutex.leave(); ASSERT(false); } //if (!randLog/* && now() >= 32.0*/) // randLog = fopen("randLog.txt", "wt"); Task t = std::move( self->tasks.top() ); // Unfortunately still a copy under gcc where .top() returns const& self->currentTaskID = t.taskID; self->tasks.pop(); self->mutex.leave(); self->execTask(t); self->yielded = false; } self->currentProcess = callingMachine; self->net2->stop(); return Void(); } ACTOR Future _run(Sim2 *self) { Future loopFuture = self->runLoop(self); self->net2->run(); Void _ = wait( loopFuture ); return Void(); } // Implement ISimulator interface virtual void run() { _run(this); } virtual ProcessInfo* newProcess(const char* name, uint32_t ip, uint16_t port, LocalityData locality, ProcessClass startingClass, const char* dataFolder, const char* coordinationFolder) { ASSERT( locality.zoneId().present() ); MachineInfo& machine = machines[ locality.zoneId().get() ]; if (!machine.zoneId.present()) machine.zoneId = locality.zoneId(); for( int i = 0; i < machine.processes.size(); i++ ) { if( machine.processes[i]->locality.zoneId() != locality.zoneId() ) { // SOMEDAY: compute ip from locality to avoid this check TraceEvent("Sim2Mismatch").detail("IP", format("%x", ip)) .detailext("zoneId", locality.zoneId()).detail("NewName", name) .detailext("ExistingmachineId", machine.processes[i]->locality.zoneId()).detail("ExistingName", machine.processes[i]->name); ASSERT( false ); } ASSERT( machine.processes[i]->address.port != port ); } // This is for async operations on non-durable files. // These files must live on after process kills for sim purposes. if( machine.machineProcess == 0 ) { NetworkAddress machineAddress(ip, 0, false, false); machine.machineProcess = new ProcessInfo("Machine", locality, startingClass, machineAddress, this, "", ""); machine.machineProcess->machine = &machine; } NetworkAddress address(ip, port, true, false); // SOMEDAY see above about becoming SSL! ProcessInfo* m = new ProcessInfo(name, locality, startingClass, address, this, dataFolder, coordinationFolder); m->listener = Reference( new Sim2Listener(m) ); m->machine = &machine; machine.processes.push_back(m); currentlyRebootingProcesses.erase(address); addressMap[ m->address ] = m; m->excluded = g_simulator.isExcluded(address); m->cleared = g_simulator.isCleared(address); m->setGlobal(enTDMetrics, (flowGlobalType) &m->tdmetrics); m->setGlobal(enNetworkConnections, (flowGlobalType) m->network); m->setGlobal(enASIOTimedOut, (flowGlobalType) false); TraceEvent("NewMachine").detail("Name", name).detail("Address", m->address).detailext("zoneId", m->locality.zoneId()).detail("Excluded", m->excluded).detail("Cleared", m->cleared); // FIXME: Sometimes, connections to/from this process will explicitly close return m; } virtual bool isAvailable() const { std::vector processesLeft, processesDead; for (auto processInfo : getAllProcesses()) { // Add non-test processes (ie. datahall is not be set for test processes) if (processInfo->isAvailableClass()) { // Ignore excluded machines if (processInfo->isExcluded()) processesDead.push_back(processInfo); else if (processInfo->isCleared()) processesDead.push_back(processInfo); // Mark all of the unavailable as dead else if (!processInfo->isAvailable()) processesDead.push_back(processInfo); else if (protectedAddresses.count(processInfo->address)) processesLeft.push_back(processInfo); else processesLeft.push_back(processInfo); } } return canKillProcesses(processesLeft, processesDead, KillInstantly, NULL); } // The following function will determine if the specified configuration of available and dead processes can allow the cluster to survive virtual bool canKillProcesses(std::vector const& availableProcesses, std::vector const& deadProcesses, KillType kt, KillType* newKillType) const { bool canSurvive = true; int nQuorum = ((desiredCoordinators+1)/2)*2-1; KillType newKt = kt; if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)) { LocalityGroup processesLeft, processesDead; std::vector localitiesDead, localitiesLeft, badCombo; std::set>> uniqueMachines; ASSERT(storagePolicy); ASSERT(tLogPolicy); for (auto processInfo : availableProcesses) { processesLeft.add(processInfo->locality); localitiesLeft.push_back(processInfo->locality); uniqueMachines.insert(processInfo->locality.machineId()); } for (auto processInfo : deadProcesses) { processesDead.add(processInfo->locality); localitiesDead.push_back(processInfo->locality); } // Reboot if dead machines do fulfill policies if (processesDead.validate(tLogPolicy)) { newKt = Reboot; canSurvive = false; TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("tLogPolicy", tLogPolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("Reason", "tLogPolicy validates against dead processes."); } else if (processesDead.validate(storagePolicy)) { newKt = Reboot; canSurvive = false; TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("Reason", "storagePolicy validates against dead processes."); } // Check all combinations of the AntiQuorum within the failed else if ((tLogWriteAntiQuorum) && (!validateAllCombinations(badCombo, processesDead, tLogPolicy, localitiesLeft, tLogWriteAntiQuorum, false))) { newKt = Reboot; canSurvive = false; TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("BadZones", ::describeZones(badCombo)).detail("BadDataHalls", ::describeDataHalls(badCombo)).detail("Reason", "tLog AntiQuorum does not validates against dead processes."); } // Reboot and Delete if remaining machines do NOT fulfill policies else if ((kt != RebootAndDelete) && (kt != RebootProcessAndDelete) && (!processesLeft.validate(tLogPolicy))) { newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot; canSurvive = false; TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("tLogPolicy", tLogPolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Reason", "tLogPolicy does not validates against remaining processes."); } else if ((kt != RebootAndDelete) && (kt != RebootProcessAndDelete) && (!processesLeft.validate(storagePolicy))) { newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot; canSurvive = false; TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Reason", "storagePolicy does not validates against remaining processes."); } else if ((kt != RebootAndDelete) && (kt != RebootProcessAndDelete) && (nQuorum > uniqueMachines.size())) { newKt = (g_random->random01() < 0.33) ? RebootAndDelete : Reboot; canSurvive = false; TraceEvent("KillChanged").detail("KillType", kt).detail("NewKillType", newKt).detail("storagePolicy", storagePolicy->info()).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("RemainingZones", ::describeZones(localitiesLeft)).detail("RemainingDataHalls", ::describeDataHalls(localitiesLeft)).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("Reason", "Not enough unique machines to perform auto configuration of coordinators."); } else { TraceEvent("CanSurviveKills").detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("DeadZones", ::describeZones(localitiesDead)).detail("DeadDataHalls", ::describeDataHalls(localitiesDead)).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()).detail("Quorum", nQuorum).detail("Machines", uniqueMachines.size()).detail("ZonesLeft", ::describeZones(localitiesLeft)).detail("DataHallsLeft", ::describeDataHalls(localitiesLeft)).detail("ValidateRemaining", processesLeft.validate(tLogPolicy)); } } if (newKillType) *newKillType = newKt; return canSurvive; } virtual void destroyProcess( ISimulator::ProcessInfo *p ) { TraceEvent("ProcessDestroyed").detail("Name", p->name).detail("Address", p->address).detailext("zoneId", p->locality.zoneId()); currentlyRebootingProcesses.insert(std::pair(p->address, p)); std::vector& processes = machines[ p->locality.zoneId().get() ].processes; if( p != processes.back() ) { auto it = std::find( processes.begin(), processes.end(), p ); std::swap( *it, processes.back() ); } processes.pop_back(); killProcess_internal( p, KillInstantly ); } void killProcess_internal( ProcessInfo* machine, KillType kt ) { TEST( true ); // Simulated machine was killed with any kill type TEST( kt == KillInstantly ); // Simulated machine was killed instantly TEST( kt == InjectFaults ); // Simulated machine was killed with faults if (kt == KillInstantly) { TraceEvent(SevWarn, "FailMachine", machine->locality.zoneId()).detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).detail("Process", describe(*machine)).detail("Rebooting", machine->rebooting).detail("Protected", protectedAddresses.count(machine->address)).backtrace(); // This will remove all the "tracked" messages that came from the machine being killed latestEventCache.clear(); machine->failed = true; } else if (kt == InjectFaults) { TraceEvent(SevWarn, "FaultMachine", machine->locality.zoneId()).detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).detail("Process", describe(*machine)).detail("Rebooting", machine->rebooting).detail("Protected", protectedAddresses.count(machine->address)).backtrace(); should_inject_fault = simulator_should_inject_fault; machine->fault_injection_r = g_random->randomUniqueID().first(); machine->fault_injection_p1 = 0.1; machine->fault_injection_p2 = g_random->random01(); } else { ASSERT( false ); } ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting); } virtual void rebootProcess( ProcessInfo* process, KillType kt ) { if( kt == RebootProcessAndDelete && protectedAddresses.count(process->address) ) { TraceEvent("RebootChanged").detail("ZoneId", process->locality.describeZone()).detail("KillType", RebootProcess).detail("OrigKillType", kt).detail("Reason", "Protected process"); kt = RebootProcess; } doReboot( process, kt ); } virtual void rebootProcess(Optional> zoneId, bool allProcesses ) { if( allProcesses ) { auto processes = getAllProcesses(); for( int i = 0; i < processes.size(); i++ ) if( processes[i]->locality.zoneId() == zoneId && !processes[i]->rebooting ) doReboot( processes[i], RebootProcess ); } else { auto processes = getAllProcesses(); for( int i = 0; i < processes.size(); i++ ) { if( processes[i]->locality.zoneId() != zoneId || processes[i]->rebooting ) { std::swap(processes[i--], processes.back()); processes.pop_back(); } } if( processes.size() ) doReboot( g_random->randomChoice( processes ), RebootProcess ); } } virtual void killProcess( ProcessInfo* machine, KillType kt ) { TraceEvent("attemptingKillProcess").detail("killedMachines", killedMachines).detail("killableMachines", killableMachines); if (kt < RebootAndDelete ) { killProcess_internal( machine, kt ); killedMachines++; } } virtual void killInterface( NetworkAddress address, KillType kt ) { if (kt < RebootAndDelete ) { std::vector& processes = machines[ addressMap[address]->locality.zoneId() ].processes; for( int i = 0; i < processes.size(); i++ ) killProcess_internal( processes[i], kt ); killedMachines++; } } virtual bool killMachine(Optional> zoneId, KillType kt, bool killIsSafe, bool forceKill, KillType* ktFinal) { auto ktOrig = kt; if (killIsSafe) ASSERT( kt == ISimulator::RebootAndDelete ); // Only types of "safe" kill supported so far TEST(true); // Trying to killing a machine TEST(kt == KillInstantly); // Trying to kill instantly TEST(kt == InjectFaults); // Trying to kill by injecting faults if(speedUpSimulation && !forceKill) { TraceEvent(SevWarn, "AbortedKill", zoneId).detailext("ZoneId", zoneId).detail("Reason", "Unforced kill within speedy simulation.").backtrace(); if (ktFinal) *ktFinal = None; return false; } int processesOnMachine = 0; KillType originalKt = kt; // Reboot if any of the processes are protected and count the number of processes not rebooting for (auto& process : machines[zoneId].processes) { if (protectedAddresses.count(process->address)) kt = Reboot; if (!process->rebooting) processesOnMachine++; } // Do nothing, if no processes to kill if (processesOnMachine == 0) { TraceEvent(SevWarn, "AbortedKill", zoneId).detailext("ZoneId", zoneId).detail("Reason", "The target had no processes running.").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace(); if (ktFinal) *ktFinal = None; return false; } // Check if machine can be removed, if requested if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)) { std::vector processesLeft, processesDead; int protectedWorker = 0, unavailable = 0, excluded = 0, cleared = 0; for (auto machineRec : machines) { for (auto processInfo : machineRec.second.processes) { // Add non-test processes (ie. datahall is not be set for test processes) if (processInfo->isAvailableClass()) { // Do not include any excluded machines if (processInfo->isExcluded()) { processesDead.push_back(processInfo); excluded ++; } else if (!processInfo->isCleared()) { processesDead.push_back(processInfo); cleared ++; } else if (!processInfo->isAvailable()) { processesDead.push_back(processInfo); unavailable ++; } else if (protectedAddresses.count(processInfo->address)) { processesLeft.push_back(processInfo); protectedWorker ++; } else if (machineRec.second.zoneId != zoneId) processesLeft.push_back(processInfo); // Add processes from dead machines and datacenter machines to dead group else processesDead.push_back(processInfo); } } } if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) { if ((kt != Reboot) && (!killIsSafe)) { kt = Reboot; } TraceEvent("ChangedKillMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("OrigKillType", ktOrig).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("Protected", protectedWorker).detail("Unavailable", unavailable).detail("Excluded", excluded).detail("Cleared", cleared).detail("ProtectedTotal", protectedAddresses.size()).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()); } else if ((kt == KillInstantly) || (kt == InjectFaults)) { TraceEvent("DeadMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()); for (auto process : processesLeft) { TraceEvent("DeadMachineSurvivors", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", describe(*process)); } for (auto process : processesDead) { TraceEvent("DeadMachineVictims", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", describe(*process)); } } else { TraceEvent("ClearMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()); for (auto process : processesLeft) { TraceEvent("ClearMachineSurvivors", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", describe(*process)); } for (auto process : processesDead) { TraceEvent("ClearMachineVictims", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", describe(*process)); } } } TEST(originalKt != kt); // Kill type was changed from requested to reboot. // Check if any processes on machine are rebooting if( processesOnMachine != processesPerMachine && kt >= RebootAndDelete ) { TEST(true); //Attempted reboot, but the target did not have all of its processes running TraceEvent(SevWarn, "AbortedKill", zoneId).detail("KillType", kt).detailext("ZoneId", zoneId).detail("Reason", "Machine processes does not match number of processes per machine").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace(); if (ktFinal) *ktFinal = None; return false; } // Check if any processes on machine are rebooting if ( processesOnMachine != processesPerMachine) { TEST(true); //Attempted reboot, but the target did not have all of its processes running TraceEvent(SevWarn, "AbortedKill", zoneId).detail("KillType", kt).detailext("ZoneId", zoneId).detail("Reason", "Machine processes does not match number of processes per machine").detail("processes", processesOnMachine).detail("processesPerMachine", processesPerMachine).backtrace(); if (ktFinal) *ktFinal = None; return false; } TraceEvent("KillMachine", zoneId).detailext("ZoneId", zoneId).detail("Kt", kt).detail("KtOrig", ktOrig).detail("KilledMachines", killedMachines).detail("KillableMachines", processesOnMachine).detail("ProcessPerMachine", processesPerMachine).detail("KillChanged", kt!=ktOrig).detail("killIsSafe", killIsSafe); if (kt < RebootAndDelete ) { if(kt == InjectFaults && machines[zoneId].machineProcess != nullptr) killProcess_internal( machines[zoneId].machineProcess, kt ); for (auto& process : machines[zoneId].processes) { TraceEvent("KillMachineProcess", zoneId).detail("KillType", kt).detail("Process", process->toString()).detail("startingClass", process->startingClass.toString()).detail("failed", process->failed).detail("excluded", process->excluded).detail("cleared", process->cleared).detail("rebooting", process->rebooting); if (process->startingClass != ProcessClass::TesterClass) killProcess_internal( process, kt ); } } else if ( kt == Reboot || killIsSafe) { for (auto& process : machines[zoneId].processes) { TraceEvent("KillMachineProcess", zoneId).detail("KillType", kt).detail("Process", process->toString()).detail("startingClass", process->startingClass.toString()).detail("failed", process->failed).detail("excluded", process->excluded).detail("cleared", process->cleared).detail("rebooting", process->rebooting); if (process->startingClass != ProcessClass::TesterClass) doReboot(process, kt ); } } TEST(kt == RebootAndDelete); // Resulted in a reboot and delete TEST(kt == Reboot); // Resulted in a reboot TEST(kt == KillInstantly); // Resulted in an instant kill TEST(kt == InjectFaults); // Resulted in a kill by injecting faults if (ktFinal) *ktFinal = kt; return true; } virtual bool killDataCenter(Optional> dcId, KillType kt, KillType* ktFinal) { auto ktOrig = kt; auto processes = getAllProcesses(); std::map>, int> datacenterZones; int dcProcesses = 0; // Switch to a reboot, if anything protected on machine for (auto& procRecord : processes) { auto processDcId = procRecord->locality.dcId(); auto processZoneId = procRecord->locality.zoneId(); ASSERT(processZoneId.present()); if (processDcId.present() && (processDcId == dcId)) { if ((kt != Reboot) && (protectedAddresses.count(procRecord->address))) { kt = Reboot; TraceEvent(SevWarn, "DcKillChanged").detailext("DataCenter", dcId).detail("KillType", kt).detail("OrigKillType", ktOrig) .detail("Reason", "Datacenter has protected process").detail("ProcessAddress", procRecord->address).detail("failed", procRecord->failed).detail("rebooting", procRecord->rebooting).detail("excluded", procRecord->excluded).detail("cleared", procRecord->cleared).detail("Process", describe(*procRecord)); } datacenterZones[processZoneId.get()] ++; dcProcesses ++; } } // Check if machine can be removed, if requested if ((kt == KillInstantly) || (kt == InjectFaults) || (kt == RebootAndDelete) || (kt == RebootProcessAndDelete)) { std::vector processesLeft, processesDead; for (auto machineRec : machines) { for (auto processInfo : machineRec.second.processes) { // Add non-test processes (ie. datahall is not be set for test processes) if (processInfo->isAvailableClass()) { // Mark all of the unavailable as dead if (processInfo->isExcluded()) processesDead.push_back(processInfo); else if (processInfo->isCleared()) processesDead.push_back(processInfo); else if (!processInfo->isAvailable()) processesDead.push_back(processInfo); else if (protectedAddresses.count(processInfo->address)) processesLeft.push_back(processInfo); // Keep all not in the datacenter zones else if (datacenterZones.find(machineRec.second.zoneId) == datacenterZones.end()) processesLeft.push_back(processInfo); else processesDead.push_back(processInfo); } } } if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) { TraceEvent(SevWarn, "DcKillChanged").detailext("DataCenter", dcId).detail("KillType", kt).detail("OrigKillType", ktOrig); } else { TraceEvent("DeadDataCenter").detailext("DataCenter", dcId).detail("KillType", kt).detail("DcZones", datacenterZones.size()).detail("DcProcesses", dcProcesses).detail("ProcessesDead", processesDead.size()).detail("ProcessesLeft", processesLeft.size()).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info()); for (auto process : processesLeft) { auto zoneId = process->locality.zoneId(); TraceEvent("DeadDcSurvivors", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", describe(*process)); } for (auto process : processesDead) { auto zoneId = process->locality.zoneId(); TraceEvent("DeadDcVictims", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("VictimProcess", describe(*process)); } } } KillType ktResult, ktMin = kt; for (auto& datacenterZone : datacenterZones) { killMachine( datacenterZone.first, kt, (kt == RebootAndDelete), true, &ktResult); if (ktResult != kt) { TraceEvent(SevWarn, "killDCFail") .detailext("Zone", datacenterZone.first) .detail("KillType", kt) .detail("KillTypeResult", ktResult) .detail("KillTypeOrig", ktOrig); ASSERT(ktResult == None); } ktMin = std::min( ktResult, ktMin ); } TraceEvent("killDataCenter") .detail("killedMachines", killedMachines) .detail("killableMachines", killableMachines) .detail("killableDatacenters", killableDatacenters) .detail("maxCoordinatorsInDatacenter", maxCoordinatorsInDatacenter) .detail("DcZones", datacenterZones.size()) .detail("DcProcesses", dcProcesses) .detailext("DCID", dcId) .detail("KillType", kt) .detail("KillTypeOrig", ktOrig) .detail("KillTypeMin", ktMin) .detail("KilledDC", kt==ktMin); TEST(kt != ktMin); // DataCenter kill was rejected by killMachine TEST((kt==ktMin) && (kt == RebootAndDelete)); // Resulted in a reboot and delete TEST((kt==ktMin) && (kt == Reboot)); // Resulted in a reboot TEST((kt==ktMin) && (kt == KillInstantly)); // Resulted in an instant kill TEST((kt==ktMin) && (kt == InjectFaults)); // Resulted in a kill by injecting faults TEST((kt==ktMin) && (kt != ktOrig)); // Kill request was downgraded TEST((kt==ktMin) && (kt == ktOrig)); // Requested kill was done if (ktFinal) *ktFinal = ktMin; return (kt == ktMin); } virtual void clogInterface( uint32_t ip, double seconds, ClogMode mode = ClogDefault ) { if (mode == ClogDefault) { double a = g_random->random01(); if ( a < 0.3 ) mode = ClogSend; else if (a < 0.6 ) mode = ClogReceive; else mode = ClogAll; } TraceEvent("ClogInterface").detail("IP", toIPString(ip)).detail("Delay", seconds) .detail("Queue", mode==ClogSend?"Send":mode==ClogReceive?"Receive":"All"); if (mode == ClogSend || mode==ClogAll) g_clogging.clogSendFor( ip, seconds ); if (mode == ClogReceive || mode==ClogAll) g_clogging.clogRecvFor( ip, seconds ); } virtual void clogPair( uint32_t from, uint32_t to, double seconds ) { g_clogging.clogPairFor( from, to, seconds ); } virtual std::vector getAllProcesses() const { std::vector processes; for( auto c = machines.begin(); c != machines.end(); ++c ) processes.insert( processes.end(), c->second.processes.begin(), c->second.processes.end() ); return processes; } virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) { NetworkAddress normalizedAddress(address.ip, address.port, true, false); ASSERT( addressMap.count( normalizedAddress ) ); return addressMap[ normalizedAddress ]; } virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) { return &machines[addressMap[address]->locality.zoneId()]; } virtual MachineInfo* getMachineById(Optional> const& zoneId) { return &machines[zoneId]; } virtual void destroyMachine(Optional> const& zoneId ) { auto& machine = machines[zoneId]; for( auto process : machine.processes ) { ASSERT( process->failed ); } if( machine.machineProcess ) { killProcess_internal( machine.machineProcess, KillInstantly ); } machines.erase(zoneId); } Sim2() : time(0.0), taskCount(0), yielded(false), yield_limit(0), currentTaskID(-1) { // Not letting currentProcess be NULL eliminates some annoying special cases currentProcess = new ProcessInfo( "NoMachine", LocalityData(Optional>(), StringRef(), StringRef(), StringRef()), ProcessClass(), NetworkAddress(), this, "", "" ); g_network = net2 = newNet2(NetworkAddress(), false, true); Net2FileSystem::newFileSystem(); check_yield(0); } // Implementation struct Task { int taskID; double time; uint64_t stable; ProcessInfo* machine; Promise action; Task( double time, int taskID, uint64_t stable, ProcessInfo* machine, Promise&& action ) : time(time), taskID(taskID), stable(stable), machine(machine), action(std::move(action)) {} Task( double time, int taskID, uint64_t stable, ProcessInfo* machine, Future& future ) : time(time), taskID(taskID), stable(stable), machine(machine) { future = action.getFuture(); } Task(Task&& rhs) noexcept(true) : time(rhs.time), taskID(rhs.taskID), stable(rhs.stable), machine(rhs.machine), action(std::move(rhs.action)) {} void operator= ( Task const& rhs ) { taskID = rhs.taskID; time = rhs.time; stable = rhs.stable; machine = rhs.machine; action = rhs.action; } Task( Task const& rhs ) : taskID(rhs.taskID), time(rhs.time), stable(rhs.stable), machine(rhs.machine), action(rhs.action) {} void operator= (Task&& rhs) noexcept(true) { time = rhs.time; taskID = rhs.taskID; stable = rhs.stable; machine = rhs.machine; action = std::move(rhs.action); } bool operator < (Task const& rhs) const { // Ordering is reversed for priority_queue if (time != rhs.time) return time > rhs.time; return stable > rhs.stable; } }; void execTask(struct Task& t) { if (t.machine->failed) { t.action.send(Never()); } else { mutex.enter(); this->time = t.time; mutex.leave(); this->currentProcess = t.machine; try { //auto before = getCPUTicks(); t.action.send(Void()); ASSERT( this->currentProcess == t.machine ); /*auto elapsed = getCPUTicks() - before; currentProcess->cpuTicks += elapsed; if (g_random->random01() < 0.01){ TraceEvent("st").detail("cpu", currentProcess->cpuTicks); currentProcess->cpuTicks = 0; }*/ } catch (Error& e) { TraceEvent(SevError, "UnhandledSimulationEventError").error(e, true); killProcess(t.machine, KillInstantly); } //if( this->time > 45.522817 ) { // printf("foo\n"); //} if (randLog) fprintf( randLog, "T %f %d %s %lld\n", this->time, int(g_random->peek() % 10000), t.machine ? t.machine->name : "none", t.stable); } } virtual void onMainThread( Promise&& signal, int taskID ) { // This is presumably coming from either a "fake" thread pool thread, i.e. it is actually on this thread // or a thread created with g_network->startThread ASSERT(getCurrentProcess()); mutex.enter(); ASSERT(taskID >= TaskMinPriority && taskID <= TaskMaxPriority); tasks.push( Task( time, taskID, taskCount++, getCurrentProcess(), std::move(signal) ) ); mutex.leave(); } virtual Future onProcess( ISimulator::ProcessInfo *process, int taskID ) { return delay( 0, taskID, process ); } virtual Future onMachine( ISimulator::ProcessInfo *process, int taskID ) { if( process->machine == 0 ) return Void(); return delay( 0, taskID, process->machine->machineProcess ); } //time is guarded by ISimulator::mutex. It is not necessary to guard reads on the main thread because //time should only be modified from the main thread. double time; int currentTaskID; //taskCount is guarded by ISimulator::mutex uint64_t taskCount; std::map>, MachineInfo > machines; std::map addressMap; std::map> filesDeadMap; std::set exclusionSet; //tasks is guarded by ISimulator::mutex std::priority_queue> tasks; //Sim2Net network; INetwork *net2; //Map from machine IP -> machine disk space info std::map diskSpaceMap; //Whether or not yield has returned true during the current iteration of the run loop bool yielded; int yield_limit; // how many more times yield may return false before next returning true }; void startNewSimulator() { ASSERT( !g_network ); g_network = g_pSimulator = new Sim2(); g_simulator.connectionFailuresDisableDuration = g_random->random01() < 0.5 ? 0 : 1e6; } static double networkLatency() { double a = g_random->random01(); const double pFast = 0.999; if (a <= pFast) return FLOW_KNOBS->MIN_NETWORK_LATENCY + FLOW_KNOBS->FAST_NETWORK_LATENCY/pFast * a; // 0.5ms average else{ a = (a-pFast) / (1-pFast); // uniform 0-1 again return FLOW_KNOBS->MIN_NETWORK_LATENCY + FLOW_KNOBS->SLOW_NETWORK_LATENCY*a; // long tail up to X ms } } ACTOR void doReboot( ISimulator::ProcessInfo *p, ISimulator::KillType kt ) { TraceEvent("RebootingProcessAttempt").detailext("ZoneId", p->locality.zoneId()).detail("KillType", kt).detail("Process", p->toString()).detail("startingClass", p->startingClass.toString()).detail("failed", p->failed).detail("excluded", p->excluded).detail("cleared", p->cleared).detail("rebooting", p->rebooting).detail("TaskDefaultDelay", TaskDefaultDelay); Void _ = wait( g_sim2.delay( 0, TaskDefaultDelay, p ) ); // Switch to the machine in question try { ASSERT( kt == ISimulator::RebootProcess || kt == ISimulator::Reboot || kt == ISimulator::RebootAndDelete || kt == ISimulator::RebootProcessAndDelete ); TEST( kt == ISimulator::RebootProcess ); // Simulated process rebooted TEST( kt == ISimulator::Reboot ); // Simulated machine rebooted TEST( kt == ISimulator::RebootAndDelete ); // Simulated machine rebooted with data and coordination state deletion TEST( kt == ISimulator::RebootProcessAndDelete ); // Simulated process rebooted with data and coordination state deletion if( p->rebooting ) return; TraceEvent("RebootingProcess").detail("KillType", kt).detail("Address", p->address).detailext("ZoneId", p->locality.zoneId()).detailext("DataHall", p->locality.dataHallId()).detail("Locality", p->locality.toString()).detail("failed", p->failed).detail("excluded", p->excluded).detail("cleared", p->cleared).backtrace(); p->rebooting = true; if ((kt == ISimulator::RebootAndDelete) || (kt == ISimulator::RebootProcessAndDelete)) { p->cleared = true; g_simulator.clearAddress(p->address); } p->shutdownSignal.send( kt ); } catch (Error& e) { TraceEvent(SevError, "RebootError").error(e); p->shutdownSignal.sendError(e); // ? throw; // goes nowhere! } } //Simulates delays for performing operations on disk Future waitUntilDiskReady( Reference diskParameters, int64_t size, bool sync ) { if(g_simulator.connectionFailuresDisableDuration > 1e4) return delay(0.0001); if( diskParameters->nextOperation < now() ) diskParameters->nextOperation = now(); diskParameters->nextOperation += ( 1.0 / diskParameters->iops ) + ( size / diskParameters->bandwidth ); double randomLatency; if(sync) { randomLatency = .005 + g_random->random01() * (BUGGIFY ? 1.0 : .010); } else randomLatency = 10 * g_random->random01() / diskParameters->iops; return delayUntil( diskParameters->nextOperation + randomLatency ); } #if defined(_WIN32) /* Opening with FILE_SHARE_DELETE lets simulation actually work on windows - previously renames were always failing. FIXME: Use an actual platform abstraction for this stuff! Is there any reason we can't use underlying net2 for example? */ #include int sf_open( const char* filename, int flags, int convFlags, int mode ) { HANDLE wh = CreateFile( filename, GENERIC_READ | ((flags&IAsyncFile::OPEN_READWRITE) ? GENERIC_WRITE : 0), FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, NULL, (flags&IAsyncFile::OPEN_EXCLUSIVE) ? CREATE_NEW : (flags&IAsyncFile::OPEN_CREATE) ? OPEN_ALWAYS : OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL ); int h = -1; if (wh != INVALID_HANDLE_VALUE) h = _open_osfhandle( (intptr_t)wh, convFlags ); else errno = GetLastError() == ERROR_FILE_NOT_FOUND ? ENOENT : EFAULT; return h; } #endif // Opens a file for asynchronous I/O Future< Reference > Sim2FileSystem::open( std::string filename, int64_t flags, int64_t mode ) { ASSERT( (flags & IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE) || !(flags & IAsyncFile::OPEN_CREATE) || StringRef(filename).endsWith(LiteralStringRef(".fdb-lock")) ); // We don't use "ordinary" non-atomic file creation right now except for folder locking, and we don't have code to simulate its unsafeness. if ( (flags & IAsyncFile::OPEN_EXCLUSIVE) ) ASSERT( flags & IAsyncFile::OPEN_CREATE ); if (flags & IAsyncFile::OPEN_UNCACHED) { auto& machineCache = g_simulator.getCurrentProcess()->machine->openFiles; std::string actualFilename = filename; if ( machineCache.find(filename) == machineCache.end() ) { if(flags & IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE) { actualFilename = filename + ".part"; auto partFile = machineCache.find(actualFilename); if(partFile != machineCache.end()) { Future> f = AsyncFileDetachable::open(partFile->second); if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0) f = map(f, [=](Reference r) { return Reference(new AsyncFileWriteChecker(r)); }); return f; } } //Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile. This way, they can both keep up with the time to start the next operation Reference diskParameters(new DiskParameters(FLOW_KNOBS->SIM_DISK_IOPS, FLOW_KNOBS->SIM_DISK_BANDWIDTH)); machineCache[actualFilename] = AsyncFileNonDurable::open(filename, actualFilename, SimpleFile::open(filename, flags, mode, diskParameters, false), diskParameters); } Future> f = AsyncFileDetachable::open( machineCache[actualFilename] ); if(FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0) f = map(f, [=](Reference r) { return Reference(new AsyncFileWriteChecker(r)); }); return f; } else return AsyncFileCached::open(filename, flags, mode); } // Deletes the given file. If mustBeDurable, returns only when the file is guaranteed to be deleted even after a power failure. Future< Void > Sim2FileSystem::deleteFile( std::string filename, bool mustBeDurable ) { return Sim2::deleteFileImpl(&g_sim2, filename, mustBeDurable); } void Sim2FileSystem::newFileSystem() { g_network->setGlobal(INetwork::enFileSystem, (flowGlobalType) new Sim2FileSystem()); }