mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 02:18:39 +08:00
A hack way to call API through getRange("\xff\xff/conflicting_keys\<start_key>", "\xff\xff/conflicting_keys\<end_key>").
This commit is contained in:
parent
9dc79488a8
commit
10719200c3
@ -138,9 +138,6 @@ struct CommitTransactionRequest : TimedRequest {
|
|||||||
serializer(ar, transaction, reply, arena, flags, debugID);
|
serializer(ar, transaction, reply, arena, flags, debugID);
|
||||||
}
|
}
|
||||||
|
|
||||||
void reportConflictingKeys(){
|
|
||||||
transaction.report_conflicting_keys = true;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
static inline int getBytes( CommitTransactionRequest const& r ) {
|
static inline int getBytes( CommitTransactionRequest const& r ) {
|
||||||
|
@ -2673,8 +2673,15 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||||||
}
|
}
|
||||||
return Void();
|
return Void();
|
||||||
} else {
|
} else {
|
||||||
|
// clear previous conflicting KeyRanges
|
||||||
|
tr->info.conflictingKeyRanges.reset();
|
||||||
if (ci.conflictingKeyRanges.present()){
|
if (ci.conflictingKeyRanges.present()){
|
||||||
tr->info.conflictingKeyRanges = ci.conflictingKeyRanges.get();
|
Standalone<VectorRef<KeyValueRef>> conflictingKeyRanges;
|
||||||
|
for (auto const & kr : ci.conflictingKeyRanges.get()) {
|
||||||
|
conflictingKeyRanges.push_back_deep(conflictingKeyRanges.arena(), KeyValueRef(kr.begin, conflictingKeysTrue));
|
||||||
|
conflictingKeyRanges.push_back_deep(conflictingKeyRanges.arena(), KeyValueRef(kr.end, conflictingKeysFalse));
|
||||||
|
}
|
||||||
|
tr->info.conflictingKeyRanges = conflictingKeyRanges;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (info.debugID.present())
|
if (info.debugID.present())
|
||||||
@ -2789,7 +2796,7 @@ Future<Void> Transaction::commitMutations() {
|
|||||||
tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH;
|
tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH;
|
||||||
}
|
}
|
||||||
if(options.reportConflictingKeys) {
|
if(options.reportConflictingKeys) {
|
||||||
tr.reportConflictingKeys();
|
tr.transaction.report_conflicting_keys = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
|
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
|
||||||
|
@ -144,7 +144,7 @@ struct TransactionInfo {
|
|||||||
Optional<UID> debugID;
|
Optional<UID> debugID;
|
||||||
TaskPriority taskID;
|
TaskPriority taskID;
|
||||||
bool useProvisionalProxies;
|
bool useProvisionalProxies;
|
||||||
Optional<Standalone<VectorRef<KeyRangeRef>>> conflictingKeyRanges;
|
Optional<Standalone<VectorRef<KeyValueRef>>> conflictingKeyRanges;
|
||||||
|
|
||||||
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
|
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
|
||||||
};
|
};
|
||||||
@ -273,7 +273,6 @@ public:
|
|||||||
void reset();
|
void reset();
|
||||||
void fullReset();
|
void fullReset();
|
||||||
double getBackoff(int errCode);
|
double getBackoff(int errCode);
|
||||||
|
|
||||||
void debugTransaction(UID dID) { info.debugID = dID; }
|
void debugTransaction(UID dID) { info.debugID = dID; }
|
||||||
|
|
||||||
Future<Void> commitMutations();
|
Future<Void> commitMutations();
|
||||||
|
@ -23,7 +23,6 @@
|
|||||||
#include "fdbclient/DatabaseContext.h"
|
#include "fdbclient/DatabaseContext.h"
|
||||||
#include "fdbclient/StatusClient.h"
|
#include "fdbclient/StatusClient.h"
|
||||||
#include "fdbclient/MonitorLeader.h"
|
#include "fdbclient/MonitorLeader.h"
|
||||||
#include "fdbclient/libb64/encode.h"
|
|
||||||
#include "flow/Util.h"
|
#include "flow/Util.h"
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
@ -1190,6 +1189,43 @@ ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces (Reference<ClusterC
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ACTOR Future<Standalone<RangeResultRef>> getConflictingKeys(
|
||||||
|
VectorRef<KeyValueRef> conflictingKeys,
|
||||||
|
ReadYourWritesTransaction* ryw,
|
||||||
|
KeySelector begin,
|
||||||
|
KeySelector end,
|
||||||
|
GetRangeLimits limits,
|
||||||
|
bool snapshot,
|
||||||
|
bool reverse
|
||||||
|
) {
|
||||||
|
// In general, if we want to use getRange to expose conflicting keys, we need to support all the parameters it provides.
|
||||||
|
// It is kind of difficult to take care of each corner cases of what getRange does
|
||||||
|
// Thus, we use a hack way here to achieve it.
|
||||||
|
// We create an empty RYWTransaction and write all conflicting key/values to it.
|
||||||
|
// Since it is RYWTr, we can call getRange on it with same parameters given to the original getRange
|
||||||
|
state ReadYourWritesTransaction hackTr(ryw->getDatabase());
|
||||||
|
// One issue here is that, although the write itself is local. getRange will look through the database.
|
||||||
|
// To make sure there is no keys other than conflictings keys are read,
|
||||||
|
// We write it under system key prefix "\xff/conflicting_keys/", which should always be empty in the database.
|
||||||
|
// Thus, only the local written keys are under this prefix and will be looked through.
|
||||||
|
// One pain point is that it adds latency overhead here
|
||||||
|
state const KeyRef conflictingKeysPrefix = LiteralStringRef("\xff/conflicting_keys/");
|
||||||
|
hackTr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
for (const KeyValueRef kv : conflictingKeys) {
|
||||||
|
hackTr.set(kv.key.withPrefix(conflictingKeysPrefix), kv.value);
|
||||||
|
}
|
||||||
|
state KeySelector conflictingKeysBegin = KeySelectorRef(begin.getKey().withPrefix(conflictingKeysPrefix), begin.orEqual, begin.offset);
|
||||||
|
state KeySelector conflictingKeysEnd = KeySelectorRef(end.getKey().withPrefix(conflictingKeysPrefix), end.orEqual, end.offset);
|
||||||
|
Standalone<RangeResultRef> resultWithKeyPrefix = wait(hackTr.getRange(conflictingKeysBegin, conflictingKeysEnd, limits, snapshot, reverse));
|
||||||
|
// Remove the prefix we added before and then return the result
|
||||||
|
Standalone<RangeResultRef> result;
|
||||||
|
for (const KeyValueRef kv : resultWithKeyPrefix) {
|
||||||
|
result.push_back_deep(result.arena(), KeyValueRef(kv.key.removePrefix(conflictingKeysPrefix), kv.value));
|
||||||
|
}
|
||||||
|
hackTr.reset();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool snapshot ) {
|
Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool snapshot ) {
|
||||||
TEST(true);
|
TEST(true);
|
||||||
|
|
||||||
@ -1229,24 +1265,6 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
|
|||||||
return Optional<Value>();
|
return Optional<Value>();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add conflicting keys to special key space
|
|
||||||
if (key == LiteralStringRef("\xff\xff/conflicting_keys/json")){
|
|
||||||
if (tr.info.conflictingKeyRanges.present()){
|
|
||||||
json_spirit::mArray root;
|
|
||||||
json_spirit::mObject keyrange;
|
|
||||||
for (const auto & kr : tr.info.conflictingKeyRanges.get()) {
|
|
||||||
keyrange["begin"] = base64::encoder::from_string(kr.begin.toString());
|
|
||||||
keyrange["end"] = base64::encoder::from_string(kr.end.toString());
|
|
||||||
root.push_back(keyrange);
|
|
||||||
keyrange.clear();
|
|
||||||
}
|
|
||||||
Optional<Value> output = StringRef(json_spirit::write_string(json_spirit::mValue(root), json_spirit::Output_options::raw_utf8));
|
|
||||||
return output;
|
|
||||||
} else {
|
|
||||||
return Optional<Value>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(checkUsedDuringCommit()) {
|
if(checkUsedDuringCommit()) {
|
||||||
return used_during_commit();
|
return used_during_commit();
|
||||||
}
|
}
|
||||||
@ -1298,6 +1316,33 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Use special key prefix "\xff\xff/conflicting_keys/<some_key>",
|
||||||
|
// to retrieve keys which caused latest not_committed(conflicting with another transaction) error.
|
||||||
|
// The returned key value pairs are interpretted as :
|
||||||
|
// <key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
|
||||||
|
// <key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
|
||||||
|
// Due to the implementation of resolver, currently,
|
||||||
|
// we can only give key ranges that contain at least one key is conflicting.
|
||||||
|
Key conflictingKeysPreifx = LiteralStringRef("\xff\xff/conflicting_keys/");
|
||||||
|
if (begin.getKey().startsWith(conflictingKeysPreifx) && end.getKey().startsWith(conflictingKeysPreifx)) {
|
||||||
|
// Remove the special key prefix "\xff\xff/conflicting_keys/"
|
||||||
|
Key beginConflictingKey = begin.getKey().removePrefix(conflictingKeysPreifx);
|
||||||
|
Key endConflictingKey = end.getKey().removePrefix(conflictingKeysPreifx);
|
||||||
|
|
||||||
|
// Check if the conflicting key range to be read is valid
|
||||||
|
KeyRef maxKey = getMaxReadKey();
|
||||||
|
if(beginConflictingKey > maxKey || endConflictingKey > maxKey)
|
||||||
|
return key_outside_legal_range();
|
||||||
|
|
||||||
|
begin.setKey(beginConflictingKey);
|
||||||
|
end.setKey(endConflictingKey);
|
||||||
|
if (tr.info.conflictingKeyRanges.present()) {
|
||||||
|
return getConflictingKeys(tr.info.conflictingKeyRanges.get(), this, begin, end, limits, snapshot, reverse);
|
||||||
|
} else {
|
||||||
|
return Standalone<RangeResultRef>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if(checkUsedDuringCommit()) {
|
if(checkUsedDuringCommit()) {
|
||||||
return used_during_commit();
|
return used_during_commit();
|
||||||
}
|
}
|
||||||
|
@ -58,6 +58,9 @@ void decodeKeyServersValue( const ValueRef& value, vector<UID>& src, vector<UID>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ValueRef conflictingKeysTrue = LiteralStringRef("1");
|
||||||
|
const ValueRef conflictingKeysFalse = LiteralStringRef("0");
|
||||||
|
|
||||||
// "\xff/storageCache/[[begin]]" := "[[vector<uint16_t>]]"
|
// "\xff/storageCache/[[begin]]" := "[[vector<uint16_t>]]"
|
||||||
const KeyRangeRef storageCacheKeys( LiteralStringRef("\xff/storageCache/"), LiteralStringRef("\xff/storageCache0") );
|
const KeyRangeRef storageCacheKeys( LiteralStringRef("\xff/storageCache/"), LiteralStringRef("\xff/storageCache0") );
|
||||||
const KeyRef storageCachePrefix = storageCacheKeys.begin;
|
const KeyRef storageCachePrefix = storageCacheKeys.begin;
|
||||||
|
@ -64,6 +64,8 @@ const Key serverKeysPrefixFor( UID serverID );
|
|||||||
UID serverKeysDecodeServer( const KeyRef& key );
|
UID serverKeysDecodeServer( const KeyRef& key );
|
||||||
bool serverHasKey( ValueRef storedValue );
|
bool serverHasKey( ValueRef storedValue );
|
||||||
|
|
||||||
|
extern const ValueRef conflictingKeysTrue, conflictingKeysFalse;
|
||||||
|
|
||||||
extern const KeyRef cacheKeysPrefix;
|
extern const KeyRef cacheKeysPrefix;
|
||||||
|
|
||||||
const Key cacheKeysKey( uint16_t idx, const KeyRef& key );
|
const Key cacheKeysKey( uint16_t idx, const KeyRef& key );
|
||||||
|
@ -9,9 +9,6 @@ For details, see http://sourceforge.net/projects/libb64
|
|||||||
#define BASE64_DECODE_H
|
#define BASE64_DECODE_H
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <sstream>
|
|
||||||
|
|
||||||
#define BUFFERSIZE 8192
|
|
||||||
|
|
||||||
namespace base64
|
namespace base64
|
||||||
{
|
{
|
||||||
@ -63,15 +60,6 @@ namespace base64
|
|||||||
delete [] code;
|
delete [] code;
|
||||||
delete [] plaintext;
|
delete [] plaintext;
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::string from_string(std::string s)
|
|
||||||
{
|
|
||||||
std::stringstream in(s);
|
|
||||||
std::stringstream out;
|
|
||||||
decoder dec;
|
|
||||||
dec.decode(in, out);
|
|
||||||
return out.str();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace base64
|
} // namespace base64
|
||||||
|
@ -251,7 +251,7 @@ description is not currently required but encouraged.
|
|||||||
<Option name="use_provisional_proxies" code="711"
|
<Option name="use_provisional_proxies" code="711"
|
||||||
description="This option should only be used by tools which change the database configuration." />
|
description="This option should only be used by tools which change the database configuration." />
|
||||||
<Option name="report_conflicting_keys" code="712"
|
<Option name="report_conflicting_keys" code="712"
|
||||||
description="The transaction will save conflicting keys if conflicts happen." />
|
description="The transaction can retrieve keys that are conflicting with other transactions." />
|
||||||
</Scope>
|
</Scope>
|
||||||
|
|
||||||
<!-- The enumeration values matter - do not change them without
|
<!-- The enumeration values matter - do not change them without
|
||||||
|
@ -54,7 +54,7 @@ private:
|
|||||||
std::vector< std::pair<StringRef,StringRef> > combinedWriteConflictRanges;
|
std::vector< std::pair<StringRef,StringRef> > combinedWriteConflictRanges;
|
||||||
std::vector< struct ReadConflictRange > combinedReadConflictRanges;
|
std::vector< struct ReadConflictRange > combinedReadConflictRanges;
|
||||||
bool* transactionConflictStatus;
|
bool* transactionConflictStatus;
|
||||||
std::map< int, Standalone<VectorRef<KeyRangeRef>> >* conflictingKeyRangeMap;
|
std::map< int, Standalone< VectorRef< KeyRangeRef > > >* conflictingKeyRangeMap;
|
||||||
|
|
||||||
void checkIntraBatchConflicts();
|
void checkIntraBatchConflicts();
|
||||||
void combineWriteConflictRanges();
|
void combineWriteConflictRanges();
|
||||||
|
@ -1039,11 +1039,11 @@ ACTOR Future<Void> commitBatch(
|
|||||||
for (int resolverInd : transactionResolverMap[t]) {
|
for (int resolverInd : transactionResolverMap[t]) {
|
||||||
for (const KeyRangeRef & kr : resolution[resolverInd].conflictingKeyRangeMap[nextTr[resolverInd]]){
|
for (const KeyRangeRef & kr : resolution[resolverInd].conflictingKeyRangeMap[nextTr[resolverInd]]){
|
||||||
unmergedConflictingKRs.emplace_back(kr);
|
unmergedConflictingKRs.emplace_back(kr);
|
||||||
// TraceEvent("ConflictingKeyRange").detail("ResolverIndex", resolverInd).detail("TrasactionIndex", t).detail("StartKey", kr.begin.toString()).detail("EndKey", kr.end.toString());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// At least one keyRange should be returned
|
||||||
ASSERT(unmergedConflictingKRs.size());
|
ASSERT(unmergedConflictingKRs.size());
|
||||||
// Sort the keyranges by begin key, then union overlap ranges from left to right
|
// Sort the keyranges by begin key, then union overlapping ranges from left to right
|
||||||
std::sort(unmergedConflictingKRs.begin(), unmergedConflictingKRs.end(), [](KeyRange a, KeyRange b){
|
std::sort(unmergedConflictingKRs.begin(), unmergedConflictingKRs.end(), [](KeyRange a, KeyRange b){
|
||||||
return a.begin < b.begin;
|
return a.begin < b.begin;
|
||||||
});
|
});
|
||||||
@ -1066,7 +1066,7 @@ ACTOR Future<Void> commitBatch(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update corresponding transaction index on each resolver
|
// Update corresponding transaction indices on each resolver
|
||||||
for (int resolverInd : transactionResolverMap[t])
|
for (int resolverInd : transactionResolverMap[t])
|
||||||
nextTr[resolverInd]++;
|
nextTr[resolverInd]++;
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ ACTOR Future<Void> resolveBatch(
|
|||||||
|
|
||||||
vector<int> commitList;
|
vector<int> commitList;
|
||||||
vector<int> tooOldList;
|
vector<int> tooOldList;
|
||||||
std::map<int, Standalone<VectorRef<KeyRangeRef>> > conflictingKeyRangeMap;
|
std::map< int, Standalone< VectorRef< KeyRangeRef > > > conflictingKeyRangeMap;
|
||||||
|
|
||||||
// Detect conflicts
|
// Detect conflicts
|
||||||
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
|
double expire = now() + SERVER_KNOBS->SAMPLE_EXPIRATION_TIME;
|
||||||
|
@ -123,7 +123,7 @@ struct ReadConflictRange {
|
|||||||
StringRef begin, end;
|
StringRef begin, end;
|
||||||
Version version;
|
Version version;
|
||||||
int transaction;
|
int transaction;
|
||||||
Standalone<VectorRef<KeyRangeRef>> * conflictingKeyRange;
|
Standalone< VectorRef< KeyRangeRef > >* conflictingKeyRange;
|
||||||
ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction, Standalone<VectorRef<KeyRangeRef>> * cKR = nullptr )
|
ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction, Standalone<VectorRef<KeyRangeRef>> * cKR = nullptr )
|
||||||
: begin(begin), end(end), version(version), transaction(transaction), conflictingKeyRange(cKR)
|
: begin(begin), end(end), version(version), transaction(transaction), conflictingKeyRange(cKR)
|
||||||
{
|
{
|
||||||
@ -760,7 +760,7 @@ private:
|
|||||||
Version version;
|
Version version;
|
||||||
bool *result;
|
bool *result;
|
||||||
int state;
|
int state;
|
||||||
Standalone<VectorRef<KeyRangeRef>>* conflictingKeyRange;
|
Standalone<VectorRef<KeyRangeRef>>* conflictingKeyRange; // null if report_conflicting_keys is not enabled.
|
||||||
|
|
||||||
void init( const ReadConflictRange& r, Node* header, bool* tCS, Standalone<VectorRef<KeyRangeRef>>* cKR) {
|
void init( const ReadConflictRange& r, Node* header, bool* tCS, Standalone<VectorRef<KeyRangeRef>>* cKR) {
|
||||||
this->start.init( r.begin, header );
|
this->start.init( r.begin, header );
|
||||||
@ -783,7 +783,6 @@ private:
|
|||||||
if (end_node != nullptr) {
|
if (end_node != nullptr) {
|
||||||
endKey = StringRef(end_node->value(), end_node->length());
|
endKey = StringRef(end_node->value(), end_node->length());
|
||||||
}
|
}
|
||||||
|
|
||||||
conflictingKeyRange->push_back_deep(conflictingKeyRange->arena(), KeyRangeRef(startKey, endKey));
|
conflictingKeyRange->push_back_deep(conflictingKeyRange->arena(), KeyRangeRef(startKey, endKey));
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
@ -809,7 +808,7 @@ private:
|
|||||||
if (start.finger[l]->getMaxVersion(l) <= version)
|
if (start.finger[l]->getMaxVersion(l) <= version)
|
||||||
return noConflict();
|
return noConflict();
|
||||||
if (l==0)
|
if (l==0)
|
||||||
return conflict(nullptr, nullptr); // The whole readConflictRange is conflicting with other transactions with higher version number
|
return conflict(nullptr, nullptr); // The whole read_conflict_range is conflicting with other transactions with higher version number
|
||||||
}
|
}
|
||||||
state = 1;
|
state = 1;
|
||||||
case 1:
|
case 1:
|
||||||
@ -1039,10 +1038,12 @@ public:
|
|||||||
for(int i=begin; i<end; i++)
|
for(int i=begin; i<end; i++)
|
||||||
values[i] = true;
|
values[i] = true;
|
||||||
}
|
}
|
||||||
bool any( int begin, int end ) {
|
bool any( int begin, int end, int* conflictingIndex = nullptr ) {
|
||||||
for(int i=begin; i<end; i++)
|
for(int i=begin; i<end; i++)
|
||||||
if (values[i])
|
if (values[i]) {
|
||||||
|
if (conflictingIndex != nullptr) *conflictingIndex = i;
|
||||||
return true;
|
return true;
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -1133,9 +1134,9 @@ public:
|
|||||||
setBits(orValues, beginWord, lastWord+1, true);
|
setBits(orValues, beginWord, lastWord+1, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool any(int begin, int end) {
|
bool any(int begin, int end, int* conflictingIndex = nullptr) {
|
||||||
bool a = orImpl(begin,end);
|
bool a = orImpl(begin,end);
|
||||||
bool b = debug.any(begin,end);
|
bool b = debug.any(begin,end, conflictingIndex);
|
||||||
ASSERT( a == b );
|
ASSERT( a == b );
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
@ -1164,8 +1165,14 @@ void ConflictBatch::checkIntraBatchConflicts() {
|
|||||||
for(int i=0; i<tr.readRanges.size(); i++){
|
for(int i=0; i<tr.readRanges.size(); i++){
|
||||||
int startPointIndex = tr.readRanges[i].first;
|
int startPointIndex = tr.readRanges[i].first;
|
||||||
int endPointIndex = tr.readRanges[i].second;
|
int endPointIndex = tr.readRanges[i].second;
|
||||||
if ( mcs.any(startPointIndex , endPointIndex ) ) {
|
int conflictingIndex;
|
||||||
|
if ( mcs.any( startPointIndex , endPointIndex, tr.reportConflictingKeys ? &conflictingIndex : nullptr ) ) {
|
||||||
if (tr.reportConflictingKeys){
|
if (tr.reportConflictingKeys){
|
||||||
|
// The MiniConflictSet is difficult to change, use MiniConflictSet2 to hack here. (Future: use MiniConflictSet)
|
||||||
|
if (points[conflictingIndex].begin)
|
||||||
|
startPointIndex = conflictingIndex;
|
||||||
|
else
|
||||||
|
endPointIndex = conflictingIndex;
|
||||||
(*conflictingKeyRangeMap)[t].push_back_deep((*conflictingKeyRangeMap)[t].arena(), KeyRangeRef(points[startPointIndex].key, points[endPointIndex].key));
|
(*conflictingKeyRangeMap)[t].push_back_deep((*conflictingKeyRangeMap)[t].arena(), KeyRangeRef(points[startPointIndex].key, points[endPointIndex].key));
|
||||||
}
|
}
|
||||||
conflict = true;
|
conflict = true;
|
||||||
|
@ -19,11 +19,11 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "fdbclient/NativeAPI.actor.h"
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
|
#include "fdbclient/SystemData.h"
|
||||||
#include "fdbserver/TesterInterface.actor.h"
|
#include "fdbserver/TesterInterface.actor.h"
|
||||||
#include "fdbserver/workloads/workloads.actor.h"
|
#include "fdbserver/workloads/workloads.actor.h"
|
||||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||||
#include "fdbclient/ReadYourWrites.h"
|
|
||||||
#include "fdbclient/libb64/decode.h"
|
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
struct ReportConflictingKeysWorkload : TestWorkload {
|
struct ReportConflictingKeysWorkload : TestWorkload {
|
||||||
@ -31,8 +31,9 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||||||
double testDuration, transactionsPerSecond, addReadConflictRangeProb, addWriteConflictRangeProb;
|
double testDuration, transactionsPerSecond, addReadConflictRangeProb, addWriteConflictRangeProb;
|
||||||
Key keyPrefix;
|
Key keyPrefix;
|
||||||
|
|
||||||
int nodeCount, actorCount, keyBytes, valueBytes, readConflictRangeCount, writeConflictRangeCount;
|
int nodeCountPerPrefix, actorCount, keyBytes, valueBytes, readConflictRangeCount, writeConflictRangeCount;
|
||||||
bool reportConflictingKeys, skipCorrectnessCheck;
|
bool reportConflictingKeys, skipCorrectnessCheck;
|
||||||
|
uint64_t keyPrefixBytes, prefixCount;
|
||||||
|
|
||||||
PerfIntCounter invalidReports, commits, conflicts, retries, xacts;
|
PerfIntCounter invalidReports, commits, conflicts, retries, xacts;
|
||||||
|
|
||||||
@ -42,12 +43,11 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
testDuration = getOption(options, LiteralStringRef("testDuration"), 10.0);
|
||||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
|
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 5000.0) / clientCount;
|
||||||
actorCount = getOption(options, LiteralStringRef("actorsPerClient"), transactionsPerSecond / 5);
|
actorCount = getOption(options, LiteralStringRef("actorsPerClient"), transactionsPerSecond / 5);
|
||||||
nodeCount = getOption(options, LiteralStringRef("nodeCount"), transactionsPerSecond * clientCount);
|
|
||||||
keyPrefix = unprintable(
|
keyPrefix = unprintable(
|
||||||
getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("ReportConflictingKeysWorkload"))
|
getOption(options, LiteralStringRef("keyPrefix"), LiteralStringRef("ReportConflictingKeysWorkload"))
|
||||||
.toString());
|
.toString());
|
||||||
keyBytes = getOption(options, LiteralStringRef("keyBytes"), 16);
|
keyBytes = getOption(options, LiteralStringRef("keyBytes"), 16);
|
||||||
ASSERT(keyPrefix.size() + 16 <= keyBytes); // make sure the string format is valid
|
|
||||||
readConflictRangeCount = getOption(options, LiteralStringRef("readConflictRangeCountPerTx"), 1);
|
readConflictRangeCount = getOption(options, LiteralStringRef("readConflictRangeCountPerTx"), 1);
|
||||||
writeConflictRangeCount = getOption(options, LiteralStringRef("writeConflictRangeCountPerTx"), 1);
|
writeConflictRangeCount = getOption(options, LiteralStringRef("writeConflictRangeCountPerTx"), 1);
|
||||||
// modeled by geometric distribution: (1 - prob) / prob = mean
|
// modeled by geometric distribution: (1 - prob) / prob = mean
|
||||||
@ -56,6 +56,16 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||||||
// If true, store key ranges conflicting with other txs
|
// If true, store key ranges conflicting with other txs
|
||||||
reportConflictingKeys = getOption(options, LiteralStringRef("reportConflictingKeys"), false);
|
reportConflictingKeys = getOption(options, LiteralStringRef("reportConflictingKeys"), false);
|
||||||
skipCorrectnessCheck = getOption(options, LiteralStringRef("skipCorrectnessCheck"), false);
|
skipCorrectnessCheck = getOption(options, LiteralStringRef("skipCorrectnessCheck"), false);
|
||||||
|
// used for generating keyPrefix
|
||||||
|
keyPrefixBytes = getOption(options, LiteralStringRef("keyPrefixBytes"), 0);
|
||||||
|
if (keyPrefixBytes) {
|
||||||
|
prefixCount = 255 * std::round(std::exp2(8*(keyPrefixBytes-1)));
|
||||||
|
ASSERT(keyPrefixBytes + 16 <= keyBytes);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ASSERT(keyPrefix.size() + 16 <= keyBytes); // make sure the string format is valid
|
||||||
|
}
|
||||||
|
nodeCountPerPrefix = getOption(options, LiteralStringRef("nodeCountPerPrefix"), 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string description() override { return "ReportConflictingKeysWorkload"; }
|
std::string description() override { return "ReportConflictingKeysWorkload"; }
|
||||||
@ -92,34 +102,51 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||||||
double getCheckTimeout() override { return std::numeric_limits<double>::max(); }
|
double getCheckTimeout() override { return std::numeric_limits<double>::max(); }
|
||||||
|
|
||||||
// Copied from tester.actor.cpp, added parameter to determine the key's length
|
// Copied from tester.actor.cpp, added parameter to determine the key's length
|
||||||
Key keyForIndex(int n) {
|
Key keyForIndex(int prefixIdx, int n) {
|
||||||
double p = (double)n / nodeCount;
|
double p = (double)n / nodeCountPerPrefix;
|
||||||
int paddingLen = keyBytes - 16 - keyPrefix.size();
|
int paddingLen = keyBytes - 16 - keyPrefixBytes;
|
||||||
// left padding by zero
|
// left padding by zero
|
||||||
return StringRef(format("%0*llx", paddingLen, *(uint64_t*)&p)).withPrefix(keyPrefix);
|
return StringRef(format("%0*llx", paddingLen, *(uint64_t*)&p)).withPrefix( prefixIdx >= 0 ? keyPrefixForIndex( prefixIdx) : keyPrefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
Key keyPrefixForIndex(uint64_t n) {
|
||||||
|
Key prefix = makeString(keyPrefixBytes);
|
||||||
|
uint8_t * head = mutateString(prefix);
|
||||||
|
memset(head, 0, keyPrefixBytes);
|
||||||
|
int offset = keyPrefixBytes - 1;
|
||||||
|
while (n) {
|
||||||
|
*(head + offset) = static_cast<uint8_t>(n % 256);
|
||||||
|
n /= 256;
|
||||||
|
offset -= 1;
|
||||||
|
}
|
||||||
|
return prefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
void addRandomReadConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>& readConflictRanges) {
|
void addRandomReadConflictRange(ReadYourWritesTransaction* tr, std::vector<KeyRange>& readConflictRanges) {
|
||||||
int startIdx, endIdx;
|
int startIdx, endIdx, startPrefixIdx, endPrefixIdx;
|
||||||
Key startKey, endKey;
|
Key startKey, endKey;
|
||||||
while (deterministicRandom()->random01() < addReadConflictRangeProb) {
|
while (deterministicRandom()->random01() < addReadConflictRangeProb) {
|
||||||
startIdx = deterministicRandom()->randomInt(0, nodeCount);
|
startPrefixIdx = keyPrefixBytes ? deterministicRandom()->randomInt(0, prefixCount) : -1;
|
||||||
endIdx = deterministicRandom()->randomInt(startIdx, nodeCount);
|
endPrefixIdx = keyPrefixBytes ? deterministicRandom()->randomInt(startPrefixIdx, prefixCount) : -1;
|
||||||
startKey = keyForIndex(startIdx);
|
startIdx = deterministicRandom()->randomInt(0, nodeCountPerPrefix);
|
||||||
endKey = keyForIndex(endIdx);
|
endIdx = deterministicRandom()->randomInt(startPrefixIdx < endPrefixIdx ? 0 : startIdx, nodeCountPerPrefix);
|
||||||
|
startKey = keyForIndex(startPrefixIdx, startIdx);
|
||||||
|
endKey = keyForIndex(endPrefixIdx, endIdx);
|
||||||
tr->addReadConflictRange(KeyRangeRef(startKey, endKey));
|
tr->addReadConflictRange(KeyRangeRef(startKey, endKey));
|
||||||
readConflictRanges.push_back(KeyRangeRef(startKey, endKey));
|
readConflictRanges.push_back(KeyRangeRef(startKey, endKey));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addRandomWriteConflictRange(ReadYourWritesTransaction* tr) {
|
void addRandomWriteConflictRange(ReadYourWritesTransaction* tr) {
|
||||||
int startIdx, endIdx;
|
int startIdx, endIdx, startPrefixIdx, endPrefixIdx;
|
||||||
Key startKey, endKey;
|
Key startKey, endKey;
|
||||||
while (deterministicRandom()->random01() < addWriteConflictRangeProb) {
|
while (deterministicRandom()->random01() < addWriteConflictRangeProb) {
|
||||||
startIdx = deterministicRandom()->randomInt(0, nodeCount);
|
startPrefixIdx = keyPrefixBytes ? deterministicRandom()->randomInt(0, prefixCount) : -1;
|
||||||
endIdx = deterministicRandom()->randomInt(startIdx, nodeCount);
|
endPrefixIdx = keyPrefixBytes ? deterministicRandom()->randomInt(startPrefixIdx, prefixCount) : -1;
|
||||||
startKey = keyForIndex(startIdx);
|
startIdx = deterministicRandom()->randomInt(0, nodeCountPerPrefix);
|
||||||
endKey = keyForIndex(endIdx);
|
endIdx = deterministicRandom()->randomInt(startPrefixIdx < endPrefixIdx ? 0 : startIdx, nodeCountPerPrefix);
|
||||||
|
startKey = keyForIndex(startPrefixIdx, startIdx);
|
||||||
|
endKey = keyForIndex(endPrefixIdx, endIdx);
|
||||||
tr->addWriteConflictRange(KeyRangeRef(startKey, endKey));
|
tr->addWriteConflictRange(KeyRangeRef(startKey, endKey));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -153,33 +180,34 @@ struct ReportConflictingKeysWorkload : TestWorkload {
|
|||||||
wait(tr.onError(e));
|
wait(tr.onError(e));
|
||||||
// check API correctness
|
// check API correctness
|
||||||
if (!self->skipCorrectnessCheck && self->reportConflictingKeys && isConflict) {
|
if (!self->skipCorrectnessCheck && self->reportConflictingKeys && isConflict) {
|
||||||
Optional<Standalone<StringRef>> temp =
|
state KeyRange ckr = KeyRangeRef(LiteralStringRef("\xff\xff/conflicting_keys/"), LiteralStringRef("\xff\xff/conflicting_keys/\xff"));
|
||||||
wait(tr.get(LiteralStringRef("\xff\xff/conflicting_keys/json")));
|
loop {
|
||||||
auto jsonStr = temp.get().toString();
|
try {
|
||||||
json_spirit::mValue val;
|
Standalone<RangeResultRef> conflictingKeyRanges = wait(tr.getRange(ckr, readConflictRanges.size() * 2));
|
||||||
if (json_spirit::read_string(jsonStr, val)) {
|
ASSERT( conflictingKeyRanges.size() && ( conflictingKeyRanges.size() % 2 == 0 ) );
|
||||||
auto root = val.get_array();
|
for (int i = 0; i < conflictingKeyRanges.size(); i += 2) {
|
||||||
ASSERT(root.size() > 0);
|
KeyValueRef startKey = conflictingKeyRanges[i];
|
||||||
// Only use the last entry which contains the read_conflict_ranges corresponding to current
|
ASSERT(startKey.value == conflictingKeysTrue);
|
||||||
// conflicts
|
KeyValueRef endKey = conflictingKeyRanges[i+1];
|
||||||
for (const auto& pair : root) {
|
ASSERT(endKey.value == conflictingKeysFalse);
|
||||||
json_spirit::mObject kr_obj = pair.get_obj();
|
KeyRange kr = KeyRangeRef(startKey.key, endKey.key);
|
||||||
std::string start_key = base64::decoder::from_string(kr_obj["begin"].get_str());
|
if (!std::any_of(readConflictRanges.begin(), readConflictRanges.end(), [&kr](KeyRange rCR) {
|
||||||
std::string end_key = base64::decoder::from_string(kr_obj["end"].get_str());
|
// Read_conflict_range remains same in the resolver.
|
||||||
KeyRange kr = KeyRangeRef(start_key, end_key);
|
// Thus, the returned keyrange is either the original read_conflict_range or merged
|
||||||
if (!std::any_of(readConflictRanges.begin(), readConflictRanges.end(), [&kr](KeyRange rCR) {
|
// by several overlapped ones In either case, it contains at least one original
|
||||||
// Returned KeyRange is an intersection of Read_conflict_range and Write_conflict_range saved in the resolver.
|
// read_conflict_range
|
||||||
// Thus, it intersects with at least one original read_conflict_range
|
return kr.intersects(rCR);
|
||||||
return kr.intersects(rCR);
|
})) {
|
||||||
})) {
|
++self->invalidReports;
|
||||||
++self->invalidReports;
|
TraceEvent(SevError, "TestFailure").detail("Reason", "InvalidKeyRangeReturned");
|
||||||
TraceEvent(SevError, "TestFailure").detail("Reason", "InvalidKeyRangeReturned");
|
}
|
||||||
}
|
}
|
||||||
|
break; // leave the loop once we finish readRange
|
||||||
|
} catch (Error& err) {
|
||||||
|
TraceEvent("FailedToGetConflictingKeys").error(err);
|
||||||
|
wait(tr.onError(err));
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
++self->invalidReports;
|
|
||||||
TraceEvent(SevError, "TestFailure").detail("Reason", "FailedToParseJson");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
++self->retries;
|
++self->retries;
|
||||||
}
|
}
|
||||||
|
@ -65,8 +65,8 @@ add_fdb_test(TEST_FILES ReadHalfAbsent.txt IGNORE)
|
|||||||
add_fdb_test(TEST_FILES RedwoodCorrectnessUnits.txt IGNORE)
|
add_fdb_test(TEST_FILES RedwoodCorrectnessUnits.txt IGNORE)
|
||||||
add_fdb_test(TEST_FILES RedwoodCorrectnessBTree.txt IGNORE)
|
add_fdb_test(TEST_FILES RedwoodCorrectnessBTree.txt IGNORE)
|
||||||
add_fdb_test(TEST_FILES RedwoodCorrectnessPager.txt IGNORE)
|
add_fdb_test(TEST_FILES RedwoodCorrectnessPager.txt IGNORE)
|
||||||
add_fdb_test(TEST_FILES ReportConflictingKeys.txt IGNORE)
|
|
||||||
add_fdb_test(TEST_FILES fast/RedwoodCorrectnessBTree.txt IGNORE)
|
add_fdb_test(TEST_FILES fast/RedwoodCorrectnessBTree.txt IGNORE)
|
||||||
|
add_fdb_test(TEST_FILES fast/ReportConflictingKeys.txt IGNORE)
|
||||||
add_fdb_test(TEST_FILES RedwoodCorrectness.txt IGNORE)
|
add_fdb_test(TEST_FILES RedwoodCorrectness.txt IGNORE)
|
||||||
add_fdb_test(TEST_FILES RedwoodPerfTests.txt IGNORE)
|
add_fdb_test(TEST_FILES RedwoodPerfTests.txt IGNORE)
|
||||||
add_fdb_test(TEST_FILES SampleNoSimAttrition.txt IGNORE)
|
add_fdb_test(TEST_FILES SampleNoSimAttrition.txt IGNORE)
|
||||||
|
@ -2,10 +2,11 @@ testTitle=ReportConflictingKeysTest
|
|||||||
testName=ReportConflictingKeys
|
testName=ReportConflictingKeys
|
||||||
testDuration=10.0
|
testDuration=10.0
|
||||||
transactionsPerSecond=100000
|
transactionsPerSecond=100000
|
||||||
nodeCount=10000
|
nodeCountPerPrefix=10000
|
||||||
actorsPerClient=16
|
actorsPerClient=256
|
||||||
keyPrefix=RCK
|
keyPrefix=RCK
|
||||||
keyBytes=64
|
keyBytes=64
|
||||||
readConflictRangeCountPerTx=1
|
readConflictRangeCountPerTx=1
|
||||||
writeConflictRangeCountPerTx=1
|
writeConflictRangeCountPerTx=1
|
||||||
reportConflictingKeys=true
|
reportConflictingKeys=true
|
||||||
|
skipCorrectnessCheck=false
|
Loading…
x
Reference in New Issue
Block a user