1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-26 09:22:34 +08:00

Rename class RatekeeperData to Ratekeeper

This commit is contained in:
sfc-gh-tclinkenbeard 2022-02-14 12:42:25 -08:00
parent d6c5239080
commit 8074630530
2 changed files with 34 additions and 35 deletions

@ -65,9 +65,9 @@ ACTOR static Future<Void> splitError(Future<Void> in, Promise<Void> errOut) {
}
}
class RatekeeperDataImpl {
class RatekeeperImpl {
public:
ACTOR static Future<Void> configurationMonitor(RatekeeperData* self) {
ACTOR static Future<Void> configurationMonitor(Ratekeeper* self) {
loop {
state ReadYourWritesTransaction tr(self->db);
@ -95,7 +95,7 @@ public:
}
ACTOR static Future<Void> monitorServerListChange(
RatekeeperData* self,
Ratekeeper* self,
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
state std::map<UID, StorageServerInterface> oldServers;
state Transaction tr(self->db);
@ -140,7 +140,7 @@ public:
}
}
ACTOR static Future<Void> trackStorageServerQueueInfo(RatekeeperData* self, StorageServerInterface ssi) {
ACTOR static Future<Void> trackStorageServerQueueInfo(Ratekeeper* self, StorageServerInterface ssi) {
self->storageQueueInfo.insert(mapPair(ssi.id(), StorageQueueInfo(ssi.id(), ssi.locality)));
state Map<UID, StorageQueueInfo>::iterator myQueueInfo = self->storageQueueInfo.find(ssi.id());
TraceEvent("RkTracking", self->id)
@ -195,7 +195,7 @@ public:
}
}
ACTOR static Future<Void> trackTLogQueueInfo(RatekeeperData* self, TLogInterface tli) {
ACTOR static Future<Void> trackTLogQueueInfo(Ratekeeper* self, TLogInterface tli) {
self->tlogQueueInfo.insert(mapPair(tli.id(), TLogQueueInfo(tli.id())));
state Map<UID, TLogQueueInfo>::iterator myQueueInfo = self->tlogQueueInfo.find(tli.id());
TraceEvent("RkTracking", self->id).detail("TransactionLog", tli.id());
@ -241,7 +241,7 @@ public:
}
ACTOR static Future<Void> trackEachStorageServer(
RatekeeperData* self,
Ratekeeper* self,
FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
state Map<UID, Future<Void>> actors;
state Promise<Void> err;
@ -261,7 +261,7 @@ public:
}
}
ACTOR static Future<Void> monitorThrottlingChanges(RatekeeperData* self) {
ACTOR static Future<Void> monitorThrottlingChanges(Ratekeeper* self) {
state bool committed = false;
loop {
state ReadYourWritesTransaction tr(self->db);
@ -373,8 +373,7 @@ public:
}
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state RatekeeperData self(rkInterf.id(),
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;
state std::vector<TLogInterface> tlogInterfs;
@ -391,7 +390,7 @@ public:
self.addActor.send(traceRole(Role::RATEKEEPER, rkInterf.id()));
self.addActor.send(self.monitorThrottlingChanges());
RatekeeperData* selfPtr = &self; // let flow compiler capture self
Ratekeeper* selfPtr = &self; // let flow compiler capture self
self.addActor.send(recurring([selfPtr]() { selfPtr->refreshStorageServerCommitCost(); },
SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
@ -513,39 +512,39 @@ public:
return Void();
}
}; // class RatekeeperDataImpl
}; // class RatekeeperImpl
Future<Void> RatekeeperData::configurationMonitor() {
return RatekeeperDataImpl::configurationMonitor(this);
Future<Void> Ratekeeper::configurationMonitor() {
return RatekeeperImpl::configurationMonitor(this);
}
Future<Void> RatekeeperData::monitorServerListChange(
Future<Void> Ratekeeper::monitorServerListChange(
PromiseStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
return RatekeeperDataImpl::monitorServerListChange(this, serverChanges);
return RatekeeperImpl::monitorServerListChange(this, serverChanges);
}
Future<Void> RatekeeperData::trackEachStorageServer(
Future<Void> Ratekeeper::trackEachStorageServer(
FutureStream<std::pair<UID, Optional<StorageServerInterface>>> serverChanges) {
return RatekeeperDataImpl::trackEachStorageServer(this, serverChanges);
return RatekeeperImpl::trackEachStorageServer(this, serverChanges);
}
Future<Void> RatekeeperData::trackStorageServerQueueInfo(StorageServerInterface ssi) {
return RatekeeperDataImpl::trackStorageServerQueueInfo(this, ssi);
Future<Void> Ratekeeper::trackStorageServerQueueInfo(StorageServerInterface ssi) {
return RatekeeperImpl::trackStorageServerQueueInfo(this, ssi);
}
Future<Void> RatekeeperData::trackTLogQueueInfo(TLogInterface tli) {
return RatekeeperDataImpl::trackTLogQueueInfo(this, tli);
Future<Void> Ratekeeper::trackTLogQueueInfo(TLogInterface tli) {
return RatekeeperImpl::trackTLogQueueInfo(this, tli);
}
Future<Void> RatekeeperData::monitorThrottlingChanges() {
return RatekeeperDataImpl::monitorThrottlingChanges(this);
Future<Void> Ratekeeper::monitorThrottlingChanges() {
return RatekeeperImpl::monitorThrottlingChanges(this);
}
Future<Void> RatekeeperData::run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
return RatekeeperDataImpl::run(rkInterf, dbInfo);
Future<Void> Ratekeeper::run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
return RatekeeperImpl::run(rkInterf, dbInfo);
}
RatekeeperData::RatekeeperData(UID id, Database db)
Ratekeeper::Ratekeeper(UID id, Database db)
: id(id), db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
@ -571,7 +570,7 @@ RatekeeperData::RatekeeperData(UID id, Database db)
SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL);
}
void RatekeeperData::updateCommitCostEstimation(
void Ratekeeper::updateCommitCostEstimation(
UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation) {
for (auto it = storageQueueInfo.begin(); it != storageQueueInfo.end(); ++it) {
auto tagCostIt = costEstimation.find(it->key);
@ -585,7 +584,7 @@ void RatekeeperData::updateCommitCostEstimation(
}
}
void RatekeeperData::updateRate(RatekeeperLimits* limits) {
void Ratekeeper::updateRate(RatekeeperLimits* limits) {
// double controlFactor = ; // dt / eFoldingTime
double actualTps = smoothReleasedTransactions.smoothRate();
@ -1066,7 +1065,7 @@ void RatekeeperData::updateRate(RatekeeperLimits* limits) {
}
}
Future<Void> RatekeeperData::refreshStorageServerCommitCost() {
Future<Void> Ratekeeper::refreshStorageServerCommitCost() {
if (lastBusiestCommitTagPick == 0) { // the first call should be skipped
lastBusiestCommitTagPick = now();
return Void();
@ -1113,7 +1112,7 @@ Future<Void> RatekeeperData::refreshStorageServerCommitCost() {
return Void();
}
void RatekeeperData::tryAutoThrottleTag(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) {
void Ratekeeper::tryAutoThrottleTag(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) {
// NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE
// currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs.
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
@ -1136,7 +1135,7 @@ void RatekeeperData::tryAutoThrottleTag(TransactionTag tag, double rate, double
}
}
void RatekeeperData::tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQueue, int64_t storageDurabilityLag) {
void Ratekeeper::tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQueue, int64_t storageDurabilityLag) {
// NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In most
// of situation, this works. More indicators besides queue size and durability lag could be investigated in the
// future
@ -1158,6 +1157,6 @@ void RatekeeperData::tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQue
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
wait(RatekeeperData::run(rkInterf, dbInfo));
wait(Ratekeeper::run(rkInterf, dbInfo));
return Void();
}

@ -502,7 +502,7 @@ struct RatekeeperLimits {
context(context), rkUpdateEventCacheHolder(makeReference<EventCacheHolder>("RkUpdate" + context)) {}
};
struct RatekeeperData {
struct Ratekeeper {
// Differentiate from GrvProxyInfo in DatabaseContext.h
struct GrvProxyInfo {
int64_t totalTransactions;
@ -523,7 +523,7 @@ struct RatekeeperData {
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, RatekeeperData::GrvProxyInfo> grvProxyInfo;
std::map<UID, Ratekeeper::GrvProxyInfo> grvProxyInfo;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics;
DatabaseConfiguration configuration;
@ -548,7 +548,7 @@ struct RatekeeperData {
bool autoThrottlingEnabled;
RatekeeperData(UID id, Database db);
Ratekeeper(UID id, Database db);
Future<Void> configurationMonitor();
void updateCommitCostEstimation(UIDTransactionTagMap<TransactionCommitCostEstimation> const& costEstimation);