diff --git a/bindings/python/tests/fdbcli_tests.py b/bindings/python/tests/fdbcli_tests.py index d24ddb876f..7be7c75f4d 100755 --- a/bindings/python/tests/fdbcli_tests.py +++ b/bindings/python/tests/fdbcli_tests.py @@ -233,7 +233,8 @@ def suspend(logger): port = address.split(':')[1] logger.debug("Port: {}".format(port)) # use the port number to find the exact fdb process we are connecting to - pinfo = list(filter(lambda x: port in x, pinfos)) + # child process like fdbserver -r flowprocess does not provide `datadir` in the command line + pinfo = list(filter(lambda x: port in x and 'datadir' in x, pinfos)) assert len(pinfo) == 1 pid = pinfo[0].split(' ')[0] logger.debug("Pid: {}".format(pid)) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index bbc2cb0adf..acc1d3253c 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -652,6 +652,7 @@ struct GetRangeLimits { }; struct RangeResultRef : VectorRef { + constexpr static FileIdentifier file_identifier = 3985192; bool more; // True if (but not necessarily only if) values remain in the *key* range requested (possibly beyond the // limits requested) False implies that no such values remain Optional readThrough; // Only present when 'more' is true. When present, this value represent the end (or @@ -958,6 +959,7 @@ struct TLogSpillType { // Contains the amount of free and total space for a storage server, in bytes struct StorageBytes { + constexpr static FileIdentifier file_identifier = 3928581; // Free space on the filesystem int64_t free; // Total space on the filesystem diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index f53efac786..32a8738f61 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -250,6 +250,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( DEBOUNCE_RECRUITING_DELAY, 5.0 ); init( DD_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) DD_FAILURE_TIME = 10.0; init( DD_ZERO_HEALTHY_TEAM_DELAY, 1.0 ); + init( REMOTE_KV_STORE, false ); if( randomize && BUGGIFY ) REMOTE_KV_STORE = true; + init( REMOTE_KV_STORE_INIT_DELAY, 0.1 ); + init( REMOTE_KV_STORE_MAX_INIT_DURATION, 10.0 ); init( REBALANCE_MAX_RETRIES, 100 ); init( DD_OVERLAP_PENALTY, 10000 ); init( DD_EXCLUDE_MIN_REPLICAS, 1 ); @@ -555,6 +558,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( MIN_REBOOT_TIME, 4.0 ); if( longReboots ) MIN_REBOOT_TIME = 10.0; init( MAX_REBOOT_TIME, 5.0 ); if( longReboots ) MAX_REBOOT_TIME = 20.0; init( LOG_DIRECTORY, "."); // Will be set to the command line flag. + init( CONN_FILE, ""); // Will be set to the command line flag. init( SERVER_MEM_LIMIT, 8LL << 30 ); init( SYSTEM_MONITOR_FREQUENCY, 5.0 ); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index de69ef43dc..830d462883 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -233,6 +233,14 @@ public: double DD_FAILURE_TIME; double DD_ZERO_HEALTHY_TEAM_DELAY; + // Run storage enginee on a child process on the same machine with storage process + bool REMOTE_KV_STORE; + // A delay to avoid race on file resources if the new kv store process started immediately after the previous kv + // store process died + double REMOTE_KV_STORE_INIT_DELAY; + // max waiting time for the remote kv store to initialize + double REMOTE_KV_STORE_MAX_INIT_DURATION; + // KeyValueStore SQLITE int CLEAR_BUFFER_SIZE; double READ_VALUE_TIME_ESTIMATE; @@ -488,6 +496,7 @@ public: double MIN_REBOOT_TIME; double MAX_REBOOT_TIME; std::string LOG_DIRECTORY; + std::string CONN_FILE; int64_t SERVER_MEM_LIMIT; double SYSTEM_MONITOR_FREQUENCY; diff --git a/fdbrpc/CMakeLists.txt b/fdbrpc/CMakeLists.txt index baff60b4f0..59ae21bc9e 100644 --- a/fdbrpc/CMakeLists.txt +++ b/fdbrpc/CMakeLists.txt @@ -10,6 +10,7 @@ set(FDBRPC_SRCS AsyncFileNonDurable.actor.cpp AsyncFileWriteChecker.cpp FailureMonitor.actor.cpp + FlowProcess.actor.h FlowTransport.actor.cpp genericactors.actor.h genericactors.actor.cpp diff --git a/fdbrpc/FlowProcess.actor.h b/fdbrpc/FlowProcess.actor.h new file mode 100644 index 0000000000..bd734198d8 --- /dev/null +++ b/fdbrpc/FlowProcess.actor.h @@ -0,0 +1,94 @@ +/* + * FlowProcess.actor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#if defined(NO_INTELLISENSE) && !defined(FDBRPC_FLOW_PROCESS_ACTOR_G_H) +#define FDBRPC_FLOW_PROCESS_ACTOR_G_H +#include "fdbrpc/FlowProcess.actor.g.h" +#elif !defined(FDBRPC_FLOW_PROCESS_ACTOR_H) +#define FDBRPC_FLOW_PROCESS_ACTOR_H + +#include "fdbrpc/fdbrpc.h" + +#include +#include + +#include // has to be last include + +struct FlowProcessInterface { + constexpr static FileIdentifier file_identifier = 3491839; + RequestStream registerProcess; + + template + void serialize(Ar& ar) { + serializer(ar, registerProcess); + } +}; + +struct FlowProcessRegistrationRequest { + constexpr static FileIdentifier file_identifier = 3411838; + Standalone flowProcessInterface; + + template + void serialize(Ar& ar) { + serializer(ar, flowProcessInterface); + } +}; + +class FlowProcess { + +public: + virtual ~FlowProcess() {} + virtual StringRef name() const = 0; + virtual StringRef serializedInterface() const = 0; + virtual Future run() = 0; + virtual void registerEndpoint(Endpoint p) = 0; +}; + +struct IProcessFactory { + static FlowProcess* create(std::string const& name) { + auto it = factories().find(name); + if (it == factories().end()) + return nullptr; // or throw? + return it->second->create(); + } + static std::map& factories() { + static std::map theFactories; + return theFactories; + } + + virtual FlowProcess* create() = 0; + + virtual const char* getName() = 0; +}; + +template +struct ProcessFactory : IProcessFactory { + ProcessFactory(const char* name) : name(name) { factories()[name] = this; } + FlowProcess* create() override { return new ProcessType(); } + const char* getName() override { return this->name; } + +private: + const char* name; +}; + +#include +#endif diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index bac1a9b145..9e5a224d4c 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -991,7 +991,8 @@ static void scanPackets(TransportData* transport, Arena& arena, NetworkAddress const& peerAddress, ProtocolVersion peerProtocolVersion, - Future disconnect) { + Future disconnect, + bool isStableConnection) { // Find each complete packet in the given byte range and queue a ready task to deliver it. // Remove the complete packets from the range by increasing unprocessed_begin. // There won't be more than 64K of data plus one packet, so this shouldn't take a long time. @@ -1030,7 +1031,7 @@ static void scanPackets(TransportData* transport, if (checksumEnabled) { bool isBuggifyEnabled = false; - if (g_network->isSimulated() && + if (g_network->isSimulated() && !isStableConnection && g_network->now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && BUGGIFY_WITH_PROB(0.0001)) { g_simulator.lastConnectionFailure = g_network->now(); @@ -1057,7 +1058,8 @@ static void scanPackets(TransportData* transport, if (isBuggifyEnabled) { TraceEvent(SevInfo, "ChecksumMismatchExp") .detail("PacketChecksum", packetChecksum) - .detail("CalculatedChecksum", calculatedChecksum); + .detail("CalculatedChecksum", calculatedChecksum) + .detail("PeerAddress", peerAddress.toString()); } else { TraceEvent(SevWarnAlways, "ChecksumMismatchUnexp") .detail("PacketChecksum", packetChecksum) @@ -1305,7 +1307,8 @@ ACTOR static Future connectionReader(TransportData* transport, arena, peerAddress, peerProtocolVersion, - peer->disconnect.getFuture()); + peer->disconnect.getFuture(), + g_network->isSimulated() && conn->isStableConnection()); } else { unprocessed_begin = unprocessed_end; peer->resetPing.trigger(); @@ -1364,6 +1367,11 @@ ACTOR static Future listen(TransportData* self, NetworkAddress listenAddr) state ActorCollectionNoErrors incoming; // Actors monitoring incoming connections that haven't yet been associated with a peer state Reference listener = INetworkConnections::net()->listen(listenAddr); + if (!g_network->isSimulated() && self->localAddresses.address.port == 0) { + TraceEvent(SevInfo, "UpdatingListenAddress") + .detail("AssignedListenAddress", listener->getListenAddress().toString()); + self->localAddresses.address = listener->getListenAddress(); + } state uint64_t connectionCount = 0; try { loop { diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index ba3a247bb6..268005eaff 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -20,6 +20,7 @@ #include #include +#include #include "contrib/fmt-8.1.1/include/fmt/format.h" #include "fdbrpc/simulator.h" @@ -121,20 +122,24 @@ void ISimulator::displayWorkers() const { int openCount = 0; struct SimClogging { - double getSendDelay(NetworkAddress from, NetworkAddress to) const { return halfLatency(); } + double getSendDelay(NetworkAddress from, NetworkAddress to, bool stableConnection = false) const { + // stable connection here means it's a local connection between processes on the same machine + // we expect it to have much lower latency + return (stableConnection ? 0.1 : 1.0) * halfLatency(); + } - double getRecvDelay(NetworkAddress from, NetworkAddress to) { + double getRecvDelay(NetworkAddress from, NetworkAddress to, bool stableConnection = false) { auto pair = std::make_pair(from.ip, to.ip); double tnow = now(); - double t = tnow + halfLatency(); - if (!g_simulator.speedUpSimulation) + double t = tnow + (stableConnection ? 0.1 : 1.0) * halfLatency(); + if (!g_simulator.speedUpSimulation && !stableConnection) t += clogPairLatency[pair]; - if (!g_simulator.speedUpSimulation && clogPairUntil.count(pair)) + if (!g_simulator.speedUpSimulation && !stableConnection && clogPairUntil.count(pair)) t = std::max(t, clogPairUntil[pair]); - if (!g_simulator.speedUpSimulation && clogRecvUntil.count(to.ip)) + if (!g_simulator.speedUpSimulation && !stableConnection && clogRecvUntil.count(to.ip)) t = std::max(t, clogRecvUntil[to.ip]); return t - tnow; @@ -182,8 +187,8 @@ SimClogging g_clogging; struct Sim2Conn final : IConnection, ReferenceCounted { Sim2Conn(ISimulator::ProcessInfo* process) - : opened(false), closedByCaller(false), process(process), dbgid(deterministicRandom()->randomUniqueID()), - stopReceive(Never()) { + : opened(false), closedByCaller(false), stableConnection(false), process(process), + dbgid(deterministicRandom()->randomUniqueID()), stopReceive(Never()) { pipes = sender(this) && receiver(this); } @@ -202,7 +207,18 @@ struct Sim2Conn final : IConnection, ReferenceCounted { process->address.ip, FLOW_KNOBS->MAX_CLOGGING_LATENCY * deterministicRandom()->random01()); sendBufSize = std::max(deterministicRandom()->randomInt(0, 5000000), 25e6 * (latency + .002)); - TraceEvent("Sim2Connection").detail("SendBufSize", sendBufSize).detail("Latency", latency); + // options like clogging or bitsflip are disabled for stable connections + stableConnection = std::any_of(process->childs.begin(), + process->childs.end(), + [&](ISimulator::ProcessInfo* child) { return child && child == peerProcess; }) || + std::any_of(peerProcess->childs.begin(), + peerProcess->childs.end(), + [&](ISimulator::ProcessInfo* child) { return child && child == process; }); + + TraceEvent("Sim2Connection") + .detail("SendBufSize", sendBufSize) + .detail("Latency", latency) + .detail("StableConnection", stableConnection); } ~Sim2Conn() { ASSERT_ABORT(!opened || closedByCaller); } @@ -222,6 +238,8 @@ struct Sim2Conn final : IConnection, ReferenceCounted { bool isPeerGone() const { return !peer || peerProcess->failed; } + bool isStableConnection() const override { return stableConnection; } + void peerClosed() { leakedConnectionTracker = trackLeakedConnection(this); stopReceive = delay(1.0); @@ -249,7 +267,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { ASSERT(limit > 0); int toSend = 0; - if (BUGGIFY) { + if (BUGGIFY && !stableConnection) { toSend = std::min(limit, buffer->bytes_written - buffer->bytes_sent); } else { for (auto p = buffer; p; p = p->next) { @@ -262,7 +280,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { } } ASSERT(toSend); - if (BUGGIFY) + if (BUGGIFY && !stableConnection) toSend = std::min(toSend, deterministicRandom()->randomInt(0, 1000)); if (!peer) @@ -286,7 +304,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { NetworkAddress getPeerAddress() const override { return peerEndpoint; } UID getDebugID() const override { return dbgid; } - bool opened, closedByCaller; + bool opened, closedByCaller, stableConnection; private: ISimulator::ProcessInfo *process, *peerProcess; @@ -336,10 +354,12 @@ private: deterministicRandom()->random01() < .5 ? self->sentBytes.get() : deterministicRandom()->randomInt64(self->receivedBytes.get(), self->sentBytes.get() + 1); - wait(delay(g_clogging.getSendDelay(self->process->address, self->peerProcess->address))); + wait(delay(g_clogging.getSendDelay( + self->process->address, self->peerProcess->address, self->isStableConnection()))); wait(g_simulator.onProcess(self->process)); ASSERT(g_simulator.getCurrentProcess() == self->process); - wait(delay(g_clogging.getRecvDelay(self->process->address, self->peerProcess->address))); + wait(delay(g_clogging.getRecvDelay( + self->process->address, self->peerProcess->address, self->isStableConnection()))); ASSERT(g_simulator.getCurrentProcess() == self->process); if (self->stopReceive.isReady()) { wait(Future(Never())); @@ -389,7 +409,9 @@ private: } void rollRandomClose() { - if (now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && + // make sure connections between parenta and their childs are not closed + if (!stableConnection && + now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && deterministicRandom()->random01() < .00001) { g_simulator.lastConnectionFailure = now(); double a = deterministicRandom()->random01(), b = deterministicRandom()->random01(); @@ -1101,6 +1123,10 @@ public: if (mustBeDurable || deterministicRandom()->random01() < 0.5) { state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess(); state TaskPriority currentTaskID = g_network->getCurrentTask(); + TraceEvent(SevDebug, "Sim2DeleteFileImpl") + .detail("CurrentProcess", currentProcess->toString()) + .detail("Filename", filename) + .detail("Durable", mustBeDurable); wait(g_simulator.onMachine(currentProcess)); try { wait(::delay(0.05 * deterministicRandom()->random01())); @@ -1118,6 +1144,9 @@ public: throw err; } } else { + TraceEvent(SevDebug, "Sim2DeleteFileImplNonDurable") + .detail("Filename", filename) + .detail("Durable", mustBeDurable); TEST(true); // Simulated non-durable delete return Void(); } @@ -1163,6 +1192,9 @@ public: MachineInfo& machine = machines[locality.machineId().get()]; if (!machine.machineId.present()) machine.machineId = locality.machineId(); + if (port == 0 && std::string(name) == "remote flow process") { + port = machine.getRandomPort(); + } for (int i = 0; i < machine.processes.size(); i++) { if (machine.processes[i]->locality.machineId() != locality.machineId()) { // SOMEDAY: compute ip from locality to avoid this check @@ -1220,6 +1252,11 @@ public: .detail("Excluded", m->excluded) .detail("Cleared", m->cleared); + if (std::string(name) == "remote flow process") { + protectedAddresses.insert(m->address); + TraceEvent(SevDebug, "NewFlowProcessProtected").detail("Address", m->address); + } + // FIXME: Sometimes, connections to/from this process will explicitly close return m; @@ -1497,6 +1534,7 @@ public: .detail("MachineId", p->locality.machineId()); currentlyRebootingProcesses.insert(std::pair(p->address, p)); std::vector& processes = machines[p->locality.machineId().get()].processes; + machines[p->locality.machineId().get()].removeRemotePort(p->address.port); if (p != processes.back()) { auto it = std::find(processes.begin(), processes.end(), p); std::swap(*it, processes.back()); @@ -1520,7 +1558,8 @@ public: .detail("Protected", protectedAddresses.count(machine->address)) .backtrace(); // This will remove all the "tracked" messages that came from the machine being killed - latestEventCache.clear(); + if (std::string(machine->name) != "remote flow process") + latestEventCache.clear(); machine->failed = true; } else if (kt == InjectFaults) { TraceEvent(SevWarn, "FaultMachine") @@ -1548,7 +1587,8 @@ public: } else { ASSERT(false); } - ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting); + ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting || + std::string(machine->name) == "remote flow process"); } void rebootProcess(ProcessInfo* process, KillType kt) override { if (kt == RebootProcessAndDelete && protectedAddresses.count(process->address)) { @@ -2390,8 +2430,19 @@ ACTOR void doReboot(ISimulator::ProcessInfo* p, ISimulator::KillType kt) { kt == ISimulator::RebootProcessAndDelete); // Simulated process rebooted with data and coordination state deletion - if (p->rebooting || !p->isReliable()) + if (p->rebooting || !p->isReliable()) { + TraceEvent(SevDebug, "DoRebootFailed") + .detail("Rebooting", p->rebooting) + .detail("Reliable", p->isReliable()); return; + } else if (std::string(p->name) == "remote flow process") { + TraceEvent(SevDebug, "DoRebootFailed").detail("Name", p->name).detail("Address", p->address); + return; + } else if (p->getChilds().size()) { + TraceEvent(SevDebug, "DoRebootFailedOnParentProcess").detail("Address", p->address); + return; + } + TraceEvent("RebootingProcess") .detail("KillType", kt) .detail("Address", p->address) diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 80f17b3971..8f23db0400 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -21,6 +21,7 @@ #ifndef FLOW_SIMULATOR_H #define FLOW_SIMULATOR_H #include "flow/ProtocolVersion.h" +#include #include #pragma once @@ -87,6 +88,8 @@ public: ProtocolVersion protocolVersion; + std::vector childs; + ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, @@ -117,6 +120,7 @@ public: << " fault_injection_p2:" << fault_injection_p2; return ss.str(); } + std::vector const& getChilds() const { return childs; } // Return true if the class type is suitable for stateful roles, such as tLog and StorageServer. bool isAvailableClass() const { @@ -202,7 +206,30 @@ public: std::set closingFiles; Optional> machineId; - MachineInfo() : machineProcess(nullptr) {} + const uint16_t remotePortStart; + std::vector usedRemotePorts; + + MachineInfo() : machineProcess(nullptr), remotePortStart(1000) {} + + short getRandomPort() { + for (uint16_t i = remotePortStart; i < 60000; i++) { + if (std::find(usedRemotePorts.begin(), usedRemotePorts.end(), i) == usedRemotePorts.end()) { + TraceEvent(SevDebug, "RandomPortOpened").detail("PortNum", i); + usedRemotePorts.push_back(i); + return i; + } + } + UNREACHABLE(); + } + + void removeRemotePort(uint16_t port) { + if (port < remotePortStart) + return; + auto pos = std::find(usedRemotePorts.begin(), usedRemotePorts.end(), port); + if (pos != usedRemotePorts.end()) { + usedRemotePorts.erase(pos); + } + } }; ProcessInfo* getProcess(Endpoint const& endpoint) { return getProcessByAddress(endpoint.getPrimaryAddress()); } diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 2726c039fe..7970dd18ef 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -98,6 +98,8 @@ set(FDBSERVER_SRCS Ratekeeper.h RatekeeperInterface.h RecoveryState.h + RemoteIKeyValueStore.actor.h + RemoteIKeyValueStore.actor.cpp ResolutionBalancer.actor.cpp ResolutionBalancer.actor.h Resolver.actor.cpp diff --git a/fdbserver/FDBExecHelper.actor.cpp b/fdbserver/FDBExecHelper.actor.cpp index 9b690d4d35..3e34fc8e25 100644 --- a/fdbserver/FDBExecHelper.actor.cpp +++ b/fdbserver/FDBExecHelper.actor.cpp @@ -18,17 +18,30 @@ * limitations under the License. */ +#include "flow/TLSConfig.actor.h" +#include "flow/Trace.h" +#include "flow/Platform.h" +#include "flow/flow.h" +#include "flow/genericactors.actor.h" +#include "flow/network.h" +#include "fdbrpc/FlowProcess.actor.h" +#include "fdbrpc/Net2FileSystem.h" +#include "fdbrpc/simulator.h" +#include "fdbclient/WellKnownEndpoints.h" +#include "fdbclient/versions.h" +#include "fdbserver/CoroFlow.h" +#include "fdbserver/FDBExecHelper.actor.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/RemoteIKeyValueStore.actor.h" + #if !defined(_WIN32) && !defined(__APPLE__) && !defined(__INTEL_COMPILER) #define BOOST_SYSTEM_NO_LIB #define BOOST_DATE_TIME_NO_LIB #define BOOST_REGEX_NO_LIB #include #endif -#include "fdbserver/FDBExecHelper.actor.h" -#include "flow/Trace.h" -#include "flow/flow.h" -#include "fdbclient/versions.h" -#include "fdbserver/Knobs.h" +#include + #include "flow/actorcompiler.h" // This must be the last #include. ExecCmdValueString::ExecCmdValueString(StringRef pCmdValueString) { @@ -90,12 +103,138 @@ void ExecCmdValueString::dbgPrint() const { return; } +ACTOR void destoryChildProcess(Future parentSSClosed, ISimulator::ProcessInfo* childInfo, std::string message) { + // This code path should be bug free + wait(parentSSClosed); + TraceEvent(SevDebug, message.c_str()).log(); + // This one is root cause for most failures, make sure it's okay to destory + g_pSimulator->destroyProcess(childInfo); + // Explicitly reset the connection with the child process in case re-spawn very quickly + FlowTransport::transport().resetConnection(childInfo->address); +} + +ACTOR Future spawnSimulated(std::vector paramList, + double maxWaitTime, + bool isSync, + double maxSimDelayTime, + IClosable* parent) { + state ISimulator::ProcessInfo* self = g_pSimulator->getCurrentProcess(); + state ISimulator::ProcessInfo* child; + + state std::string role; + state std::string addr; + state std::string flowProcessName; + state Endpoint parentProcessEndpoint; + state int i = 0; + // fdbserver -r flowprocess --process-name ikvs --process-endpoint ip:port,token,id + for (; i < paramList.size(); i++) { + if (paramList.size() > i + 1) { + // temporary args parser that only supports the flowprocess role + if (paramList[i] == "-r") { + role = paramList[i + 1]; + } else if (paramList[i] == "-p" || paramList[i] == "--public_address") { + addr = paramList[i + 1]; + } else if (paramList[i] == "--process-name") { + flowProcessName = paramList[i + 1]; + } else if (paramList[i] == "--process-endpoint") { + state std::vector addressArray; + boost::split(addressArray, paramList[i + 1], [](char c) { return c == ','; }); + if (addressArray.size() != 3) { + std::cerr << "Invalid argument, expected 3 elements in --process-endpoint got " + << addressArray.size() << std::endl; + flushAndExit(FDB_EXIT_ERROR); + } + try { + auto addr = NetworkAddress::parse(addressArray[0]); + uint64_t fst = std::stoul(addressArray[1]); + uint64_t snd = std::stoul(addressArray[2]); + UID token(fst, snd); + NetworkAddressList l; + l.address = addr; + parentProcessEndpoint = Endpoint(l, token); + } catch (Error& e) { + std::cerr << "Could not parse network address " << addressArray[0] << std::endl; + flushAndExit(FDB_EXIT_ERROR); + } + } + } + } + state int result = 0; + child = g_pSimulator->newProcess("remote flow process", + self->address.ip, + 0, + self->address.isTLS(), + self->addresses.secondaryAddress.present() ? 2 : 1, + self->locality, + ProcessClass(ProcessClass::UnsetClass, ProcessClass::AutoSource), + self->dataFolder, + self->coordinationFolder, // do we need to customize this coordination folder path? + self->protocolVersion); + wait(g_pSimulator->onProcess(child)); + state Future onShutdown = child->onShutdown(); + state Future parentShutdown = self->onShutdown(); + state Future flowProcessF; + + try { + TraceEvent(SevDebug, "SpawnedChildProcess") + .detail("Child", child->toString()) + .detail("Parent", self->toString()); + std::string role = ""; + std::string addr = ""; + for (int i = 0; i < paramList.size(); i++) { + if (paramList.size() > i + 1 && paramList[i] == "-r") { + role = paramList[i + 1]; + } + } + if (role == "flowprocess" && !parentShutdown.isReady()) { + self->childs.push_back(child); + state Future parentSSClosed = parent->onClosed(); + FlowTransport::createInstance(false, 1, WLTOKEN_RESERVED_COUNT); + FlowTransport::transport().bind(child->address, child->address); + Sim2FileSystem::newFileSystem(); + ProcessFactory(flowProcessName.c_str()); + flowProcessF = runFlowProcess(flowProcessName, parentProcessEndpoint); + + choose { + when(wait(flowProcessF)) { + TraceEvent(SevDebug, "ChildProcessKilled").log(); + wait(g_pSimulator->onProcess(self)); + TraceEvent(SevDebug, "BackOnParentProcess").detail("Result", std::to_string(result)); + destoryChildProcess(parentSSClosed, child, "StorageServerReceivedClosedMessage"); + } + when(wait(success(onShutdown))) { + ASSERT(false); + // In prod, we use prctl to bind parent and child processes to die together + // In simulation, we simply disable killing parent or child processes as we cannot use the same + // mechanism here + } + when(wait(success(parentShutdown))) { + ASSERT(false); + // Parent process is not killed, see above + } + } + } else { + ASSERT(false); + } + } catch (Error& e) { + TraceEvent(SevError, "RemoteIKVSDied").errorUnsuppressed(e); + result = -1; + } + + return result; +} + #if defined(_WIN32) || defined(__APPLE__) || defined(__INTEL_COMPILER) ACTOR Future spawnProcess(std::string binPath, std::vector paramList, double maxWaitTime, bool isSync, - double maxSimDelayTime) { + double maxSimDelayTime, + IClosable* parent) { + if (g_network->isSimulated() && getExecPath() == binPath) { + int res = wait(spawnSimulated(paramList, maxWaitTime, isSync, maxSimDelayTime, parent)); + return res; + } wait(delay(0.0)); return 0; } @@ -125,6 +264,9 @@ static auto fork_child(const std::string& path, std::vector& paramList) { } static void setupTraceWithOutput(TraceEvent& event, size_t bytesRead, char* outputBuffer) { + // get some errors printed for spawned process + std::cout << "Output bytesRead: " << bytesRead << std::endl; + std::cout << "output buffer: " << std::string(outputBuffer) << std::endl; if (bytesRead == 0) return; ASSERT(bytesRead <= SERVER_KNOBS->MAX_FORKED_PROCESS_OUTPUT); @@ -139,7 +281,12 @@ ACTOR Future spawnProcess(std::string path, std::vector args, double maxWaitTime, bool isSync, - double maxSimDelayTime) { + double maxSimDelayTime, + IClosable* parent) { + if (g_network->isSimulated() && getExecPath() == path) { + int res = wait(spawnSimulated(args, maxWaitTime, isSync, maxSimDelayTime, parent)); + return res; + } // for async calls in simulator, always delay by a deterministic amount of time and then // do the call synchronously, otherwise the predictability of the simulator breaks if (!isSync && g_network->isSimulated()) { @@ -182,7 +329,7 @@ ACTOR Future spawnProcess(std::string path, int flags = fcntl(readFD.get(), F_GETFL, 0); fcntl(readFD.get(), F_SETFL, flags | O_NONBLOCK); while (true) { - if (runTime > maxWaitTime) { + if (maxWaitTime >= 0 && runTime > maxWaitTime) { // timing out TraceEvent(SevWarnAlways, "SpawnProcessFailure") @@ -203,7 +350,6 @@ ACTOR Future spawnProcess(std::string path, break; bytesRead += bytes; } - if (err < 0) { TraceEvent event(SevWarnAlways, "SpawnProcessFailure"); setupTraceWithOutput(event, bytesRead, outputBuffer); diff --git a/fdbserver/FDBExecHelper.actor.h b/fdbserver/FDBExecHelper.actor.h index f5f07a000d..4a191663d8 100644 --- a/fdbserver/FDBExecHelper.actor.h +++ b/fdbserver/FDBExecHelper.actor.h @@ -63,16 +63,19 @@ private: // data StringRef binaryPath; }; +class IClosable; // Forward declaration + // FIXME: move this function to a common location // spawns a process pointed by `binPath` and the arguments provided at `paramList`, -// if the process spawned takes more than `maxWaitTime` then it will be killed -// if isSync is set to true then the process will be synchronously executed -// if async and in simulator then delay spawning the process to max of maxSimDelayTime +// if the process spawned takes more than `maxWaitTime` then it will be killed, if `maxWaitTime` < 0, then there won't +// be timeout if isSync is set to true then the process will be synchronously executed if async and in simulator then +// delay spawning the process to max of maxSimDelayTime ACTOR Future spawnProcess(std::string binPath, std::vector paramList, double maxWaitTime, bool isSync, - double maxSimDelayTime); + double maxSimDelayTime, + IClosable* parent = nullptr); // helper to run all the work related to running the exec command ACTOR Future execHelper(ExecCmdValueString* execArg, UID snapUID, std::string folder, std::string role); diff --git a/fdbserver/IKeyValueStore.h b/fdbserver/IKeyValueStore.h index 479a7c544b..2020eebb1a 100644 --- a/fdbserver/IKeyValueStore.h +++ b/fdbserver/IKeyValueStore.h @@ -159,12 +159,23 @@ extern IKeyValueStore* keyValueStoreLogSystem(class IDiskQueue* queue, bool replaceContent, bool exactRecovery); +extern IKeyValueStore* openRemoteKVStore(KeyValueStoreType storeType, + std::string const& filename, + UID logID, + int64_t memoryLimit, + bool checkChecksums = false, + bool checkIntegrity = false); + inline IKeyValueStore* openKVStore(KeyValueStoreType storeType, std::string const& filename, UID logID, int64_t memoryLimit, bool checkChecksums = false, - bool checkIntegrity = false) { + bool checkIntegrity = false, + bool openRemotely = false) { + if (openRemotely) { + return openRemoteKVStore(storeType, filename, logID, memoryLimit, checkChecksums, checkIntegrity); + } switch (storeType) { case KeyValueStoreType::SSD_BTREE_V1: return keyValueStoreSQLite(filename, logID, KeyValueStoreType::SSD_BTREE_V1, false, checkIntegrity); diff --git a/fdbserver/RemoteIKeyValueStore.actor.cpp b/fdbserver/RemoteIKeyValueStore.actor.cpp new file mode 100644 index 0000000000..bead82267a --- /dev/null +++ b/fdbserver/RemoteIKeyValueStore.actor.cpp @@ -0,0 +1,246 @@ +/* + * RemoteIKeyValueStore.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/ActorCollection.h" +#include "flow/Error.h" +#include "flow/Platform.h" +#include "flow/Trace.h" +#include "fdbrpc/FlowProcess.actor.h" +#include "fdbrpc/fdbrpc.h" +#include "fdbclient/FDBTypes.h" +#include "fdbserver/FDBExecHelper.actor.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/RemoteIKeyValueStore.actor.h" + +#include "flow/actorcompiler.h" // This must be the last #include. + +StringRef KeyValueStoreProcess::_name = "KeyValueStoreProcess"_sr; + +// A guard for guaranteed killing of machine after runIKVS returns +struct AfterReturn { + IKeyValueStore* kvStore; + UID id; + AfterReturn() : kvStore(nullptr) {} + AfterReturn(IKeyValueStore* store, UID& uid) : kvStore(store), id(uid) {} + ~AfterReturn() { + TraceEvent(SevDebug, "RemoteKVStoreAfterReturn") + .detail("Valid", kvStore != nullptr ? "True" : "False") + .detail("UID", id) + .log(); + if (kvStore != nullptr) { + kvStore->close(); + } + } + // called when we already explicitly closed the kv store + void invalidate() { kvStore = nullptr; } +}; + +ACTOR void sendCommitReply(IKVSCommitRequest commitReq, IKeyValueStore* kvStore, Future onClosed) { + try { + choose { + when(wait(onClosed)) { commitReq.reply.sendError(remote_kvs_cancelled()); } + when(wait(kvStore->commit(commitReq.sequential))) { + StorageBytes storageBytes = kvStore->getStorageBytes(); + commitReq.reply.send(IKVSCommitReply(storageBytes)); + } + } + } catch (Error& e) { + TraceEvent(SevDebug, "RemoteKVSCommitReplyError").errorUnsuppressed(e); + commitReq.reply.sendError(e.code() == error_code_actor_cancelled ? remote_kvs_cancelled() : e); + } +} + +ACTOR template +Future cancellableForwardPromise(ReplyPromise output, Future input) { + try { + T value = wait(input); + output.send(value); + } catch (Error& e) { + TraceEvent(SevDebug, "CancellableForwardPromiseError").errorUnsuppressed(e).backtrace(); + output.sendError(e.code() == error_code_actor_cancelled ? remote_kvs_cancelled() : e); + } + return Void(); +} + +ACTOR Future runIKVS(OpenKVStoreRequest openReq, IKVSInterface ikvsInterface) { + state IKeyValueStore* kvStore = openKVStore(openReq.storeType, + openReq.filename, + openReq.logID, + openReq.memoryLimit, + openReq.checkChecksums, + openReq.checkIntegrity); + state UID kvsId(ikvsInterface.id()); + state ActorCollection actors(false); + state AfterReturn guard(kvStore, kvsId); + state Promise onClosed; + TraceEvent(SevDebug, "RemoteKVStoreInitializing").detail("UID", kvsId); + wait(kvStore->init()); + openReq.reply.send(ikvsInterface); + TraceEvent(SevInfo, "RemoteKVStoreInitialized").detail("IKVSInterfaceUID", kvsId); + + loop { + try { + choose { + when(IKVSGetValueRequest getReq = waitNext(ikvsInterface.getValue.getFuture())) { + actors.add(cancellableForwardPromise(getReq.reply, + kvStore->readValue(getReq.key, getReq.type, getReq.debugID))); + } + when(IKVSSetRequest req = waitNext(ikvsInterface.set.getFuture())) { kvStore->set(req.keyValue); } + when(IKVSClearRequest req = waitNext(ikvsInterface.clear.getFuture())) { kvStore->clear(req.range); } + when(IKVSCommitRequest commitReq = waitNext(ikvsInterface.commit.getFuture())) { + sendCommitReply(commitReq, kvStore, onClosed.getFuture()); + } + when(IKVSReadValuePrefixRequest readPrefixReq = waitNext(ikvsInterface.readValuePrefix.getFuture())) { + actors.add(cancellableForwardPromise( + readPrefixReq.reply, + kvStore->readValuePrefix( + readPrefixReq.key, readPrefixReq.maxLength, readPrefixReq.type, readPrefixReq.debugID))); + } + when(IKVSReadRangeRequest readRangeReq = waitNext(ikvsInterface.readRange.getFuture())) { + actors.add(cancellableForwardPromise( + readRangeReq.reply, + fmap( + [](const RangeResult& result) { return IKVSReadRangeReply(result); }, + kvStore->readRange( + readRangeReq.keys, readRangeReq.rowLimit, readRangeReq.byteLimit, readRangeReq.type)))); + } + when(IKVSGetStorageByteRequest req = waitNext(ikvsInterface.getStorageBytes.getFuture())) { + StorageBytes storageBytes = kvStore->getStorageBytes(); + req.reply.send(storageBytes); + } + when(IKVSGetErrorRequest getFutureReq = waitNext(ikvsInterface.getError.getFuture())) { + actors.add(cancellableForwardPromise(getFutureReq.reply, kvStore->getError())); + } + when(IKVSOnClosedRequest onClosedReq = waitNext(ikvsInterface.onClosed.getFuture())) { + // onClosed request is not cancelled even this actor is cancelled + forwardPromise(onClosedReq.reply, kvStore->onClosed()); + } + when(IKVSDisposeRequest disposeReq = waitNext(ikvsInterface.dispose.getFuture())) { + TraceEvent(SevDebug, "RemoteIKVSDisposeReceivedRequest").detail("UID", kvsId); + kvStore->dispose(); + guard.invalidate(); + onClosed.send(Void()); + return Void(); + } + when(IKVSCloseRequest closeReq = waitNext(ikvsInterface.close.getFuture())) { + TraceEvent(SevDebug, "RemoteIKVSCloseReceivedRequest").detail("UID", kvsId); + kvStore->close(); + guard.invalidate(); + onClosed.send(Void()); + return Void(); + } + } + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) { + TraceEvent(SevInfo, "RemoteKVStoreCancelled").detail("UID", kvsId).backtrace(); + onClosed.send(Void()); + return Void(); + } else { + TraceEvent(SevError, "RemoteKVStoreError").error(e).detail("UID", kvsId).backtrace(); + throw; + } + } + } +} + +ACTOR static Future flowProcessRunner(RemoteIKeyValueStore* self, Promise ready) { + state FlowProcessInterface processInterface; + state Future process; + + auto path = abspath(getExecPath()); + auto endpoint = processInterface.registerProcess.getEndpoint(); + auto address = endpoint.addresses.address.toString(); + auto token = endpoint.token; + + // port 0 means we will find a random available port number for it + std::string flowProcessAddr = g_network->getLocalAddress().ip.toString().append(":0"); + std::vector args = { "bin/fdbserver", + "-r", + "flowprocess", + "-C", + SERVER_KNOBS->CONN_FILE, + "--logdir", + SERVER_KNOBS->LOG_DIRECTORY, + "-p", + flowProcessAddr, + "--process-name", + KeyValueStoreProcess::_name.toString(), + "--process-endpoint", + format("%s,%lu,%lu", address.c_str(), token.first(), token.second()) }; + // For remote IKV store, we need to make sure the shutdown signal is sent back until we can destroy it in the + // simulation + process = spawnProcess(path, args, -1.0, false, 0.01 /*not used*/, self); + choose { + when(FlowProcessRegistrationRequest req = waitNext(processInterface.registerProcess.getFuture())) { + self->consumeInterface(req.flowProcessInterface); + ready.send(Void()); + } + when(int res = wait(process)) { + // 0 means process normally shut down; non-zero means errors + // process should not shut down normally before not ready + ASSERT(res); + return res; + } + } + int res = wait(process); + return res; +} + +ACTOR static Future initializeRemoteKVStore(RemoteIKeyValueStore* self, OpenKVStoreRequest openKVSReq) { + TraceEvent(SevInfo, "WaitingOnFlowProcess").detail("StoreType", openKVSReq.storeType).log(); + Promise ready; + self->returnCode = flowProcessRunner(self, ready); + wait(ready.getFuture()); + IKVSInterface ikvsInterface = wait(self->kvsProcess.openKVStore.getReply(openKVSReq)); + TraceEvent(SevInfo, "IKVSInterfaceReceived").detail("UID", ikvsInterface.id()); + self->interf = ikvsInterface; + self->interf.storeType = openKVSReq.storeType; + return Void(); +} + +IKeyValueStore* openRemoteKVStore(KeyValueStoreType storeType, + std::string const& filename, + UID logID, + int64_t memoryLimit, + bool checkChecksums, + bool checkIntegrity) { + RemoteIKeyValueStore* self = new RemoteIKeyValueStore(); + self->initialized = initializeRemoteKVStore( + self, OpenKVStoreRequest(storeType, filename, logID, memoryLimit, checkChecksums, checkIntegrity)); + return self; +} + +ACTOR static Future delayFlowProcessRunAction(FlowProcess* self, double time) { + wait(delay(time)); + wait(self->run()); + return Void(); +} + +Future runFlowProcess(std::string const& name, Endpoint endpoint) { + TraceEvent(SevInfo, "RunFlowProcessStart").log(); + FlowProcess* self = IProcessFactory::create(name.c_str()); + self->registerEndpoint(endpoint); + RequestStream registerProcess(endpoint); + FlowProcessRegistrationRequest req; + req.flowProcessInterface = self->serializedInterface(); + registerProcess.send(req); + TraceEvent(SevDebug, "FlowProcessInitFinished").log(); + return delayFlowProcessRunAction(self, g_network->isSimulated() ? 0 : SERVER_KNOBS->REMOTE_KV_STORE_INIT_DELAY); +} diff --git a/fdbserver/RemoteIKeyValueStore.actor.h b/fdbserver/RemoteIKeyValueStore.actor.h new file mode 100644 index 0000000000..7df95aa2e8 --- /dev/null +++ b/fdbserver/RemoteIKeyValueStore.actor.h @@ -0,0 +1,504 @@ +/* + * RemoteIKeyValueStore.actor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_REMOTE_IKEYVALUESTORE_ACTOR_G_H) +#define FDBSERVER_REMOTE_IKEYVALUESTORE_ACTOR_G_H +#include "fdbserver/RemoteIKeyValueStore.actor.g.h" +#elif !defined(FDBSERVER_REMOTE_IKEYVALUESTORE_ACTOR_H) +#define FDBSERVER_REMOTE_IKEYVALUESTORE_ACTOR_H + +#include "flow/ActorCollection.h" +#include "flow/IRandom.h" +#include "flow/Knobs.h" +#include "flow/Trace.h" +#include "flow/flow.h" +#include "flow/network.h" +#include "fdbrpc/FlowProcess.actor.h" +#include "fdbrpc/FlowTransport.h" +#include "fdbrpc/fdbrpc.h" +#include "fdbclient/FDBTypes.h" +#include "fdbserver/FDBExecHelper.actor.h" +#include "fdbserver/IKeyValueStore.h" +#include "fdbserver/Knobs.h" + +#include "flow/actorcompiler.h" // This must be the last #include. + +struct IKVSCommitReply { + constexpr static FileIdentifier file_identifier = 3958189; + StorageBytes storeBytes; + + IKVSCommitReply() : storeBytes(0, 0, 0, 0) {} + IKVSCommitReply(const StorageBytes& sb) : storeBytes(sb) {} + + template + void serialize(Ar& ar) { + serializer(ar, storeBytes); + } +}; + +struct RemoteKVSProcessInterface { + + constexpr static FileIdentifier file_identifier = 3491838; + RequestStream getProcessInterface; + RequestStream openKVStore; + + UID uniqueID = deterministicRandom()->randomUniqueID(); + + UID id() const { return uniqueID; } + + template + void serialize(Ar& ar) { + serializer(ar, getProcessInterface, openKVStore); + } +}; + +struct IKVSInterface { + constexpr static FileIdentifier file_identifier = 4929113; + RequestStream getValue; + RequestStream set; + RequestStream clear; + RequestStream commit; + RequestStream readValuePrefix; + RequestStream readRange; + RequestStream getStorageBytes; + RequestStream getError; + RequestStream onClosed; + RequestStream dispose; + RequestStream close; + + UID uniqueID; + + UID id() const { return uniqueID; } + + KeyValueStoreType storeType; + + KeyValueStoreType type() const { return storeType; } + + IKVSInterface() {} + + IKVSInterface(KeyValueStoreType type) : uniqueID(deterministicRandom()->randomUniqueID()), storeType(type) {} + + template + void serialize(Ar& ar) { + serializer(ar, + getValue, + set, + clear, + commit, + readValuePrefix, + readRange, + getStorageBytes, + getError, + onClosed, + dispose, + close, + uniqueID); + } +}; + +struct GetRemoteKVSProcessInterfaceRequest { + constexpr static FileIdentifier file_identifier = 8382983; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +struct OpenKVStoreRequest { + constexpr static FileIdentifier file_identifier = 5918682; + KeyValueStoreType storeType; + std::string filename; + UID logID; + int64_t memoryLimit; + bool checkChecksums; + bool checkIntegrity; + ReplyPromise reply; + + OpenKVStoreRequest(){}; + + OpenKVStoreRequest(KeyValueStoreType storeType, + std::string filename, + UID logID, + int64_t memoryLimit, + bool checkChecksums = false, + bool checkIntegrity = false) + : storeType(storeType), filename(filename), logID(logID), memoryLimit(memoryLimit), + checkChecksums(checkChecksums), checkIntegrity(checkIntegrity) {} + + template + void serialize(Ar& ar) { + serializer(ar, storeType, filename, logID, memoryLimit, checkChecksums, checkIntegrity, reply); + } +}; + +struct IKVSGetValueRequest { + constexpr static FileIdentifier file_identifier = 1029439; + KeyRef key; + IKeyValueStore::ReadType type; + Optional debugID = Optional(); + ReplyPromise> reply; + + template + void serialize(Ar& ar) { + serializer(ar, key, type, debugID, reply); + } +}; + +struct IKVSSetRequest { + constexpr static FileIdentifier file_identifier = 7283948; + KeyValueRef keyValue; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, keyValue, reply); + } +}; + +struct IKVSClearRequest { + constexpr static FileIdentifier file_identifier = 2838575; + KeyRangeRef range; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, range, reply); + } +}; + +struct IKVSCommitRequest { + constexpr static FileIdentifier file_identifier = 2985129; + bool sequential; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, sequential, reply); + } +}; + +struct IKVSReadValuePrefixRequest { + constexpr static FileIdentifier file_identifier = 1928374; + KeyRef key; + int maxLength; + IKeyValueStore::ReadType type; + Optional debugID = Optional(); + ReplyPromise> reply; + + template + void serialize(Ar& ar) { + serializer(ar, key, maxLength, type, debugID, reply); + } +}; + +// Use this instead of RangeResult as reply for better serialization performance +struct IKVSReadRangeReply { + constexpr static FileIdentifier file_identifier = 6682449; + Arena arena; + VectorRef data; + bool more; + Optional readThrough; + bool readToBegin; + bool readThroughEnd; + + IKVSReadRangeReply() = default; + + explicit IKVSReadRangeReply(const RangeResult& res) + : arena(res.arena()), data(static_cast&>(res)), more(res.more), + readThrough(res.readThrough), readToBegin(res.readToBegin), readThroughEnd(res.readThroughEnd) {} + + template + void serialize(Ar& ar) { + serializer(ar, data, more, readThrough, readToBegin, readThroughEnd, arena); + } + + RangeResult toRangeResult() const { + RangeResult r(RangeResultRef(data, more, readThrough), arena); + r.readToBegin = readToBegin; + r.readThroughEnd = readThroughEnd; + return r; + } +}; + +struct IKVSReadRangeRequest { + constexpr static FileIdentifier file_identifier = 5918394; + KeyRangeRef keys; + int rowLimit; + int byteLimit; + IKeyValueStore::ReadType type; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, keys, rowLimit, byteLimit, type, reply); + } +}; + +struct IKVSGetStorageByteRequest { + constexpr static FileIdentifier file_identifier = 3512344; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +struct IKVSGetErrorRequest { + constexpr static FileIdentifier file_identifier = 3942891; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +struct IKVSOnClosedRequest { + constexpr static FileIdentifier file_identifier = 1923894; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +struct IKVSDisposeRequest { + constexpr static FileIdentifier file_identifier = 1235952; + + template + void serialize(Ar& ar) { + serializer(ar); + } +}; + +struct IKVSCloseRequest { + constexpr static FileIdentifier file_identifier = 13859172; + + template + void serialize(Ar& ar) { + serializer(ar); + } +}; + +ACTOR Future runIKVS(OpenKVStoreRequest openReq, IKVSInterface ikvsInterface); + +struct KeyValueStoreProcess : FlowProcess { + RemoteKVSProcessInterface kvsIf; + Standalone serializedIf; + + Endpoint ssProcess; // endpoint for the storage process + RequestStream ssRequestStream; + + KeyValueStoreProcess() { + TraceEvent(SevDebug, "InitKeyValueStoreProcess").log(); + ObjectWriter writer(IncludeVersion()); + writer.serialize(kvsIf); + serializedIf = writer.toString(); + } + + void registerEndpoint(Endpoint p) override { + ssProcess = p; + ssRequestStream = RequestStream(p); + } + + StringRef name() const override { return _name; } + StringRef serializedInterface() const override { return serializedIf; } + + ACTOR static Future _run(KeyValueStoreProcess* self) { + state ActorCollection actors(true); + TraceEvent("WaitingForOpenKVStoreRequest").log(); + loop { + choose { + when(OpenKVStoreRequest req = waitNext(self->kvsIf.openKVStore.getFuture())) { + TraceEvent("OpenKVStoreRequestReceived").log(); + IKVSInterface interf; + actors.add(runIKVS(req, interf)); + } + when(ErrorOr e = wait(errorOr(actors.getResult()))) { + if (e.isError()) { + TraceEvent("KeyValueStoreProcessRunActorError").errorUnsuppressed(e.getError()); + throw e.getError(); + } else { + TraceEvent("KeyValueStoreProcessFinished").log(); + return e.get(); + } + } + } + } + } + + Future run() override { return _run(this); } + + static StringRef _name; +}; + +struct RemoteIKeyValueStore : public IKeyValueStore { + RemoteKVSProcessInterface kvsProcess; + IKVSInterface interf; + Future initialized; + Future returnCode; + StorageBytes storageBytes; + + RemoteIKeyValueStore() : storageBytes(0, 0, 0, 0) {} + + Future init() override { + TraceEvent(SevInfo, "RemoteIKeyValueStoreInit").log(); + return initialized; + } + + Future getError() const override { return getErrorImpl(this, returnCode); } + Future onClosed() const override { return onCloseImpl(this); } + + void dispose() override { + TraceEvent(SevDebug, "RemoteIKVSDisposeRequest").backtrace(); + interf.dispose.send(IKVSDisposeRequest{}); + // hold the future to not cancel the spawned process + uncancellable(returnCode); + delete this; + } + void close() override { + TraceEvent(SevDebug, "RemoteIKVSCloseRequest").backtrace(); + interf.close.send(IKVSCloseRequest{}); + // hold the future to not cancel the spawned process + uncancellable(returnCode); + delete this; + } + + KeyValueStoreType getType() const override { return interf.type(); } + + void set(KeyValueRef keyValue, const Arena* arena = nullptr) override { + interf.set.send(IKVSSetRequest{ keyValue, ReplyPromise() }); + } + void clear(KeyRangeRef range, const Arena* arena = nullptr) override { + interf.clear.send(IKVSClearRequest{ range, ReplyPromise() }); + } + + Future commit(bool sequential = false) override { + Future commitReply = + interf.commit.getReply(IKVSCommitRequest{ sequential, ReplyPromise() }); + return commitAndGetStorageBytes(this, commitReply); + } + + Future> readValue(KeyRef key, + ReadType type = ReadType::NORMAL, + Optional debugID = Optional()) override { + return readValueImpl(this, IKVSGetValueRequest{ key, type, debugID, ReplyPromise>() }); + } + + Future> readValuePrefix(KeyRef key, + int maxLength, + ReadType type = ReadType::NORMAL, + Optional debugID = Optional()) override { + return interf.readValuePrefix.getReply( + IKVSReadValuePrefixRequest{ key, maxLength, type, debugID, ReplyPromise>() }); + } + + Future readRange(KeyRangeRef keys, + int rowLimit = 1 << 30, + int byteLimit = 1 << 30, + ReadType type = ReadType::NORMAL) override { + IKVSReadRangeRequest req{ keys, rowLimit, byteLimit, type, ReplyPromise() }; + return fmap([](const IKVSReadRangeReply& reply) { return reply.toRangeResult(); }, + interf.readRange.getReply(req)); + } + + StorageBytes getStorageBytes() const override { return storageBytes; } + + void consumeInterface(StringRef intf) { + kvsProcess = ObjectReader::fromStringRef(intf, IncludeVersion()); + } + + ACTOR static Future commitAndGetStorageBytes(RemoteIKeyValueStore* self, + Future commitReplyFuture) { + IKVSCommitReply commitReply = wait(commitReplyFuture); + self->storageBytes = commitReply.storeBytes; + return Void(); + } + + ACTOR static Future> readValueImpl(RemoteIKeyValueStore* self, IKVSGetValueRequest req) { + Optional val = wait(self->interf.getValue.getReply(req)); + return val; + } + + ACTOR static Future getErrorImpl(const RemoteIKeyValueStore* self, Future returnCode) { + choose { + when(wait(self->initialized)) {} + when(wait(delay(SERVER_KNOBS->REMOTE_KV_STORE_MAX_INIT_DURATION))) { + TraceEvent(SevError, "RemoteIKVSInitTooLong") + .detail("TimeLimit", SERVER_KNOBS->REMOTE_KV_STORE_MAX_INIT_DURATION); + throw please_reboot_remote_kv_store(); + } + } + state Future connectionCheckingDelay = delay(FLOW_KNOBS->FAILURE_DETECTION_DELAY); + state Future> storeError = errorOr(self->interf.getError.getReply(IKVSGetErrorRequest{})); + loop choose { + when(ErrorOr e = wait(storeError)) { + TraceEvent(SevDebug, "RemoteIKVSGetError") + .errorUnsuppressed(e.isError() ? e.getError() : success()) + .backtrace(); + if (e.isError()) + throw e.getError(); + else + return e.get(); + } + when(int res = wait(returnCode)) { + TraceEvent(res != 0 ? SevError : SevInfo, "SpawnedProcessDied").detail("Res", res); + if (res) + throw please_reboot_remote_kv_store(); // this will reboot the worker + else + return Void(); + } + when(wait(connectionCheckingDelay)) { + // for the corner case where the child process stuck and waitpid also does not give update on it + // In this scenario, we need to manually reboot the storage engine process + if (IFailureMonitor::failureMonitor() + .getState(self->interf.getError.getEndpoint().getPrimaryAddress()) + .isFailed()) { + TraceEvent(SevError, "RemoteKVStoreConnectionStuck").log(); + throw please_reboot_remote_kv_store(); // this will reboot the worker + } + connectionCheckingDelay = delay(FLOW_KNOBS->FAILURE_DETECTION_DELAY); + } + } + } + + ACTOR static Future onCloseImpl(const RemoteIKeyValueStore* self) { + try { + wait(self->initialized); + wait(self->interf.onClosed.getReply(IKVSOnClosedRequest{})); + TraceEvent(SevDebug, "RemoteIKVSOnCloseImplOnClosedFinished"); + } catch (Error& e) { + TraceEvent(SevInfo, "RemoteIKVSOnCloseImplError").errorUnsuppressed(e).backtrace(); + throw; + } + return Void(); + } +}; + +Future runFlowProcess(std::string const& name, Endpoint endpoint); + +#include "flow/unactorcompiler.h" +#endif \ No newline at end of file diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index b369f5d3df..3883dd50d3 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -263,6 +263,9 @@ class TestConfig { if (attrib == "disableHostname") { disableHostname = strcmp(value.c_str(), "true") == 0; } + if (attrib == "disableRemoteKVS") { + disableRemoteKVS = strcmp(value.c_str(), "true") == 0; + } if (attrib == "restartInfoLocation") { isFirstTestInRestart = true; } @@ -298,6 +301,8 @@ public: bool disableTss = false; // 7.1 cannot be downgraded to 7.0 and below after enabling hostname, so disable hostname for 7.0 downgrade tests bool disableHostname = false; + // remote key value store is a child process spawned by the SS process to run the storage engine + bool disableRemoteKVS = false; // Storage Engine Types: Verify match with SimulationConfig::generateNormalConfig // 0 = "ssd" // 1 = "memory" @@ -357,6 +362,7 @@ public: .add("maxTLogVersion", &maxTLogVersion) .add("disableTss", &disableTss) .add("disableHostname", &disableHostname) + .add("disableRemoteKVS", &disableRemoteKVS) .add("simpleConfig", &simpleConfig) .add("generateFearless", &generateFearless) .add("datacenters", &datacenters) @@ -1084,6 +1090,11 @@ ACTOR Future restartSimulatedSystem(std::vector>* systemActor INetworkConnections::net()->parseMockDNSFromString(mockDNSStr); } } + if (testConfig.disableRemoteKVS) { + IKnobCollection::getMutableGlobalKnobCollection().setKnob("remote_kv_store", + KnobValueRef::create(bool{ false })); + TraceEvent(SevDebug, "DisaableRemoteKVS").log(); + } *pConnString = conn; *pTesterCount = testerCount; bool usingSSL = conn.toString().find(":tls") != std::string::npos || listenersPerProcess > 1; @@ -1836,6 +1847,11 @@ void setupSimulatedSystem(std::vector>* systemActors, if (testConfig.configureLocked) { startingConfigString += " locked"; } + if (testConfig.disableRemoteKVS) { + IKnobCollection::getMutableGlobalKnobCollection().setKnob("remote_kv_store", + KnobValueRef::create(bool{ false })); + TraceEvent(SevDebug, "DisaableRemoteKVS").log(); + } auto configDBType = testConfig.getConfigDBType(); for (auto kv : startingConfigJSON) { if ("tss_storage_engine" == kv.first) { diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 17f1bcf9d2..75c47cf6a8 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -45,16 +45,20 @@ #include "fdbclient/WellKnownEndpoints.h" #include "fdbclient/SimpleIni.h" #include "fdbrpc/AsyncFileCached.actor.h" +#include "fdbrpc/FlowProcess.actor.h" #include "fdbrpc/Net2FileSystem.h" #include "fdbrpc/PerfMetric.h" +#include "fdbrpc/fdbrpc.h" #include "fdbrpc/simulator.h" #include "fdbserver/ConflictSet.h" #include "fdbserver/CoordinationInterface.h" #include "fdbserver/CoroFlow.h" #include "fdbserver/DataDistribution.actor.h" +#include "fdbserver/FDBExecHelper.actor.h" #include "fdbserver/IKeyValueStore.h" #include "fdbserver/MoveKeys.actor.h" #include "fdbserver/NetworkTest.h" +#include "fdbserver/RemoteIKeyValueStore.actor.h" #include "fdbserver/RestoreWorkerInterface.actor.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/SimulatedCluster.h" @@ -74,10 +78,13 @@ #include "flow/WriteOnlySet.h" #include "flow/UnitTest.h" #include "flow/FaultInjection.h" +#include "flow/flow.h" +#include "flow/network.h" #if defined(__linux__) || defined(__FreeBSD__) #include #include +#include #ifdef ALLOC_INSTRUMENTATION #include #endif @@ -100,7 +107,7 @@ enum { OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_BUILD_FLAGS, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_UNITTESTPARAM, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE, OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE, - OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_CONFIG_PATH, OPT_USE_TEST_CONFIG_DB, OPT_FAULT_INJECTION, OPT_PROFILER, OPT_PRINT_SIMTIME, + OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_CONFIG_PATH, OPT_USE_TEST_CONFIG_DB, OPT_FAULT_INJECTION, OPT_PROFILER, OPT_PRINT_SIMTIME, OPT_FLOW_PROCESS_NAME, OPT_FLOW_PROCESS_ENDPOINT }; CSimpleOpt::SOption g_rgOptions[] = { @@ -187,8 +194,10 @@ CSimpleOpt::SOption g_rgOptions[] = { { OPT_USE_TEST_CONFIG_DB, "--use-test-config-db", SO_NONE }, { OPT_FAULT_INJECTION, "-fi", SO_REQ_SEP }, { OPT_FAULT_INJECTION, "--fault-injection", SO_REQ_SEP }, - { OPT_PROFILER, "--profiler-", SO_REQ_SEP}, + { OPT_PROFILER, "--profiler-", SO_REQ_SEP }, { OPT_PRINT_SIMTIME, "--print-sim-time", SO_NONE }, + { OPT_FLOW_PROCESS_NAME, "--process-name", SO_REQ_SEP }, + { OPT_FLOW_PROCESS_ENDPOINT, "--process-endpoint", SO_REQ_SEP }, #ifndef TLS_DISABLED TLS_OPTION_FLAGS @@ -959,7 +968,8 @@ enum class ServerRole { SkipListTest, Test, VersionedMapTest, - UnitTests + UnitTests, + FlowProcess }; struct CLIOptions { std::string commandLine; @@ -1015,6 +1025,8 @@ struct CLIOptions { UnitTestParameters testParams; std::map profilerConfig; + std::string flowProcessName; + Endpoint flowProcessEndpoint; bool printSimTime = false; static CLIOptions parseArgs(int argc, char* argv[]) { @@ -1193,6 +1205,8 @@ private: role = ServerRole::ConsistencyCheck; else if (!strcmp(sRole, "unittests")) role = ServerRole::UnitTests; + else if (!strcmp(sRole, "flowprocess")) + role = ServerRole::FlowProcess; else { fprintf(stderr, "ERROR: Unknown role `%s'\n", sRole); printHelpTeaser(argv[0]); @@ -1517,6 +1531,42 @@ private: case OPT_USE_TEST_CONFIG_DB: configDBType = ConfigDBType::SIMPLE; break; + case OPT_FLOW_PROCESS_NAME: + flowProcessName = args.OptionArg(); + std::cout << flowProcessName << std::endl; + break; + case OPT_FLOW_PROCESS_ENDPOINT: { + std::vector strings; + std::cout << args.OptionArg() << std::endl; + boost::split(strings, args.OptionArg(), [](char c) { return c == ','; }); + for (auto& str : strings) { + std::cout << str << " "; + } + std::cout << "\n"; + if (strings.size() != 3) { + std::cerr << "Invalid argument, expected 3 elements in --process-endpoint got " << strings.size() + << std::endl; + flushAndExit(FDB_EXIT_ERROR); + } + try { + auto addr = NetworkAddress::parse(strings[0]); + uint64_t fst = std::stoul(strings[1]); + uint64_t snd = std::stoul(strings[2]); + UID token(fst, snd); + NetworkAddressList l; + l.address = addr; + flowProcessEndpoint = Endpoint(l, token); + std::cout << "flowProcessEndpoint: " << flowProcessEndpoint.getPrimaryAddress().toString() + << ", token: " << flowProcessEndpoint.token.toString() << "\n"; + } catch (Error& e) { + std::cerr << "Could not parse network address " << strings[0] << std::endl; + flushAndExit(FDB_EXIT_ERROR); + } catch (std::exception& e) { + std::cerr << "Could not parse token " << strings[1] << "," << strings[2] << std::endl; + flushAndExit(FDB_EXIT_ERROR); + } + break; + } case OPT_PRINT_SIMTIME: printSimTime = true; break; @@ -1723,6 +1773,7 @@ int main(int argc, char* argv[]) { role == ServerRole::Simulation ? IsSimulated::True : IsSimulated::False); IKnobCollection::getMutableGlobalKnobCollection().setKnob("log_directory", KnobValue::create(opts.logFolder)); + IKnobCollection::getMutableGlobalKnobCollection().setKnob("conn_file", KnobValue::create(opts.connFile)); if (role != ServerRole::Simulation) { IKnobCollection::getMutableGlobalKnobCollection().setKnob("commit_batches_mem_bytes_hard_limit", KnobValue::create(int64_t{ opts.memLimit })); @@ -1802,8 +1853,8 @@ int main(int argc, char* argv[]) { FlowTransport::createInstance(false, 1, WLTOKEN_RESERVED_COUNT); opts.buildNetwork(argv[0]); - const bool expectsPublicAddress = - (role == ServerRole::FDBD || role == ServerRole::NetworkTestServer || role == ServerRole::Restore); + const bool expectsPublicAddress = (role == ServerRole::FDBD || role == ServerRole::NetworkTestServer || + role == ServerRole::Restore || role == ServerRole::FlowProcess); if (opts.publicAddressStrs.empty()) { if (expectsPublicAddress) { fprintf(stderr, "ERROR: The -p or --public-address option is required\n"); @@ -2139,6 +2190,19 @@ int main(int argc, char* argv[]) { } f = result; + } else if (role == ServerRole::FlowProcess) { + TraceEvent(SevDebug, "StartingFlowProcess").detail("From", "fdbserver"); +#if defined(__linux__) || defined(__FreeBSD__) + prctl(PR_SET_PDEATHSIG, SIGTERM); + if (getppid() == 1) /* parent already died before prctl */ + flushAndExit(FDB_EXIT_SUCCESS); +#endif + + if (opts.flowProcessName == "KeyValueStoreProcess") { + ProcessFactory(opts.flowProcessName.c_str()); + } + f = stopAfter(runFlowProcess(opts.flowProcessName, opts.flowProcessEndpoint)); + g_network->run(); } else if (role == ServerRole::KVFileDump) { f = stopAfter(KVFileDump(opts.kvFile)); g_network->run(); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 4b8e483b05..f627fb82c1 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -8421,7 +8421,8 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData } if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed || - e.code() == error_code_file_not_found || e.code() == error_code_actor_cancelled) { + e.code() == error_code_file_not_found || e.code() == error_code_actor_cancelled || + e.code() == error_code_remote_kvs_cancelled) { TraceEvent("StorageServerTerminated", self.thisServerID).errorUnsuppressed(e); return true; } else diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 1596da1362..710ad420d7 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -1092,7 +1092,8 @@ std::map> testSpecGlobalKey [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedMaxTLogVersion", ""); } }, { "disableTss", [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableTSS", ""); } }, { "disableHostname", - [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableHostname", ""); } } + [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedDisableHostname", ""); } }, + { "disableRemoteKVS", [](const std::string& value) { TraceEvent("TestParserTest").detail("ParsedRemoteKVS", ""); } } }; std::map> testSpecTestKeys = { diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 0cc0faa57d..bd721b437b 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include #include #include @@ -49,6 +50,7 @@ #include "fdbserver/CoordinationInterface.h" #include "fdbserver/ConfigNode.h" #include "fdbserver/LocalConfiguration.h" +#include "fdbserver/RemoteIKeyValueStore.actor.h" #include "fdbclient/MonitorLeader.h" #include "fdbclient/ClientWorkerInterface.h" #include "flow/Profiler.h" @@ -208,31 +210,44 @@ ACTOR Future handleIOErrors(Future actor, IClosable* store, UID id, state Future> storeError = actor.isReady() ? Never() : errorOr(store->getError()); choose { when(state ErrorOr e = wait(errorOr(actor))) { + TraceEvent(SevDebug, "HandleIOErrorsActorIsReady") + .detail("Error", e.isError() ? e.getError().code() : -1) + .detail("UID", id); if (e.isError() && e.getError().code() == error_code_please_reboot) { // no need to wait. } else { + TraceEvent(SevDebug, "HandleIOErrorsActorBeforeOnClosed").detail("IsClosed", onClosed.isReady()); wait(onClosed); + TraceEvent(SevDebug, "HandleIOErrorsActorOnClosedFinished") + .detail("StoreError", + storeError.isReady() ? (storeError.get().isError() ? storeError.get().getError().code() : 0) + : -1); } if (e.isError() && e.getError().code() == error_code_broken_promise && !storeError.isReady()) { wait(delay(0.00001 + FLOW_KNOBS->MAX_BUGGIFIED_DELAY)); } - if (storeError.isReady()) - throw storeError.get().getError(); - if (e.isError()) + if (storeError.isReady() && + !((storeError.get().isError() && storeError.get().getError().code() == error_code_file_not_found))) { + throw storeError.get().isError() ? storeError.get().getError() : actor_cancelled(); + } + if (e.isError()) { throw e.getError(); - else + } else return e.get(); } when(ErrorOr e = wait(storeError)) { - TraceEvent("WorkerTerminatingByIOError", id).errorUnsuppressed(e.getError()); + // for remote kv store, worker can terminate without an error, so throws actor_cancelled + // (there's probably a better way tho) + TraceEvent("WorkerTerminatingByIOError", id) + .errorUnsuppressed(e.isError() ? e.getError() : actor_cancelled()); actor.cancel(); // file_not_found can occur due to attempting to open a partially deleted DiskQueue, which should not be // reported SevError. - if (e.getError().code() == error_code_file_not_found) { + if (e.isError() && e.getError().code() == error_code_file_not_found) { TEST(true); // Worker terminated with file_not_found error return Void(); } - throw e.getError(); + throw e.isError() ? e.getError() : actor_cancelled(); } } } @@ -243,6 +258,7 @@ ACTOR Future workerHandleErrors(FutureStream errors) { ErrorInfo err = _err; bool ok = err.error.code() == error_code_success || err.error.code() == error_code_please_reboot || err.error.code() == error_code_actor_cancelled || + err.error.code() == error_code_remote_kvs_cancelled || err.error.code() == error_code_coordinators_changed || // The worker server was cancelled err.error.code() == error_code_shutdown_in_progress; @@ -253,6 +269,7 @@ ACTOR Future workerHandleErrors(FutureStream errors) { endRole(err.role, err.id, "Error", ok, err.error); if (err.error.code() == error_code_please_reboot || + err.error.code() == error_code_please_reboot_remote_kv_store || (err.role == Role::SHARED_TRANSACTION_LOG && (err.error.code() == error_code_io_error || err.error.code() == error_code_io_timeout))) throw err.error; @@ -1090,9 +1107,13 @@ struct TrackRunningStorage { KeyValueStoreType storeType, std::set>* runningStorages) : self(self), storeType(storeType), runningStorages(runningStorages) { + TraceEvent(SevDebug, "TrackingRunningStorageConstruction").detail("StorageID", self); runningStorages->emplace(self, storeType); } - ~TrackRunningStorage() { runningStorages->erase(std::make_pair(self, storeType)); }; + ~TrackRunningStorage() { + runningStorages->erase(std::make_pair(self, storeType)); + TraceEvent(SevDebug, "TrackingRunningStorageDesctruction").detail("StorageID", self); + }; }; ACTOR Future storageServerRollbackRebooter(std::set>* runningStorages, @@ -1523,8 +1544,15 @@ ACTOR Future workerServer(Reference connRecord, if (s.storedComponent == DiskStore::Storage) { LocalLineage _; getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage; - IKeyValueStore* kv = - openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, false, validateDataFiles); + IKeyValueStore* kv = openKVStore( + s.storeType, + s.filename, + s.storeID, + memoryLimit, + false, + validateDataFiles, + SERVER_KNOBS->REMOTE_KV_STORE && /* testing mixed mode in simulation if remote kvs enabled*/ + (g_network->isSimulated() ? deterministicRandom()->coinflip() : true)); Future kvClosed = kv->onClosed(); filesClosed.add(kvClosed); @@ -1598,6 +1626,7 @@ ACTOR Future workerServer(Reference connRecord, logQueueBasename = fileLogQueuePrefix.toString() + optionsString.toString() + "-"; } ASSERT_WE_THINK(abspath(parentDirectory(s.filename)) == folder); + // TraceEvent(SevDebug, "openRemoteKVStore").detail("storeType", "TlogData"); IKeyValueStore* kv = openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles); const DiskQueueVersion dqv = s.tLogOptions.getDiskQueueVersion(); const int64_t diskQueueWarnSize = @@ -2002,6 +2031,7 @@ ACTOR Future workerServer(Reference connRecord, req.logVersion > TLogVersion::V2 ? fileVersionedLogDataPrefix : fileLogDataPrefix; std::string filename = filenameFromId(req.storeType, folder, prefix.toString() + tLogOptions.toPrefix(), logId); + // TraceEvent(SevDebug, "openRemoteKVStore").detail("storeType", "3"); IKeyValueStore* data = openKVStore(req.storeType, filename, logId, memoryLimit); const DiskQueueVersion dqv = tLogOptions.getDiskQueueVersion(); IDiskQueue* queue = openDiskQueue( @@ -2086,7 +2116,17 @@ ACTOR Future workerServer(Reference connRecord, folder, isTss ? testingStoragePrefix.toString() : fileStoragePrefix.toString(), recruited.id()); - IKeyValueStore* data = openKVStore(req.storeType, filename, recruited.id(), memoryLimit); + + IKeyValueStore* data = openKVStore( + req.storeType, + filename, + recruited.id(), + memoryLimit, + false, + false, + SERVER_KNOBS->REMOTE_KV_STORE && /* testing mixed mode in simulation if remote kvs enabled*/ + (g_network->isSimulated() ? deterministicRandom()->coinflip() : true)); + Future kvClosed = data->onClosed(); filesClosed.add(kvClosed); ReplyPromise storageReady = req.reply; @@ -2333,20 +2373,26 @@ ACTOR Future workerServer(Reference connRecord, when(wait(handleErrors)) {} } } catch (Error& err) { + TraceEvent(SevDebug, "WorkerServer").detail("Error", err.code()).backtrace(); // Make sure actors are cancelled before "recovery" promises are destructed. for (auto f : recoveries) f.cancel(); state Error e = err; bool ok = e.code() == error_code_please_reboot || e.code() == error_code_actor_cancelled || - e.code() == error_code_please_reboot_delete; + e.code() == error_code_please_reboot_delete || e.code() == error_code_please_reboot_remote_kv_store; endRole(Role::WORKER, interf.id(), "WorkerError", ok, e); errorForwarders.clear(false); sharedLogs.clear(); - if (e.code() != - error_code_actor_cancelled) { // We get cancelled e.g. when an entire simulation times out, but in that case - // we won't be restarted and don't need to wait for shutdown + if (e.code() != error_code_actor_cancelled && e.code() != error_code_please_reboot_remote_kv_store) { + // actor_cancelled: + // We get cancelled e.g. when an entire simulation times out, but in that case + // we won't be restarted and don't need to wait for shutdown + // reboot_remote_kv_store: + // The child process running the storage engine died abnormally, + // the current solution is to reboot the worker. + // Some refactoring work in the future can make it only reboot the storage server stopping.send(Void()); wait(filesClosed.getResult()); // Wait for complete shutdown of KV stores wait(delay(0.0)); // Unwind the callstack to make sure that IAsyncFile references are all gone diff --git a/fdbserver/workloads/SaveAndKill.actor.cpp b/fdbserver/workloads/SaveAndKill.actor.cpp index 316d9b13c9..3f3aa66f4c 100644 --- a/fdbserver/workloads/SaveAndKill.actor.cpp +++ b/fdbserver/workloads/SaveAndKill.actor.cpp @@ -22,6 +22,7 @@ #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbrpc/simulator.h" +#include "boost/algorithm/string/predicate.hpp" #undef state #include "fdbclient/SimpleIni.h" @@ -70,12 +71,14 @@ struct SaveAndKillWorkload : TestWorkload { std::map rebootingProcesses = g_simulator.currentlyRebootingProcesses; std::map allProcessesMap; for (const auto& [_, process] : rebootingProcesses) { - if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end()) { + if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end() && + std::string(process->name) != "remote flow process") { allProcessesMap[process->dataFolder] = process; } } for (const auto& process : processes) { - if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end()) { + if (allProcessesMap.find(process->dataFolder) == allProcessesMap.end() && + std::string(process->name) != "remote flow process") { allProcessesMap[process->dataFolder] = process; } } diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 1e65e828c3..47a09eb1a0 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -743,6 +743,13 @@ class Listener final : public IListener, ReferenceCounted { public: Listener(boost::asio::io_context& io_service, NetworkAddress listenAddress) : io_service(io_service), listenAddress(listenAddress), acceptor(io_service, tcpEndpoint(listenAddress)) { + // when port 0 is passed in, a random port will be opened + // set listenAddress as the address with the actual port opened instead of port 0 + if (listenAddress.port == 0) { + this->listenAddress = + NetworkAddress::parse(acceptor.local_endpoint().address().to_string().append(":").append( + std::to_string(acceptor.local_endpoint().port()))); + } platform::setCloseOnExec(acceptor.native_handle()); } diff --git a/flow/Platform.actor.cpp b/flow/Platform.actor.cpp index 20a13ac8c7..466549419f 100644 --- a/flow/Platform.actor.cpp +++ b/flow/Platform.actor.cpp @@ -3755,6 +3755,40 @@ void fdb_probe_actor_exit(const char* name, unsigned long id, int index) { } #endif +void throwExecPathError(Error e, char path[]) { + Severity sev = e.code() == error_code_io_error ? SevError : SevWarnAlways; + TraceEvent(sev, "GetPathError").error(e).detail("Path", path); + throw e; +} + +std::string getExecPath() { + char path[1024]; + uint32_t size = sizeof(path); +#if defined(__APPLE__) + if (_NSGetExecutablePath(path, &size) == 0) { + return std::string(path); + } else { + throwExecPathError(platform_error(), path); + } +#elif defined(__linux__) + ssize_t len = ::readlink("/proc/self/exe", path, size); + if (len != -1) { + path[len] = '\0'; + return std::string(path); + } else { + throwExecPathError(platform_error(), path); + } +#elif defined(_WIN32) + auto len = GetModuleFileName(nullptr, path, size); + if (len != 0) { + return std::string(path); + } else { + throwExecPathError(platform_error(), path); + } +#endif + return "unsupported OS"; +} + void setupRunLoopProfiler() { #ifdef __linux__ if (!profileThread && FLOW_KNOBS->RUN_LOOP_PROFILING_INTERVAL > 0) { diff --git a/flow/Platform.h b/flow/Platform.h index dae2a63a08..5ce6cd6640 100644 --- a/flow/Platform.h +++ b/flow/Platform.h @@ -703,6 +703,9 @@ void* loadFunction(void* lib, const char* func_name); std::string exePath(); +// get the absolute path +std::string getExecPath(); + #ifdef _WIN32 inline static int ctzll(uint64_t value) { unsigned long count = 0; diff --git a/flow/error_definitions.h b/flow/error_definitions.h index ecd7ab1d28..318aa1d7d2 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -87,6 +87,7 @@ ERROR( blob_granule_file_load_error, 1063, "Error loading a blob file during gra ERROR( blob_granule_transaction_too_old, 1064, "Read version is older than blob granule history supports" ) ERROR( blob_manager_replaced, 1065, "This blob manager has been replaced." ) ERROR( change_feed_popped, 1066, "Tried to read a version older than what has been popped from the change feed" ) +ERROR( remote_kvs_cancelled, 1067, "The remote key-value store is cancelled" ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" ) @@ -113,6 +114,7 @@ ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cance ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress" ) ERROR( invalid_cluster_id, 1217, "Attempted to join cluster with a different cluster ID" ) ERROR( restart_cluster_controller, 1218, "Restart cluster controller process" ) +ERROR( please_reboot_remote_kv_store, 1219, "Need to reboot the storage engine process as it died abnormally") // 15xx Platform errors ERROR( platform_error, 1500, "Platform error" ) diff --git a/flow/genericactors.actor.h b/flow/genericactors.actor.h index f30ef772e5..f5f2aedba1 100644 --- a/flow/genericactors.actor.h +++ b/flow/genericactors.actor.h @@ -80,7 +80,7 @@ Future> stopAfter(Future what) { ret = Optional(_); } catch (Error& e) { bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || - e.code() == error_code_actor_cancelled; + e.code() == error_code_actor_cancelled || e.code() == error_code_please_reboot_remote_kv_store; TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e); if (!ok) { fprintf(stderr, "Fatal Error: %s\n", e.what()); diff --git a/flow/network.h b/flow/network.h index 967a145b7e..5617f96501 100644 --- a/flow/network.h +++ b/flow/network.h @@ -507,6 +507,10 @@ public: virtual NetworkAddress getPeerAddress() const = 0; virtual UID getDebugID() const = 0; + + // At present, implemented by Sim2Conn where we want to disable bits flip for connections between parent process and + // child process, also reduce latency for this kind of connection + virtual bool isStableConnection() const { throw unsupported_operation(); } }; class IListener { diff --git a/tests/fast/PhysicalShardMove.toml b/tests/fast/PhysicalShardMove.toml index 72d1f0331c..6377f8d6a2 100644 --- a/tests/fast/PhysicalShardMove.toml +++ b/tests/fast/PhysicalShardMove.toml @@ -4,6 +4,7 @@ storageEngineType = 4 processesPerMachine = 1 coordinators = 3 machineCount = 15 +disableRemoteKVS = true [[test]] testTitle = 'PhysicalShardMove' diff --git a/tests/slow/DiskFailureCycle.toml b/tests/slow/DiskFailureCycle.toml index b61bdebc61..3f09821bf4 100644 --- a/tests/slow/DiskFailureCycle.toml +++ b/tests/slow/DiskFailureCycle.toml @@ -4,6 +4,7 @@ minimumReplication = 3 minimumRegions = 3 logAntiQuorum = 0 storageEngineExcludeTypes = [4] +disableRemoteKVS = true [[test]] testTitle = 'DiskFailureCycle'