Pass along versions with GetKeyServerLocationsRequests, if we have them.

This commit is contained in:
A.J. Beamon 2022-03-08 15:41:22 -08:00
parent 502209229c
commit 06f088e088

View File

@ -2668,7 +2668,8 @@ ACTOR Future<KeyRangeLocationInfo> getKeyLocation_internal(Database cx,
SpanID spanID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
Reverse isBackward) {
Reverse isBackward,
Version version) {
state Span span("NAPI:getKeyLocation"_loc, spanID);
if (isBackward) {
@ -2694,6 +2695,7 @@ ACTOR Future<KeyRangeLocationInfo> getKeyLocation_internal(Database cx,
Optional<KeyRef>(),
100,
isBackward,
version,
key.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
@ -2760,11 +2762,12 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
SpanID spanID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies,
Reverse isBackward = Reverse::False) {
Reverse isBackward,
Version version) {
// we first check whether this range is cached
Optional<KeyRangeLocationInfo> locationInfo = cx->getCachedLocation(tenant, key, isBackward);
if (!locationInfo.present()) {
return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward);
return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version);
}
bool onlyEndpointFailedAndNeedRefresh = false;
@ -2778,7 +2781,7 @@ Future<KeyRangeLocationInfo> getKeyLocation(Database const& cx,
cx->invalidateCache(locationInfo.get().tenantEntry.prefix, key);
// Refresh the cache with a new getKeyLocations made to proxies.
return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward);
return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version);
}
return locationInfo.get();
@ -2788,8 +2791,9 @@ template <class F>
Future<KeyRangeLocationInfo> getKeyLocation(Reference<TransactionState> trState,
Key const& key,
F StorageServerInterface::*member,
Reverse isBackward = Reverse::False,
UseTenant useTenant = UseTenant::True) {
Reverse isBackward,
UseTenant useTenant,
Version version) {
auto f = getKeyLocation(trState->cx,
useTenant ? trState->tenant : Optional<TenantName>(),
key,
@ -2797,7 +2801,8 @@ Future<KeyRangeLocationInfo> getKeyLocation(Reference<TransactionState> trState,
trState->spanID,
trState->debugID,
trState->useProvisionalProxies,
isBackward);
isBackward,
version);
if (trState->tenant.present() && useTenant) {
return map(f, [trState](const KeyRangeLocationInfo& locationInfo) {
@ -2817,7 +2822,8 @@ ACTOR Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations_internal(
Reverse reverse,
SpanID spanID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies) {
UseProvisionalProxies useProvisionalProxies,
Version version) {
state Span span("NAPI:getKeyRangeLocations"_loc, spanID);
if (debugID.present())
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before");
@ -2836,6 +2842,7 @@ ACTOR Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations_internal(
keys.end,
limit,
reverse,
version,
keys.arena()),
TaskPriority::DefaultPromiseEndpoint))) {
++cx->transactionKeyServerLocationRequestsCompleted;
@ -2888,13 +2895,15 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c
F StorageServerInterface::*member,
SpanID const& spanID,
Optional<UID> const& debugID,
UseProvisionalProxies useProvisionalProxies) {
UseProvisionalProxies useProvisionalProxies,
Version version) {
ASSERT(!keys.empty());
std::vector<KeyRangeLocationInfo> locations;
if (!cx->getCachedLocations(tenant, keys, locations, limit, reverse)) {
return getKeyRangeLocations_internal(cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies);
return getKeyRangeLocations_internal(
cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version);
}
bool foundFailed = false;
@ -2914,7 +2923,8 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Database const& c
if (foundFailed) {
// Refresh the cache with a new getKeyRangeLocations made to proxies.
return getKeyRangeLocations_internal(cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies);
return getKeyRangeLocations_internal(
cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version);
}
return locations;
@ -2926,7 +2936,8 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Reference<Transac
int limit,
Reverse reverse,
F StorageServerInterface::*member,
UseTenant useTenant = UseTenant::True) {
UseTenant useTenant,
Version version) {
auto f = getKeyRangeLocations(trState->cx,
useTenant ? trState->tenant : Optional<TenantName>(),
keys,
@ -2935,7 +2946,8 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Reference<Transac
member,
trState->spanID,
trState->debugID,
trState->useProvisionalProxies);
trState->useProvisionalProxies,
version);
if (trState->tenant.present() && useTenant) {
return map(f, [trState](const std::vector<KeyRangeLocationInfo>& locationInfo) {
@ -2948,9 +2960,12 @@ Future<std::vector<KeyRangeLocationInfo>> getKeyRangeLocations(Reference<Transac
}
}
ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange keys) {
ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange keys, Future<Version> fVersion) {
state int totalRanges = 0;
state int totalRequests = 0;
state Version version = wait(fVersion);
loop {
std::vector<KeyRangeLocationInfo> locations =
wait(getKeyRangeLocations_internal(trState->cx,
@ -2960,7 +2975,8 @@ ACTOR Future<Void> warmRange_impl(Reference<TransactionState> trState, KeyRange
Reverse::False,
trState->spanID,
trState->debugID,
trState->useProvisionalProxies));
trState->useProvisionalProxies,
version));
totalRanges += CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT;
totalRequests++;
if (locations.size() == 0 || totalRanges >= trState->cx->locationCacheSize ||
@ -3047,7 +3063,7 @@ TenantInfo TransactionState::getTenantInfo() const {
}
Future<Void> Transaction::warmRange(KeyRange keys) {
return warmRange_impl(trState, keys);
return warmRange_impl(trState, keys, getReadVersion());
}
ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
@ -3066,7 +3082,7 @@ ACTOR Future<Optional<Value>> getValue(Reference<TransactionState> trState,
loop {
state KeyRangeLocationInfo locationInfo =
wait(getKeyLocation(trState, key, &StorageServerInterface::getValue, Reverse::False, useTenant));
wait(getKeyLocation(trState, key, &StorageServerInterface::getValue, Reverse::False, useTenant, ver));
state Optional<UID> getValueID = Optional<UID>();
state uint64_t startTime;
@ -3207,8 +3223,12 @@ ACTOR Future<Key> getKey(Reference<TransactionState> trState,
}
Key locationKey(k.getKey(), k.arena());
state KeyRangeLocationInfo locationInfo = wait(getKeyLocation(
trState, locationKey, &StorageServerInterface::getKey, Reverse{ k.isBackward() }, useTenant));
state KeyRangeLocationInfo locationInfo = wait(getKeyLocation(trState,
locationKey,
&StorageServerInterface::getKey,
Reverse{ k.isBackward() },
useTenant,
version.get()));
try {
if (getKeyID.present())
@ -3337,7 +3357,9 @@ ACTOR Future<Version> watchValue(Database cx, Reference<const WatchParameters> p
&StorageServerInterface::watchValue,
parameters->spanID,
parameters->debugID,
parameters->useProvisionalProxies));
parameters->useProvisionalProxies,
Reverse::False,
parameters->version));
try {
state Optional<UID> watchValueID = Optional<UID>();
@ -3620,7 +3642,8 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT,
reverse,
getRangeRequestStream<GetKeyValuesFamilyRequest>(),
useTenant));
useTenant,
version));
ASSERT(locations.size());
state int shard = 0;
loop {
@ -3989,8 +4012,13 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
Key locationKey = reverse ? Key(end.getKey(), end.arena()) : Key(begin.getKey(), begin.arena());
Reverse locationBackward{ reverse ? (end - 1).isBackward() : begin.isBackward() };
state KeyRangeLocationInfo beginServer = wait(getKeyLocation(
trState, locationKey, getRangeRequestStream<GetKeyValuesFamilyRequest>(), locationBackward, useTenant));
state KeyRangeLocationInfo beginServer =
wait(getKeyLocation(trState,
locationKey,
getRangeRequestStream<GetKeyValuesFamilyRequest>(),
locationBackward,
useTenant,
version));
state KeyRange shard = beginServer.range;
state bool modifiedSelectors = false;
state GetKeyValuesFamilyRequest req;
@ -4424,8 +4452,14 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
Reverse reverse,
SpanID spanContext) {
loop {
state std::vector<KeyRangeLocationInfo> locations = wait(getKeyRangeLocations(
trState, keys, CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT, reverse, &StorageServerInterface::getKeyValuesStream));
state std::vector<KeyRangeLocationInfo> locations =
wait(getKeyRangeLocations(trState,
keys,
CLIENT_KNOBS->GET_RANGE_SHARD_LIMIT,
reverse,
&StorageServerInterface::getKeyValuesStream,
UseTenant::True,
version));
ASSERT(locations.size());
state int shard = 0;
loop {
@ -4685,7 +4719,8 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
ACTOR Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(Reference<TransactionState> trState,
KeyRange keys,
int64_t chunkSize);
int64_t chunkSize,
Version version);
static KeyRange intersect(KeyRangeRef lhs, KeyRangeRef rhs) {
return KeyRange(KeyRangeRef(std::max(lhs.begin, rhs.begin), std::min(lhs.end, rhs.end)));
@ -4734,11 +4769,11 @@ ACTOR Future<Void> getRangeStream(Reference<TransactionState> trState,
state std::vector<Future<Void>> outstandingRequests;
while (b < e) {
state KeyRangeLocationInfo locationInfo =
wait(getKeyLocation(trState, reverse ? e : b, &StorageServerInterface::getKeyValuesStream, reverse));
state KeyRangeLocationInfo locationInfo = wait(getKeyLocation(
trState, reverse ? e : b, &StorageServerInterface::getKeyValuesStream, reverse, UseTenant::True, version));
state KeyRange shardIntersection = intersect(locationInfo.range, KeyRangeRef(b, e));
state Standalone<VectorRef<KeyRef>> splitPoints =
wait(getRangeSplitPoints(trState, shardIntersection, CLIENT_KNOBS->RANGESTREAM_FRAGMENT_SIZE));
wait(getRangeSplitPoints(trState, shardIntersection, CLIENT_KNOBS->RANGESTREAM_FRAGMENT_SIZE, version));
state std::vector<KeyRange> toSend;
// state std::vector<Future<std::list<KeyRangeRef>::iterator>> outstandingRequests;
@ -4933,18 +4968,20 @@ void Watch::setWatch(Future<Void> watchFuture) {
onSetWatchTrigger.send(Void());
}
ACTOR Future<TenantInfo> getTenantMetadata(Reference<TransactionState> trState, Key key) {
KeyRangeLocationInfo locationInfo = wait(getKeyLocation(trState, key, &StorageServerInterface::getValue));
ACTOR Future<TenantInfo> getTenantMetadata(Reference<TransactionState> trState, Key key, Future<Version> fVersion) {
state Version version = wait(fVersion);
KeyRangeLocationInfo locationInfo =
wait(getKeyLocation(trState, key, &StorageServerInterface::getValue, Reverse::False, UseTenant::True, version));
return trState->getTenantInfo();
}
Future<TenantInfo> populateAndGetTenant(Reference<TransactionState> trState, Key const& key) {
Future<TenantInfo> populateAndGetTenant(Reference<TransactionState> trState, Key const& key, Future<Version> version) {
if (!trState->tenant.present()) {
return TenantInfo();
} else if (trState->tenantId != TenantInfo::INVALID_TENANT) {
return trState->getTenantInfo();
} else {
return getTenantMetadata(trState, key);
return getTenantMetadata(trState, key, version);
}
}
@ -5008,14 +5045,15 @@ Future<Void> Transaction::watch(Reference<Watch> watch) {
trState->cx->addWatch();
watches.push_back(watch);
return ::watch(watch,
trState->cx,
populateAndGetTenant(trState, watch->key),
trState->options.readTags,
trState->spanID,
trState->taskID,
trState->debugID,
trState->useProvisionalProxies);
return ::watch(
watch,
trState->cx,
populateAndGetTenant(trState, watch->key, readVersion.isValid() ? readVersion : Future<Version>(latestVersion)),
trState->options.readTags,
trState->spanID,
trState->taskID,
trState->debugID,
trState->useProvisionalProxies);
}
ACTOR Future<Standalone<VectorRef<const char*>>> getAddressesForKeyActor(Reference<TransactionState> trState,
@ -5025,7 +5063,9 @@ ACTOR Future<Standalone<VectorRef<const char*>>> getAddressesForKeyActor(Referen
state Key resolvedKey = key;
if (trState->tenant.present()) {
KeyRangeLocationInfo locationInfo = wait(getKeyLocation(trState, ""_sr, &StorageServerInterface::getValue));
state Version version = wait(ver);
KeyRangeLocationInfo locationInfo = wait(getKeyLocation(
trState, ""_sr, &StorageServerInterface::getValue, Reverse::False, UseTenant::True, version));
resolvedKey = key.withPrefix(locationInfo.tenantEntry.prefix);
}
@ -5712,8 +5752,14 @@ ACTOR Future<Optional<ClientTrCommitCostEstimation>> estimateCommitCosts(Referen
++trCommitCosts.expensiveCostEstCount;
++trState->cx->transactionsExpensiveClearCostEstCount;
} else {
std::vector<KeyRangeLocationInfo> locations = wait(getKeyRangeLocations(
trState, keyRange, CLIENT_KNOBS->TOO_MANY, Reverse::False, &StorageServerInterface::getShardState));
std::vector<KeyRangeLocationInfo> locations =
wait(getKeyRangeLocations(trState,
keyRange,
CLIENT_KNOBS->TOO_MANY,
Reverse::False,
&StorageServerInterface::getShardState,
UseTenant::True,
latestVersion));
if (locations.empty()) {
continue;
}
@ -5809,7 +5855,12 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
state Key tenantPrefix;
if (trState->tenant.present()) {
KeyRangeLocationInfo locationInfo = wait(getKeyLocation(trState, ""_sr, &StorageServerInterface::getValue));
KeyRangeLocationInfo locationInfo = wait(getKeyLocation(trState,
""_sr,
&StorageServerInterface::getValue,
Reverse::False,
UseTenant::True,
req.transaction.read_snapshot));
applyTenantPrefix(req, locationInfo.tenantEntry.prefix);
tenantPrefix = locationInfo.tenantEntry.prefix;
}
@ -6848,7 +6899,8 @@ ACTOR Future<StorageMetrics> getStorageMetricsLargeKeyRange(Database cx, KeyRang
&StorageServerInterface::waitMetrics,
span.context,
Optional<UID>(),
UseProvisionalProxies::False));
UseProvisionalProxies::False,
latestVersion));
state int nLocs = locations.size();
state std::vector<Future<StorageMetrics>> fx(nLocs);
state StorageMetrics total;
@ -6949,7 +7001,8 @@ ACTOR Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> getReadHotRanges(Da
&StorageServerInterface::getReadHotRanges,
span.context,
Optional<UID>(),
UseProvisionalProxies::False));
UseProvisionalProxies::False,
latestVersion));
try {
// TODO: how to handle this?
// This function is called whenever a shard becomes read-hot. But somehow the shard was splitted across more
@ -7019,7 +7072,8 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(Databa
&StorageServerInterface::waitMetrics,
span.context,
Optional<UID>(),
UseProvisionalProxies::False));
UseProvisionalProxies::False,
latestVersion));
if (expectedShardCount >= 0 && locations.size() != expectedShardCount) {
return std::make_pair(Optional<StorageMetrics>(), locations.size());
}
@ -7117,12 +7171,19 @@ Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> DatabaseContext::getReadH
ACTOR Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(Reference<TransactionState> trState,
KeyRange keys,
int64_t chunkSize) {
int64_t chunkSize,
Version version) {
state Span span("NAPI:GetRangeSplitPoints"_loc, trState->spanID);
loop {
state std::vector<KeyRangeLocationInfo> locations = wait(getKeyRangeLocations(
trState, keys, CLIENT_KNOBS->TOO_MANY, Reverse::False, &StorageServerInterface::getRangeSplitPoints));
state std::vector<KeyRangeLocationInfo> locations =
wait(getKeyRangeLocations(trState,
keys,
CLIENT_KNOBS->TOO_MANY,
Reverse::False,
&StorageServerInterface::getRangeSplitPoints,
UseTenant::True,
version));
try {
state int nLocs = locations.size();
state std::vector<Future<SplitRangeReply>> fReplies(nLocs);
@ -7174,7 +7235,8 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> getRangeSplitPoints(Reference<Transa
}
Future<Standalone<VectorRef<KeyRef>>> Transaction::getRangeSplitPoints(KeyRange const& keys, int64_t chunkSize) {
return ::getRangeSplitPoints(trState, keys, chunkSize);
return ::getRangeSplitPoints(
trState, keys, chunkSize, readVersion.isValid() && readVersion.isReady() ? readVersion.get() : latestVersion);
}
#define BG_REQUEST_DEBUG false
@ -7441,7 +7503,8 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> splitStorageMetrics(Database cx,
&StorageServerInterface::splitMetrics,
span.context,
Optional<UID>(),
UseProvisionalProxies::False));
UseProvisionalProxies::False,
latestVersion));
state StorageMetrics used;
state Standalone<VectorRef<KeyRef>> results;
@ -8026,7 +8089,8 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
&StorageServerInterface::changeFeedStream,
span.context,
Optional<UID>(),
UseProvisionalProxies::False));
UseProvisionalProxies::False,
latestVersion));
if (locations.size() >= CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT) {
ASSERT_WE_THINK(false);
@ -8210,7 +8274,8 @@ ACTOR Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeedsA
&StorageServerInterface::overlappingChangeFeeds,
span.context,
Optional<UID>(),
UseProvisionalProxies::False));
UseProvisionalProxies::False,
latestVersion));
if (locations.size() >= CLIENT_KNOBS->CHANGE_FEED_LOCATION_LIMIT) {
TraceEvent(SevError, "OverlappingRangeTooLarge")
@ -8292,7 +8357,8 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, Ke
&StorageServerInterface::changeFeedPop,
span.context,
Optional<UID>(),
UseProvisionalProxies::False));
UseProvisionalProxies::False,
latestVersion));
if (locations.size() > 2) {
wait(popChangeFeedBackup(cx, rangeID, version));