Fix arena issue and change std::vector back to VectorRef

This commit is contained in:
chaoguang 2020-01-17 14:24:13 -08:00
parent d17d1c88cd
commit 670c3e629f
6 changed files with 32 additions and 28 deletions

View File

@ -104,7 +104,7 @@ struct CommitID {
Version version; // returns invalidVersion if transaction conflicts Version version; // returns invalidVersion if transaction conflicts
uint16_t txnBatchId; uint16_t txnBatchId;
Optional<Value> metadataVersion; Optional<Value> metadataVersion;
Optional<std::vector<int>> conflictingKRIndices; Optional<Standalone<VectorRef<int>>> conflictingKRIndices;
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
@ -112,7 +112,7 @@ struct CommitID {
} }
CommitID() : version(invalidVersion), txnBatchId(0) {} CommitID() : version(invalidVersion), txnBatchId(0) {}
CommitID( Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion, const Optional<std::vector<int>>& conflictingKRIndices = Optional<std::vector<int>>() ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion), conflictingKRIndices(conflictingKRIndices) {} CommitID( Version version, uint16_t txnBatchId, const Optional<Value>& metadataVersion, const Optional<Standalone<VectorRef<int>>>& conflictingKRIndices = Optional<Standalone<VectorRef<int>>>() ) : version(version), txnBatchId(txnBatchId), metadataVersion(metadataVersion), conflictingKRIndices(conflictingKRIndices) {}
}; };
struct CommitTransactionRequest : TimedRequest { struct CommitTransactionRequest : TimedRequest {

View File

@ -33,7 +33,7 @@ void clearConflictSet( ConflictSet*, Version );
void destroyConflictSet(ConflictSet*); void destroyConflictSet(ConflictSet*);
struct ConflictBatch { struct ConflictBatch {
explicit ConflictBatch( ConflictSet*, std::map< int, std::vector< int > >* conflictingKeyRangeMap = nullptr); explicit ConflictBatch( ConflictSet*, std::map< int, VectorRef< int > >* conflictingKeyRangeMap = nullptr, Arena* resolveBatchReplyArena = nullptr);
~ConflictBatch(); ~ConflictBatch();
enum TransactionCommitResult { enum TransactionCommitResult {
@ -54,7 +54,8 @@ private:
std::vector< std::pair<StringRef,StringRef> > combinedWriteConflictRanges; std::vector< std::pair<StringRef,StringRef> > combinedWriteConflictRanges;
std::vector< struct ReadConflictRange > combinedReadConflictRanges; std::vector< struct ReadConflictRange > combinedReadConflictRanges;
bool* transactionConflictStatus; bool* transactionConflictStatus;
std::map< int, std::vector< int > >* conflictingKeyRangeMap; std::map< int, VectorRef< int > >* conflictingKeyRangeMap;
Arena* resolveBatchReplyArena;
void checkIntraBatchConflicts(); void checkIntraBatchConflicts();
void combineWriteConflictRanges(); void combineWriteConflictRanges();

View File

@ -1034,15 +1034,15 @@ ACTOR Future<Void> commitBatch(
else { else {
// If enable the option to report conflicting keys from resolvers, we union all conflicting key ranges here and send back through CommitID // If enable the option to report conflicting keys from resolvers, we union all conflicting key ranges here and send back through CommitID
if (trs[t].transaction.report_conflicting_keys) { if (trs[t].transaction.report_conflicting_keys) {
std::vector<int> conflictingKRIndices; Standalone<VectorRef<int>> conflictingKRIndices;
for (int resolverInd : transactionResolverMap[t]) { for (int resolverInd : transactionResolverMap[t]) {
const std::vector<int>& cKRs = resolution[resolverInd].conflictingKeyRangeMap.get()[nextTr[resolverInd]]; auto const & cKRs = resolution[resolverInd].conflictingKeyRangeMap[nextTr[resolverInd]];
for (auto const & rCRIndex : cKRs) for (auto const & rCRIndex : cKRs)
conflictingKRIndices.emplace_back(rCRIndex); conflictingKRIndices.push_back(conflictingKRIndices.arena(), rCRIndex);
} }
// At least one keyRange index should be returned // At least one keyRange index should be returned
ASSERT(conflictingKRIndices.size()); ASSERT(conflictingKRIndices.size());
trs[t].reply.send(CommitID(invalidVersion, t, Optional<Value>(), Optional<std::vector<int>>(conflictingKRIndices))); trs[t].reply.send(CommitID(invalidVersion, t, Optional<Value>(), Optional<Standalone<VectorRef<int>>>(conflictingKRIndices)));
} else { } else {
trs[t].reply.sendError(not_committed()); trs[t].reply.sendError(not_committed());
} }

View File

@ -130,14 +130,15 @@ ACTOR Future<Void> resolveBatch(
if(req.debugID.present()) if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer"); g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer");
ResolveTransactionBatchReply &reply = proxyInfo.outstandingBatches[req.version];
vector<int> commitList; vector<int> commitList;
vector<int> tooOldList; vector<int> tooOldList;
std::map< int, std::vector< int > > conflictingKeyRangeMap;
// Detect conflicts // Detect conflicts
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME; double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
double tstart = timer(); double tstart = timer();
ConflictBatch conflictBatch(self->conflictSet, &conflictingKeyRangeMap); ConflictBatch conflictBatch(self->conflictSet, &reply.conflictingKeyRangeMap, &reply.arena);
int keys = 0; int keys = 0;
for(int t=0; t<req.transactions.size(); t++) { for(int t=0; t<req.transactions.size(); t++) {
conflictBatch.addTransaction( req.transactions[t] ); conflictBatch.addTransaction( req.transactions[t] );
@ -156,7 +157,6 @@ ACTOR Future<Void> resolveBatch(
g_counters.conflictTransactions += req.transactions.size(); g_counters.conflictTransactions += req.transactions.size();
g_counters.conflictKeys += keys; g_counters.conflictKeys += keys;
ResolveTransactionBatchReply &reply = proxyInfo.outstandingBatches[req.version];
reply.debugID = req.debugID; reply.debugID = req.debugID;
reply.committed.resize( reply.arena, req.transactions.size() ); reply.committed.resize( reply.arena, req.transactions.size() );
for(int c=0; c<commitList.size(); c++) for(int c=0; c<commitList.size(); c++)
@ -164,9 +164,6 @@ ACTOR Future<Void> resolveBatch(
for (int c = 0; c<tooOldList.size(); c++) for (int c = 0; c<tooOldList.size(); c++)
reply.committed[tooOldList[c]] = ConflictBatch::TransactionTooOld; reply.committed[tooOldList[c]] = ConflictBatch::TransactionTooOld;
if (!conflictingKeyRangeMap.empty())
reply.conflictingKeyRangeMap = std::move(conflictingKeyRangeMap);
ASSERT(req.prevVersion >= 0 || req.txnStateTransactions.size() == 0); // The master's request should not have any state transactions ASSERT(req.prevVersion >= 0 || req.txnStateTransactions.size() == 0); // The master's request should not have any state transactions

View File

@ -77,11 +77,11 @@ struct ResolveTransactionBatchReply {
VectorRef<uint8_t> committed; VectorRef<uint8_t> committed;
Optional<UID> debugID; Optional<UID> debugID;
VectorRef<VectorRef<StateTransactionRef>> stateMutations; // [version][transaction#] -> (committed, [mutation#]) VectorRef<VectorRef<StateTransactionRef>> stateMutations; // [version][transaction#] -> (committed, [mutation#])
Optional<std::map<int, std::vector<int>>> conflictingKeyRangeMap; // transaction index -> conflicting read_conflict_range ids given by the resolver std::map<int, VectorRef<int>> conflictingKeyRangeMap; // transaction index -> conflicting read_conflict_range ids given by the resolver
template <class Archive> template <class Archive>
void serialize(Archive& ar) { void serialize(Archive& ar) {
serializer(ar, committed, stateMutations, arena, debugID, conflictingKeyRangeMap); serializer(ar, committed, stateMutations, debugID, conflictingKeyRangeMap, arena);
} }
}; };

View File

@ -123,10 +123,12 @@ struct ReadConflictRange {
StringRef begin, end; StringRef begin, end;
Version version; Version version;
int transaction; int transaction;
std::vector<int>* conflictingKeyRange;
int indexInTx; int indexInTx;
ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction, int indexInTx, std::vector<int> * cKR = nullptr ) VectorRef<int>* conflictingKeyRange;
: begin(begin), end(end), version(version), transaction(transaction), indexInTx(indexInTx), conflictingKeyRange(cKR) Arena* cKRArena;
ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction, int indexInTx, VectorRef<int> * cKR = nullptr, Arena* cKRArena = nullptr)
: begin(begin), end(end), version(version), transaction(transaction), indexInTx(indexInTx), conflictingKeyRange(cKR), cKRArena(cKRArena)
{ {
} }
bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin)<0; } bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin)<0; }
@ -531,7 +533,7 @@ public:
int started = min(M,count); int started = min(M,count);
for(int i=0; i<started; i++){ for(int i=0; i<started; i++){
inProgress[i].init( ranges[i], header, transactionConflictStatus, ranges[i].indexInTx, ranges[i].conflictingKeyRange ); inProgress[i].init( ranges[i], header, transactionConflictStatus, ranges[i].indexInTx, ranges[i].conflictingKeyRange, ranges[i].cKRArena );
nextJob[i] = i+1; nextJob[i] = i+1;
} }
nextJob[started-1] = 0; nextJob[started-1] = 0;
@ -548,7 +550,7 @@ public:
} }
else { else {
int temp = started++; int temp = started++;
inProgress[job].init( ranges[temp], header, transactionConflictStatus, ranges[temp].indexInTx, ranges[temp].conflictingKeyRange ); inProgress[job].init( ranges[temp], header, transactionConflictStatus, ranges[temp].indexInTx, ranges[temp].conflictingKeyRange, ranges[temp].cKRArena );
} }
} }
prevJob = job; prevJob = job;
@ -762,13 +764,15 @@ private:
bool *result; bool *result;
int state; int state;
int indexInTx; int indexInTx;
std::vector<int>* conflictingKeyRange; // null if report_conflicting_keys is not enabled. VectorRef<int>* conflictingKeyRange; // null if report_conflicting_keys is not enabled.
Arena* cKRArena; // null if report_conflicting_keys is not enabled.
void init( const ReadConflictRange& r, Node* header, bool* tCS, int indexInTx, std::vector<int>* cKR) { void init( const ReadConflictRange& r, Node* header, bool* tCS, int indexInTx, VectorRef<int>* cKR, Arena* cKRArena) {
this->start.init( r.begin, header ); this->start.init( r.begin, header );
this->end.init( r.end, header ); this->end.init( r.end, header );
this->version = r.version; this->version = r.version;
this->indexInTx = indexInTx; this->indexInTx = indexInTx;
this->cKRArena = cKRArena;
result = &tCS[ r.transaction ]; result = &tCS[ r.transaction ];
conflictingKeyRange = cKR; conflictingKeyRange = cKR;
this->state = 0; this->state = 0;
@ -778,7 +782,7 @@ private:
bool conflict() { bool conflict() {
*result = true; *result = true;
if(conflictingKeyRange != nullptr) if(conflictingKeyRange != nullptr)
conflictingKeyRange->push_back(indexInTx); conflictingKeyRange->push_back(*cKRArena, indexInTx);
return true; return true;
} }
@ -974,8 +978,8 @@ void destroyConflictSet(ConflictSet* cs) {
delete cs; delete cs;
} }
ConflictBatch::ConflictBatch( ConflictSet* cs, std::map< int, std::vector< int > >* conflictingKeyRangeMap ) ConflictBatch::ConflictBatch( ConflictSet* cs, std::map< int, VectorRef< int > >* conflictingKeyRangeMap, Arena* resolveBatchReplyArena )
: cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap) : cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap),resolveBatchReplyArena(resolveBatchReplyArena)
{ {
} }
@ -1010,7 +1014,9 @@ void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) {
points.emplace_back(range.begin, false, true, false, t, &info->readRanges[r].first); points.emplace_back(range.begin, false, true, false, t, &info->readRanges[r].first);
//points.back().keyEnd = StringRef(buf,range.second); //points.back().keyEnd = StringRef(buf,range.second);
points.emplace_back(range.end, false, false, false, t, &info->readRanges[r].second); points.emplace_back(range.end, false, false, false, t, &info->readRanges[r].second);
combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t, r, tr.report_conflicting_keys ? &(*conflictingKeyRangeMap)[t] : nullptr); combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t, r,
tr.report_conflicting_keys ? &(*conflictingKeyRangeMap)[t] : nullptr,
tr.report_conflicting_keys ? resolveBatchReplyArena : nullptr);
} }
for(int r=0; r<tr.write_conflict_ranges.size(); r++) { for(int r=0; r<tr.write_conflict_ranges.size(); r++) {
const KeyRangeRef& range = tr.write_conflict_ranges[r]; const KeyRangeRef& range = tr.write_conflict_ranges[r];
@ -1158,7 +1164,7 @@ void ConflictBatch::checkIntraBatchConflicts() {
for(int i=0; i<tr.readRanges.size(); i++){ for(int i=0; i<tr.readRanges.size(); i++){
if ( mcs.any( tr.readRanges[i].first , tr.readRanges[i].second ) ) { if ( mcs.any( tr.readRanges[i].first , tr.readRanges[i].second ) ) {
if (tr.reportConflictingKeys){ if (tr.reportConflictingKeys){
(*conflictingKeyRangeMap)[t].push_back(i); (*conflictingKeyRangeMap)[t].push_back(*resolveBatchReplyArena, i);
} }
conflict = true; conflict = true;
break; break;