mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-03 03:41:53 +08:00
Merge pull request #3999 from dongxinEric/misc/attach-primary-peek-location-to-logRouterMetrics
Add a way to decorate the trace event created by `traceCounters` with more details.
This commit is contained in:
commit
4086e3a275
@ -76,7 +76,9 @@ void CounterCollection::logToTraceEvent(TraceEvent &te) const {
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> traceCounters(std::string traceEventName, UID traceEventID, double interval, CounterCollection* counters, std::string trackLatestName) {
|
||||
ACTOR Future<Void> traceCounters(std::string traceEventName, UID traceEventID, double interval,
|
||||
CounterCollection* counters, std::string trackLatestName,
|
||||
std::function<void(TraceEvent&)> decorator) {
|
||||
wait(delay(0)); // Give an opportunity for all members used in special counters to be initialized
|
||||
|
||||
for (ICounter* c : counters->counters)
|
||||
@ -89,6 +91,7 @@ ACTOR Future<Void> traceCounters(std::string traceEventName, UID traceEventID, d
|
||||
te.detail("Elapsed", now() - last_interval);
|
||||
|
||||
counters->logToTraceEvent(te);
|
||||
decorator(te);
|
||||
|
||||
if (!trackLatestName.empty()) {
|
||||
te.trackLatest(trackLatestName);
|
||||
|
@ -132,7 +132,9 @@ struct SpecialCounter : ICounter, FastAllocated<SpecialCounter<F>>, NonCopyable
|
||||
template <class F>
|
||||
static void specialCounter(CounterCollection& collection, std::string const& name, F && f) { new SpecialCounter<F>(collection, name, std::move(f)); }
|
||||
|
||||
Future<Void> traceCounters(std::string const& traceEventName, UID const& traceEventID, double const& interval, CounterCollection* const& counters, std::string const& trackLatestName = std::string());
|
||||
Future<Void> traceCounters(std::string const& traceEventName, UID const& traceEventID, double const& interval,
|
||||
CounterCollection* const& counters, std::string const& trackLatestName = std::string(),
|
||||
std::function<void(TraceEvent&)> const& decorator = [](TraceEvent& te) {});
|
||||
|
||||
class LatencyBands {
|
||||
public:
|
||||
|
@ -75,6 +75,7 @@ struct LogRouterData {
|
||||
|
||||
UID dbgid;
|
||||
Reference<AsyncVar<Reference<ILogSystem>>> logSystem;
|
||||
Optional<UID> primaryPeekLocation;
|
||||
NotifiedVersion version;
|
||||
NotifiedVersion minPopped;
|
||||
Version startVersion;
|
||||
@ -89,6 +90,7 @@ struct LogRouterData {
|
||||
double maxWaitForVersionTime = 0;
|
||||
double getMoreTime = 0;
|
||||
double maxGetMoreTime = 0;
|
||||
int64_t generation = -1;
|
||||
|
||||
struct PeekTrackerData {
|
||||
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
|
||||
@ -119,9 +121,12 @@ struct LogRouterData {
|
||||
return newTagData;
|
||||
}
|
||||
|
||||
LogRouterData(UID dbgid, const InitializeLogRouterRequest& req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()),
|
||||
version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false),
|
||||
cc("LogRouter", dbgid.toString()), getMoreCount("GetMoreCount", cc), getMoreBlockedCount("GetMoreBlockedCount", cc) {
|
||||
LogRouterData(UID dbgid, const InitializeLogRouterRequest& req)
|
||||
: dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()),
|
||||
version(req.startVersion - 1), minPopped(0), generation(req.recoveryCount), startVersion(req.startVersion),
|
||||
allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false),
|
||||
cc("LogRouter", dbgid.toString()), getMoreCount("GetMoreCount", cc),
|
||||
getMoreBlockedCount("GetMoreBlockedCount", cc) {
|
||||
//setup just enough of a logSet to be able to call getPushLocations
|
||||
logSet.logServers.resize(req.tLogLocalities.size());
|
||||
logSet.tLogPolicy = req.tLogPolicy;
|
||||
@ -148,7 +153,12 @@ struct LogRouterData {
|
||||
specialCounter(cc, "WaitForVersionMaxMS", [this](){ double val = this->maxWaitForVersionTime; this->maxWaitForVersionTime = 0; return 1000*val; });
|
||||
specialCounter(cc, "GetMoreMS", [this](){ double val = this->getMoreTime; this->getMoreTime = 0; return 1000*val; });
|
||||
specialCounter(cc, "GetMoreMaxMS", [this](){ double val = this->maxGetMoreTime; this->maxGetMoreTime = 0; return 1000*val; });
|
||||
logger = traceCounters("LogRouterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "LogRouterMetrics");
|
||||
specialCounter(cc, "Generation", [this]() { return this->generation; });
|
||||
logger = traceCounters("LogRouterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc,
|
||||
"LogRouterMetrics", [this](TraceEvent& te) {
|
||||
te.detail("PrimaryPeekLocation", this->primaryPeekLocation);
|
||||
te.detail("RouterTag", this->routerTag.toString());
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@ -264,6 +274,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
|
||||
if(r) tagPopped = std::max(tagPopped, r->popped());
|
||||
if( self->logSystem->get() ) {
|
||||
r = self->logSystem->get()->peekLogRouter( self->dbgid, tagAt, self->routerTag );
|
||||
self->primaryPeekLocation = r->getPrimaryPeekLocation();
|
||||
TraceEvent("LogRouterPeekLocation", self->dbgid).detail("LogID", r->getPrimaryPeekLocation()).trackLatest(self->eventCacheHolder->trackingKey);
|
||||
} else {
|
||||
r = Reference<ILogSystem::IPeekCursor>();
|
||||
|
@ -576,6 +576,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
specialCounter(cc, "QueueDiskBytesTotal", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().total; });
|
||||
specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); });
|
||||
specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); });
|
||||
specialCounter(cc, "Geneartion", [this]() { return this->recoveryCount; });
|
||||
}
|
||||
|
||||
~LogData() {
|
||||
|
@ -3406,7 +3406,10 @@ ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi
|
||||
|
||||
wait( self->byteSampleRecovery );
|
||||
|
||||
actors.add(traceCounters("StorageMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &self->counters.cc, self->thisServerID.toString() + "/StorageMetrics"));
|
||||
Tag tag = self->tag;
|
||||
actors.add(traceCounters("StorageMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY,
|
||||
&self->counters.cc, self->thisServerID.toString() + "/StorageMetrics",
|
||||
[tag](TraceEvent& te) { te.detail("Tag", tag.toString()); }));
|
||||
|
||||
loop {
|
||||
choose {
|
||||
|
Loading…
x
Reference in New Issue
Block a user