mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 09:58:50 +08:00
Add ratekeeper ID for storage server busiest write tag report
This commit is contained in:
parent
ca482784d4
commit
195890dd7b
@ -58,6 +58,9 @@ struct VersionReply {
|
||||
struct UpdateCommitCostRequest {
|
||||
constexpr static FileIdentifier file_identifier = 4159439;
|
||||
|
||||
// Ratekeeper ID, it is only reasonable to compare postTime from the same Ratekeeper
|
||||
UID ratekeeperID;
|
||||
|
||||
// The time the request being posted
|
||||
double postTime;
|
||||
|
||||
@ -75,7 +78,7 @@ struct UpdateCommitCostRequest {
|
||||
|
||||
template <typename Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, postTime, elapsed, busiestTag, opsSum, costSum, totalWriteCosts, reported, reply);
|
||||
serializer(ar, ratekeeperID, postTime, elapsed, busiestTag, opsSum, costSum, totalWriteCosts, reported, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -150,7 +150,7 @@ public:
|
||||
|
||||
ACTOR static Future<Void> trackStorageServerQueueInfo(ActorWeakSelfRef<Ratekeeper> self,
|
||||
StorageServerInterface ssi) {
|
||||
self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality)));
|
||||
self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(self->id, ssi.id(), ssi.locality)));
|
||||
TraceEvent("RkTracking", self->id)
|
||||
.detail("StorageServer", ssi.id())
|
||||
.detail("Locality", ssi.locality.toString());
|
||||
@ -952,8 +952,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
||||
return Void();
|
||||
}
|
||||
|
||||
StorageQueueInfo::StorageQueueInfo(UID id, LocalityData locality)
|
||||
: valid(false), id(id), locality(locality), acceptingRequests(false),
|
||||
StorageQueueInfo::StorageQueueInfo(const UID& ratekeeperID_, const UID& id_, const LocalityData& locality_)
|
||||
: valid(false), ratekeeperID(ratekeeperID_), id(id_), locality(locality_), acceptingRequests(false),
|
||||
smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothDurableVersion(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothLatestVersion(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
@ -962,6 +962,9 @@ StorageQueueInfo::StorageQueueInfo(UID id, LocalityData locality)
|
||||
lastReply.instanceID = -1;
|
||||
}
|
||||
|
||||
StorageQueueInfo::StorageQueueInfo(const UID& id_, const LocalityData& locality_)
|
||||
: StorageQueueInfo(UID(), id_, locality_) {}
|
||||
|
||||
void StorageQueueInfo::addCommitCost(TransactionTagRef tagName, TransactionCommitCostEstimation const& cost) {
|
||||
tagCostEst[tagName] += cost;
|
||||
totalWriteCosts += cost.getCostSum();
|
||||
@ -1014,14 +1017,15 @@ UpdateCommitCostRequest StorageQueueInfo::refreshCommitCost(double elapsed) {
|
||||
busiestWriteTags.emplace_back(busiestTag, maxRate, maxBusyness);
|
||||
}
|
||||
|
||||
UpdateCommitCostRequest updateCommitCostRequest;
|
||||
updateCommitCostRequest.postTime = now();
|
||||
updateCommitCostRequest.elapsed = elapsed;
|
||||
updateCommitCostRequest.busiestTag = busiestTag;
|
||||
updateCommitCostRequest.opsSum = maxCost.getOpsSum();
|
||||
updateCommitCostRequest.costSum = maxCost.getCostSum();
|
||||
updateCommitCostRequest.totalWriteCosts = totalWriteCosts;
|
||||
updateCommitCostRequest.reported = !busiestWriteTags.empty();
|
||||
UpdateCommitCostRequest updateCommitCostRequest{ ratekeeperID,
|
||||
now(),
|
||||
elapsed,
|
||||
busiestTag,
|
||||
maxCost.getOpsSum(),
|
||||
maxCost.getCostSum(),
|
||||
totalWriteCosts,
|
||||
!busiestWriteTags.empty(),
|
||||
ReplyPromise<Void>() };
|
||||
|
||||
// reset statistics
|
||||
tagCostEst.clear();
|
||||
|
@ -58,6 +58,7 @@ class StorageQueueInfo {
|
||||
|
||||
public:
|
||||
bool valid;
|
||||
UID ratekeeperID;
|
||||
UID id;
|
||||
LocalityData locality;
|
||||
StorageQueuingMetricsReply lastReply;
|
||||
@ -69,7 +70,8 @@ public:
|
||||
limitReason_t limitReason;
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> busiestReadTags, busiestWriteTags;
|
||||
|
||||
StorageQueueInfo(UID id, LocalityData locality);
|
||||
StorageQueueInfo(const UID& id, const LocalityData& locality);
|
||||
StorageQueueInfo(const UID& rateKeeperID, const UID& id, const LocalityData& locality);
|
||||
// Summarizes up the commit cost per storage server. Returns the UpdateCommitCostRequest for corresponding SS.
|
||||
UpdateCommitCostRequest refreshCommitCost(double elapsed);
|
||||
int64_t getStorageQueueBytes() const { return lastReply.bytesInput - smoothDurableBytes.smoothTotal(); }
|
||||
|
@ -622,12 +622,13 @@ public:
|
||||
|
||||
struct BusiestWriteTagContext {
|
||||
const std::string busiestWriteTagTrackingKey;
|
||||
UID ratekeeperID;
|
||||
Reference<EventCacheHolder> busiestWriteTagEventHolder;
|
||||
double lastUpdateTime;
|
||||
|
||||
BusiestWriteTagContext(const UID& thisServerID)
|
||||
: busiestWriteTagTrackingKey(thisServerID.toString() + "/BusiestWriteTag"),
|
||||
busiestWriteTagEventHolder(makeReference<EventCacheHolder>(busiestWriteTagTrackingKey)), lastUpdateTime(0.0) {}
|
||||
: busiestWriteTagTrackingKey(thisServerID.toString() + "/BusiestWriteTag"), ratekeeperID(UID()),
|
||||
busiestWriteTagEventHolder(makeReference<EventCacheHolder>(busiestWriteTagTrackingKey)), lastUpdateTime(-1) {}
|
||||
};
|
||||
|
||||
struct StorageServer {
|
||||
@ -1732,7 +1733,9 @@ ACTOR Future<Version> waitForVersionNoTooOld(StorageServer* data, Version versio
|
||||
if (version <= data->version.get())
|
||||
return version;
|
||||
choose {
|
||||
when(wait(data->version.whenAtLeast(version))) { return version; }
|
||||
when(wait(data->version.whenAtLeast(version))) {
|
||||
return version;
|
||||
}
|
||||
when(wait(delay(SERVER_KNOBS->FUTURE_VERSION_DELAY))) {
|
||||
if (deterministicRandom()->random01() < 0.001)
|
||||
TraceEvent(SevWarn, "ShardServerFutureVersion1000x", data->thisServerID)
|
||||
@ -9519,7 +9522,9 @@ ACTOR Future<Void> waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re
|
||||
|
||||
}*/
|
||||
}
|
||||
when(wait(timeout)) { timedout = true; }
|
||||
when(wait(timeout)) {
|
||||
timedout = true;
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
@ -10079,7 +10084,18 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
||||
self->actors.add(fetchCheckpointQ(self, req));
|
||||
}
|
||||
when(UpdateCommitCostRequest req = waitNext(ssi.updateCommitCostRequest.getFuture())) {
|
||||
// Ratekeeper might change with a new ID. In this case, always accept the data.
|
||||
if (req.ratekeeperID != self->busiestWriteTagContext.ratekeeperID) {
|
||||
TraceEvent("RatekeeperIDChange")
|
||||
.detail("OldID", self->busiestWriteTagContext.ratekeeperID)
|
||||
.detail("OldLastUpdateTime", self->busiestWriteTagContext.lastUpdateTime)
|
||||
.detail("NewID", req.ratekeeperID)
|
||||
.detail("LastUpdateTime", req.postTime);
|
||||
self->busiestWriteTagContext.ratekeeperID = req.ratekeeperID;
|
||||
self->busiestWriteTagContext.lastUpdateTime = -1;
|
||||
}
|
||||
// In case we received an old request/duplicate request, due to, e.g. network problem
|
||||
ASSERT(req.postTime > 0);
|
||||
if (req.postTime < self->busiestWriteTagContext.lastUpdateTime) {
|
||||
continue;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user