diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 89754cab18..e90c3eda29 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -779,7 +779,6 @@ const KeyRef exeRestore = LiteralStringRef("fdbrestore"); const KeyRef exeDatabaseAgent = LiteralStringRef("dr_agent"); const KeyRef exeDatabaseBackup = LiteralStringRef("fdbdr"); -extern void flushTraceFileVoid(); extern const char* getHGVersion(); #ifdef _WIN32 diff --git a/fdbclient/ClientWorkerInterface.h b/fdbclient/ClientWorkerInterface.h index 4e3efde268..d5f83b86ec 100644 --- a/fdbclient/ClientWorkerInterface.h +++ b/fdbclient/ClientWorkerInterface.h @@ -49,12 +49,14 @@ struct RebootRequest { constexpr static FileIdentifier file_identifier = 11913957; bool deleteData; bool checkData; + uint32_t waitForDuration; - explicit RebootRequest(bool deleteData = false, bool checkData = false) : deleteData(deleteData), checkData(checkData) {} + explicit RebootRequest(bool deleteData = false, bool checkData = false, uint32_t waitForDuration = 0) + : deleteData(deleteData), checkData(checkData), waitForDuration(waitForDuration) {} template void serialize(Ar& ar) { - serializer(ar, deleteData, checkData); + serializer(ar, deleteData, checkData, waitForDuration); } }; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index bf264c030c..bf61207a16 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -107,6 +107,7 @@ set(FDBSERVER_SRCS workloads/ChangeConfig.actor.cpp workloads/ClientTransactionProfileCorrectness.actor.cpp workloads/TriggerRecovery.actor.cpp + workloads/SuspendProcesses.actor.cpp workloads/CommitBugCheck.actor.cpp workloads/ConfigureDatabase.actor.cpp workloads/ConflictRange.actor.cpp diff --git a/fdbserver/fdbserver.vcxproj b/fdbserver/fdbserver.vcxproj index da4a62d62f..44e752cdb3 100644 --- a/fdbserver/fdbserver.vcxproj +++ b/fdbserver/fdbserver.vcxproj @@ -107,6 +107,7 @@ + diff --git a/fdbserver/fdbserver.vcxproj.filters b/fdbserver/fdbserver.vcxproj.filters index 78e9d35e16..23b68d2c38 100644 --- a/fdbserver/fdbserver.vcxproj.filters +++ b/fdbserver/fdbserver.vcxproj.filters @@ -234,6 +234,9 @@ workloads + + workloads + workloads diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index aa2b37b404..97428b9802 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -866,6 +866,13 @@ ACTOR Future workerServer( when( RebootRequest req = waitNext( interf.clientInterface.reboot.getFuture() ) ) { state RebootRequest rebootReq = req; + if(req.waitForDuration) { + TraceEvent("RebootRequestSuspendingProcess").detail("Duration", req.waitForDuration); + flushTraceFileVoid(); + setProfilingEnabled(0); + g_network->stop(); + threadSleep(req.waitForDuration); + } if(rebootReq.checkData) { Reference checkFile = wait( IAsyncFileSystem::filesystem()->open( joinPath(folder, validationFilename), IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE, 0600 ) ); wait( checkFile->sync() ); diff --git a/fdbserver/workloads/SuspendProcesses.actor.cpp b/fdbserver/workloads/SuspendProcesses.actor.cpp new file mode 100644 index 0000000000..4581f02fa1 --- /dev/null +++ b/fdbserver/workloads/SuspendProcesses.actor.cpp @@ -0,0 +1,68 @@ +#include + +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/ServerDBInfo.h" +#include "fdbclient/Status.h" +#include "fdbclient/StatusClient.h" +#include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/RunTransaction.actor.h" +#include "flow/actorcompiler.h" // has to be last include + +struct SuspendProcessesWorkload : TestWorkload { + std::vector prefixSuspendProcesses; + double suspendTimeDuration; + double waitTimeDuration; + + SuspendProcessesWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + prefixSuspendProcesses = getOption(options, LiteralStringRef("prefixesSuspendProcesses"), std::vector()); + waitTimeDuration = getOption(options, LiteralStringRef("waitTimeDuration"), 0); + suspendTimeDuration = getOption(options, LiteralStringRef("suspendTimeDuration"), 0); + } + + virtual std::string description() { return "SuspendProcesses"; } + + virtual Future setup(Database const& cx) { return Void(); } + + ACTOR Future _start(Database cx, SuspendProcessesWorkload* self) { + wait(delay(self->waitTimeDuration)); + state ReadYourWritesTransaction tr(cx); + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + Standalone kvs = wait(tr.getRange( + KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1)); + std::vector> suspendProcessInterfaces; + for (auto it : kvs) { + auto ip_port = it.key.endsWith(LiteralStringRef(":tls")) + ? it.key.removeSuffix(LiteralStringRef(":tls")) + : it.key; + for (auto& killProcess : self->prefixSuspendProcesses) { + if (boost::starts_with(ip_port.toString().c_str(), killProcess.c_str())) { + suspendProcessInterfaces.push_back(it.value); + TraceEvent("SuspendProcessSelectedProcess").detail("IpPort", printable(ip_port)); + } + } + } + for (auto& interf : suspendProcessInterfaces) { + BinaryReader::fromStringRef(interf, IncludeVersion()) + .reboot.send(RebootRequest(false, false, self->suspendTimeDuration)); + } + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + + virtual Future start(Database const& cx) { + if (clientId != 0) return Void(); + return _start(cx, this); + } + + virtual Future check(Database const& cx) { return true; } + + virtual void getMetrics(vector& m) {} +}; + +WorkloadFactory SuspendProcessesWorkloadFactory("SuspendProcesses"); diff --git a/flow/Error.cpp b/flow/Error.cpp index e2d096b8dd..3edb81adf9 100644 --- a/flow/Error.cpp +++ b/flow/Error.cpp @@ -35,8 +35,6 @@ std::map& Error::errorCounts() { #include -extern void flushTraceFileVoid(); - Error Error::fromUnvalidatedCode(int code) { if (code < 0 || code > 30000) { Error e = Error::fromCode(error_code_unknown_error); diff --git a/flow/Trace.h b/flow/Trace.h index 6ab4a857c6..6ba7e0e7db 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -537,6 +537,7 @@ void openTraceFile(const NetworkAddress& na, uint64_t rollsize, uint64_t maxLogs void initTraceEventMetrics(); void closeTraceFile(); bool traceFileIsOpen(); +void flushTraceFileVoid(); // Changes the format of trace files. Returns false if the format is unrecognized. No longer safe to call after a call // to openTraceFile.