diff --git a/bindings/python/tests/fdbcli_tests.py b/bindings/python/tests/fdbcli_tests.py index 25511b7b33..11f7c191eb 100755 --- a/bindings/python/tests/fdbcli_tests.py +++ b/bindings/python/tests/fdbcli_tests.py @@ -9,6 +9,7 @@ import json import time import random + def enable_logging(level=logging.ERROR): """Enable logging in the function with the specified logging level @@ -17,7 +18,7 @@ def enable_logging(level=logging.ERROR): """ def func_decorator(func): @functools.wraps(func) - def wrapper(*args,**kwargs): + def wrapper(*args, **kwargs): # initialize logger logger = logging.getLogger(func.__name__) logger.setLevel(level) @@ -33,6 +34,7 @@ def enable_logging(level=logging.ERROR): return wrapper return func_decorator + def run_fdbcli_command(*args): """run the fdbcli statement: fdbcli --exec ' ... '. @@ -42,6 +44,7 @@ def run_fdbcli_command(*args): commands = command_template + ["{}".format(' '.join(args))] return subprocess.run(commands, stdout=subprocess.PIPE).stdout.decode('utf-8').strip() + def run_fdbcli_command_and_get_error(*args): """run the fdbcli statement: fdbcli --exec ' ... '. @@ -51,6 +54,7 @@ def run_fdbcli_command_and_get_error(*args): commands = command_template + ["{}".format(' '.join(args))] return subprocess.run(commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stderr.decode('utf-8').strip() + @enable_logging() def advanceversion(logger): # get current read version @@ -73,6 +77,7 @@ def advanceversion(logger): logger.debug("Read version: {}".format(version4)) assert version4 >= version3 + @enable_logging() def maintenance(logger): # expected fdbcli output when running 'maintenance' while there's no ongoing maintenance @@ -95,6 +100,7 @@ def maintenance(logger): output3 = run_fdbcli_command('maintenance') assert output3 == no_maintenance_output + @enable_logging() def setclass(logger): output1 = run_fdbcli_command('setclass') @@ -109,11 +115,11 @@ def setclass(logger): # check class source assert 'command_line' in class_type_line_1 # set class to a random valid type - class_types = ['storage', 'storage', 'transaction', 'resolution', - 'commit_proxy', 'grv_proxy', 'master', 'stateless', 'log', - 'router', 'cluster_controller', 'fast_restore', 'data_distributor', - 'coordinator', 'ratekeeper', 'storage_cache', 'backup' - ] + class_types = ['storage', 'storage', 'transaction', 'resolution', + 'commit_proxy', 'grv_proxy', 'master', 'stateless', 'log', + 'router', 'cluster_controller', 'fast_restore', 'data_distributor', + 'coordinator', 'ratekeeper', 'storage_cache', 'backup' + ] random_class_type = random.choice(class_types) logger.debug("Change to type: {}".format(random_class_type)) run_fdbcli_command('setclass', network_address, random_class_type) @@ -135,6 +141,7 @@ def setclass(logger): logger.debug(class_type_line_3) assert class_type_line_3 == class_type_line_1 + @enable_logging() def lockAndUnlock(logger): # lock an unlocked database, should be successful @@ -149,7 +156,7 @@ def lockAndUnlock(logger): output2 = run_fdbcli_command_and_get_error("lock") assert output2 == 'ERROR: Database is locked (1038)' # unlock the database - process = subprocess.Popen(command_template + ['unlock ' + lock_uid], stdin = subprocess.PIPE, stdout = subprocess.PIPE) + process = subprocess.Popen(command_template + ['unlock ' + lock_uid], stdin=subprocess.PIPE, stdout=subprocess.PIPE) line1 = process.stdout.readline() # The randome passphrease we need to confirm to proceed the unlocking line2 = process.stdout.readline() @@ -160,6 +167,7 @@ def lockAndUnlock(logger): assert output3.decode('utf-8').strip() == 'Database unlocked.' assert not get_value_from_status_json(True, 'cluster', 'database_lock_state', 'locked') + @enable_logging() def kill(logger): output1 = run_fdbcli_command('kill') @@ -169,11 +177,11 @@ def kill(logger): address = lines[1] logger.debug("Address: {}".format(address)) old_generation = get_value_from_status_json(False, 'cluster', 'generation') - # This is currently an issue with fdbcli, - # where you need to first run 'kill' to initialize processes' list + # This is currently an issue with fdbcli, + # where you need to first run 'kill' to initialize processes' list # and then specify the certain process to kill - process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE) - # + process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + # output2, err = process.communicate(input='kill; kill {}\n'.format(address).encode()) logger.debug(output2) # wait for a second for the cluster recovery @@ -182,6 +190,7 @@ def kill(logger): logger.debug("Old: {}, New: {}".format(old_generation, new_generation)) assert new_generation > old_generation + @enable_logging() def suspend(logger): output1 = run_fdbcli_command('suspend') @@ -201,7 +210,7 @@ def suspend(logger): assert len(pinfo) == 1 pid = pinfo[0].split(' ')[0] logger.debug("Pid: {}".format(pid)) - process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE) + process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE) # suspend the process for enough long time output2, err = process.communicate(input='suspend; suspend 3600 {}\n'.format(address).encode()) # the cluster should be unavailable after the only process being suspended @@ -214,7 +223,7 @@ def suspend(logger): kill_output = subprocess.check_output(['kill', pid]).decode().strip() logger.debug("Kill result: {}".format(kill_output)) # The process should come back after a few time - duration = 0 # seconds we already wait + duration = 0 # seconds we already wait while not get_value_from_status_json(False, 'client', 'database_status', 'available') and duration < 60: logger.debug("Sleep for 1 second to wait cluster recovery") time.sleep(1) @@ -222,6 +231,7 @@ def suspend(logger): # at most after 60 seconds, the cluster should be available assert get_value_from_status_json(False, 'client', 'database_status', 'available') + def get_value_from_status_json(retry, *args): while True: result = json.loads(run_fdbcli_command('status', 'json')) @@ -230,9 +240,10 @@ def get_value_from_status_json(retry, *args): for arg in args: assert arg in result result = result[arg] - + return result + @enable_logging() def consistencycheck(logger): consistency_check_on_output = 'ConsistencyCheck is on' @@ -246,6 +257,7 @@ def consistencycheck(logger): output3 = run_fdbcli_command('consistencycheck') assert output3 == consistency_check_on_output + @enable_logging() def cache_range(logger): # this command is currently experimental @@ -253,6 +265,7 @@ def cache_range(logger): run_fdbcli_command('cache_range', 'set', 'a', 'b') run_fdbcli_command('cache_range', 'clear', 'a', 'b') + @enable_logging() def datadistribution(logger): output1 = run_fdbcli_command('datadistribution', 'off') @@ -272,6 +285,7 @@ def datadistribution(logger): assert output6 == 'Data distribution is enabled for rebalance.' time.sleep(1) + @enable_logging() def transaction(logger): """This test will cover the transaction related fdbcli commands. @@ -281,7 +295,7 @@ def transaction(logger): """ err1 = run_fdbcli_command_and_get_error('set', 'key', 'value') assert err1 == 'ERROR: writemode must be enabled to set or clear keys in the database.' - process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE) + process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE) transaction_flow = ['writemode on', 'begin', 'getversion', 'set key value', 'get key', 'commit'] output1, _ = process.communicate(input='\n'.join(transaction_flow).encode()) # split the output into lines @@ -300,13 +314,13 @@ def transaction(logger): output2 = run_fdbcli_command('get', 'key') assert output2 == "`key' is `value'" # test rollback and read-your-write behavior - process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE) + process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE) transaction_flow = [ - 'writemode on', 'begin', 'getrange a z', + 'writemode on', 'begin', 'getrange a z', 'clear key', 'get key', # 'option on READ_YOUR_WRITES_DISABLE', 'get key', 'rollback' - ] + ] output3, _ = process.communicate(input='\n'.join(transaction_flow).encode()) lines = list(filter(len, output3.decode().split('\n')))[-5:] # lines[0] == "Transaction started" and lines[1] == 'Range limited to 25 keys' @@ -317,13 +331,13 @@ def transaction(logger): output4 = run_fdbcli_command('get', 'key') assert output4 == "`key' is `value'" # test read_your_write_disable option and clear the inserted key - process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE) + process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE) transaction_flow = [ 'writemode on', 'begin', 'option on READ_YOUR_WRITES_DISABLE', 'clear key', 'get key', 'commit' - ] + ] output6, _ = process.communicate(input='\n'.join(transaction_flow).encode()) lines = list(filter(len, output6.decode().split('\n')))[-4:] assert lines[1] == 'Option enabled for current transaction' @@ -333,6 +347,7 @@ def transaction(logger): output7 = run_fdbcli_command('get', 'key') assert output7 == "`key': not found" + def get_fdb_process_addresses(logger): # get all processes' network addresses output = run_fdbcli_command('kill') @@ -342,6 +357,7 @@ def get_fdb_process_addresses(logger): assert len(addresses) == process_number return addresses + @enable_logging(logging.DEBUG) def coordinators(logger): # we should only have one coordinator for now @@ -369,6 +385,7 @@ def coordinators(logger): assert len(get_value_from_status_json(True, 'client', 'coordinators', 'coordinators')) == 1 wait_for_database_available(logger) + @enable_logging(logging.DEBUG) def exclude(logger): # get all processes' network addresses @@ -381,7 +398,7 @@ def exclude(logger): # randomly pick one and exclude the process excluded_address = random.choice(addresses) # If we see "not enough space" error, use FORCE option to proceed - # this should be a safe operation as we do not need any storage space for the test + # this should be a safe operation as we do not need any storage space for the test force = False # sometimes we need to retry the exclude while True: @@ -418,6 +435,8 @@ def exclude(logger): wait_for_database_available(logger) # read the system key 'k', need to enable the option first + + def read_system_key(k): output = run_fdbcli_command('option', 'on', 'READ_SYSTEM_KEYS;', 'get', k) if 'is' not in output: @@ -426,6 +445,7 @@ def read_system_key(k): _, value = output.split(' is ') return value + @enable_logging() def throttle(logger): # no throttled tags at the beginning @@ -443,6 +463,7 @@ def throttle(logger): assert enable_flag == "`0'" # TODO : test manual throttling, not easy to do now + def wait_for_database_available(logger): # sometimes the change takes some time to have effect and the database can be unavailable at that time # this is to wait until the database is available again @@ -450,9 +471,11 @@ def wait_for_database_available(logger): logger.debug("Database unavailable for now, wait for one second") time.sleep(1) + if __name__ == '__main__': # fdbcli_tests.py [external_client_library_path] - assert len(sys.argv) == 4 or len(sys.argv) == 5, "Please pass arguments: [external_client_library_path]" + assert len(sys.argv) == 4 or len( + sys.argv) == 5, "Please pass arguments: [external_client_library_path]" # set external client library if len(sys.argv) == 5: external_client_library_path = sys.argv[4] @@ -466,7 +489,7 @@ if __name__ == '__main__': process_number = int(sys.argv[3]) if process_number == 1: # TODO: disable for now, the change can cause the database unavailable - #advanceversion() + # advanceversion() cache_range() consistencycheck() datadistribution() @@ -481,5 +504,3 @@ if __name__ == '__main__': assert process_number > 1, "Process number should be positive" coordinators() exclude() - -