mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 18:02:31 +08:00
Added bindings tests; Protected new SSI endpoints under new 7.0 ProtocolVersion
This commit is contained in:
parent
eb28492900
commit
00e9f8b9bf
@ -157,6 +157,7 @@ class ApiTest(Test):
|
||||
read_conflicts = ['READ_CONFLICT_RANGE', 'READ_CONFLICT_KEY']
|
||||
write_conflicts = ['WRITE_CONFLICT_RANGE', 'WRITE_CONFLICT_KEY', 'DISABLE_WRITE_CONFLICT']
|
||||
txn_sizes = ['GET_APPROXIMATE_SIZE']
|
||||
# storage_metrics = ['GET_ESTIMATED_RANGE_SIZE', 'GET_RANGE_SPLIT_POINTS']
|
||||
storage_metrics = ['GET_ESTIMATED_RANGE_SIZE']
|
||||
|
||||
op_choices += reads
|
||||
@ -553,6 +554,23 @@ class ApiTest(Test):
|
||||
instructions.push_args(key1, key2)
|
||||
instructions.append(op)
|
||||
self.add_strings(1)
|
||||
elif op == 'GET_RANGE_SPLIT_POINTS':
|
||||
# Protect against inverted range and identical keys
|
||||
key1 = self.workspace.pack(self.random.random_tuple(1))
|
||||
key2 = self.workspace.pack(self.random.random_tuple(1))
|
||||
|
||||
while key1 == key2:
|
||||
key1 = self.workspace.pack(self.random.random_tuple(1))
|
||||
key2 = self.workspace.pack(self.random.random_tuple(1))
|
||||
|
||||
if key1 > key2:
|
||||
key1, key2 = key2, key1
|
||||
|
||||
# TODO: randomize chunkSize but should not exceed 100M(shard limit)
|
||||
chunkSize = 10000000 # 10M
|
||||
instructions.push_args(key1, key2, chunkSize)
|
||||
instructions.append(op)
|
||||
self.add_strings(1)
|
||||
|
||||
else:
|
||||
assert False, 'Unknown operation: ' + op
|
||||
|
@ -661,6 +661,33 @@ struct GetEstimatedRangeSize : InstructionFunc {
|
||||
const char* GetEstimatedRangeSize::name = "GET_ESTIMATED_RANGE_SIZE";
|
||||
REGISTER_INSTRUCTION_FUNC(GetEstimatedRangeSize);
|
||||
|
||||
struct GetRangeSplitPoints : InstructionFunc {
|
||||
static const char* name;
|
||||
|
||||
ACTOR static Future<Void> call(Reference<FlowTesterData> data, Reference<InstructionData> instruction) {
|
||||
state std::vector<StackItem> items = data->stack.pop(3);
|
||||
if (items.size() != 3)
|
||||
return Void();
|
||||
|
||||
Standalone<StringRef> s1 = wait(items[0].value);
|
||||
state Standalone<StringRef> beginKey = Tuple::unpack(s1).getString(0);
|
||||
|
||||
Standalone<StringRef> s2 = wait(items[1].value);
|
||||
state Standalone<StringRef> endKey = Tuple::unpack(s2).getString(0);
|
||||
|
||||
Standalone<StringRef> s3 = wait(items[2].value);
|
||||
state int64_t chunkSize = Tuple::unpack(s3).getInt(0);
|
||||
|
||||
Future<FDBStandalone<VectorRef<KeyRef>>> fsplitPoints = instruction->tr->getRangeSplitPoints(KeyRangeRef(beginKey, endKey), chunkSize);
|
||||
FDBStandalone<VectorRef<KeyRef>> splitPoints = wait(fsplitPoints);
|
||||
data->stack.pushTuple(LiteralStringRef("GOT_RANGE_SPLIT_POINTS"));
|
||||
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
const char* GetRangeSplitPoints::name = "GET_RANGE_SPLIT_POINTS";
|
||||
REGISTER_INSTRUCTION_FUNC(GetRangeSplitPoints);
|
||||
|
||||
struct GetKeyFunc : InstructionFunc {
|
||||
static const char* name;
|
||||
|
||||
|
2
bindings/go/go.sum
Normal file
2
bindings/go/go.sum
Normal file
@ -0,0 +1,2 @@
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
@ -579,6 +579,17 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
|
||||
if e != nil {
|
||||
panic(e)
|
||||
}
|
||||
case op == "GET_RANGE_SPLIT_POINTS":
|
||||
r := sm.popKeyRange()
|
||||
chunkSize := sm.waitAndPop().item.(int64)
|
||||
_, e := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
|
||||
_ = rtr.GetRangeSplitPoints(r, chunkSize).MustGet()
|
||||
sm.store(idx, []byte("GOT_RANGE_SPLIT_POINTS"))
|
||||
return nil, nil
|
||||
})
|
||||
if e != nil {
|
||||
panic(e)
|
||||
}
|
||||
case op == "COMMIT":
|
||||
sm.store(idx, sm.currentTransaction().Commit())
|
||||
case op == "RESET":
|
||||
|
@ -23,7 +23,7 @@ package com.apple.foundationdb;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
class KeyArrayResult {
|
||||
public class KeyArrayResult {
|
||||
final List<byte[]> keys;
|
||||
|
||||
KeyArrayResult(byte[] keyBytes, int[] keyLengths) {
|
||||
|
@ -38,6 +38,7 @@ import com.apple.foundationdb.FDB;
|
||||
import com.apple.foundationdb.FDBException;
|
||||
import com.apple.foundationdb.KeySelector;
|
||||
import com.apple.foundationdb.KeyValue;
|
||||
import com.apple.foundationdb.KeyArrayResult;
|
||||
import com.apple.foundationdb.MutationType;
|
||||
import com.apple.foundationdb.Range;
|
||||
import com.apple.foundationdb.StreamingMode;
|
||||
@ -229,6 +230,12 @@ public class AsyncStackTester {
|
||||
inst.push("GOT_ESTIMATED_RANGE_SIZE".getBytes());
|
||||
}, FDB.DEFAULT_EXECUTOR);
|
||||
}
|
||||
else if (op == StackOperation.GET_RANGE_SPLIT_POINTS) {
|
||||
List<Object> params = inst.popParams(3).join();
|
||||
return inst.readTr.getRangeSplitPoints((byte[])params.get(0), (byte[])params.get(1), (long)params.get(2)).thenAcceptAsync(splitPoints -> {
|
||||
inst.push("GOT_RANGE_SPLIT_POINTS".getBytes());
|
||||
}, FDB.DEFAULT_EXECUTOR);
|
||||
}
|
||||
else if(op == StackOperation.GET_RANGE) {
|
||||
return inst.popParams(5).thenComposeAsync(params -> {
|
||||
int limit = StackUtils.getInt(params.get(2));
|
||||
|
@ -57,6 +57,7 @@ enum StackOperation {
|
||||
GET_APPROXIMATE_SIZE,
|
||||
GET_VERSIONSTAMP,
|
||||
GET_ESTIMATED_RANGE_SIZE,
|
||||
GET_RANGE_SPLIT_POINTS,
|
||||
SET_READ_VERSION,
|
||||
ON_ERROR,
|
||||
SUB,
|
||||
|
@ -39,6 +39,7 @@ import com.apple.foundationdb.FDB;
|
||||
import com.apple.foundationdb.FDBException;
|
||||
import com.apple.foundationdb.KeySelector;
|
||||
import com.apple.foundationdb.KeyValue;
|
||||
import com.apple.foundationdb.KeyArrayResult;
|
||||
import com.apple.foundationdb.LocalityUtil;
|
||||
import com.apple.foundationdb.MutationType;
|
||||
import com.apple.foundationdb.Range;
|
||||
@ -211,6 +212,11 @@ public class StackTester {
|
||||
Long size = inst.readTr.getEstimatedRangeSizeBytes((byte[])params.get(0), (byte[])params.get(1)).join();
|
||||
inst.push("GOT_ESTIMATED_RANGE_SIZE".getBytes());
|
||||
}
|
||||
else if (op == StackOperation.GET_RANGE_SPLIT_POINTS) {
|
||||
List<Object> params = inst.popParams(3).join();
|
||||
KeyArrayResult splitPoints = inst.readTr.getRangeSplitPoints((byte[])params.get(0), (byte[])params.get(1), (long)params.get(2)).join();
|
||||
inst.push("GOT_RANGE_SPLIT_POINTS".getBytes());
|
||||
}
|
||||
else if(op == StackOperation.GET_RANGE) {
|
||||
List<Object> params = inst.popParams(5).join();
|
||||
|
||||
|
@ -393,6 +393,10 @@ class Tester:
|
||||
begin, end = inst.pop(2)
|
||||
estimatedSize = obj.get_estimated_range_size_bytes(begin, end).wait()
|
||||
inst.push(b"GOT_ESTIMATED_RANGE_SIZE")
|
||||
elif inst.op == six.u("GET_RANGE_SPLIT_POINTS"):
|
||||
begin, end, chunkSize = inst.pop(3)
|
||||
estimatedSize = obj.get_range_split_points(begin, end, chunkSize).wait()
|
||||
inst.push(b"GOT_RANGE_SPLIT_POINTS")
|
||||
elif inst.op == six.u("GET_KEY"):
|
||||
key, or_equal, offset, prefix = inst.pop(4)
|
||||
result = obj.get_key(fdb.KeySelector(key, or_equal, offset))
|
||||
|
@ -320,6 +320,9 @@ class Tester
|
||||
when "GET_ESTIMATED_RANGE_SIZE"
|
||||
inst.tr.get_estimated_range_size_bytes(inst.wait_and_pop, inst.wait_and_pop).to_i
|
||||
inst.push("GOT_ESTIMATED_RANGE_SIZE")
|
||||
when "GET_RANGE_SPLIT_POINTS"
|
||||
inst.tr.get_range_split_points(inst.wait_and_pop, inst.wait_and_pop, inst.wait_and_pop).length()
|
||||
inst.push("GOT_RANGE_SPLIT_POINTS")
|
||||
when "GET_KEY"
|
||||
selector = FDB::KeySelector.new(inst.wait_and_pop, inst.wait_and_pop, inst.wait_and_pop)
|
||||
prefix = inst.wait_and_pop
|
||||
|
@ -4018,7 +4018,7 @@ ACTOR Future<Standalone<VectorRef<KeyRangeRef>>> getReadHotRanges(Database cx, K
|
||||
partEnd = locations[i].first.end;
|
||||
}
|
||||
ReadHotSubRangeRequest req(KeyRangeRef(partBegin, partEnd));
|
||||
fReplies[i] = loadBalance(locations[i].second, &StorageServerInterface::getReadHotRanges, req,
|
||||
fReplies[i] = loadBalance(locations[i].second->locations(), &StorageServerInterface::getReadHotRanges, req,
|
||||
TaskPriority::DataDistribution);
|
||||
}
|
||||
|
||||
@ -4156,7 +4156,7 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(Database cx, Key
|
||||
partEnd = locations[i].first.end;
|
||||
}
|
||||
SplitRangeRequest req(KeyRangeRef(partBegin, partEnd), chunkSize);
|
||||
fReplies[i] = loadBalance(locations[i].second, &StorageServerInterface::getRangeSplitPoints, req,
|
||||
fReplies[i] = loadBalance(locations[i].second->locations(), &StorageServerInterface::getRangeSplitPoints, req,
|
||||
TaskPriority::DataDistribution);
|
||||
}
|
||||
|
||||
|
@ -100,9 +100,11 @@ struct StorageServerInterface {
|
||||
getKeyValueStoreType = RequestStream<ReplyPromise<KeyValueStoreType>>( getValue.getEndpoint().getAdjustedEndpoint(9) );
|
||||
watchValue = RequestStream<struct WatchValueRequest>( getValue.getEndpoint().getAdjustedEndpoint(10) );
|
||||
getReadHotRanges = RequestStream<struct ReadHotSubRangeRequest>( getValue.getEndpoint().getAdjustedEndpoint(11) );
|
||||
if(ar.protocolVersion().hasRangeSplit()) {
|
||||
getRangeSplitPoints =
|
||||
RequestStream<struct SplitRangeRequest>(getValue.getEndpoint().getAdjustedEndpoint(12));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(Ar::isDeserializing);
|
||||
if constexpr (is_fb_function<Ar>) {
|
||||
@ -129,6 +131,7 @@ struct StorageServerInterface {
|
||||
streams.push_back(getKeyValueStoreType.getReceiver());
|
||||
streams.push_back(watchValue.getReceiver());
|
||||
streams.push_back(getReadHotRanges.getReceiver());
|
||||
streams.push_back(getRangeSplitPoints.getReceiver());
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
};
|
||||
|
@ -474,7 +474,7 @@ struct StorageServerMetrics {
|
||||
std::vector<KeyRangeRef> v = getReadHotRanges(req.keys, SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO,
|
||||
SERVER_KNOBS->READ_HOT_SUB_RANGE_CHUNK_SIZE,
|
||||
SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS);
|
||||
reply.readHotRanges = VectorRef<KeyRangeRef>(v.data(), v.size());
|
||||
reply.readHotRanges.append_deep(reply.readHotRanges.arena(), v.data(), v.size());
|
||||
req.reply.send(reply);
|
||||
}
|
||||
|
||||
@ -503,7 +503,7 @@ struct StorageServerMetrics {
|
||||
SplitRangeReply reply;
|
||||
std::vector<KeyRef> points = getSplitPoints(req.keys, req.chunkSize);
|
||||
|
||||
reply.splitPoints = VectorRef<KeyRef>(points.data(), points.size());
|
||||
reply.splitPoints.append_deep(reply.splitPoints.arena(), points.data(), points.size());
|
||||
req.reply.send(reply);
|
||||
}
|
||||
|
||||
@ -570,6 +570,28 @@ TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/multipleReturnedPoint
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/noneSplitable") {
|
||||
|
||||
int64_t sampleUnit = SERVER_KNOBS->BYTES_READ_UNITS_PER_SAMPLE;
|
||||
StorageServerMetrics ssm;
|
||||
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("A"), 200 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Absolute"), 800 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Apple"), 1000 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Bah"), 20 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Banana"), 80 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Bob"), 200 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("But"), 100 * sampleUnit);
|
||||
ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit);
|
||||
|
||||
vector<KeyRef> t = ssm.getSplitPoints(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 10000 * sampleUnit);
|
||||
|
||||
ASSERT(t.size() == 0);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
||||
TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/chunkTooLarge") {
|
||||
|
||||
int64_t sampleUnit = SERVER_KNOBS->BYTES_READ_UNITS_PER_SAMPLE;
|
||||
|
@ -679,6 +679,7 @@ ACTOR Future<Void> storageServerRollbackRebooter( Future<Void> prevStorageServer
|
||||
DUMPTOKEN(recruited.waitMetrics);
|
||||
DUMPTOKEN(recruited.splitMetrics);
|
||||
DUMPTOKEN(recruited.getReadHotRanges);
|
||||
DUMPTOKEN(recruited.getRangeSplitPoints);
|
||||
DUMPTOKEN(recruited.getStorageMetrics);
|
||||
DUMPTOKEN(recruited.waitFailure);
|
||||
DUMPTOKEN(recruited.getQueuingMetrics);
|
||||
@ -1008,6 +1009,7 @@ ACTOR Future<Void> workerServer(
|
||||
DUMPTOKEN(recruited.waitMetrics);
|
||||
DUMPTOKEN(recruited.splitMetrics);
|
||||
DUMPTOKEN(recruited.getReadHotRanges);
|
||||
DUMPTOKEN(recruited.getRangeSplitPoints);
|
||||
DUMPTOKEN(recruited.getStorageMetrics);
|
||||
DUMPTOKEN(recruited.waitFailure);
|
||||
DUMPTOKEN(recruited.getQueuingMetrics);
|
||||
@ -1318,6 +1320,7 @@ ACTOR Future<Void> workerServer(
|
||||
DUMPTOKEN(recruited.waitMetrics);
|
||||
DUMPTOKEN(recruited.splitMetrics);
|
||||
DUMPTOKEN(recruited.getReadHotRanges);
|
||||
DUMPTOKEN(recruited.getRangeSplitPoints);
|
||||
DUMPTOKEN(recruited.getStorageMetrics);
|
||||
DUMPTOKEN(recruited.waitFailure);
|
||||
DUMPTOKEN(recruited.getQueuingMetrics);
|
||||
|
@ -128,6 +128,7 @@ public: // introduced features
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ReportConflictingKeys);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, SmallEndpoints);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B070000000LL, RangeSplit);
|
||||
};
|
||||
|
||||
// These impact both communications and the deserialization of certain database and IKeyValueStore keys.
|
||||
|
@ -3,4 +3,4 @@ testName=UnitTests
|
||||
startDelay=0
|
||||
useDB=false
|
||||
maxTestCases=0
|
||||
testsMatching=/fdbserver/StorageMetricSample
|
||||
testsMatching=/fdbserver/StorageMetricSample/rangeSplitPoints
|
Loading…
x
Reference in New Issue
Block a user