diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index ee57a1b87f..876c4165e5 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -208,23 +208,27 @@ struct StorageServerDisk { // SOMEDAY: Put readNextKeyInclusive in IKeyValueStore Future<Key> readNextKeyInclusive(KeyRef key, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) { + ++(*kvScans); return readFirstKey(storage, KeyRangeRef(key, allKeys.end), type); } Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL, Optional<UID> debugID = Optional<UID>()) { + ++(*kvGets); return storage->readValue(key, type, debugID); } Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL, Optional<UID> debugID = Optional<UID>()) { + ++(*kvGets); return storage->readValuePrefix(key, maxLength, type, debugID); } Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) { + ++(*kvScans); return storage->readRange(keys, rowLimit, byteLimit, type); } @@ -234,6 +238,9 @@ struct StorageServerDisk { Counter* kvBytesCommit; Counter* kvClearRange; + Counter* kvGets; + Counter* kvScans; + Counter* kvCommit; private: struct StorageServer* data; @@ -840,6 +847,10 @@ public: Counter kvBytesScan; Counter kvBytesGet; + Counter eagerReadsKeys; + Counter kvGets; + Counter kvScans; + Counter kvCommit; LatencySample readLatencySample; LatencyBands readLatencyBands; @@ -865,10 +876,12 @@ public: fetchesFromLogs("FetchesFromLogs", cc), quickGetValueHit("QuickGetValueHit", cc), quickGetValueMiss("QuickGetValueMiss", cc), quickGetKeyValuesHit("QuickGetKeyValuesHit", cc), quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc), kvBytesScan("KVBytesScan", cc), - kvBytesGet("KVBytesGet", cc), readLatencySample("ReadLatencyMetrics", - self->thisServerID, - SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, - SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + eagerReadsKeys("EagerReadsKeys", cc), kvGets("KVGets", cc), kvScans("KVScans", cc), + kvCommit("KVCommit", cc), kvBytesGet("KVBytesGet", cc), + readLatencySample("ReadLatencyMetrics", + self->thisServerID, + SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SAMPLE_SIZE), readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { specialCounter(cc, "LastTLogVersion", [self]() { return self->lastTLogVersion; }); specialCounter(cc, "Version", [self]() { return self->version.get(); }); @@ -892,6 +905,8 @@ public: } } counters; + int64_t bytesRestored; + Reference<EventCacheHolder> storageServerSourceTLogIDEventHolder; StorageServer(IKeyValueStore* storage, @@ -949,6 +964,9 @@ public: this->storage.kvBytesCommit = &counters.kvBytesCommit; this->storage.kvClearRange = &counters.kvClearRange; + this->storage.kvGets = &counters.kvGets; + this->storage.kvScans = &counters.kvScans; + this->storage.kvCommit = &counters.kvCommit; } //~StorageServer() { fclose(log); } @@ -1362,6 +1380,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) { path = 2; Optional<Value> vv = wait(data->storage.readValue(req.key, IKeyValueStore::ReadType::NORMAL, req.debugID)); data->counters.kvBytesGet += vv.expectedSize(); + ++data->counters.kvGets; // Validate that while we were reading the data we didn't lose the version or shard if (version < data->storageVersion()) { TEST(true); // transaction_too_old after readValue @@ -1628,7 +1647,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) if (feed->second->storageVersion != invalidVersion) { self->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0), changeFeedDurableKey(feed->second->id, req.version))); - self->counters.systemClearRange++; + ++self->counters.systemClearRange; if (req.version > feed->second->storageVersion) { feed->second->storageVersion = invalidVersion; feed->second->durableVersion = invalidVersion; @@ -3390,6 +3409,7 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager) std::vector<Future<Key>> keyEnd(eager->keyBegin.size()); for (int i = 0; i < keyEnd.size(); i++) keyEnd[i] = data->storage.readNextKeyInclusive(eager->keyBegin[i], IKeyValueStore::ReadType::EAGER); + data->counters.eagerReadsKeys += keyEnd.size(); state Future<std::vector<Key>> futureKeyEnds = getAll(keyEnd); state std::vector<Key> keyEndVal = wait(futureKeyEnds); @@ -3411,6 +3431,7 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager) data->counters.kvBytesGet += value.expectedSize(); } } + data->counters.eagerReadsKeys += eager->keys.size(); eager->value = optionalValues; return Void(); @@ -3701,7 +3722,7 @@ void removeDataRange(StorageServer* ss, MutationRef m(MutationRef::ClearRange, range.end, endClear->getEndKey()); m = ss->addMutationToMutationLog(mLV, m); data.insert(m.param1, ValueOrClearToRef::clearTo(m.param2)); - ss->counters.systemClearRange++; + ++ss->counters.systemClearRange; } auto beginClear = data.atLatest().lastLess(range.begin); @@ -4420,7 +4441,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) { if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) { if (shard->phase < AddingShard::Waiting) { data->storage.clearRange(keys); - data->counters.systemClearRange++; + ++data->counters.systemClearRange; data->byteSampleApplyClear(keys, invalidVersion); } else { ASSERT(data->data().getLatestVersion() > data->version.get()); @@ -4660,7 +4681,7 @@ void changeServerKeys(StorageServer* data, data->addMutation(data->data().getLatestVersion(), true, clearRange, range, data->updateEagerReads); data->newestAvailableVersion.insert(range, latestVersion); setAvailableStatus(data, range, true); - data->counters.systemClearRange++; + ++data->counters.systemClearRange; } validate(data); @@ -4686,12 +4707,12 @@ void changeServerKeys(StorageServer* data, auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); data->addMutationToMutationLog( mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey))); - data->counters.systemClearRange++; + ++data->counters.systemClearRange; data->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, changeFeedDurableKey(f.first, 0), changeFeedDurableKey(f.first, version))); - data->counters.systemClearRange++; + ++data->counters.systemClearRange; auto rs = data->keyChangeFeed.modify(f.second); for (auto r = rs.begin(); r != rs.end(); ++r) { auto& feedList = r->value(); @@ -4939,12 +4960,12 @@ private: auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); data->addMutationToMutationLog( mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey))); - data->counters.systemClearRange++; + ++data->counters.systemClearRange; data->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, changeFeedDurableKey(feed->second->id, 0), changeFeedDurableKey(feed->second->id, currentVersion))); - data->counters.systemClearRange++; + ++data->counters.systemClearRange; auto rs = data->keyChangeFeed.modify(feed->second->range); for (auto r = rs.begin(); r != rs.end(); ++r) { auto& feedList = r->value(); @@ -4965,7 +4986,7 @@ private: if (feed->second->storageVersion != invalidVersion) { data->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0), changeFeedDurableKey(feed->second->id, popVersion))); - data->counters.systemClearRange++; + ++data->counters.systemClearRange; if (popVersion > feed->second->storageVersion) { feed->second->storageVersion = invalidVersion; feed->second->durableVersion = invalidVersion; @@ -5547,6 +5568,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) { debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion); state double beforeStorageCommit = now(); state Future<Void> durable = data->storage.commit(); + ++data->counters.kvCommit; state Future<Void> durableDelay = Void(); if (bytesLeft > 0) { @@ -5650,7 +5672,7 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) { //TraceEvent("SetAvailableStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", availableKeys.begin).detail("RangeEnd", availableKeys.end); self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, availableKeys.begin, availableKeys.end)); - self->counters.systemClearRange++; + ++self->counters.systemClearRange; self->addMutationToMutationLog(mLV, MutationRef(MutationRef::SetValue, availableKeys.begin, @@ -5671,7 +5693,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) persistShardAssignedKeys.begin.toString() + keys.end.toString()); //TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end); self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end)); - self->counters.systemClearRange++; + ++self->counters.systemClearRange; self->addMutationToMutationLog(mLV, MutationRef(MutationRef::SetValue, assignedKeys.begin, @@ -5687,7 +5709,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) void StorageServerDisk::clearRange(KeyRangeRef keys) { storage->clear(keys); - *kvClearRange++; + ++(*kvClearRange); } void StorageServerDisk::writeKeyValue(KeyValueRef kv) { @@ -5701,7 +5723,7 @@ void StorageServerDisk::writeMutation(MutationRef mutation) { *kvBytesCommit += mutation.expectedSize(); } else if (mutation.type == MutationRef::ClearRange) { storage->clear(KeyRangeRef(mutation.param1, mutation.param2)); - *kvClearRange++; + ++(*kvClearRange); } else ASSERT(false); } @@ -5716,7 +5738,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations, *kvBytesCommit += m.expectedSize(); } else if (m.type == MutationRef::ClearRange) { storage->clear(KeyRangeRef(m.param1, m.param2)); - *kvClearRange++; + ++(*kvClearRange); } } } @@ -5780,6 +5802,7 @@ ACTOR Future<Void> applyByteSampleResult(StorageServer* data, KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES)); if (results) { results->push_back(bs.castTo<VectorRef<KeyValueRef>>()); + data->bytesRestored += bs.logicalSize(); data->counters.kvBytesScan += bs.logicalSize(); } int rangeSize = bs.expectedSize(); @@ -5950,6 +5973,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor data->sk = Key(); return false; } + data->bytesRestored += fFormat.get().expectedSize(); if (!persistFormatReadableRange.contains(fFormat.get().get())) { TraceEvent(SevError, "UnsupportedDBFormat") .detail("Format", fFormat.get().get().toString()) @@ -5957,12 +5981,15 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor throw worker_recovery_failed(); } data->thisServerID = BinaryReader::fromStringRef<UID>(fID.get().get(), Unversioned()); + data->bytesRestored += fID.get().expectedSize(); if (ftssPairID.get().present()) { data->setTssPair(BinaryReader::fromStringRef<UID>(ftssPairID.get().get(), Unversioned())); + data->bytesRestored += ftssPairID.get().expectedSize(); } if (fClusterID.get().present()) { data->clusterId.send(BinaryReader::fromStringRef<UID>(fClusterID.get().get(), Unversioned())); + data->bytesRestored += fClusterID.get().expectedSize(); } else { TEST(true); // storage server upgraded to version supporting cluster IDs data->actors.add(persistClusterId(data)); @@ -5974,22 +6001,29 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor if (fTssQuarantine.get().present()) { TEST(true); // TSS restarted while quarantined data->tssInQuarantine = true; + data->bytesRestored += fTssQuarantine.get().expectedSize(); } data->sk = serverKeysPrefixFor((data->tssPairID.present()) ? data->tssPairID.get() : data->thisServerID) .withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/ - if (fLogProtocol.get().present()) + if (fLogProtocol.get().present()) { data->logProtocol = BinaryReader::fromStringRef<ProtocolVersion>(fLogProtocol.get().get(), Unversioned()); + data->bytesRestored += fLogProtocol.get().expectedSize(); + } - if (fPrimaryLocality.get().present()) + if (fPrimaryLocality.get().present()) { data->primaryLocality = BinaryReader::fromStringRef<int8_t>(fPrimaryLocality.get().get(), Unversioned()); + data->bytesRestored += fPrimaryLocality.get().expectedSize(); + } state Version version = BinaryReader::fromStringRef<Version>(fVersion.get().get(), Unversioned()); debug_checkRestoredVersion(data->thisServerID, version, "StorageServer"); data->setInitialVersion(version); + data->bytesRestored += fVersion.get().expectedSize(); state RangeResult available = fShardAvailable.get(); + data->bytesRestored += available.logicalSize(); state int availableLoc; for (availableLoc = 0; availableLoc < available.size(); availableLoc++) { KeyRangeRef keys(available[availableLoc].key.removePrefix(persistShardAvailableKeys.begin), @@ -6006,6 +6040,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor } state RangeResult assigned = fShardAssigned.get(); + data->bytesRestored += assigned.logicalSize(); state int assignedLoc; for (assignedLoc = 0; assignedLoc < assigned.size(); assignedLoc++) { KeyRangeRef keys(assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin), @@ -6024,6 +6059,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor } state RangeResult changeFeeds = fChangeFeeds.get(); + data->bytesRestored += changeFeeds.logicalSize(); state int feedLoc; for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) { Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin); @@ -6061,10 +6097,11 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor ++it) { if (it->value() == invalidVersion) { KeyRangeRef clearRange(it->begin(), it->end()); - data->counters.systemClearRange++; + ++data->counters.systemClearRange; // TODO(alexmiller): Figure out how to selectively enable spammy data distribution events. // DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange); storage->clear(clearRange); + ++data->counters.systemClearRange; data->byteSampleApplyClear(clearRange, invalidVersion); } } @@ -6142,7 +6179,7 @@ void StorageServer::byteSampleApplySet(KeyValueRef kv, Version ver) { auto diskRange = singleKeyRange(key.withPrefix(persistByteSampleKeys.begin)); addMutationToMutationLogOrStorage(ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end)); - counters.systemClearRange++; + ++counters.systemClearRange; } } @@ -6189,7 +6226,7 @@ void StorageServer::byteSampleApplyClear(KeyRangeRef range, Version ver) { byteSample.eraseAsync(range.begin, range.end); auto diskRange = range.withPrefix(persistByteSampleKeys.begin); addMutationToMutationLogOrStorage(ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end)); - counters.systemClearRange++; + ++counters.systemClearRange; } } @@ -6301,6 +6338,7 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi) state Future<Void> doPollMetrics = Void(); wait(self->byteSampleRecovery); + TraceEvent("StorageServerRestoreDurableState", self->thisServerID).detail("RestoredBytes", self->bytesRestored); // Logs all counters in `counters.cc` and reset the interval. self->actors.add(traceCounters("StorageMetrics", @@ -6852,6 +6890,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData, try { wait(self.storage.init()); wait(self.storage.commit()); + ++self.counters.kvCommit; if (seedTag == invalidTag) { std::pair<Version, Tag> verAndTag = wait(addStorageServer( @@ -6868,6 +6907,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData, self.storage.makeNewStorageServerDurable(); wait(self.storage.commit()); + ++self.counters.kvCommit; TraceEvent("StorageServerInit", ssi.id()) .detail("Version", self.version.get()) @@ -7067,6 +7107,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData, throw worker_removed(); } } + ++self.counters.kvCommit; bool ok = wait(self.storage.restoreDurableState()); if (!ok) {