mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 01:42:37 +08:00
Format fdbcli_tests.py file
This commit is contained in:
parent
cc18cc742c
commit
775ac3e27c
@ -9,6 +9,7 @@ import json
|
||||
import time
|
||||
import random
|
||||
|
||||
|
||||
def enable_logging(level=logging.ERROR):
|
||||
"""Enable logging in the function with the specified logging level
|
||||
|
||||
@ -17,7 +18,7 @@ def enable_logging(level=logging.ERROR):
|
||||
"""
|
||||
def func_decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args,**kwargs):
|
||||
def wrapper(*args, **kwargs):
|
||||
# initialize logger
|
||||
logger = logging.getLogger(func.__name__)
|
||||
logger.setLevel(level)
|
||||
@ -33,6 +34,7 @@ def enable_logging(level=logging.ERROR):
|
||||
return wrapper
|
||||
return func_decorator
|
||||
|
||||
|
||||
def run_fdbcli_command(*args):
|
||||
"""run the fdbcli statement: fdbcli --exec '<arg1> <arg2> ... <argN>'.
|
||||
|
||||
@ -42,6 +44,7 @@ def run_fdbcli_command(*args):
|
||||
commands = command_template + ["{}".format(' '.join(args))]
|
||||
return subprocess.run(commands, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()
|
||||
|
||||
|
||||
def run_fdbcli_command_and_get_error(*args):
|
||||
"""run the fdbcli statement: fdbcli --exec '<arg1> <arg2> ... <argN>'.
|
||||
|
||||
@ -51,6 +54,7 @@ def run_fdbcli_command_and_get_error(*args):
|
||||
commands = command_template + ["{}".format(' '.join(args))]
|
||||
return subprocess.run(commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stderr.decode('utf-8').strip()
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def advanceversion(logger):
|
||||
# get current read version
|
||||
@ -73,6 +77,7 @@ def advanceversion(logger):
|
||||
logger.debug("Read version: {}".format(version4))
|
||||
assert version4 >= version3
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def maintenance(logger):
|
||||
# expected fdbcli output when running 'maintenance' while there's no ongoing maintenance
|
||||
@ -95,6 +100,7 @@ def maintenance(logger):
|
||||
output3 = run_fdbcli_command('maintenance')
|
||||
assert output3 == no_maintenance_output
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def setclass(logger):
|
||||
output1 = run_fdbcli_command('setclass')
|
||||
@ -109,11 +115,11 @@ def setclass(logger):
|
||||
# check class source
|
||||
assert 'command_line' in class_type_line_1
|
||||
# set class to a random valid type
|
||||
class_types = ['storage', 'storage', 'transaction', 'resolution',
|
||||
'commit_proxy', 'grv_proxy', 'master', 'stateless', 'log',
|
||||
'router', 'cluster_controller', 'fast_restore', 'data_distributor',
|
||||
'coordinator', 'ratekeeper', 'storage_cache', 'backup'
|
||||
]
|
||||
class_types = ['storage', 'storage', 'transaction', 'resolution',
|
||||
'commit_proxy', 'grv_proxy', 'master', 'stateless', 'log',
|
||||
'router', 'cluster_controller', 'fast_restore', 'data_distributor',
|
||||
'coordinator', 'ratekeeper', 'storage_cache', 'backup'
|
||||
]
|
||||
random_class_type = random.choice(class_types)
|
||||
logger.debug("Change to type: {}".format(random_class_type))
|
||||
run_fdbcli_command('setclass', network_address, random_class_type)
|
||||
@ -135,6 +141,7 @@ def setclass(logger):
|
||||
logger.debug(class_type_line_3)
|
||||
assert class_type_line_3 == class_type_line_1
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def lockAndUnlock(logger):
|
||||
# lock an unlocked database, should be successful
|
||||
@ -149,7 +156,7 @@ def lockAndUnlock(logger):
|
||||
output2 = run_fdbcli_command_and_get_error("lock")
|
||||
assert output2 == 'ERROR: Database is locked (1038)'
|
||||
# unlock the database
|
||||
process = subprocess.Popen(command_template + ['unlock ' + lock_uid], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
process = subprocess.Popen(command_template + ['unlock ' + lock_uid], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
line1 = process.stdout.readline()
|
||||
# The randome passphrease we need to confirm to proceed the unlocking
|
||||
line2 = process.stdout.readline()
|
||||
@ -160,6 +167,7 @@ def lockAndUnlock(logger):
|
||||
assert output3.decode('utf-8').strip() == 'Database unlocked.'
|
||||
assert not get_value_from_status_json(True, 'cluster', 'database_lock_state', 'locked')
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def kill(logger):
|
||||
output1 = run_fdbcli_command('kill')
|
||||
@ -169,11 +177,11 @@ def kill(logger):
|
||||
address = lines[1]
|
||||
logger.debug("Address: {}".format(address))
|
||||
old_generation = get_value_from_status_json(False, 'cluster', 'generation')
|
||||
# This is currently an issue with fdbcli,
|
||||
# where you need to first run 'kill' to initialize processes' list
|
||||
# This is currently an issue with fdbcli,
|
||||
# where you need to first run 'kill' to initialize processes' list
|
||||
# and then specify the certain process to kill
|
||||
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
#
|
||||
process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
#
|
||||
output2, err = process.communicate(input='kill; kill {}\n'.format(address).encode())
|
||||
logger.debug(output2)
|
||||
# wait for a second for the cluster recovery
|
||||
@ -182,6 +190,7 @@ def kill(logger):
|
||||
logger.debug("Old: {}, New: {}".format(old_generation, new_generation))
|
||||
assert new_generation > old_generation
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def suspend(logger):
|
||||
output1 = run_fdbcli_command('suspend')
|
||||
@ -201,7 +210,7 @@ def suspend(logger):
|
||||
assert len(pinfo) == 1
|
||||
pid = pinfo[0].split(' ')[0]
|
||||
logger.debug("Pid: {}".format(pid))
|
||||
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
# suspend the process for enough long time
|
||||
output2, err = process.communicate(input='suspend; suspend 3600 {}\n'.format(address).encode())
|
||||
# the cluster should be unavailable after the only process being suspended
|
||||
@ -214,7 +223,7 @@ def suspend(logger):
|
||||
kill_output = subprocess.check_output(['kill', pid]).decode().strip()
|
||||
logger.debug("Kill result: {}".format(kill_output))
|
||||
# The process should come back after a few time
|
||||
duration = 0 # seconds we already wait
|
||||
duration = 0 # seconds we already wait
|
||||
while not get_value_from_status_json(False, 'client', 'database_status', 'available') and duration < 60:
|
||||
logger.debug("Sleep for 1 second to wait cluster recovery")
|
||||
time.sleep(1)
|
||||
@ -222,6 +231,7 @@ def suspend(logger):
|
||||
# at most after 60 seconds, the cluster should be available
|
||||
assert get_value_from_status_json(False, 'client', 'database_status', 'available')
|
||||
|
||||
|
||||
def get_value_from_status_json(retry, *args):
|
||||
while True:
|
||||
result = json.loads(run_fdbcli_command('status', 'json'))
|
||||
@ -230,9 +240,10 @@ def get_value_from_status_json(retry, *args):
|
||||
for arg in args:
|
||||
assert arg in result
|
||||
result = result[arg]
|
||||
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def consistencycheck(logger):
|
||||
consistency_check_on_output = 'ConsistencyCheck is on'
|
||||
@ -246,6 +257,7 @@ def consistencycheck(logger):
|
||||
output3 = run_fdbcli_command('consistencycheck')
|
||||
assert output3 == consistency_check_on_output
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def cache_range(logger):
|
||||
# this command is currently experimental
|
||||
@ -253,6 +265,7 @@ def cache_range(logger):
|
||||
run_fdbcli_command('cache_range', 'set', 'a', 'b')
|
||||
run_fdbcli_command('cache_range', 'clear', 'a', 'b')
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def datadistribution(logger):
|
||||
output1 = run_fdbcli_command('datadistribution', 'off')
|
||||
@ -272,6 +285,7 @@ def datadistribution(logger):
|
||||
assert output6 == 'Data distribution is enabled for rebalance.'
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def transaction(logger):
|
||||
"""This test will cover the transaction related fdbcli commands.
|
||||
@ -281,7 +295,7 @@ def transaction(logger):
|
||||
"""
|
||||
err1 = run_fdbcli_command_and_get_error('set', 'key', 'value')
|
||||
assert err1 == 'ERROR: writemode must be enabled to set or clear keys in the database.'
|
||||
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
transaction_flow = ['writemode on', 'begin', 'getversion', 'set key value', 'get key', 'commit']
|
||||
output1, _ = process.communicate(input='\n'.join(transaction_flow).encode())
|
||||
# split the output into lines
|
||||
@ -300,13 +314,13 @@ def transaction(logger):
|
||||
output2 = run_fdbcli_command('get', 'key')
|
||||
assert output2 == "`key' is `value'"
|
||||
# test rollback and read-your-write behavior
|
||||
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
transaction_flow = [
|
||||
'writemode on', 'begin', 'getrange a z',
|
||||
'writemode on', 'begin', 'getrange a z',
|
||||
'clear key', 'get key',
|
||||
# 'option on READ_YOUR_WRITES_DISABLE', 'get key',
|
||||
'rollback'
|
||||
]
|
||||
]
|
||||
output3, _ = process.communicate(input='\n'.join(transaction_flow).encode())
|
||||
lines = list(filter(len, output3.decode().split('\n')))[-5:]
|
||||
# lines[0] == "Transaction started" and lines[1] == 'Range limited to 25 keys'
|
||||
@ -317,13 +331,13 @@ def transaction(logger):
|
||||
output4 = run_fdbcli_command('get', 'key')
|
||||
assert output4 == "`key' is `value'"
|
||||
# test read_your_write_disable option and clear the inserted key
|
||||
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
transaction_flow = [
|
||||
'writemode on', 'begin',
|
||||
'option on READ_YOUR_WRITES_DISABLE',
|
||||
'clear key', 'get key',
|
||||
'commit'
|
||||
]
|
||||
]
|
||||
output6, _ = process.communicate(input='\n'.join(transaction_flow).encode())
|
||||
lines = list(filter(len, output6.decode().split('\n')))[-4:]
|
||||
assert lines[1] == 'Option enabled for current transaction'
|
||||
@ -333,6 +347,7 @@ def transaction(logger):
|
||||
output7 = run_fdbcli_command('get', 'key')
|
||||
assert output7 == "`key': not found"
|
||||
|
||||
|
||||
def get_fdb_process_addresses(logger):
|
||||
# get all processes' network addresses
|
||||
output = run_fdbcli_command('kill')
|
||||
@ -342,6 +357,7 @@ def get_fdb_process_addresses(logger):
|
||||
assert len(addresses) == process_number
|
||||
return addresses
|
||||
|
||||
|
||||
@enable_logging(logging.DEBUG)
|
||||
def coordinators(logger):
|
||||
# we should only have one coordinator for now
|
||||
@ -369,6 +385,7 @@ def coordinators(logger):
|
||||
assert len(get_value_from_status_json(True, 'client', 'coordinators', 'coordinators')) == 1
|
||||
wait_for_database_available(logger)
|
||||
|
||||
|
||||
@enable_logging(logging.DEBUG)
|
||||
def exclude(logger):
|
||||
# get all processes' network addresses
|
||||
@ -381,7 +398,7 @@ def exclude(logger):
|
||||
# randomly pick one and exclude the process
|
||||
excluded_address = random.choice(addresses)
|
||||
# If we see "not enough space" error, use FORCE option to proceed
|
||||
# this should be a safe operation as we do not need any storage space for the test
|
||||
# this should be a safe operation as we do not need any storage space for the test
|
||||
force = False
|
||||
# sometimes we need to retry the exclude
|
||||
while True:
|
||||
@ -418,6 +435,8 @@ def exclude(logger):
|
||||
wait_for_database_available(logger)
|
||||
|
||||
# read the system key 'k', need to enable the option first
|
||||
|
||||
|
||||
def read_system_key(k):
|
||||
output = run_fdbcli_command('option', 'on', 'READ_SYSTEM_KEYS;', 'get', k)
|
||||
if 'is' not in output:
|
||||
@ -426,6 +445,7 @@ def read_system_key(k):
|
||||
_, value = output.split(' is ')
|
||||
return value
|
||||
|
||||
|
||||
@enable_logging()
|
||||
def throttle(logger):
|
||||
# no throttled tags at the beginning
|
||||
@ -443,6 +463,7 @@ def throttle(logger):
|
||||
assert enable_flag == "`0'"
|
||||
# TODO : test manual throttling, not easy to do now
|
||||
|
||||
|
||||
def wait_for_database_available(logger):
|
||||
# sometimes the change takes some time to have effect and the database can be unavailable at that time
|
||||
# this is to wait until the database is available again
|
||||
@ -450,9 +471,11 @@ def wait_for_database_available(logger):
|
||||
logger.debug("Database unavailable for now, wait for one second")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# fdbcli_tests.py <path_to_fdbcli_binary> <path_to_fdb_cluster_file> <process_number> [external_client_library_path]
|
||||
assert len(sys.argv) == 4 or len(sys.argv) == 5, "Please pass arguments: <path_to_fdbcli_binary> <path_to_fdb_cluster_file> <process_number> [external_client_library_path]"
|
||||
assert len(sys.argv) == 4 or len(
|
||||
sys.argv) == 5, "Please pass arguments: <path_to_fdbcli_binary> <path_to_fdb_cluster_file> <process_number> [external_client_library_path]"
|
||||
# set external client library
|
||||
if len(sys.argv) == 5:
|
||||
external_client_library_path = sys.argv[4]
|
||||
@ -466,7 +489,7 @@ if __name__ == '__main__':
|
||||
process_number = int(sys.argv[3])
|
||||
if process_number == 1:
|
||||
# TODO: disable for now, the change can cause the database unavailable
|
||||
#advanceversion()
|
||||
# advanceversion()
|
||||
cache_range()
|
||||
consistencycheck()
|
||||
datadistribution()
|
||||
@ -481,5 +504,3 @@ if __name__ == '__main__':
|
||||
assert process_number > 1, "Process number should be positive"
|
||||
coordinators()
|
||||
exclude()
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user