Make the TagSet sent to the storage servers optional so we can distinguish no tags from unsampled.

This commit is contained in:
A.J. Beamon 2020-04-10 13:29:28 -07:00
parent 29b2c2f3aa
commit 6508c891fc
4 changed files with 28 additions and 27 deletions

View File

@ -1364,7 +1364,7 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetValueReply _reply =
wait(loadBalance(ssi.second, &StorageServerInterface::getValue,
GetValueRequest(key, ver, cx->sampleReadTags() ? tags : TagSet(), getValueID), TaskPriority::DefaultPromiseEndpoint, false,
GetValueRequest(key, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID), TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
@ -1452,7 +1452,7 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetKeyReply _reply =
wait(loadBalance(ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get(), cx->sampleReadTags() ? tags : TagSet(), info.debugID),
wait(loadBalance(ssi.second, &StorageServerInterface::getKey, GetKeyRequest(k, version.get(), cx->sampleReadTags() ? tags : Optional<TagSet>(), info.debugID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
@ -1543,7 +1543,7 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
state WatchValueReply resp;
choose {
when(WatchValueReply r = wait(loadBalance(ssi.second, &StorageServerInterface::watchValue,
WatchValueRequest(key, value, ver, cx->sampleReadTags() ? tags : TagSet(), watchValueID),
WatchValueRequest(key, value, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), watchValueID),
TaskPriority::DefaultPromiseEndpoint))) {
resp = r;
}
@ -1626,7 +1626,7 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
//FIXME: buggify byte limits on internal functions that use them, instead of globally
req.tags = cx->sampleReadTags() ? tags : TagSet();
req.tags = cx->sampleReadTags() ? tags : Optional<TagSet>();
req.debugID = info.debugID;
try {
@ -1915,7 +1915,7 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
transformRangeLimits(limits, reverse, req);
ASSERT(req.limitBytes > 0 && req.limit != 0 && req.limit < 0 == reverse);
req.tags = cx->sampleReadTags() ? tags : TagSet();
req.tags = cx->sampleReadTags() ? tags : Optional<TagSet>();
req.debugID = info.debugID;
try {
if( info.debugID.present() ) {

View File

@ -143,12 +143,12 @@ struct GetValueRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 8454530;
Key key;
Version version;
TagSet tags;
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromise<GetValueReply> reply;
GetValueRequest(){}
GetValueRequest(const Key& key, Version ver, TagSet tags, Optional<UID> debugID) : key(key), version(ver), tags(tags), debugID(debugID) {}
GetValueRequest(const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
@ -174,12 +174,12 @@ struct WatchValueRequest {
Key key;
Optional<Value> value;
Version version;
TagSet tags;
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromise<WatchValueReply> reply;
WatchValueRequest(){}
WatchValueRequest(const Key& key, Optional<Value> value, Version ver, TagSet tags, Optional<UID> debugID) : key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
WatchValueRequest(const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
@ -210,12 +210,11 @@ struct GetKeyValuesRequest : TimedRequest {
Version version; // or latestVersion
int limit, limitBytes;
bool isFetchKeys;
TagSet tags;
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromise<GetKeyValuesReply> reply;
GetKeyValuesRequest() : isFetchKeys(false) {}
// GetKeyValuesRequest(const KeySelectorRef& begin, const KeySelectorRef& end, Version version, int limit, int limitBytes, Optional<UID> debugID) : begin(begin), end(end), version(version), limit(limit), limitBytes(limitBytes) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, begin, end, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, arena);
@ -240,12 +239,12 @@ struct GetKeyRequest : TimedRequest {
Arena arena;
KeySelectorRef sel;
Version version; // or latestVersion
TagSet tags;
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromise<GetKeyReply> reply;
GetKeyRequest() {}
GetKeyRequest(KeySelectorRef const& sel, Version version, TagSet tags, Optional<UID> debugID) : sel(sel), version(version), debugID(debugID) {}
GetKeyRequest(KeySelectorRef const& sel, Version version, Optional<TagSet> tags, Optional<UID> debugID) : sel(sel), version(version), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {

View File

@ -146,7 +146,7 @@ struct dynamic_size_traits<TagSet> : std::true_type {
t.bytes += tag.size();
}
t.arena = context.arena();
t.arena = context.arena(); // TODO: this arena could be big
}
};

View File

@ -461,30 +461,32 @@ public:
};
std::unordered_map<Standalone<StringRef>, int64_t, std::hash<StringRef>> intervalCounts;
int64_t intervalTotalCount = 0;
int64_t intervalTotalSampledCount = 0;
Standalone<StringRef> busiestTag;
int64_t busiestTagCount = 0;
double intervalStart = 0;
Optional<TagInfo> previousBusiestTag;
void increment(TagSet const& tags, int64_t delta) {
for(auto& tag : tags) {
int64_t &count = intervalCounts[Standalone<StringRef>(tag, tags.arena)];
count += delta;
if(count > busiestTagCount) {
busiestTagCount = count;
busiestTag = tag;
void increment(Optional<TagSet> const& tags, int64_t delta) {
if(tags.present()) {
for(auto& tag : tags.get()) {
int64_t &count = intervalCounts[Standalone<StringRef>(tag, tags.get().arena)];
count += delta;
if(count > busiestTagCount) {
busiestTagCount = count;
busiestTag = tag;
}
}
}
intervalTotalCount += delta;
intervalTotalSampledCount += delta;
}
}
void startNewInterval(UID id) {
double elapsed = now() - intervalStart;
if(intervalStart > 0 && busiestTagCount >= SERVER_KNOBS->MIN_TAG_PAGES_READ_RATE * elapsed) {
previousBusiestTag = TagInfo(busiestTag, busiestTagCount, intervalTotalCount, elapsed);
previousBusiestTag = TagInfo(busiestTag, busiestTagCount, intervalTotalSampledCount, elapsed);
}
else {
previousBusiestTag.reset();
@ -495,11 +497,11 @@ public:
.detail("Elapsed", elapsed)
.detail("Tag", printable(busiestTag))
.detail("TagCount", busiestTagCount)
.detail("TotalCount", intervalTotalCount)
.detail("TotalSampledCount", intervalTotalSampledCount)
.detail("Reported", previousBusiestTag.present());
intervalCounts.clear();
intervalTotalCount = 0;
intervalTotalSampledCount = 0;
busiestTagCount = 0;
intervalStart = now();
}