Add tenant support to the Java bindings

This commit is contained in:
A.J. Beamon 2022-02-19 15:48:49 -08:00
parent fc68cdf45f
commit a2a97e7176
12 changed files with 777 additions and 58 deletions

View File

@ -61,8 +61,8 @@ testers = {
'python': Tester('python', 'python ' + _absolute_path('python/tests/tester.py'), 2040, 23, MAX_API_VERSION, types=ALL_TYPES, tenants_enabled=True),
'python3': Tester('python3', 'python3 ' + _absolute_path('python/tests/tester.py'), 2040, 23, MAX_API_VERSION, types=ALL_TYPES, tenants_enabled=True),
'ruby': Tester('ruby', _absolute_path('ruby/tests/tester.rb'), 2040, 23, MAX_API_VERSION),
'java': Tester('java', _java_cmd + 'StackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES),
'java_async': Tester('java', _java_cmd + 'AsyncStackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES),
'java': Tester('java', _java_cmd + 'StackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES, tenants_enabled=True),
'java_async': Tester('java', _java_cmd + 'AsyncStackTester', 2040, 510, MAX_API_VERSION, types=ALL_TYPES, tenants_enabled=True),
'go': Tester('go', _absolute_path('go/build/bin/_stacktester'), 2040, 200, MAX_API_VERSION, types=ALL_TYPES),
'flow': Tester('flow', _absolute_path('flow/bin/fdb_flow_tester'), 63, 500, MAX_API_VERSION, directory_snapshot_ops_enabled=False),
}

View File

@ -32,6 +32,7 @@ set(JAVA_BINDING_SRCS
src/main/com/apple/foundationdb/DirectBufferPool.java
src/main/com/apple/foundationdb/FDB.java
src/main/com/apple/foundationdb/FDBDatabase.java
src/main/com/apple/foundationdb/FDBTenant.java
src/main/com/apple/foundationdb/FDBTransaction.java
src/main/com/apple/foundationdb/FutureInt64.java
src/main/com/apple/foundationdb/FutureKey.java
@ -64,6 +65,7 @@ set(JAVA_BINDING_SRCS
src/main/com/apple/foundationdb/ReadTransactionContext.java
src/main/com/apple/foundationdb/subspace/package-info.java
src/main/com/apple/foundationdb/subspace/Subspace.java
src/main/com/apple/foundationdb/Tenant.java
src/main/com/apple/foundationdb/Transaction.java
src/main/com/apple/foundationdb/TransactionContext.java
src/main/com/apple/foundationdb/EventKeeper.java

View File

@ -663,6 +663,78 @@ JNIEXPORT jbyteArray JNICALL Java_com_apple_foundationdb_FutureKey_FutureKey_1ge
return result;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1allocateTenant(JNIEnv* jenv,
jobject,
jlong dbPtr,
jbyteArray tenantNameBytes) {
if (!dbPtr || !tenantNameBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBDatabase* database = (FDBDatabase*)dbPtr;
uint8_t* barr = (uint8_t*)jenv->GetByteArrayElements(tenantNameBytes, JNI_NULL);
if (!barr) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
FDBFuture* f = fdb_database_allocate_tenant(database, barr, jenv->GetArrayLength(tenantNameBytes));
jenv->ReleaseByteArrayElements(tenantNameBytes, (jbyte*)barr, JNI_ABORT);
return (jlong)f;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1deleteTenant(JNIEnv* jenv,
jobject,
jlong dbPtr,
jbyteArray tenantNameBytes) {
if (!dbPtr || !tenantNameBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBDatabase* database = (FDBDatabase*)dbPtr;
uint8_t* barr = (uint8_t*)jenv->GetByteArrayElements(tenantNameBytes, JNI_NULL);
if (!barr) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
FDBFuture* f = fdb_database_remove_tenant(database, barr, jenv->GetArrayLength(tenantNameBytes));
jenv->ReleaseByteArrayElements(tenantNameBytes, (jbyte*)barr, JNI_ABORT);
return (jlong)f;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1openTenant(JNIEnv* jenv,
jobject,
jlong dbPtr,
jbyteArray tenantNameBytes) {
if (!dbPtr || !tenantNameBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBDatabase* database = (FDBDatabase*)dbPtr;
FDBTenant* tenant;
uint8_t* barr = (uint8_t*)jenv->GetByteArrayElements(tenantNameBytes, JNI_NULL);
if (!barr) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}
fdb_error_t err = fdb_database_open_tenant(database, barr, jenv->GetArrayLength(tenantNameBytes), &tenant);
if (err) {
safeThrow(jenv, getThrowable(jenv, err));
return 0;
}
jenv->ReleaseByteArrayElements(tenantNameBytes, (jbyte*)barr, JNI_ABORT);
return (jlong)tenant;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBDatabase_Database_1createTransaction(JNIEnv* jenv,
jobject,
jlong dbPtr) {
@ -764,6 +836,31 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDB_Database_1create(JNIEnv*
return (jlong)db;
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTenant_Tenant_1createTransaction(JNIEnv* jenv,
jobject,
jlong tPtr) {
if (!tPtr) {
throwParamNotNull(jenv);
return 0;
}
FDBTenant* tenant = (FDBTenant*)tPtr;
FDBTransaction* tr;
fdb_error_t err = fdb_tenant_create_transaction(tenant, &tr);
if (err) {
safeThrow(jenv, getThrowable(jenv, err));
return 0;
}
return (jlong)tr;
}
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTenant_Tenant_1dispose(JNIEnv* jenv, jobject, jlong tPtr) {
if (!tPtr) {
throwParamNotNull(jenv);
return;
}
fdb_tenant_destroy((FDBTenant*)tPtr);
}
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1setVersion(JNIEnv* jenv,
jobject,
jlong tPtr,

View File

@ -41,11 +41,67 @@ import java.util.function.Function;
*/
public interface Database extends AutoCloseable, TransactionContext {
/**
* Creates a {@link Transaction} that operates on this {@code Database}.<br>
* Creates a new tenant in the cluster.
*
* @param tenantName The name of the tenant. Can be any byte string that does not begin a 0xFF byte.
* @return a {@code CompletableFuture} that when set without error will indicate that the tenant has
* been created.
*/
CompletableFuture<Void> allocateTenant(byte[] tenantName);
/**
* Deletes a tenant from the cluster.<br>
* <br>
* <b>Note:</b> A tenant cannot be deleted if it has any data in it. To delete a non-empty tenant, you must
* first use a clear operation to delete all of its keys.
*
* @param tenantName The name of the tenant being deleted.
* @return a {@code CompletableFuture} that when set without error will indicate that the tenant has
* been deleted.
*/
CompletableFuture<Void> deleteTenant(byte[] tenantName);
/**
* Opens an existing tenant to be used for running transactions.
*
* @param tenantName The name of the tenant to open.
* @return a {@link Tenant} that can be used to create transactions that will operate in the tenant's key-space.
*/
default Tenant openTenant(byte[] tenantName) {
return openTenant(tenantName, getExecutor());
}
/**
* Opens an existing tenant to be used for running transactions.
*
* @param tenantName The name of the tenant to open.
* @param e the {@link Executor} to use when executing asynchronous callbacks.
* @return a {@link Tenant} that can be used to create transactions that will operate in the tenant's key-space.
*/
Tenant openTenant(byte[] tenantName, Executor e);
/**
* Opens an existing tenant to be used for running transactions.
*
* @param tenantName The name of the tenant to open.
* @param e the {@link Executor} to use when executing asynchronous callbacks.
* @param eventKeeper the {@link EventKeeper} to use when tracking instrumented calls for the tenant's transactions.
* @return a {@link Tenant} that can be used to create transactions that will operate in the tenant's key-space.
*/
Tenant openTenant(byte[] tenantName, Executor e, EventKeeper eventKeeper);
/**
* Creates a {@link Transaction} that operates on this {@code Database}. Creating a transaction
* in this way does not associate it with a {@code Tenant}, and as a result the transaction will
* operate on the entire key-space for the database.<br>
* <br>
* <b>Note:</b> Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after
* {@link Transaction#onError} is called.
* {@link Transaction#onError} is called.<br>
* <br>
* <b>Note:</b> Transactions created directly on a {@code Database} object cannot be used in a cluster
* that requires tenant-based access. To run transactions in those clusters, you must first open a tenant
* with {@link #openTenant(byte[])}.
*
* @return a newly created {@code Transaction} that reads from and writes to this {@code Database}.
*/

View File

@ -116,6 +116,49 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume
}
}
@Override
public CompletableFuture<Void> allocateTenant(byte[] tenantName) {
pointerReadLock.lock();
try {
return new FutureVoid(Database_allocateTenant(getPtr(), tenantName), executor);
} finally {
pointerReadLock.unlock();
}
}
@Override
public CompletableFuture<Void> deleteTenant(byte[] tenantName) {
pointerReadLock.lock();
try {
return new FutureVoid(Database_deleteTenant(getPtr(), tenantName), executor);
} finally {
pointerReadLock.unlock();
}
}
@Override
public Tenant openTenant(byte[] tenantName, Executor e) {
return openTenant(tenantName, e, eventKeeper);
}
@Override
public Tenant openTenant(byte[] tenantName, Executor e, EventKeeper eventKeeper) {
pointerReadLock.lock();
Tenant tenant = null;
try {
tenant = new FDBTenant(Database_openTenant(getPtr(), tenantName), this, tenantName, e, eventKeeper);
return tenant;
} catch (RuntimeException err) {
if (tenant != null) {
tenant.close();
}
throw err;
} finally {
pointerReadLock.unlock();
}
}
@Override
public Transaction createTransaction(Executor e) {
return createTransaction(e, eventKeeper);
@ -170,6 +213,9 @@ class FDBDatabase extends NativeObjectWrapper implements Database, OptionConsume
Database_dispose(cPtr);
}
private native long Database_allocateTenant(long cPtr, byte[] tenantName);
private native long Database_deleteTenant(long cPtr, byte[] tenantName);
private native long Database_openTenant(long cPtr, byte[] tenantName);
private native long Database_createTransaction(long cPtr);
private native void Database_dispose(long cPtr);
private native void Database_setOption(long cPtr, int code, byte[] value) throws FDBException;

View File

@ -0,0 +1,157 @@
/*
* FDBTenant.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import com.apple.foundationdb.async.AsyncUtil;
class FDBTenant extends NativeObjectWrapper implements Tenant {
private final Database database;
private final byte[] name;
private final Executor executor;
private final EventKeeper eventKeeper;
protected FDBTenant(long cPtr, Database database, byte[] name, Executor executor) {
this(cPtr, database, name, executor, null);
}
protected FDBTenant(long cPtr, Database database, byte[] name, Executor executor, EventKeeper eventKeeper) {
super(cPtr);
this.database = database;
this.name = name;
this.executor = executor;
this.eventKeeper = eventKeeper;
}
@Override
public <T> T run(Function<? super Transaction, T> retryable, Executor e) {
Transaction t = this.createTransaction(e);
try {
while (true) {
try {
T returnVal = retryable.apply(t);
t.commit().join();
return returnVal;
} catch (RuntimeException err) {
t = t.onError(err).join();
}
}
} finally {
t.close();
}
}
@Override
public <T> T read(Function<? super ReadTransaction, T> retryable, Executor e) {
return this.run(retryable, e);
}
@Override
public <T> CompletableFuture<T> runAsync(final Function<? super Transaction, ? extends CompletableFuture<T>> retryable, Executor e) {
final AtomicReference<Transaction> trRef = new AtomicReference<>(createTransaction(e));
final AtomicReference<T> returnValue = new AtomicReference<>();
return AsyncUtil.whileTrue(() -> {
CompletableFuture<T> process = AsyncUtil.applySafely(retryable, trRef.get());
return AsyncUtil.composeHandleAsync(process.thenComposeAsync(returnVal ->
trRef.get().commit().thenApply(o -> {
returnValue.set(returnVal);
return false;
}), e),
(value, t) -> {
if(t == null)
return CompletableFuture.completedFuture(value);
if(!(t instanceof RuntimeException))
throw new CompletionException(t);
return trRef.get().onError(t).thenApply(newTr -> {
trRef.set(newTr);
return true;
});
}, e);
}, e)
.thenApply(o -> returnValue.get())
.whenComplete((v, t) -> trRef.get().close());
}
@Override
public <T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable, Executor e) {
return this.runAsync(retryable, e);
}
@Override
protected void finalize() throws Throwable {
try {
checkUnclosed("Tenant");
close();
}
finally {
super.finalize();
}
}
@Override
public Transaction createTransaction(Executor e) {
return createTransaction(e, eventKeeper);
}
@Override
public Transaction createTransaction(Executor e, EventKeeper eventKeeper) {
pointerReadLock.lock();
Transaction tr = null;
try {
tr = new FDBTransaction(Tenant_createTransaction(getPtr()), database, e, eventKeeper);
tr.options().setUsedDuringCommitProtectionDisable();
return tr;
} catch (RuntimeException err) {
if (tr != null) {
tr.close();
}
throw err;
} finally {
pointerReadLock.unlock();
}
}
@Override
public byte[] getName() {
return name;
}
@Override
public Executor getExecutor() {
return executor;
}
@Override
protected void closeInternal(long cPtr) {
Tenant_dispose(cPtr);
}
private native long Tenant_createTransaction(long cPtr);
private native void Tenant_dispose(long cPtr);
}

View File

@ -0,0 +1,257 @@
/*
* Tenant.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
/**
* A tenant represents a named key-space within a database that can be interacted with
* transactionally.<br>
* <br>
* The simplest correct programs using tenants will make use of the methods defined
* in the {@link TransactionContext} interface. When used on a {@code Tenant} these
* methods will call {@code Transaction#commit()} after user code has been
* executed. These methods will not return successfully until {@code commit()} has
* returned successfully.<br>
* <br>
* <b>Note:</b> {@code Tenant} objects must be {@link #close closed} when no longer
* in use in order to free any associated resources.
*/
public interface Tenant extends AutoCloseable, TransactionContext {
/**
* Creates a {@link Transaction} that operates on this {@code Tenant}.<br>
* <br>
* <b>Note:</b> Java transactions automatically set the {@link TransactionOptions#setUsedDuringCommitProtectionDisable}
* option. This is because the Java bindings disallow use of {@code Transaction} objects after
* {@link Transaction#onError} is called.
*
* @return a newly created {@code Transaction} that reads from and writes to this {@code Tenant}.
*/
default Transaction createTransaction() {
return createTransaction(getExecutor());
}
/**
* Creates a {@link Transaction} that operates on this {@code Tenant} with the given {@link Executor}
* for asynchronous callbacks.
*
* @param e the {@link Executor} to use when executing asynchronous callbacks.
* @return a newly created {@code Transaction} that reads from and writes to this {@code Tenant}.
*/
Transaction createTransaction(Executor e);
/**
* Creates a {@link Transaction} that operates on this {@code Tenant} with the given {@link Executor}
* for asynchronous callbacks.
*
* @param e the {@link Executor} to use when executing asynchronous callbacks.
* @param eventKeeper the {@link EventKeeper} to use when tracking instrumented calls for the transaction.
*
* @return a newly created {@code Transaction} that reads from and writes to this {@code Tenant}.
*/
Transaction createTransaction(Executor e, EventKeeper eventKeeper);
/**
* Returns the name of this {@code Tenant}.
*
* @return the name of this {@code Tenant} as a byte string.
*/
byte[] getName();
/**
* Runs a read-only transactional function against this {@code Tenant} with retry logic.
* {@link Function#apply(Object) apply(ReadTransaction)} will be called on the
* supplied {@link Function} until a non-retryable
* {@link FDBException} (or any {@code Throwable} other than an {@code FDBException})
* is thrown. This call is blocking -- this
* method will not return until the {@code Function} has been called and completed without error.<br>
*
* @param retryable the block of logic to execute in a {@link Transaction} against
* this tenant
* @param <T> the return type of {@code retryable}
*
* @return the result of the last run of {@code retryable}
*/
@Override
default <T> T read(Function<? super ReadTransaction, T> retryable) {
return read(retryable, getExecutor());
}
/**
* Runs a read-only transactional function against this {@code Tenant} with retry logic. Use
* this formulation of {@link #read(Function)} if one wants to set a custom {@link Executor}
* for the transaction when run.
*
* @param retryable the block of logic to execute in a {@link Transaction} against
* this tenant
* @param e the {@link Executor} to use for asynchronous callbacks
* @param <T> the return type of {@code retryable}
* @return the result of the last run of {@code retryable}
*
* @see #read(Function)
*/
<T> T read(Function<? super ReadTransaction, T> retryable, Executor e);
/**
* Runs a read-only transactional function against this {@code Tenant} with retry logic.
* {@link Function#apply(Object) apply(ReadTransaction)} will be called on the
* supplied {@link Function} until a non-retryable
* {@link FDBException} (or any {@code Throwable} other than an {@code FDBException})
* is thrown. This call is non-blocking -- this
* method will return immediately and with a {@link CompletableFuture} that will be
* set when the {@code Function} has been called and completed without error.<br>
* <br>
* Any errors encountered executing {@code retryable}, or received from the
* database, will be set on the returned {@code CompletableFuture}.
*
* @param retryable the block of logic to execute in a {@link ReadTransaction} against
* this tenant
* @param <T> the return type of {@code retryable}
*
* @return a {@code CompletableFuture} that will be set to the value returned by the last call
* to {@code retryable}
*/
@Override
default <T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable) {
return readAsync(retryable, getExecutor());
}
/**
* Runs a read-only transactional function against this {@code Tenant} with retry logic.
* Use this version of {@link #readAsync(Function)} if one wants to set a custom
* {@link Executor} for the transaction when run.
*
* @param retryable the block of logic to execute in a {@link ReadTransaction} against
* this tenant
* @param e the {@link Executor} to use for asynchronous callbacks
* @param <T> the return type of {@code retryable}
*
* @return a {@code CompletableFuture} that will be set to the value returned by the last call
* to {@code retryable}
*
* @see #readAsync(Function)
*/
<T> CompletableFuture<T> readAsync(
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable, Executor e);
/**
* Runs a transactional function against this {@code Tenant} with retry logic.
* {@link Function#apply(Object) apply(Transaction)} will be called on the
* supplied {@link Function} until a non-retryable
* {@link FDBException} (or any {@code Throwable} other than an {@code FDBException})
* is thrown or {@link Transaction#commit() commit()},
* when called after {@code apply()}, returns success. This call is blocking -- this
* method will not return until {@code commit()} has been called and returned success.<br>
* <br>
* As with other client/server databases, in some failure scenarios a client may
* be unable to determine whether a transaction succeeded. In these cases, your
* transaction may be executed twice. For more information about how to reason
* about these situations see
* <a href="/foundationdb/developer-guide.html#transactions-with-unknown-results"
* target="_blank">the FounationDB Developer Guide</a>
*
* @param retryable the block of logic to execute in a {@link Transaction} against
* this tenant
* @param <T> the return type of {@code retryable}
*
* @return the result of the last run of {@code retryable}
*/
@Override
default <T> T run(Function<? super Transaction, T> retryable) {
return run(retryable, getExecutor());
}
/**
* Runs a transactional function against this {@code Tenant} with retry logic.
* Use this formulation of {@link #run(Function)} if one would like to set a
* custom {@link Executor} for the transaction when run.
*
* @param retryable the block of logic to execute in a {@link Transaction} against
* this tenant
* @param e the {@link Executor} to use for asynchronous callbacks
* @param <T> the return type of {@code retryable}
*
* @return the result of the last run of {@code retryable}
*/
<T> T run(Function<? super Transaction, T> retryable, Executor e);
/**
* Runs a transactional function against this {@code Tenant} with retry logic.
* {@link Function#apply(Object) apply(Transaction)} will be called on the
* supplied {@link Function} until a non-retryable
* {@link FDBException} (or any {@code Throwable} other than an {@code FDBException})
* is thrown or {@link Transaction#commit() commit()},
* when called after {@code apply()}, returns success. This call is non-blocking -- this
* method will return immediately and with a {@link CompletableFuture} that will be
* set when {@code commit()} has been called and returned success.<br>
* <br>
* As with other client/server databases, in some failure scenarios a client may
* be unable to determine whether a transaction succeeded. In these cases, your
* transaction may be executed twice. For more information about how to reason
* about these situations see
* <a href="/foundationdb/developer-guide.html#transactions-with-unknown-results"
* target="_blank">the FounationDB Developer Guide</a><br>
* <br>
* Any errors encountered executing {@code retryable}, or received from the
* database, will be set on the returned {@code CompletableFuture}.
*
* @param retryable the block of logic to execute in a {@link Transaction} against
* this tenant
* @param <T> the return type of {@code retryable}
*
* @return a {@code CompletableFuture} that will be set to the value returned by the last call
* to {@code retryable}
*/
@Override
default <T> CompletableFuture<T> runAsync(
Function<? super Transaction, ? extends CompletableFuture<T>> retryable) {
return runAsync(retryable, getExecutor());
}
/**
* Runs a transactional function against this {@code Tenant} with retry logic. Use
* this formulation of the non-blocking {@link #runAsync(Function)} if one wants
* to set a custom {@link Executor} for the transaction when run.
*
* @param retryable the block of logic to execute in a {@link Transaction} against
* this tenant
* @param e the {@link Executor} to use for asynchronous callbacks
* @param <T> the return type of {@code retryable}
*
* @return a {@code CompletableFuture} that will be set to the value returned by the last call
* to {@code retryable}
*
* @see #run(Function)
*/
<T> CompletableFuture<T> runAsync(
Function<? super Transaction, ? extends CompletableFuture<T>> retryable, Executor e);
/**
* Close the {@code Tenant} object and release any associated resources. This must be called at
* least once after the {@code Tenant} object is no longer in use. This can be called multiple
* times, but care should be taken that it is not in use in another thread at the time of the call.
*/
@Override
void close();
}

View File

@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@ -184,7 +185,7 @@ public class AsyncStackTester {
return AsyncUtil.DONE;
}
else if(op == StackOperation.RESET) {
inst.context.newTransaction();
inst.context.resetTransaction();
return AsyncUtil.DONE;
}
else if(op == StackOperation.CANCEL) {
@ -332,9 +333,9 @@ public class AsyncStackTester {
final Transaction oldTr = inst.tr;
CompletableFuture<Void> f = oldTr.onError(err).whenComplete((tr, t) -> {
if(t != null) {
inst.context.newTransaction(oldTr); // Other bindings allow reuse of non-retryable transactions, so we need to emulate that behavior.
inst.context.resetTransaction(oldTr); // Other bindings allow reuse of non-retryable transactions, so we need to emulate that behavior.
}
else if(!inst.setTransaction(oldTr, tr)) {
else if(!inst.replaceTransaction(oldTr, tr)) {
tr.close();
}
}).thenApply(v -> null);
@ -469,6 +470,28 @@ public class AsyncStackTester {
inst.push(ByteBuffer.allocate(8).order(ByteOrder.BIG_ENDIAN).putDouble(value).array());
}, FDB.DEFAULT_EXECUTOR);
}
else if (op == StackOperation.TENANT_CREATE) {
return inst.popParam().thenAcceptAsync(param -> {
byte[] tenantName = (byte[])param;
inst.push(inst.context.db.allocateTenant(tenantName));
}, FDB.DEFAULT_EXECUTOR);
}
else if (op == StackOperation.TENANT_DELETE) {
return inst.popParam().thenAcceptAsync(param -> {
byte[] tenantName = (byte[])param;
inst.push(inst.context.db.deleteTenant(tenantName));
}, FDB.DEFAULT_EXECUTOR);
}
else if (op == StackOperation.TENANT_SET_ACTIVE) {
return inst.popParam().thenAcceptAsync(param -> {
byte[] tenantName = (byte[])param;
inst.context.setTenant(Optional.of(tenantName));
}, FDB.DEFAULT_EXECUTOR);
}
else if (op == StackOperation.TENANT_CLEAR_ACTIVE) {
inst.context.setTenant(Optional.empty());
return AsyncUtil.DONE;
}
else if(op == StackOperation.UNIT_TESTS) {
inst.context.db.options().setLocationCacheSize(100001);
return inst.context.db.runAsync(tr -> {
@ -554,7 +577,7 @@ public class AsyncStackTester {
private static CompletableFuture<Void> executeMutation(final Instruction inst, Function<Transaction, CompletableFuture<Void>> r) {
// run this with a retry loop
return inst.tcx.runAsync(r).thenRunAsync(() -> {
if(inst.isDatabase)
if(inst.isDatabase || inst.isTenant)
inst.push("RESULT_NOT_PRESENT".getBytes());
}, FDB.DEFAULT_EXECUTOR);
}

View File

@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CompletableFuture;
@ -35,6 +36,7 @@ import com.apple.foundationdb.FDBException;
import com.apple.foundationdb.KeySelector;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Tenant;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.tuple.ByteArrayUtil;
import com.apple.foundationdb.tuple.Tuple;
@ -42,15 +44,27 @@ import com.apple.foundationdb.tuple.Tuple;
abstract class Context implements Runnable, AutoCloseable {
final Stack stack = new Stack();
final Database db;
Optional<Tenant> tenant = Optional.empty();
final String preStr;
int instructionIndex = 0;
KeySelector nextKey, endKey;
Long lastVersion = null;
private static class TransactionState {
public Transaction transaction;
public Optional<Tenant> tenant;
public TransactionState(Transaction transaction, Optional<Tenant> tenant) {
this.transaction = transaction;
this.tenant = tenant;
}
}
private String trName;
private List<Thread> children = new LinkedList<>();
private static Map<String, Transaction> transactionMap = new HashMap<>();
private static Map<String, TransactionState> transactionMap = new HashMap<>();
private static Map<Transaction, AtomicInteger> transactionRefCounts = new HashMap<>();
private static Map<byte[], Tenant> tenantMap = new HashMap<>();
Context(Database db, byte[] prefix) {
this.db = db;
@ -86,15 +100,24 @@ abstract class Context implements Runnable, AutoCloseable {
}
}
public synchronized void setTenant(Optional<byte[]> tenantName) {
if (tenantName.isPresent()) {
tenant = Optional.of(tenantMap.computeIfAbsent(tenantName.get(), tn -> db.openTenant(tenantName.get())));
}
else {
tenant = Optional.empty();
}
}
public static synchronized void addTransactionReference(Transaction tr) {
transactionRefCounts.computeIfAbsent(tr, x -> new AtomicInteger(0)).incrementAndGet();
}
private static synchronized Transaction getTransaction(String trName) {
Transaction tr = transactionMap.get(trName);
assert tr != null : "Null transaction";
addTransactionReference(tr);
return tr;
TransactionState state = transactionMap.get(trName);
assert state != null : "Null transaction";
addTransactionReference(state.transaction);
return state.transaction;
}
public Transaction getCurrentTransaction() {
@ -105,59 +128,78 @@ abstract class Context implements Runnable, AutoCloseable {
if(tr != null) {
AtomicInteger count = transactionRefCounts.get(tr);
if(count.decrementAndGet() == 0) {
assert !transactionMap.containsValue(tr);
transactionRefCounts.remove(tr);
tr.close();
}
}
}
private static synchronized void updateTransaction(String trName, Transaction tr) {
releaseTransaction(transactionMap.put(trName, tr));
addTransactionReference(tr);
}
private static synchronized boolean updateTransaction(String trName, Transaction oldTr, Transaction newTr) {
boolean added;
if(oldTr == null) {
added = (transactionMap.putIfAbsent(trName, newTr) == null);
private static Transaction createTransaction(Database db, Optional<Tenant> creatingTenant) {
if (creatingTenant.isPresent()) {
return creatingTenant.get().createTransaction();
}
else {
added = transactionMap.replace(trName, oldTr, newTr);
return db.createTransaction();
}
}
private static synchronized boolean newTransaction(Database db, Optional<Tenant> tenant, String trName, boolean allowReplace) {
TransactionState oldState = transactionMap.get(trName);
if (oldState != null) {
releaseTransaction(oldState.transaction);
}
else if (!allowReplace) {
return false;
}
if(added) {
TransactionState newState = new TransactionState(createTransaction(db, tenant), tenant);
transactionMap.put(trName, newState);
addTransactionReference(newState.transaction);
return true;
}
private static synchronized boolean replaceTransaction(Database db, String trName, Transaction oldTr, Transaction newTr) {
TransactionState trState = transactionMap.get(trName);
assert trState != null : "Null transaction";
if(oldTr == null || trState.transaction == oldTr) {
if(newTr == null) {
newTr = createTransaction(db, trState.tenant);
}
releaseTransaction(trState.transaction);
addTransactionReference(newTr);
releaseTransaction(oldTr);
trState.transaction = newTr;
return true;
}
return false;
}
public void updateCurrentTransaction(Transaction tr) {
updateTransaction(trName, tr);
}
public boolean updateCurrentTransaction(Transaction oldTr, Transaction newTr) {
return updateTransaction(trName, oldTr, newTr);
}
public void newTransaction() {
Transaction tr = db.createTransaction();
updateCurrentTransaction(tr);
newTransaction(db, tenant, trName, true);
}
public void newTransaction(Transaction oldTr) {
Transaction newTr = db.createTransaction();
if(!updateCurrentTransaction(oldTr, newTr)) {
newTr.close();
}
public void replaceTransaction(Transaction tr) {
replaceTransaction(db, trName, null, tr);
}
public boolean replaceTransaction(Transaction oldTr, Transaction newTr) {
return replaceTransaction(db, trName, oldTr, newTr);
}
public void resetTransaction() {
replaceTransaction(db, trName, null, null);
}
public boolean resetTransaction(Transaction oldTr) {
return replaceTransaction(db, trName, oldTr, null);
}
public void switchTransaction(byte[] rawTrName) {
trName = ByteArrayUtil.printable(rawTrName);
newTransaction(null);
newTransaction(db, tenant, trName, false);
}
abstract void executeOperations() throws Throwable;
@ -224,8 +266,12 @@ abstract class Context implements Runnable, AutoCloseable {
@Override
public void close() {
for(Transaction tr : transactionMap.values()) {
tr.close();
for(TransactionState tr : transactionMap.values()) {
tr.transaction.close();
}
for(Tenant tenant : tenantMap.values()) {
tenant.close();
}
}
}

View File

@ -33,11 +33,13 @@ import com.apple.foundationdb.tuple.Tuple;
class Instruction extends Stack {
private static final String SUFFIX_SNAPSHOT = "_SNAPSHOT";
private static final String SUFFIX_DATABASE = "_DATABASE";
private static final String SUFFIX_TENANT = "_TENANT";
final String op;
final Tuple tokens;
final Context context;
final boolean isDatabase;
final boolean isTenant;
final boolean isSnapshot;
final Transaction tr;
final ReadTransaction readTr;
@ -49,14 +51,23 @@ class Instruction extends Stack {
this.tokens = tokens;
String fullOp = tokens.getString(0);
isDatabase = fullOp.endsWith(SUFFIX_DATABASE);
boolean isDatabaseLocal = fullOp.endsWith(SUFFIX_DATABASE);
isTenant = fullOp.endsWith(SUFFIX_TENANT);
isSnapshot = fullOp.endsWith(SUFFIX_SNAPSHOT);
if(isDatabase) {
if(isDatabaseLocal) {
tr = null;
readTr = null;
op = fullOp.substring(0, fullOp.length() - SUFFIX_DATABASE.length());
}
else if(isTenant) {
tr = null;
readTr = null;
op = fullOp.substring(0, fullOp.length() - SUFFIX_TENANT.length());
if (!context.tenant.isPresent()) {
isDatabaseLocal = true;
}
}
else if(isSnapshot) {
tr = context.getCurrentTransaction();
readTr = tr.snapshot();
@ -68,22 +79,24 @@ class Instruction extends Stack {
op = fullOp;
}
tcx = isDatabase ? context.db : tr;
readTcx = isDatabase ? context.db : readTr;
isDatabase = isDatabaseLocal;
tcx = isDatabase ? context.db : isTenant ? context.tenant.get() : tr;
readTcx = isDatabase ? context.db : isTenant ? context.tenant.get() : readTr;
}
boolean setTransaction(Transaction newTr) {
if(!isDatabase) {
context.updateCurrentTransaction(newTr);
boolean replaceTransaction(Transaction newTr) {
if(!isDatabase && !isTenant) {
context.replaceTransaction(newTr);
return true;
}
return false;
}
boolean setTransaction(Transaction oldTr, Transaction newTr) {
if(!isDatabase) {
return context.updateCurrentTransaction(oldTr, newTr);
boolean replaceTransaction(Transaction oldTr, Transaction newTr) {
if(!isDatabase && !isTenant) {
return context.replaceTransaction(oldTr, newTr);
}
return false;

View File

@ -73,5 +73,11 @@ enum StackOperation {
DECODE_DOUBLE,
UNIT_TESTS, /* Possibly unimplemented */
// Tenants
TENANT_CREATE,
TENANT_DELETE,
TENANT_SET_ACTIVE,
TENANT_CLEAR_ACTIVE,
LOG_STACK
}

View File

@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
@ -197,7 +198,7 @@ public class StackTester {
inst.tr.options().setNextWriteNoWriteConflictRange();
}
else if(op == StackOperation.RESET) {
inst.context.newTransaction();
inst.context.resetTransaction();
}
else if(op == StackOperation.CANCEL) {
inst.tr.cancel();
@ -300,12 +301,12 @@ public class StackTester {
try {
Transaction tr = inst.tr.onError(err).join();
if(!inst.setTransaction(tr)) {
if(!inst.replaceTransaction(tr)) {
tr.close();
}
}
catch(Throwable t) {
inst.context.newTransaction(); // Other bindings allow reuse of non-retryable transactions, so we need to emulate that behavior.
inst.context.resetTransaction(); // Other bindings allow reuse of non-retryable transactions, so we need to emulate that behavior.
throw t;
}
@ -418,6 +419,21 @@ public class StackTester {
double value = ((Number)param).doubleValue();
inst.push(ByteBuffer.allocate(8).order(ByteOrder.BIG_ENDIAN).putDouble(value).array());
}
else if (op == StackOperation.TENANT_CREATE) {
byte[] tenantName = (byte[])inst.popParam().join();
inst.push(inst.context.db.allocateTenant(tenantName));
}
else if (op == StackOperation.TENANT_DELETE) {
byte[] tenantName = (byte[])inst.popParam().join();
inst.push(inst.context.db.deleteTenant(tenantName));
}
else if (op == StackOperation.TENANT_SET_ACTIVE) {
byte[] tenantName = (byte[])inst.popParam().join();
inst.context.setTenant(Optional.of(tenantName));
}
else if (op == StackOperation.TENANT_CLEAR_ACTIVE) {
inst.context.setTenant(Optional.empty());
}
else if(op == StackOperation.UNIT_TESTS) {
try {
inst.context.db.options().setLocationCacheSize(100001);
@ -579,7 +595,7 @@ public class StackTester {
private static void executeMutation(Instruction inst, Function<Transaction, Void> r) {
// run this with a retry loop (and commit)
inst.tcx.run(r);
if(inst.isDatabase)
if(inst.isDatabase || inst.isTenant)
inst.push("RESULT_NOT_PRESENT".getBytes());
}