diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index 784331384b..89eb9d5eca 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -104,7 +104,7 @@ struct CommitID { Version version; // returns invalidVersion if transaction conflicts uint16_t txnBatchId; Optional metadataVersion; - Optional> conflictingKRIndices; + Optional>> conflictingKRIndices; template void serialize(Ar& ar) { @@ -112,7 +112,7 @@ struct CommitID { } CommitID() : version(invalidVersion), txnBatchId(0) {} - CommitID( Version version, uint16_t txnBatchId, const Optional& metadataVersion, const Optional>& conflictingKRIndices = Optional>() ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion), conflictingKRIndices(conflictingKRIndices) {} + CommitID( Version version, uint16_t txnBatchId, const Optional& metadataVersion, const Optional>>& conflictingKRIndices = Optional>>() ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion), conflictingKRIndices(conflictingKRIndices) {} }; struct CommitTransactionRequest : TimedRequest { diff --git a/fdbserver/ConflictSet.h b/fdbserver/ConflictSet.h index ffe80e0e6d..3284f9f4ab 100644 --- a/fdbserver/ConflictSet.h +++ b/fdbserver/ConflictSet.h @@ -33,7 +33,7 @@ void clearConflictSet( ConflictSet*, Version ); void destroyConflictSet(ConflictSet*); struct ConflictBatch { - explicit ConflictBatch( ConflictSet*, std::map< int, std::vector< int > >* conflictingKeyRangeMap = nullptr); + explicit ConflictBatch( ConflictSet*, std::map< int, VectorRef< int > >* conflictingKeyRangeMap = nullptr, Arena* resolveBatchReplyArena = nullptr); ~ConflictBatch(); enum TransactionCommitResult { @@ -54,7 +54,8 @@ private: std::vector< std::pair > combinedWriteConflictRanges; std::vector< struct ReadConflictRange > combinedReadConflictRanges; bool* transactionConflictStatus; - std::map< int, std::vector< int > >* conflictingKeyRangeMap; + std::map< int, VectorRef< int > >* conflictingKeyRangeMap; + Arena* resolveBatchReplyArena; void checkIntraBatchConflicts(); void combineWriteConflictRanges(); diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 7b4aaab6bc..01be822575 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -1034,15 +1034,15 @@ ACTOR Future commitBatch( else { // If enable the option to report conflicting keys from resolvers, we union all conflicting key ranges here and send back through CommitID if (trs[t].transaction.report_conflicting_keys) { - std::vector conflictingKRIndices; + Standalone> conflictingKRIndices; for (int resolverInd : transactionResolverMap[t]) { - const std::vector& cKRs = resolution[resolverInd].conflictingKeyRangeMap.get()[nextTr[resolverInd]]; + auto const & cKRs = resolution[resolverInd].conflictingKeyRangeMap[nextTr[resolverInd]]; for (auto const & rCRIndex : cKRs) - conflictingKRIndices.emplace_back(rCRIndex); + conflictingKRIndices.push_back(conflictingKRIndices.arena(), rCRIndex); } // At least one keyRange index should be returned ASSERT(conflictingKRIndices.size()); - trs[t].reply.send(CommitID(invalidVersion, t, Optional(), Optional>(conflictingKRIndices))); + trs[t].reply.send(CommitID(invalidVersion, t, Optional(), Optional>>(conflictingKRIndices))); } else { trs[t].reply.sendError(not_committed()); } diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index 2f0933d5c8..57700e4105 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -130,14 +130,15 @@ ACTOR Future resolveBatch( if(req.debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer"); + ResolveTransactionBatchReply &reply = proxyInfo.outstandingBatches[req.version]; + vector commitList; vector tooOldList; - std::map< int, std::vector< int > > conflictingKeyRangeMap; // Detect conflicts double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME; double tstart = timer(); - ConflictBatch conflictBatch(self->conflictSet, &conflictingKeyRangeMap); + ConflictBatch conflictBatch(self->conflictSet, &reply.conflictingKeyRangeMap, &reply.arena); int keys = 0; for(int t=0; t resolveBatch( g_counters.conflictTransactions += req.transactions.size(); g_counters.conflictKeys += keys; - ResolveTransactionBatchReply &reply = proxyInfo.outstandingBatches[req.version]; reply.debugID = req.debugID; reply.committed.resize( reply.arena, req.transactions.size() ); for(int c=0; c resolveBatch( for (int c = 0; c= 0 || req.txnStateTransactions.size() == 0); // The master's request should not have any state transactions diff --git a/fdbserver/ResolverInterface.h b/fdbserver/ResolverInterface.h index d53779a5c7..e1f74707b9 100644 --- a/fdbserver/ResolverInterface.h +++ b/fdbserver/ResolverInterface.h @@ -77,11 +77,11 @@ struct ResolveTransactionBatchReply { VectorRef committed; Optional debugID; VectorRef> stateMutations; // [version][transaction#] -> (committed, [mutation#]) - Optional>> conflictingKeyRangeMap; // transaction index -> conflicting read_conflict_range ids given by the resolver + std::map> conflictingKeyRangeMap; // transaction index -> conflicting read_conflict_range ids given by the resolver template void serialize(Archive& ar) { - serializer(ar, committed, stateMutations, arena, debugID, conflictingKeyRangeMap); + serializer(ar, committed, stateMutations, debugID, conflictingKeyRangeMap, arena); } }; diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp index 038214cd60..3f1b6af859 100644 --- a/fdbserver/SkipList.cpp +++ b/fdbserver/SkipList.cpp @@ -123,10 +123,12 @@ struct ReadConflictRange { StringRef begin, end; Version version; int transaction; - std::vector* conflictingKeyRange; int indexInTx; - ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction, int indexInTx, std::vector * cKR = nullptr ) - : begin(begin), end(end), version(version), transaction(transaction), indexInTx(indexInTx), conflictingKeyRange(cKR) + VectorRef* conflictingKeyRange; + Arena* cKRArena; + + ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction, int indexInTx, VectorRef * cKR = nullptr, Arena* cKRArena = nullptr) + : begin(begin), end(end), version(version), transaction(transaction), indexInTx(indexInTx), conflictingKeyRange(cKR), cKRArena(cKRArena) { } bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin)<0; } @@ -531,7 +533,7 @@ public: int started = min(M,count); for(int i=0; i* conflictingKeyRange; // null if report_conflicting_keys is not enabled. + VectorRef* conflictingKeyRange; // null if report_conflicting_keys is not enabled. + Arena* cKRArena; // null if report_conflicting_keys is not enabled. - void init( const ReadConflictRange& r, Node* header, bool* tCS, int indexInTx, std::vector* cKR) { + void init( const ReadConflictRange& r, Node* header, bool* tCS, int indexInTx, VectorRef* cKR, Arena* cKRArena) { this->start.init( r.begin, header ); this->end.init( r.end, header ); this->version = r.version; this->indexInTx = indexInTx; + this->cKRArena = cKRArena; result = &tCS[ r.transaction ]; conflictingKeyRange = cKR; this->state = 0; @@ -778,7 +782,7 @@ private: bool conflict() { *result = true; if(conflictingKeyRange != nullptr) - conflictingKeyRange->push_back(indexInTx); + conflictingKeyRange->push_back(*cKRArena, indexInTx); return true; } @@ -974,8 +978,8 @@ void destroyConflictSet(ConflictSet* cs) { delete cs; } -ConflictBatch::ConflictBatch( ConflictSet* cs, std::map< int, std::vector< int > >* conflictingKeyRangeMap ) - : cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap) +ConflictBatch::ConflictBatch( ConflictSet* cs, std::map< int, VectorRef< int > >* conflictingKeyRangeMap, Arena* resolveBatchReplyArena ) + : cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap),resolveBatchReplyArena(resolveBatchReplyArena) { } @@ -1010,7 +1014,9 @@ void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) { points.emplace_back(range.begin, false, true, false, t, &info->readRanges[r].first); //points.back().keyEnd = StringRef(buf,range.second); points.emplace_back(range.end, false, false, false, t, &info->readRanges[r].second); - combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t, r, tr.report_conflicting_keys ? &(*conflictingKeyRangeMap)[t] : nullptr); + combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t, r, + tr.report_conflicting_keys ? &(*conflictingKeyRangeMap)[t] : nullptr, + tr.report_conflicting_keys ? resolveBatchReplyArena : nullptr); } for(int r=0; r