Track shard moves for version vector (#11977)

* Track shard moves for version vector

* Don't broadcast to all TL when a different CP had a metadata mutation, unless on shard moves

* update lastShardMove on resolver

* Respond to review comments

---------

Co-authored-by: Dan Lambright <hlambright@apple.com>
This commit is contained in:
Dan Lambright 2025-03-11 13:19:57 -04:00 committed by GitHub
parent 6ee6e0bd7f
commit 8c6f8c1403
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 34 additions and 11 deletions

View File

@ -124,6 +124,7 @@ struct ResolutionRequestBuilder {
Version version,
Version prevVersion,
Version lastReceivedVersion,
Version lastShardMove,
Span& parentSpan)
: self(self), requests(self->resolvers.size()) {
for (auto& req : requests) {
@ -131,6 +132,7 @@ struct ResolutionRequestBuilder {
req.prevVersion = prevVersion;
req.version = version;
req.lastReceivedVersion = lastReceivedVersion;
req.lastShardMove = lastShardMove;
}
}
@ -728,6 +730,8 @@ struct CommitBatchContext {
bool rangeLockEnabled();
Version lastShardMove;
private:
void evaluateBatchSize();
};
@ -809,6 +813,7 @@ inline bool shouldBackup(MutationRef const& m) {
std::set<Tag> CommitBatchContext::getWrittenTagsPreResolution() {
std::set<Tag> transactionTags;
std::vector<Tag> cacheVector = { cacheTag };
lastShardMove = pProxyCommitData->lastShardMove;
if (pProxyCommitData->txnStateStore->getReplaceContent()) {
// return empty set if txnStateStore will snapshot.
// empty sets are sent to all logs.
@ -1017,7 +1022,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
self->commitVersion = versionReply.version;
self->prevVersion = versionReply.prevVersion;
//TraceEvent("CPGetVersion", pProxyCommitData->dbgid).detail("Master", pProxyCommitData->master.id().toString()).detail("CommitVersion", self->commitVersion).detail("PrvVersion", self->prevVersion);
// TraceEvent("CPGetVersion", pProxyCommitData->dbgid).detail("Master", pProxyCommitData->master.id().toString()).detail("CommitVersion", self->commitVersion).detail("PrvVersion", self->prevVersion);
for (auto it : versionReply.resolverChanges) {
auto rs = pProxyCommitData->keyResolvers.modify(it.range);
@ -1093,8 +1098,12 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
std::vector<CommitTransactionRequest>& trs = self->trs;
state Span span("MP:getResolution"_loc, self->span.context);
ResolutionRequestBuilder requests(
pProxyCommitData, self->commitVersion, self->prevVersion, pProxyCommitData->version.get(), span);
ResolutionRequestBuilder requests(pProxyCommitData,
self->commitVersion,
self->prevVersion,
pProxyCommitData->version.get(),
self->lastShardMove,
span);
int conflictRangeCount = 0;
self->maxTransactionBytes = 0;
for (int t = 0; t < trs.size(); t++) {
@ -1775,6 +1784,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
// TraceEvent("ResolverReturn").detail("ReturnTags",reply.writtenTags).detail("TPCVsize",reply.tpcvMap.size()).detail("ReqTags",self->writtenTagsPreResolution);
self->tpcvMap = reply.tpcvMap;
self->pProxyCommitData->lastShardMove = reply.lastShardMove;
// extract push locations from tpcv
std::vector<int> fromLocations;

View File

@ -81,19 +81,18 @@ public:
Version firstUnseenVersion,
Version commitVersion,
bool initialShardChanged) {
bool shardChangedOrStateTxn = initialShardChanged;
bool shardChanged = initialShardChanged;
auto stateTransactionItr = recentStateTransactions.lower_bound(firstUnseenVersion);
auto endItr = recentStateTransactions.lower_bound(commitVersion);
// Resolver only sends back prior state txns back, because the proxy
// sends this request has them and will apply them via applyMetadataToCommittedTransactions();
// and other proxies will get this version's state txns as a prior version.
for (; stateTransactionItr != endItr; ++stateTransactionItr) {
shardChangedOrStateTxn =
shardChangedOrStateTxn || stateTransactionItr->value.first || stateTransactionItr->value.second.size();
shardChanged = shardChanged || stateTransactionItr->value.first;
reply->stateMutations.push_back(reply->arena, stateTransactionItr->value.second);
reply->arena.dependsOn(stateTransactionItr->value.second.arena());
}
return shardChangedOrStateTxn;
return shardChanged;
}
bool empty() const { return recentStateTransactionSizes.empty(); }
@ -189,6 +188,8 @@ struct Resolver : ReferenceCounted<Resolver> {
EncryptionAtRestMode encryptMode;
Version lastShardMove;
Resolver(UID dbgid, int commitProxyCount, int resolverCount, EncryptionAtRestMode encryptMode)
: dbgid(dbgid), commitProxyCount(commitProxyCount), resolverCount(resolverCount), encryptMode(encryptMode),
version(-1), conflictSet(newConflictSet()), iopsSample(SERVER_KNOBS->KEY_BYTES_PER_SAMPLE),
@ -206,7 +207,8 @@ struct Resolver : ReferenceCounted<Resolver> {
queueWaitLatencyDist(Histogram::getHistogram("Resolver"_sr, "QueueWait"_sr, Histogram::Unit::milliseconds)),
computeTimeDist(Histogram::getHistogram("Resolver"_sr, "ComputeTime"_sr, Histogram::Unit::milliseconds)),
// Distribution of queue depths, with knowledge that Histogram has 32 buckets, and each bucket will have size 1.
queueDepthDist(Histogram::getHistogram("Resolver"_sr, "QueueDepth"_sr, Histogram::Unit::countLinear, 0, 31)) {
queueDepthDist(Histogram::getHistogram("Resolver"_sr, "QueueDepth"_sr, Histogram::Unit::countLinear, 0, 31)),
lastShardMove(invalidVersion) {
specialCounter(cc, "Version", [this]() { return this->version.get(); });
specialCounter(cc, "NeededVersion", [this]() { return this->neededVersion.get(); });
specialCounter(cc, "TotalStateBytes", [this]() { return this->totalStateBytes.get(); });
@ -431,7 +433,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
// If shardChanged at or before this commit version, the proxy may have computed
// the wrong set of groups. Then we need to broadcast to all groups below.
stateTransactionsPair.first = toCommit && toCommit->isShardChanged();
bool shardChangedOrStateTxn = self->recentStateTransactionsInfo.applyStateTxnsToBatchReply(
bool shardChanged = self->recentStateTransactionsInfo.applyStateTxnsToBatchReply(
&reply, firstUnseenVersion, req.version, toCommit && toCommit->isShardChanged());
// Adds private mutation messages to the reply message.
@ -483,10 +485,14 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
reply.tpcvMap.clear();
} else {
std::set<uint16_t> writtenTLogs;
if (shardChangedOrStateTxn || req.txnStateTransactions.size() || !req.writtenTags.size()) {
if (req.lastShardMove < self->lastShardMove || shardChanged || req.txnStateTransactions.size() ||
!req.writtenTags.size()) {
for (int i = 0; i < self->numLogs; i++) {
writtenTLogs.insert(i);
}
if (shardChanged) {
self->lastShardMove = req.version;
}
} else {
toCommit->getLocations(reply.writtenTags, writtenTLogs);
}
@ -498,6 +504,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self,
self->tpcvVector[tLog] = req.version;
}
}
reply.lastShardMove = self->lastShardMove;
}
self->version.set(req.version);
bool breachedLimit = self->totalStateBytes.get() <= SERVER_KNOBS->RESOLVER_STATE_MEMORY_LIMIT &&

View File

@ -277,6 +277,8 @@ struct ProxyCommitData {
std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder = nullptr;
LogEpoch epoch;
Version lastShardMove;
std::shared_ptr<RangeLock> rangeLock = nullptr;
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
@ -376,7 +378,7 @@ struct ProxyCommitData {
? std::make_shared<AccumulativeChecksumBuilder>(
getCommitProxyAccumulativeChecksumIndex(commitProxyIndex))
: nullptr),
epoch(epoch) {
lastShardMove(invalidVersion), epoch(epoch) {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
}
};

View File

@ -106,6 +106,7 @@ struct ResolveTransactionBatchReply {
std::unordered_map<uint16_t, Version> tpcvMap;
std::set<Tag> writtenTags;
Version lastShardMove;
template <class Archive>
void serialize(Archive& ar) {
@ -118,6 +119,7 @@ struct ResolveTransactionBatchReply {
privateMutationCount,
tpcvMap,
writtenTags,
lastShardMove,
arena);
}
};
@ -135,6 +137,7 @@ struct ResolveTransactionBatchRequest : TimedRequest {
txnStateTransactions; // Offsets of elements of transactions that have (transaction subsystem state) mutations
ReplyPromise<ResolveTransactionBatchReply> reply;
Optional<UID> debugID;
Version lastShardMove;
std::set<Tag> writtenTags;
@ -150,6 +153,7 @@ struct ResolveTransactionBatchRequest : TimedRequest {
debugID,
writtenTags,
spanContext,
lastShardMove,
arena);
}
};