diff --git a/bindings/bindingtester/tests/api.py b/bindings/bindingtester/tests/api.py index 5e8d2d66a2..934baa0798 100644 --- a/bindings/bindingtester/tests/api.py +++ b/bindings/bindingtester/tests/api.py @@ -157,6 +157,7 @@ class ApiTest(Test): read_conflicts = ['READ_CONFLICT_RANGE', 'READ_CONFLICT_KEY'] write_conflicts = ['WRITE_CONFLICT_RANGE', 'WRITE_CONFLICT_KEY', 'DISABLE_WRITE_CONFLICT'] txn_sizes = ['GET_APPROXIMATE_SIZE'] + # storage_metrics = ['GET_ESTIMATED_RANGE_SIZE', 'GET_RANGE_SPLIT_POINTS'] storage_metrics = ['GET_ESTIMATED_RANGE_SIZE'] op_choices += reads @@ -553,6 +554,23 @@ class ApiTest(Test): instructions.push_args(key1, key2) instructions.append(op) self.add_strings(1) + elif op == 'GET_RANGE_SPLIT_POINTS': + # Protect against inverted range and identical keys + key1 = self.workspace.pack(self.random.random_tuple(1)) + key2 = self.workspace.pack(self.random.random_tuple(1)) + + while key1 == key2: + key1 = self.workspace.pack(self.random.random_tuple(1)) + key2 = self.workspace.pack(self.random.random_tuple(1)) + + if key1 > key2: + key1, key2 = key2, key1 + + # TODO: randomize chunkSize but should not exceed 100M(shard limit) + chunkSize = 10000000 # 10M + instructions.push_args(key1, key2, chunkSize) + instructions.append(op) + self.add_strings(1) else: assert False, 'Unknown operation: ' + op diff --git a/bindings/flow/tester/Tester.actor.cpp b/bindings/flow/tester/Tester.actor.cpp index 578f159f8c..0cbe6480fc 100644 --- a/bindings/flow/tester/Tester.actor.cpp +++ b/bindings/flow/tester/Tester.actor.cpp @@ -661,6 +661,33 @@ struct GetEstimatedRangeSize : InstructionFunc { const char* GetEstimatedRangeSize::name = "GET_ESTIMATED_RANGE_SIZE"; REGISTER_INSTRUCTION_FUNC(GetEstimatedRangeSize); +struct GetRangeSplitPoints : InstructionFunc { + static const char* name; + + ACTOR static Future call(Reference data, Reference instruction) { + state std::vector items = data->stack.pop(3); + if (items.size() != 3) + return Void(); + + Standalone s1 = wait(items[0].value); + state Standalone beginKey = Tuple::unpack(s1).getString(0); + + Standalone s2 = wait(items[1].value); + state Standalone endKey = Tuple::unpack(s2).getString(0); + + Standalone s3 = wait(items[2].value); + state int64_t chunkSize = Tuple::unpack(s3).getInt(0); + + Future>> fsplitPoints = instruction->tr->getRangeSplitPoints(KeyRangeRef(beginKey, endKey), chunkSize); + FDBStandalone> splitPoints = wait(fsplitPoints); + data->stack.pushTuple(LiteralStringRef("GOT_RANGE_SPLIT_POINTS")); + + return Void(); + } +}; +const char* GetRangeSplitPoints::name = "GET_RANGE_SPLIT_POINTS"; +REGISTER_INSTRUCTION_FUNC(GetRangeSplitPoints); + struct GetKeyFunc : InstructionFunc { static const char* name; diff --git a/bindings/go/go.sum b/bindings/go/go.sum new file mode 100644 index 0000000000..3ab73eafae --- /dev/null +++ b/bindings/go/go.sum @@ -0,0 +1,2 @@ +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/bindings/go/src/_stacktester/stacktester.go b/bindings/go/src/_stacktester/stacktester.go index 9737391569..d986ddec53 100644 --- a/bindings/go/src/_stacktester/stacktester.go +++ b/bindings/go/src/_stacktester/stacktester.go @@ -579,6 +579,17 @@ func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) { if e != nil { panic(e) } + case op == "GET_RANGE_SPLIT_POINTS": + r := sm.popKeyRange() + chunkSize := sm.waitAndPop().item.(int64) + _, e := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) { + _ = rtr.GetRangeSplitPoints(r, chunkSize).MustGet() + sm.store(idx, []byte("GOT_RANGE_SPLIT_POINTS")) + return nil, nil + }) + if e != nil { + panic(e) + } case op == "COMMIT": sm.store(idx, sm.currentTransaction().Commit()) case op == "RESET": diff --git a/bindings/java/src/main/com/apple/foundationdb/KeyArrayResult.java b/bindings/java/src/main/com/apple/foundationdb/KeyArrayResult.java index 244435eb00..f63fc16d62 100644 --- a/bindings/java/src/main/com/apple/foundationdb/KeyArrayResult.java +++ b/bindings/java/src/main/com/apple/foundationdb/KeyArrayResult.java @@ -23,7 +23,7 @@ package com.apple.foundationdb; import java.util.ArrayList; import java.util.List; -class KeyArrayResult { +public class KeyArrayResult { final List keys; KeyArrayResult(byte[] keyBytes, int[] keyLengths) { diff --git a/bindings/java/src/test/com/apple/foundationdb/test/AsyncStackTester.java b/bindings/java/src/test/com/apple/foundationdb/test/AsyncStackTester.java index 97defab88f..f584f452a9 100644 --- a/bindings/java/src/test/com/apple/foundationdb/test/AsyncStackTester.java +++ b/bindings/java/src/test/com/apple/foundationdb/test/AsyncStackTester.java @@ -38,6 +38,7 @@ import com.apple.foundationdb.FDB; import com.apple.foundationdb.FDBException; import com.apple.foundationdb.KeySelector; import com.apple.foundationdb.KeyValue; +import com.apple.foundationdb.KeyArrayResult; import com.apple.foundationdb.MutationType; import com.apple.foundationdb.Range; import com.apple.foundationdb.StreamingMode; @@ -229,6 +230,12 @@ public class AsyncStackTester { inst.push("GOT_ESTIMATED_RANGE_SIZE".getBytes()); }, FDB.DEFAULT_EXECUTOR); } + else if (op == StackOperation.GET_RANGE_SPLIT_POINTS) { + List params = inst.popParams(3).join(); + return inst.readTr.getRangeSplitPoints((byte[])params.get(0), (byte[])params.get(1), (long)params.get(2)).thenAcceptAsync(splitPoints -> { + inst.push("GOT_RANGE_SPLIT_POINTS".getBytes()); + }, FDB.DEFAULT_EXECUTOR); + } else if(op == StackOperation.GET_RANGE) { return inst.popParams(5).thenComposeAsync(params -> { int limit = StackUtils.getInt(params.get(2)); diff --git a/bindings/java/src/test/com/apple/foundationdb/test/StackOperation.java b/bindings/java/src/test/com/apple/foundationdb/test/StackOperation.java index 634a217c7f..bece744605 100644 --- a/bindings/java/src/test/com/apple/foundationdb/test/StackOperation.java +++ b/bindings/java/src/test/com/apple/foundationdb/test/StackOperation.java @@ -57,6 +57,7 @@ enum StackOperation { GET_APPROXIMATE_SIZE, GET_VERSIONSTAMP, GET_ESTIMATED_RANGE_SIZE, + GET_RANGE_SPLIT_POINTS, SET_READ_VERSION, ON_ERROR, SUB, diff --git a/bindings/java/src/test/com/apple/foundationdb/test/StackTester.java b/bindings/java/src/test/com/apple/foundationdb/test/StackTester.java index f196301865..0490e2a5fb 100644 --- a/bindings/java/src/test/com/apple/foundationdb/test/StackTester.java +++ b/bindings/java/src/test/com/apple/foundationdb/test/StackTester.java @@ -39,6 +39,7 @@ import com.apple.foundationdb.FDB; import com.apple.foundationdb.FDBException; import com.apple.foundationdb.KeySelector; import com.apple.foundationdb.KeyValue; +import com.apple.foundationdb.KeyArrayResult; import com.apple.foundationdb.LocalityUtil; import com.apple.foundationdb.MutationType; import com.apple.foundationdb.Range; @@ -211,6 +212,11 @@ public class StackTester { Long size = inst.readTr.getEstimatedRangeSizeBytes((byte[])params.get(0), (byte[])params.get(1)).join(); inst.push("GOT_ESTIMATED_RANGE_SIZE".getBytes()); } + else if (op == StackOperation.GET_RANGE_SPLIT_POINTS) { + List params = inst.popParams(3).join(); + KeyArrayResult splitPoints = inst.readTr.getRangeSplitPoints((byte[])params.get(0), (byte[])params.get(1), (long)params.get(2)).join(); + inst.push("GOT_RANGE_SPLIT_POINTS".getBytes()); + } else if(op == StackOperation.GET_RANGE) { List params = inst.popParams(5).join(); diff --git a/bindings/python/tests/tester.py b/bindings/python/tests/tester.py index f6eab9c207..6aa41dea4a 100644 --- a/bindings/python/tests/tester.py +++ b/bindings/python/tests/tester.py @@ -393,6 +393,10 @@ class Tester: begin, end = inst.pop(2) estimatedSize = obj.get_estimated_range_size_bytes(begin, end).wait() inst.push(b"GOT_ESTIMATED_RANGE_SIZE") + elif inst.op == six.u("GET_RANGE_SPLIT_POINTS"): + begin, end, chunkSize = inst.pop(3) + estimatedSize = obj.get_range_split_points(begin, end, chunkSize).wait() + inst.push(b"GOT_RANGE_SPLIT_POINTS") elif inst.op == six.u("GET_KEY"): key, or_equal, offset, prefix = inst.pop(4) result = obj.get_key(fdb.KeySelector(key, or_equal, offset)) diff --git a/bindings/ruby/tests/tester.rb b/bindings/ruby/tests/tester.rb index 3860e7d190..e653bdaf93 100755 --- a/bindings/ruby/tests/tester.rb +++ b/bindings/ruby/tests/tester.rb @@ -320,6 +320,9 @@ class Tester when "GET_ESTIMATED_RANGE_SIZE" inst.tr.get_estimated_range_size_bytes(inst.wait_and_pop, inst.wait_and_pop).to_i inst.push("GOT_ESTIMATED_RANGE_SIZE") + when "GET_RANGE_SPLIT_POINTS" + inst.tr.get_range_split_points(inst.wait_and_pop, inst.wait_and_pop, inst.wait_and_pop).length() + inst.push("GOT_RANGE_SPLIT_POINTS") when "GET_KEY" selector = FDB::KeySelector.new(inst.wait_and_pop, inst.wait_and_pop, inst.wait_and_pop) prefix = inst.wait_and_pop diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index a3a5680803..747700491b 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -4018,7 +4018,7 @@ ACTOR Future>> getReadHotRanges(Database cx, K partEnd = locations[i].first.end; } ReadHotSubRangeRequest req(KeyRangeRef(partBegin, partEnd)); - fReplies[i] = loadBalance(locations[i].second, &StorageServerInterface::getReadHotRanges, req, + fReplies[i] = loadBalance(locations[i].second->locations(), &StorageServerInterface::getReadHotRanges, req, TaskPriority::DataDistribution); } @@ -4156,7 +4156,7 @@ ACTOR Future>> getRangeSplitPoints(Database cx, Key partEnd = locations[i].first.end; } SplitRangeRequest req(KeyRangeRef(partBegin, partEnd), chunkSize); - fReplies[i] = loadBalance(locations[i].second, &StorageServerInterface::getRangeSplitPoints, req, + fReplies[i] = loadBalance(locations[i].second->locations(), &StorageServerInterface::getRangeSplitPoints, req, TaskPriority::DataDistribution); } diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index a89018ce98..41dcf4b7de 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -100,8 +100,10 @@ struct StorageServerInterface { getKeyValueStoreType = RequestStream>( getValue.getEndpoint().getAdjustedEndpoint(9) ); watchValue = RequestStream( getValue.getEndpoint().getAdjustedEndpoint(10) ); getReadHotRanges = RequestStream( getValue.getEndpoint().getAdjustedEndpoint(11) ); - getRangeSplitPoints = - RequestStream(getValue.getEndpoint().getAdjustedEndpoint(12)); + if(ar.protocolVersion().hasRangeSplit()) { + getRangeSplitPoints = + RequestStream(getValue.getEndpoint().getAdjustedEndpoint(12)); + } } } else { ASSERT(Ar::isDeserializing); @@ -129,6 +131,7 @@ struct StorageServerInterface { streams.push_back(getKeyValueStoreType.getReceiver()); streams.push_back(watchValue.getReceiver()); streams.push_back(getReadHotRanges.getReceiver()); + streams.push_back(getRangeSplitPoints.getReceiver()); FlowTransport::transport().addEndpoints(streams); } }; diff --git a/fdbserver/StorageMetrics.actor.h b/fdbserver/StorageMetrics.actor.h index 0ac479bcd1..f44c2b1d7a 100644 --- a/fdbserver/StorageMetrics.actor.h +++ b/fdbserver/StorageMetrics.actor.h @@ -474,7 +474,7 @@ struct StorageServerMetrics { std::vector v = getReadHotRanges(req.keys, SERVER_KNOBS->SHARD_MAX_READ_DENSITY_RATIO, SERVER_KNOBS->READ_HOT_SUB_RANGE_CHUNK_SIZE, SERVER_KNOBS->SHARD_READ_HOT_BANDWITH_MIN_PER_KSECONDS); - reply.readHotRanges = VectorRef(v.data(), v.size()); + reply.readHotRanges.append_deep(reply.readHotRanges.arena(), v.data(), v.size()); req.reply.send(reply); } @@ -503,7 +503,7 @@ struct StorageServerMetrics { SplitRangeReply reply; std::vector points = getSplitPoints(req.keys, req.chunkSize); - reply.splitPoints = VectorRef(points.data(), points.size()); + reply.splitPoints.append_deep(reply.splitPoints.arena(), points.data(), points.size()); req.reply.send(reply); } @@ -570,6 +570,28 @@ TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/multipleReturnedPoint return Void(); } +TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/noneSplitable") { + + int64_t sampleUnit = SERVER_KNOBS->BYTES_READ_UNITS_PER_SAMPLE; + StorageServerMetrics ssm; + + ssm.byteSample.sample.insert(LiteralStringRef("A"), 200 * sampleUnit); + ssm.byteSample.sample.insert(LiteralStringRef("Absolute"), 800 * sampleUnit); + ssm.byteSample.sample.insert(LiteralStringRef("Apple"), 1000 * sampleUnit); + ssm.byteSample.sample.insert(LiteralStringRef("Bah"), 20 * sampleUnit); + ssm.byteSample.sample.insert(LiteralStringRef("Banana"), 80 * sampleUnit); + ssm.byteSample.sample.insert(LiteralStringRef("Bob"), 200 * sampleUnit); + ssm.byteSample.sample.insert(LiteralStringRef("But"), 100 * sampleUnit); + ssm.byteSample.sample.insert(LiteralStringRef("Cat"), 300 * sampleUnit); + + vector t = ssm.getSplitPoints(KeyRangeRef(LiteralStringRef("A"), LiteralStringRef("C")), 10000 * sampleUnit); + + ASSERT(t.size() == 0); + + return Void(); +} + + TEST_CASE("/fdbserver/StorageMetricSample/rangeSplitPoints/chunkTooLarge") { int64_t sampleUnit = SERVER_KNOBS->BYTES_READ_UNITS_PER_SAMPLE; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index a7307df52c..ea7888992c 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -679,6 +679,7 @@ ACTOR Future storageServerRollbackRebooter( Future prevStorageServer DUMPTOKEN(recruited.waitMetrics); DUMPTOKEN(recruited.splitMetrics); DUMPTOKEN(recruited.getReadHotRanges); + DUMPTOKEN(recruited.getRangeSplitPoints); DUMPTOKEN(recruited.getStorageMetrics); DUMPTOKEN(recruited.waitFailure); DUMPTOKEN(recruited.getQueuingMetrics); @@ -1008,6 +1009,7 @@ ACTOR Future workerServer( DUMPTOKEN(recruited.waitMetrics); DUMPTOKEN(recruited.splitMetrics); DUMPTOKEN(recruited.getReadHotRanges); + DUMPTOKEN(recruited.getRangeSplitPoints); DUMPTOKEN(recruited.getStorageMetrics); DUMPTOKEN(recruited.waitFailure); DUMPTOKEN(recruited.getQueuingMetrics); @@ -1318,6 +1320,7 @@ ACTOR Future workerServer( DUMPTOKEN(recruited.waitMetrics); DUMPTOKEN(recruited.splitMetrics); DUMPTOKEN(recruited.getReadHotRanges); + DUMPTOKEN(recruited.getRangeSplitPoints); DUMPTOKEN(recruited.getStorageMetrics); DUMPTOKEN(recruited.waitFailure); DUMPTOKEN(recruited.getQueuingMetrics); diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index c2bb47f78c..785f8c24c6 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -128,6 +128,7 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ReportConflictingKeys); PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, SmallEndpoints); PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole); + PROTOCOL_VERSION_FEATURE(0x0FDB00B070000000LL, RangeSplit); }; // These impact both communications and the deserialization of certain database and IKeyValueStore keys. diff --git a/tests/StorageMetricsSampleTests.txt b/tests/StorageMetricsSampleTests.txt index 2207e0a5d3..0b0af55a65 100644 --- a/tests/StorageMetricsSampleTests.txt +++ b/tests/StorageMetricsSampleTests.txt @@ -3,4 +3,4 @@ testName=UnitTests startDelay=0 useDB=false maxTestCases=0 -testsMatching=/fdbserver/StorageMetricSample \ No newline at end of file +testsMatching=/fdbserver/StorageMetricSample/rangeSplitPoints \ No newline at end of file