diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 6468e19d7d..70e199289b 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -124,6 +124,8 @@ public: sav->sendError(exc); } + void send(Never) { sendError(never_reply()); } + Future<T> getFuture() const { sav->addFutureRef(); return Future<T>(sav); diff --git a/fdbrpc/networksender.actor.h b/fdbrpc/networksender.actor.h index 986e893665..3c438bfa85 100644 --- a/fdbrpc/networksender.actor.h +++ b/fdbrpc/networksender.actor.h @@ -30,6 +30,7 @@ #include "flow/flow.h" #include "flow/actorcompiler.h" // This must be the last #include. +// This actor is used by FlowTransport to serialize the response to a ReplyPromise across the network ACTOR template <class T> void networkSender(Future<T> input, Endpoint endpoint) { try { @@ -37,6 +38,9 @@ void networkSender(Future<T> input, Endpoint endpoint) { FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint, false); } catch (Error& err) { // if (err.code() == error_code_broken_promise) return; + if (err.code() == error_code_never_reply) { + return; + } ASSERT(err.code() != error_code_actor_cancelled); FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint, false); } diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 1aabe0bf2b..5406911205 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -89,6 +89,7 @@ ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" ) ERROR( future_released, 1102, "Future has been released" ) ERROR( connection_leaked, 1103, "Connection object leaked" ) +ERROR( never_reply, 1104, "Never reply to the request" ) ERROR( recruitment_failed, 1200, "Recruitment of a server failed" ) // Be careful, catching this will delete the data of a storage server or tlog permanently ERROR( move_to_removed_server, 1201, "Attempt to move keys to a storage server that was removed" ) diff --git a/tests/TestRunner/tmp_cluster.py b/tests/TestRunner/tmp_cluster.py index 5c84f6086f..6f8f94f532 100755 --- a/tests/TestRunner/tmp_cluster.py +++ b/tests/TestRunner/tmp_cluster.py @@ -1,30 +1,34 @@ #!/usr/bin/env python3 +import glob import os import shutil import subprocess import sys -import socket from local_cluster import LocalCluster from argparse import ArgumentParser, RawDescriptionHelpFormatter from random import choice from pathlib import Path + class TempCluster: def __init__(self, build_dir: str, process_number: int = 1, port: str = None): self.build_dir = Path(build_dir).resolve() assert self.build_dir.exists(), "{} does not exist".format(build_dir) assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir) tmp_dir = self.build_dir.joinpath( - 'tmp', - ''.join(choice(LocalCluster.valid_letters_for_secret) for i in range(16))) + "tmp", + "".join(choice(LocalCluster.valid_letters_for_secret) for i in range(16)), + ) tmp_dir.mkdir(parents=True) - self.cluster = LocalCluster(tmp_dir, - self.build_dir.joinpath('bin', 'fdbserver'), - self.build_dir.joinpath('bin', 'fdbmonitor'), - self.build_dir.joinpath('bin', 'fdbcli'), - process_number, - port = port) + self.cluster = LocalCluster( + tmp_dir, + self.build_dir.joinpath("bin", "fdbserver"), + self.build_dir.joinpath("bin", "fdbmonitor"), + self.build_dir.joinpath("bin", "fdbcli"), + process_number, + port=port, + ) self.log = self.cluster.log self.etc = self.cluster.etc self.data = self.cluster.data @@ -40,13 +44,14 @@ class TempCluster: shutil.rmtree(self.tmp_dir) def close(self): - self.cluster.__exit__(None,None,None) + self.cluster.__exit__(None, None, None) shutil.rmtree(self.tmp_dir) -if __name__ == '__main__': - parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter, - description=""" +if __name__ == "__main__": + parser = ArgumentParser( + formatter_class=RawDescriptionHelpFormatter, + description=""" This script automatically configures a temporary local cluster on the machine and then calls a command while this cluster is running. As soon as the command returns, the configured cluster is killed and all generated data is deleted. @@ -61,30 +66,72 @@ if __name__ == '__main__': - All occurrences of @ETC_DIR@ will be replaced with the path to the configuration directory. The environment variable FDB_CLUSTER_FILE is set to the generated cluster for the command if it is not set already. - """) - parser.add_argument('--build-dir', '-b', metavar='BUILD_DIRECTORY', help='FDB build directory', required=True) - parser.add_argument('cmd', metavar="COMMAND", nargs="+", help="The command to run") - parser.add_argument('--process-number', '-p', help="Number of fdb processes running", type=int, default=1) + """, + ) + parser.add_argument( + "--build-dir", + "-b", + metavar="BUILD_DIRECTORY", + help="FDB build directory", + required=True, + ) + parser.add_argument("cmd", metavar="COMMAND", nargs="+", help="The command to run") + parser.add_argument( + "--process-number", + "-p", + help="Number of fdb processes running", + type=int, + default=1, + ) args = parser.parse_args() errcode = 1 with TempCluster(args.build_dir, args.process_number) as cluster: print("log-dir: {}".format(cluster.log)) print("etc-dir: {}".format(cluster.etc)) print("data-dir: {}".format(cluster.data)) - print("cluster-file: {}".format(cluster.etc.joinpath('fdb.cluster'))) + print("cluster-file: {}".format(cluster.etc.joinpath("fdb.cluster"))) cmd_args = [] for cmd in args.cmd: - if cmd == '@CLUSTER_FILE@': - cmd_args.append(str(cluster.etc.joinpath('fdb.cluster'))) - elif cmd == '@DATA_DIR@': + if cmd == "@CLUSTER_FILE@": + cmd_args.append(str(cluster.etc.joinpath("fdb.cluster"))) + elif cmd == "@DATA_DIR@": cmd_args.append(str(cluster.data)) - elif cmd == '@LOG_DIR@': + elif cmd == "@LOG_DIR@": cmd_args.append(str(cluster.log)) - elif cmd == '@ETC_DIR@': + elif cmd == "@ETC_DIR@": cmd_args.append(str(cluster.etc)) else: cmd_args.append(cmd) env = dict(**os.environ) - env['FDB_CLUSTER_FILE'] = env.get('FDB_CLUSTER_FILE', cluster.etc.joinpath('fdb.cluster')) - errcode = subprocess.run(cmd_args, stdout=sys.stdout, stderr=sys.stderr, env=env).returncode + env["FDB_CLUSTER_FILE"] = env.get( + "FDB_CLUSTER_FILE", cluster.etc.joinpath("fdb.cluster") + ) + errcode = subprocess.run( + cmd_args, stdout=sys.stdout, stderr=sys.stderr, env=env + ).returncode + + sev40s = ( + subprocess.getoutput( + "grep -r 'Severity=\"40\"' {}".format(cluster.log.as_posix()) + ) + .rstrip() + .splitlines() + ) + + for line in sev40s: + # When running ASAN we expect to see this message. Boost coroutine should be using the correct asan annotations so that it shouldn't produce any false positives. + if line.endswith( + "WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false positives in some cases!" + ): + continue + print(">>>>>>>>>>>>>>>>>>>> Found severity 40 events - the test fails") + errcode = 1 + break + + if errcode: + for log_file in glob.glob(os.path.join(cluster.log, "*")): + print(">>>>>>>>>>>>>>>>>>>> Contents of {}:".format(log_file)) + with open(log_file, "r") as f: + print(f.read()) + sys.exit(errcode)