Add tenant support to Python

This commit is contained in:
A.J. Beamon 2022-02-19 15:40:32 -08:00
parent ca653c77ee
commit ce03f5783d
4 changed files with 133 additions and 66 deletions

View File

@ -58,8 +58,8 @@ _java_cmd = 'java -ea -cp %s:%s com.apple.foundationdb.test.' % (
# We could set min_api_version lower on some of these if the testers were updated to support them # We could set min_api_version lower on some of these if the testers were updated to support them
testers = { testers = {
'python': Tester('python', 'python ' + _absolute_path('python/tests/tester.py'), 2040, 23, MAX_API_VERSION, types=ALL_TYPES), 'python': Tester('python', 'python ' + _absolute_path('python/tests/tester.py'), 2040, 23, MAX_API_VERSION, types=ALL_TYPES, tenants_enabled=True),
'python3': Tester('python3', 'python3 ' + _absolute_path('python/tests/tester.py'), 2040, 23, MAX_API_VERSION, types=ALL_TYPES), 'python3': Tester('python3', 'python3 ' + _absolute_path('python/tests/tester.py'), 2040, 23, MAX_API_VERSION, types=ALL_TYPES, tenants_enabled=True),
'ruby': Tester('ruby', _absolute_path('ruby/tests/tester.rb'), 2040, 23, MAX_API_VERSION), 'ruby': Tester('ruby', _absolute_path('ruby/tests/tester.rb'), 2040, 23, MAX_API_VERSION),
'java': Tester('java', _java_cmd + 'StackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES), 'java': Tester('java', _java_cmd + 'StackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES),
'java_async': Tester('java', _java_cmd + 'AsyncStackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES), 'java_async': Tester('java', _java_cmd + 'AsyncStackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES),

View File

@ -88,6 +88,7 @@ def api_version(ver):
'predicates', 'predicates',
'Future', 'Future',
'Database', 'Database',
'Tenant',
'Transaction', 'Transaction',
'KeyValue', 'KeyValue',
'KeySelector', 'KeySelector',

View File

@ -198,9 +198,10 @@ def transactional(*tr_args, **tr_kwargs):
one of two actions, depending on the type of the parameter passed one of two actions, depending on the type of the parameter passed
to the function at call time. to the function at call time.
If given a Database, a Transaction will be created and passed into If given a Database or Tenant, a Transaction will be created and
the wrapped code in place of the Database. After the function is passed into the wrapped code in place of the Database or Tenant.
complete, the newly created transaction will be committed. After the function is complete, the newly created transaction
will be committed.
It is important to note that the wrapped method may be called It is important to note that the wrapped method may be called
multiple times in the event of a commit failure, until the commit multiple times in the event of a commit failure, until the commit
@ -943,128 +944,114 @@ class FormerFuture(_FDBBase):
except: except:
pass pass
class _TransactionCreator(_FDBBase):
class Database(_FDBBase):
def __init__(self, dpointer):
self.dpointer = dpointer
self.options = _DatabaseOptions(self)
def __del__(self):
# print('Destroying database 0x%x' % self.dpointer)
self.capi.fdb_database_destroy(self.dpointer)
def get(self, key): def get(self, key):
return Database.__database_getitem(self, key) return _TransactionCreator.__creator_getitem(self, key)
def __getitem__(self, key): def __getitem__(self, key):
if isinstance(key, slice): if isinstance(key, slice):
return self.get_range(key.start, key.stop, reverse=(key.step == -1)) return self.get_range(key.start, key.stop, reverse=(key.step == -1))
return Database.__database_getitem(self, key) return _TransactionCreator.__creator_getitem(self, key)
def get_key(self, key_selector): def get_key(self, key_selector):
return Database.__database_get_key(self, key_selector) return _TransactionCreator.__creator_get_key(self, key_selector)
def get_range(self, begin, end, limit=0, reverse=False, streaming_mode=StreamingMode.want_all): def get_range(self, begin, end, limit=0, reverse=False, streaming_mode=StreamingMode.want_all):
return Database.__database_get_range(self, begin, end, limit, reverse, streaming_mode) return _TransactionCreator.__creator_get_range(self, begin, end, limit, reverse, streaming_mode)
def get_range_startswith(self, prefix, *args, **kwargs): def get_range_startswith(self, prefix, *args, **kwargs):
return Database.__database_get_range_startswith(self, prefix, *args, **kwargs) return _TransactionCreator.__creator_get_range_startswith(self, prefix, *args, **kwargs)
def set(self, key, value): def set(self, key, value):
Database.__database_setitem(self, key, value) _TransactionCreator.__creator_setitem(self, key, value)
def __setitem__(self, key, value): def __setitem__(self, key, value):
Database.__database_setitem(self, key, value) _TransactionCreator.__creator_setitem(self, key, value)
def clear(self, key): def clear(self, key):
Database.__database_delitem(self, key) _TransactionCreator.__creator_delitem(self, key)
def clear_range(self, begin, end): def clear_range(self, begin, end):
Database.__database_delitem(self, slice(begin, end)) _TransactionCreator.__creator_delitem(self, slice(begin, end))
def __delitem__(self, key_or_slice): def __delitem__(self, key_or_slice):
Database.__database_delitem(self, key_or_slice) _TransactionCreator.__creator_delitem(self, key_or_slice)
def clear_range_startswith(self, prefix): def clear_range_startswith(self, prefix):
Database.__database_clear_range_startswith(self, prefix) _TransactionCreator.__creator_clear_range_startswith(self, prefix)
def get_and_watch(self, key): def get_and_watch(self, key):
return Database.__database_get_and_watch(self, key) return _TransactionCreator.__creator_get_and_watch(self, key)
def set_and_watch(self, key, value): def set_and_watch(self, key, value):
return Database.__database_set_and_watch(self, key, value) return _TransactionCreator.__creator_set_and_watch(self, key, value)
def clear_and_watch(self, key): def clear_and_watch(self, key):
return Database.__database_clear_and_watch(self, key) return _TransactionCreator.__creator_clear_and_watch(self, key)
def create_transaction(self): def create_transaction(self):
pointer = ctypes.c_void_p() pass
self.capi.fdb_database_create_transaction(self.dpointer, ctypes.byref(pointer))
return Transaction(pointer.value, self)
def _set_option(self, option, param, length):
self.capi.fdb_database_set_option(self.dpointer, option, param, length)
def _atomic_operation(self, opcode, key, param): def _atomic_operation(self, opcode, key, param):
Database.__database_atomic_operation(self, opcode, key, param) _TransactionCreator.__creator_atomic_operation(self, opcode, key, param)
#### Transaction implementations #### #### Transaction implementations ####
@staticmethod @staticmethod
@transactional @transactional
def __database_getitem(tr, key): def __creator_getitem(tr, key):
return tr[key].value return tr[key].value
@staticmethod @staticmethod
@transactional @transactional
def __database_get_key(tr, key_selector): def __creator_get_key(tr, key_selector):
return tr.get_key(key_selector).value return tr.get_key(key_selector).value
@staticmethod @staticmethod
@transactional @transactional
def __database_get_range(tr, begin, end, limit, reverse, streaming_mode): def __creator_get_range(tr, begin, end, limit, reverse, streaming_mode):
return tr.get_range(begin, end, limit, reverse, streaming_mode).to_list() return tr.get_range(begin, end, limit, reverse, streaming_mode).to_list()
@staticmethod @staticmethod
@transactional @transactional
def __database_get_range_startswith(tr, prefix, *args, **kwargs): def __creator_get_range_startswith(tr, prefix, *args, **kwargs):
return tr.get_range_startswith(prefix, *args, **kwargs).to_list() return tr.get_range_startswith(prefix, *args, **kwargs).to_list()
@staticmethod @staticmethod
@transactional @transactional
def __database_setitem(tr, key, value): def __creator_setitem(tr, key, value):
tr[key] = value tr[key] = value
@staticmethod @staticmethod
@transactional @transactional
def __database_clear_range_startswith(tr, prefix): def __creator_clear_range_startswith(tr, prefix):
tr.clear_range_startswith(prefix) tr.clear_range_startswith(prefix)
@staticmethod @staticmethod
@transactional @transactional
def __database_get_and_watch(tr, key): def __creator_get_and_watch(tr, key):
v = tr.get(key) v = tr.get(key)
return v, tr.watch(key) return v, tr.watch(key)
@staticmethod @staticmethod
@transactional @transactional
def __database_set_and_watch(tr, key, value): def __creator_set_and_watch(tr, key, value):
tr.set(key, value) tr.set(key, value)
return tr.watch(key) return tr.watch(key)
@staticmethod @staticmethod
@transactional @transactional
def __database_clear_and_watch(tr, key): def __creator_clear_and_watch(tr, key):
del tr[key] del tr[key]
return tr.watch(key) return tr.watch(key)
@staticmethod @staticmethod
@transactional @transactional
def __database_delitem(tr, key_or_slice): def __creator_delitem(tr, key_or_slice):
del tr[key_or_slice] del tr[key_or_slice]
@staticmethod @staticmethod
@transactional @transactional
def __database_atomic_operation(tr, opcode, key, param): def __creator_atomic_operation(tr, opcode, key, param):
tr._atomic_operation(opcode, key, param) tr._atomic_operation(opcode, key, param)
# Asynchronous transactions # Asynchronous transactions
@ -1074,11 +1061,11 @@ class Database(_FDBBase):
From = asyncio.From From = asyncio.From
coroutine = asyncio.coroutine coroutine = asyncio.coroutine
class Database: class TransactionCreator:
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_getitem(tr, key): def __creator_getitem(tr, key):
# raise Return(( yield From( tr[key] ) )) # raise Return(( yield From( tr[key] ) ))
raise Return(tr[key]) raise Return(tr[key])
yield None yield None
@ -1086,26 +1073,26 @@ class Database(_FDBBase):
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_get_key(tr, key_selector): def __creator_get_key(tr, key_selector):
raise Return(tr.get_key(key_selector)) raise Return(tr.get_key(key_selector))
yield None yield None
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_get_range(tr, begin, end, limit, reverse, streaming_mode): def __creator_get_range(tr, begin, end, limit, reverse, streaming_mode):
raise Return((yield From(tr.get_range(begin, end, limit, reverse, streaming_mode).to_list()))) raise Return((yield From(tr.get_range(begin, end, limit, reverse, streaming_mode).to_list())))
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_get_range_startswith(tr, prefix, *args, **kwargs): def __creator_get_range_startswith(tr, prefix, *args, **kwargs):
raise Return((yield From(tr.get_range_startswith(prefix, *args, **kwargs).to_list()))) raise Return((yield From(tr.get_range_startswith(prefix, *args, **kwargs).to_list())))
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_setitem(tr, key, value): def __creator_setitem(tr, key, value):
tr[key] = value tr[key] = value
raise Return() raise Return()
yield None yield None
@ -1113,7 +1100,7 @@ class Database(_FDBBase):
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_clear_range_startswith(tr, prefix): def __creator_clear_range_startswith(tr, prefix):
tr.clear_range_startswith(prefix) tr.clear_range_startswith(prefix)
raise Return() raise Return()
yield None yield None
@ -1121,7 +1108,7 @@ class Database(_FDBBase):
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_get_and_watch(tr, key): def __creator_get_and_watch(tr, key):
v = tr.get(key) v = tr.get(key)
raise Return(v, tr.watch(key)) raise Return(v, tr.watch(key))
yield None yield None
@ -1129,7 +1116,7 @@ class Database(_FDBBase):
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_set_and_watch(tr, key, value): def __creator_set_and_watch(tr, key, value):
tr.set(key, value) tr.set(key, value)
raise Return(tr.watch(key)) raise Return(tr.watch(key))
yield None yield None
@ -1137,7 +1124,7 @@ class Database(_FDBBase):
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_clear_and_watch(tr, key): def __creator_clear_and_watch(tr, key):
del tr[key] del tr[key]
raise Return(tr.watch(key)) raise Return(tr.watch(key))
yield None yield None
@ -1145,7 +1132,7 @@ class Database(_FDBBase):
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_delitem(tr, key_or_slice): def __creator_delitem(tr, key_or_slice):
del tr[key_or_slice] del tr[key_or_slice]
raise Return() raise Return()
yield None yield None
@ -1153,11 +1140,55 @@ class Database(_FDBBase):
@staticmethod @staticmethod
@transactional @transactional
@coroutine @coroutine
def __database_atomic_operation(tr, opcode, key, param): def __creator_atomic_operation(tr, opcode, key, param):
tr._atomic_operation(opcode, key, param) tr._atomic_operation(opcode, key, param)
raise Return() raise Return()
yield None yield None
return Database return TransactionCreator
class Database(_TransactionCreator):
def __init__(self, dpointer):
self.dpointer = dpointer
self.options = _DatabaseOptions(self)
def __del__(self):
# print('Destroying database 0x%x' % self.dpointer)
self.capi.fdb_database_destroy(self.dpointer)
def _set_option(self, option, param, length):
self.capi.fdb_database_set_option(self.dpointer, option, param, length)
def open_tenant(self, name):
if not isinstance(name, bytes):
raise TypeError('Tenant name must be of type ' + bytes.__name__)
pointer = ctypes.c_void_p()
self.capi.fdb_database_open_tenant(self.dpointer, name, len(name), ctypes.byref(pointer))
return Tenant(pointer.value)
def create_transaction(self):
pointer = ctypes.c_void_p()
self.capi.fdb_database_create_transaction(self.dpointer, ctypes.byref(pointer))
return Transaction(pointer.value, self)
def allocate_tenant(self, name):
return FutureVoid(self.capi.fdb_database_allocate_tenant(self.dpointer, name, len(name)))
def delete_tenant(self, name):
return FutureVoid(self.capi.fdb_database_remove_tenant(self.dpointer, name, len(name)))
class Tenant(_TransactionCreator):
def __init__(self, tpointer):
self.tpointer = tpointer
def __del__(self):
self.capi.fdb_tenant_destroy(self.tpointer)
def create_transaction(self):
pointer = ctypes.c_void_p()
self.capi.fdb_tenant_create_transaction(self.tpointer, ctypes.byref(pointer))
return Transaction(pointer.value, self)
fill_operations() fill_operations()
@ -1458,6 +1489,10 @@ def init_c_api():
_capi.fdb_database_destroy.argtypes = [ctypes.c_void_p] _capi.fdb_database_destroy.argtypes = [ctypes.c_void_p]
_capi.fdb_database_destroy.restype = None _capi.fdb_database_destroy.restype = None
_capi.fdb_database_open_tenant.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.POINTER(ctypes.c_void_p)]
_capi.fdb_database_open_tenant.restype = ctypes.c_int
_capi.fdb_database_open_tenant.errcheck = check_error_code
_capi.fdb_database_create_transaction.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)] _capi.fdb_database_create_transaction.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)]
_capi.fdb_database_create_transaction.restype = ctypes.c_int _capi.fdb_database_create_transaction.restype = ctypes.c_int
_capi.fdb_database_create_transaction.errcheck = check_error_code _capi.fdb_database_create_transaction.errcheck = check_error_code
@ -1466,6 +1501,19 @@ def init_c_api():
_capi.fdb_database_set_option.restype = ctypes.c_int _capi.fdb_database_set_option.restype = ctypes.c_int
_capi.fdb_database_set_option.errcheck = check_error_code _capi.fdb_database_set_option.errcheck = check_error_code
_capi.fdb_database_allocate_tenant.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int]
_capi.fdb_database_allocate_tenant.restype = ctypes.c_void_p
_capi.fdb_database_remove_tenant.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int]
_capi.fdb_database_remove_tenant.restype = ctypes.c_void_p
_capi.fdb_tenant_destroy.argtypes = [ctypes.c_void_p]
_capi.fdb_tenant_destroy.restype = None
_capi.fdb_tenant_create_transaction.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p)]
_capi.fdb_tenant_create_transaction.restype = ctypes.c_int
_capi.fdb_tenant_create_transaction.errcheck = check_error_code
_capi.fdb_transaction_destroy.argtypes = [ctypes.c_void_p] _capi.fdb_transaction_destroy.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_destroy.restype = None _capi.fdb_transaction_destroy.restype = None
@ -1686,10 +1734,10 @@ def init(event_model=None):
raise asyncio.Return(self) raise asyncio.Return(self)
return it() return it()
FDBRange.iterate = iterate FDBRange.iterate = iterate
AT = Database.declare_asynchronous_transactions() AT = _TransactionCreator.declare_asynchronous_transactions()
for name in dir(AT): for name in dir(AT):
if name.startswith("_Database__database_"): if name.startswith("__TransactionCreator__creator_"):
setattr(Database, name, getattr(AT, name)) setattr(_TransactionCreator, name, getattr(AT, name))
def to_list(self): def to_list(self):
if self._mode == StreamingMode.iterator: if self._mode == StreamingMode.iterator:

View File

@ -112,12 +112,13 @@ class Stack:
class Instruction: class Instruction:
def __init__(self, tr, stack, op, index, isDatabase=False, isSnapshot=False): def __init__(self, tr, stack, op, index, isDatabase=False, isTenant=False, isSnapshot=False):
self.tr = tr self.tr = tr
self.stack = stack self.stack = stack
self.op = op self.op = op
self.index = index self.index = index
self.isDatabase = isDatabase self.isDatabase = isDatabase
self.isTenant = isTenant
self.isSnapshot = isSnapshot self.isSnapshot = isSnapshot
def pop(self, count=None, with_idx=False): def pop(self, count=None, with_idx=False):
@ -277,6 +278,7 @@ class Tester:
def __init__(self, db, prefix): def __init__(self, db, prefix):
self.db = db self.db = db
self.tenant = None
self.instructions = self.db[fdb.tuple.range((prefix,))] self.instructions = self.db[fdb.tuple.range((prefix,))]
@ -317,7 +319,8 @@ class Tester:
def new_transaction(self): def new_transaction(self):
with Tester.tr_map_lock: with Tester.tr_map_lock:
Tester.tr_map[self.tr_name] = self.db.create_transaction() tr_source = self.tenant if self.tenant is not None else self.db
Tester.tr_map[self.tr_name] = tr_source.create_transaction()
def switch_transaction(self, name): def switch_transaction(self, name):
self.tr_name = name self.tr_name = name
@ -335,18 +338,22 @@ class Tester:
# print("%d. Instruction is %s" % (idx, op)) # print("%d. Instruction is %s" % (idx, op))
isDatabase = op.endswith(six.u('_DATABASE')) isDatabase = op.endswith(six.u('_DATABASE'))
isTenant = op.endswith(six.u('_TENANT'))
isSnapshot = op.endswith(six.u('_SNAPSHOT')) isSnapshot = op.endswith(six.u('_SNAPSHOT'))
if isDatabase: if isDatabase:
op = op[:-9] op = op[:-9]
obj = self.db obj = self.db
elif isTenant:
op = op[:-7]
obj = self.tenant if self.tenant else self.db
elif isSnapshot: elif isSnapshot:
op = op[:-9] op = op[:-9]
obj = self.current_transaction().snapshot obj = self.current_transaction().snapshot
else: else:
obj = self.current_transaction() obj = self.current_transaction()
inst = Instruction(obj, self.stack, op, idx, isDatabase, isSnapshot) inst = Instruction(obj, self.stack, op, idx, isDatabase, isTenant, isSnapshot)
try: try:
if inst.op == six.u("PUSH"): if inst.op == six.u("PUSH"):
@ -583,6 +590,17 @@ class Tester:
prefix = inst.pop() prefix = inst.pop()
Tester.wait_empty(self.db, prefix) Tester.wait_empty(self.db, prefix)
inst.push(b"WAITED_FOR_EMPTY") inst.push(b"WAITED_FOR_EMPTY")
elif inst.op == six.u("TENANT_CREATE"):
name = inst.pop()
inst.push(self.db.allocate_tenant(name))
elif inst.op == six.u("TENANT_DELETE"):
name = inst.pop()
inst.push(self.db.delete_tenant(name))
elif inst.op == six.u("TENANT_SET_ACTIVE"):
name = inst.pop()
self.tenant = self.db.open_tenant(name)
elif inst.op == six.u("TENANT_CLEAR_ACTIVE"):
self.tenant = None
elif inst.op == six.u("UNIT_TESTS"): elif inst.op == six.u("UNIT_TESTS"):
try: try:
test_db_options(db) test_db_options(db)