mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-16 10:52:20 +08:00
Added get, scan, and commit counters.
This commit is contained in:
parent
da8453eb5d
commit
b7388a714b
@ -208,23 +208,27 @@ struct StorageServerDisk {
|
|||||||
|
|
||||||
// SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
|
// SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
|
||||||
Future<Key> readNextKeyInclusive(KeyRef key, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
|
Future<Key> readNextKeyInclusive(KeyRef key, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
|
||||||
|
++(*kvScans);
|
||||||
return readFirstKey(storage, KeyRangeRef(key, allKeys.end), type);
|
return readFirstKey(storage, KeyRangeRef(key, allKeys.end), type);
|
||||||
}
|
}
|
||||||
Future<Optional<Value>> readValue(KeyRef key,
|
Future<Optional<Value>> readValue(KeyRef key,
|
||||||
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL,
|
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL,
|
||||||
Optional<UID> debugID = Optional<UID>()) {
|
Optional<UID> debugID = Optional<UID>()) {
|
||||||
|
++(*kvGets);
|
||||||
return storage->readValue(key, type, debugID);
|
return storage->readValue(key, type, debugID);
|
||||||
}
|
}
|
||||||
Future<Optional<Value>> readValuePrefix(KeyRef key,
|
Future<Optional<Value>> readValuePrefix(KeyRef key,
|
||||||
int maxLength,
|
int maxLength,
|
||||||
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL,
|
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL,
|
||||||
Optional<UID> debugID = Optional<UID>()) {
|
Optional<UID> debugID = Optional<UID>()) {
|
||||||
|
++(*kvGets);
|
||||||
return storage->readValuePrefix(key, maxLength, type, debugID);
|
return storage->readValuePrefix(key, maxLength, type, debugID);
|
||||||
}
|
}
|
||||||
Future<RangeResult> readRange(KeyRangeRef keys,
|
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||||
int rowLimit = 1 << 30,
|
int rowLimit = 1 << 30,
|
||||||
int byteLimit = 1 << 30,
|
int byteLimit = 1 << 30,
|
||||||
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
|
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
|
||||||
|
++(*kvScans);
|
||||||
return storage->readRange(keys, rowLimit, byteLimit, type);
|
return storage->readRange(keys, rowLimit, byteLimit, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -234,6 +238,9 @@ struct StorageServerDisk {
|
|||||||
|
|
||||||
Counter* kvBytesCommit;
|
Counter* kvBytesCommit;
|
||||||
Counter* kvClearRange;
|
Counter* kvClearRange;
|
||||||
|
Counter* kvGets;
|
||||||
|
Counter* kvScans;
|
||||||
|
Counter* kvCommit;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct StorageServer* data;
|
struct StorageServer* data;
|
||||||
@ -840,6 +847,10 @@ public:
|
|||||||
|
|
||||||
Counter kvBytesScan;
|
Counter kvBytesScan;
|
||||||
Counter kvBytesGet;
|
Counter kvBytesGet;
|
||||||
|
Counter eagerReadsKeys;
|
||||||
|
Counter kvGets;
|
||||||
|
Counter kvScans;
|
||||||
|
Counter kvCommit;
|
||||||
|
|
||||||
LatencySample readLatencySample;
|
LatencySample readLatencySample;
|
||||||
LatencyBands readLatencyBands;
|
LatencyBands readLatencyBands;
|
||||||
@ -865,7 +876,9 @@ public:
|
|||||||
fetchesFromLogs("FetchesFromLogs", cc), quickGetValueHit("QuickGetValueHit", cc),
|
fetchesFromLogs("FetchesFromLogs", cc), quickGetValueHit("QuickGetValueHit", cc),
|
||||||
quickGetValueMiss("QuickGetValueMiss", cc), quickGetKeyValuesHit("QuickGetKeyValuesHit", cc),
|
quickGetValueMiss("QuickGetValueMiss", cc), quickGetKeyValuesHit("QuickGetKeyValuesHit", cc),
|
||||||
quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc), kvBytesScan("KVBytesScan", cc),
|
quickGetKeyValuesMiss("QuickGetKeyValuesMiss", cc), kvBytesScan("KVBytesScan", cc),
|
||||||
kvBytesGet("KVBytesGet", cc), readLatencySample("ReadLatencyMetrics",
|
eagerReadsKeys("EagerReadsKeys", cc), kvGets("KVGets", cc), kvScans("KVScans", cc),
|
||||||
|
kvCommit("KVCommit", cc), kvBytesGet("KVBytesGet", cc),
|
||||||
|
readLatencySample("ReadLatencyMetrics",
|
||||||
self->thisServerID,
|
self->thisServerID,
|
||||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||||
@ -892,6 +905,8 @@ public:
|
|||||||
}
|
}
|
||||||
} counters;
|
} counters;
|
||||||
|
|
||||||
|
int64_t bytesRestored;
|
||||||
|
|
||||||
Reference<EventCacheHolder> storageServerSourceTLogIDEventHolder;
|
Reference<EventCacheHolder> storageServerSourceTLogIDEventHolder;
|
||||||
|
|
||||||
StorageServer(IKeyValueStore* storage,
|
StorageServer(IKeyValueStore* storage,
|
||||||
@ -949,6 +964,9 @@ public:
|
|||||||
|
|
||||||
this->storage.kvBytesCommit = &counters.kvBytesCommit;
|
this->storage.kvBytesCommit = &counters.kvBytesCommit;
|
||||||
this->storage.kvClearRange = &counters.kvClearRange;
|
this->storage.kvClearRange = &counters.kvClearRange;
|
||||||
|
this->storage.kvGets = &counters.kvGets;
|
||||||
|
this->storage.kvScans = &counters.kvScans;
|
||||||
|
this->storage.kvCommit = &counters.kvCommit;
|
||||||
}
|
}
|
||||||
|
|
||||||
//~StorageServer() { fclose(log); }
|
//~StorageServer() { fclose(log); }
|
||||||
@ -1362,6 +1380,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||||||
path = 2;
|
path = 2;
|
||||||
Optional<Value> vv = wait(data->storage.readValue(req.key, IKeyValueStore::ReadType::NORMAL, req.debugID));
|
Optional<Value> vv = wait(data->storage.readValue(req.key, IKeyValueStore::ReadType::NORMAL, req.debugID));
|
||||||
data->counters.kvBytesGet += vv.expectedSize();
|
data->counters.kvBytesGet += vv.expectedSize();
|
||||||
|
++data->counters.kvGets;
|
||||||
// Validate that while we were reading the data we didn't lose the version or shard
|
// Validate that while we were reading the data we didn't lose the version or shard
|
||||||
if (version < data->storageVersion()) {
|
if (version < data->storageVersion()) {
|
||||||
TEST(true); // transaction_too_old after readValue
|
TEST(true); // transaction_too_old after readValue
|
||||||
@ -1628,7 +1647,7 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
|
|||||||
if (feed->second->storageVersion != invalidVersion) {
|
if (feed->second->storageVersion != invalidVersion) {
|
||||||
self->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
|
self->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
|
||||||
changeFeedDurableKey(feed->second->id, req.version)));
|
changeFeedDurableKey(feed->second->id, req.version)));
|
||||||
self->counters.systemClearRange++;
|
++self->counters.systemClearRange;
|
||||||
if (req.version > feed->second->storageVersion) {
|
if (req.version > feed->second->storageVersion) {
|
||||||
feed->second->storageVersion = invalidVersion;
|
feed->second->storageVersion = invalidVersion;
|
||||||
feed->second->durableVersion = invalidVersion;
|
feed->second->durableVersion = invalidVersion;
|
||||||
@ -3390,6 +3409,7 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
|
|||||||
std::vector<Future<Key>> keyEnd(eager->keyBegin.size());
|
std::vector<Future<Key>> keyEnd(eager->keyBegin.size());
|
||||||
for (int i = 0; i < keyEnd.size(); i++)
|
for (int i = 0; i < keyEnd.size(); i++)
|
||||||
keyEnd[i] = data->storage.readNextKeyInclusive(eager->keyBegin[i], IKeyValueStore::ReadType::EAGER);
|
keyEnd[i] = data->storage.readNextKeyInclusive(eager->keyBegin[i], IKeyValueStore::ReadType::EAGER);
|
||||||
|
data->counters.eagerReadsKeys += keyEnd.size();
|
||||||
|
|
||||||
state Future<std::vector<Key>> futureKeyEnds = getAll(keyEnd);
|
state Future<std::vector<Key>> futureKeyEnds = getAll(keyEnd);
|
||||||
state std::vector<Key> keyEndVal = wait(futureKeyEnds);
|
state std::vector<Key> keyEndVal = wait(futureKeyEnds);
|
||||||
@ -3411,6 +3431,7 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
|
|||||||
data->counters.kvBytesGet += value.expectedSize();
|
data->counters.kvBytesGet += value.expectedSize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
data->counters.eagerReadsKeys += eager->keys.size();
|
||||||
eager->value = optionalValues;
|
eager->value = optionalValues;
|
||||||
|
|
||||||
return Void();
|
return Void();
|
||||||
@ -3701,7 +3722,7 @@ void removeDataRange(StorageServer* ss,
|
|||||||
MutationRef m(MutationRef::ClearRange, range.end, endClear->getEndKey());
|
MutationRef m(MutationRef::ClearRange, range.end, endClear->getEndKey());
|
||||||
m = ss->addMutationToMutationLog(mLV, m);
|
m = ss->addMutationToMutationLog(mLV, m);
|
||||||
data.insert(m.param1, ValueOrClearToRef::clearTo(m.param2));
|
data.insert(m.param1, ValueOrClearToRef::clearTo(m.param2));
|
||||||
ss->counters.systemClearRange++;
|
++ss->counters.systemClearRange;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto beginClear = data.atLatest().lastLess(range.begin);
|
auto beginClear = data.atLatest().lastLess(range.begin);
|
||||||
@ -4420,7 +4441,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||||||
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
|
if (e.code() == error_code_actor_cancelled && !data->shuttingDown && shard->phase >= AddingShard::Fetching) {
|
||||||
if (shard->phase < AddingShard::Waiting) {
|
if (shard->phase < AddingShard::Waiting) {
|
||||||
data->storage.clearRange(keys);
|
data->storage.clearRange(keys);
|
||||||
data->counters.systemClearRange++;
|
++data->counters.systemClearRange;
|
||||||
data->byteSampleApplyClear(keys, invalidVersion);
|
data->byteSampleApplyClear(keys, invalidVersion);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(data->data().getLatestVersion() > data->version.get());
|
ASSERT(data->data().getLatestVersion() > data->version.get());
|
||||||
@ -4660,7 +4681,7 @@ void changeServerKeys(StorageServer* data,
|
|||||||
data->addMutation(data->data().getLatestVersion(), true, clearRange, range, data->updateEagerReads);
|
data->addMutation(data->data().getLatestVersion(), true, clearRange, range, data->updateEagerReads);
|
||||||
data->newestAvailableVersion.insert(range, latestVersion);
|
data->newestAvailableVersion.insert(range, latestVersion);
|
||||||
setAvailableStatus(data, range, true);
|
setAvailableStatus(data, range, true);
|
||||||
data->counters.systemClearRange++;
|
++data->counters.systemClearRange;
|
||||||
}
|
}
|
||||||
validate(data);
|
validate(data);
|
||||||
|
|
||||||
@ -4686,12 +4707,12 @@ void changeServerKeys(StorageServer* data,
|
|||||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||||
data->addMutationToMutationLog(
|
data->addMutationToMutationLog(
|
||||||
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
||||||
data->counters.systemClearRange++;
|
++data->counters.systemClearRange;
|
||||||
data->addMutationToMutationLog(mLV,
|
data->addMutationToMutationLog(mLV,
|
||||||
MutationRef(MutationRef::ClearRange,
|
MutationRef(MutationRef::ClearRange,
|
||||||
changeFeedDurableKey(f.first, 0),
|
changeFeedDurableKey(f.first, 0),
|
||||||
changeFeedDurableKey(f.first, version)));
|
changeFeedDurableKey(f.first, version)));
|
||||||
data->counters.systemClearRange++;
|
++data->counters.systemClearRange;
|
||||||
auto rs = data->keyChangeFeed.modify(f.second);
|
auto rs = data->keyChangeFeed.modify(f.second);
|
||||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||||
auto& feedList = r->value();
|
auto& feedList = r->value();
|
||||||
@ -4939,12 +4960,12 @@ private:
|
|||||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||||
data->addMutationToMutationLog(
|
data->addMutationToMutationLog(
|
||||||
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
|
||||||
data->counters.systemClearRange++;
|
++data->counters.systemClearRange;
|
||||||
data->addMutationToMutationLog(mLV,
|
data->addMutationToMutationLog(mLV,
|
||||||
MutationRef(MutationRef::ClearRange,
|
MutationRef(MutationRef::ClearRange,
|
||||||
changeFeedDurableKey(feed->second->id, 0),
|
changeFeedDurableKey(feed->second->id, 0),
|
||||||
changeFeedDurableKey(feed->second->id, currentVersion)));
|
changeFeedDurableKey(feed->second->id, currentVersion)));
|
||||||
data->counters.systemClearRange++;
|
++data->counters.systemClearRange;
|
||||||
auto rs = data->keyChangeFeed.modify(feed->second->range);
|
auto rs = data->keyChangeFeed.modify(feed->second->range);
|
||||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||||
auto& feedList = r->value();
|
auto& feedList = r->value();
|
||||||
@ -4965,7 +4986,7 @@ private:
|
|||||||
if (feed->second->storageVersion != invalidVersion) {
|
if (feed->second->storageVersion != invalidVersion) {
|
||||||
data->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
|
data->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
|
||||||
changeFeedDurableKey(feed->second->id, popVersion)));
|
changeFeedDurableKey(feed->second->id, popVersion)));
|
||||||
data->counters.systemClearRange++;
|
++data->counters.systemClearRange;
|
||||||
if (popVersion > feed->second->storageVersion) {
|
if (popVersion > feed->second->storageVersion) {
|
||||||
feed->second->storageVersion = invalidVersion;
|
feed->second->storageVersion = invalidVersion;
|
||||||
feed->second->durableVersion = invalidVersion;
|
feed->second->durableVersion = invalidVersion;
|
||||||
@ -5547,6 +5568,7 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||||||
debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion);
|
debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion);
|
||||||
state double beforeStorageCommit = now();
|
state double beforeStorageCommit = now();
|
||||||
state Future<Void> durable = data->storage.commit();
|
state Future<Void> durable = data->storage.commit();
|
||||||
|
++data->counters.kvCommit;
|
||||||
state Future<Void> durableDelay = Void();
|
state Future<Void> durableDelay = Void();
|
||||||
|
|
||||||
if (bytesLeft > 0) {
|
if (bytesLeft > 0) {
|
||||||
@ -5650,7 +5672,7 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
|
|||||||
//TraceEvent("SetAvailableStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", availableKeys.begin).detail("RangeEnd", availableKeys.end);
|
//TraceEvent("SetAvailableStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", availableKeys.begin).detail("RangeEnd", availableKeys.end);
|
||||||
|
|
||||||
self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, availableKeys.begin, availableKeys.end));
|
self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, availableKeys.begin, availableKeys.end));
|
||||||
self->counters.systemClearRange++;
|
++self->counters.systemClearRange;
|
||||||
self->addMutationToMutationLog(mLV,
|
self->addMutationToMutationLog(mLV,
|
||||||
MutationRef(MutationRef::SetValue,
|
MutationRef(MutationRef::SetValue,
|
||||||
availableKeys.begin,
|
availableKeys.begin,
|
||||||
@ -5671,7 +5693,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
|
|||||||
persistShardAssignedKeys.begin.toString() + keys.end.toString());
|
persistShardAssignedKeys.begin.toString() + keys.end.toString());
|
||||||
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
|
//TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end);
|
||||||
self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end));
|
self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end));
|
||||||
self->counters.systemClearRange++;
|
++self->counters.systemClearRange;
|
||||||
self->addMutationToMutationLog(mLV,
|
self->addMutationToMutationLog(mLV,
|
||||||
MutationRef(MutationRef::SetValue,
|
MutationRef(MutationRef::SetValue,
|
||||||
assignedKeys.begin,
|
assignedKeys.begin,
|
||||||
@ -5687,7 +5709,7 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned)
|
|||||||
|
|
||||||
void StorageServerDisk::clearRange(KeyRangeRef keys) {
|
void StorageServerDisk::clearRange(KeyRangeRef keys) {
|
||||||
storage->clear(keys);
|
storage->clear(keys);
|
||||||
*kvClearRange++;
|
++(*kvClearRange);
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageServerDisk::writeKeyValue(KeyValueRef kv) {
|
void StorageServerDisk::writeKeyValue(KeyValueRef kv) {
|
||||||
@ -5701,7 +5723,7 @@ void StorageServerDisk::writeMutation(MutationRef mutation) {
|
|||||||
*kvBytesCommit += mutation.expectedSize();
|
*kvBytesCommit += mutation.expectedSize();
|
||||||
} else if (mutation.type == MutationRef::ClearRange) {
|
} else if (mutation.type == MutationRef::ClearRange) {
|
||||||
storage->clear(KeyRangeRef(mutation.param1, mutation.param2));
|
storage->clear(KeyRangeRef(mutation.param1, mutation.param2));
|
||||||
*kvClearRange++;
|
++(*kvClearRange);
|
||||||
} else
|
} else
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
}
|
}
|
||||||
@ -5716,7 +5738,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
|
|||||||
*kvBytesCommit += m.expectedSize();
|
*kvBytesCommit += m.expectedSize();
|
||||||
} else if (m.type == MutationRef::ClearRange) {
|
} else if (m.type == MutationRef::ClearRange) {
|
||||||
storage->clear(KeyRangeRef(m.param1, m.param2));
|
storage->clear(KeyRangeRef(m.param1, m.param2));
|
||||||
*kvClearRange++;
|
++(*kvClearRange);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -5780,6 +5802,7 @@ ACTOR Future<Void> applyByteSampleResult(StorageServer* data,
|
|||||||
KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES));
|
KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES));
|
||||||
if (results) {
|
if (results) {
|
||||||
results->push_back(bs.castTo<VectorRef<KeyValueRef>>());
|
results->push_back(bs.castTo<VectorRef<KeyValueRef>>());
|
||||||
|
data->bytesRestored += bs.logicalSize();
|
||||||
data->counters.kvBytesScan += bs.logicalSize();
|
data->counters.kvBytesScan += bs.logicalSize();
|
||||||
}
|
}
|
||||||
int rangeSize = bs.expectedSize();
|
int rangeSize = bs.expectedSize();
|
||||||
@ -5950,6 +5973,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||||||
data->sk = Key();
|
data->sk = Key();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
data->bytesRestored += fFormat.get().expectedSize();
|
||||||
if (!persistFormatReadableRange.contains(fFormat.get().get())) {
|
if (!persistFormatReadableRange.contains(fFormat.get().get())) {
|
||||||
TraceEvent(SevError, "UnsupportedDBFormat")
|
TraceEvent(SevError, "UnsupportedDBFormat")
|
||||||
.detail("Format", fFormat.get().get().toString())
|
.detail("Format", fFormat.get().get().toString())
|
||||||
@ -5957,12 +5981,15 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||||||
throw worker_recovery_failed();
|
throw worker_recovery_failed();
|
||||||
}
|
}
|
||||||
data->thisServerID = BinaryReader::fromStringRef<UID>(fID.get().get(), Unversioned());
|
data->thisServerID = BinaryReader::fromStringRef<UID>(fID.get().get(), Unversioned());
|
||||||
|
data->bytesRestored += fID.get().expectedSize();
|
||||||
if (ftssPairID.get().present()) {
|
if (ftssPairID.get().present()) {
|
||||||
data->setTssPair(BinaryReader::fromStringRef<UID>(ftssPairID.get().get(), Unversioned()));
|
data->setTssPair(BinaryReader::fromStringRef<UID>(ftssPairID.get().get(), Unversioned()));
|
||||||
|
data->bytesRestored += ftssPairID.get().expectedSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fClusterID.get().present()) {
|
if (fClusterID.get().present()) {
|
||||||
data->clusterId.send(BinaryReader::fromStringRef<UID>(fClusterID.get().get(), Unversioned()));
|
data->clusterId.send(BinaryReader::fromStringRef<UID>(fClusterID.get().get(), Unversioned()));
|
||||||
|
data->bytesRestored += fClusterID.get().expectedSize();
|
||||||
} else {
|
} else {
|
||||||
TEST(true); // storage server upgraded to version supporting cluster IDs
|
TEST(true); // storage server upgraded to version supporting cluster IDs
|
||||||
data->actors.add(persistClusterId(data));
|
data->actors.add(persistClusterId(data));
|
||||||
@ -5974,22 +6001,29 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||||||
if (fTssQuarantine.get().present()) {
|
if (fTssQuarantine.get().present()) {
|
||||||
TEST(true); // TSS restarted while quarantined
|
TEST(true); // TSS restarted while quarantined
|
||||||
data->tssInQuarantine = true;
|
data->tssInQuarantine = true;
|
||||||
|
data->bytesRestored += fTssQuarantine.get().expectedSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
data->sk = serverKeysPrefixFor((data->tssPairID.present()) ? data->tssPairID.get() : data->thisServerID)
|
data->sk = serverKeysPrefixFor((data->tssPairID.present()) ? data->tssPairID.get() : data->thisServerID)
|
||||||
.withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
|
.withPrefix(systemKeys.begin); // FFFF/serverKeys/[this server]/
|
||||||
|
|
||||||
if (fLogProtocol.get().present())
|
if (fLogProtocol.get().present()) {
|
||||||
data->logProtocol = BinaryReader::fromStringRef<ProtocolVersion>(fLogProtocol.get().get(), Unversioned());
|
data->logProtocol = BinaryReader::fromStringRef<ProtocolVersion>(fLogProtocol.get().get(), Unversioned());
|
||||||
|
data->bytesRestored += fLogProtocol.get().expectedSize();
|
||||||
|
}
|
||||||
|
|
||||||
if (fPrimaryLocality.get().present())
|
if (fPrimaryLocality.get().present()) {
|
||||||
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(fPrimaryLocality.get().get(), Unversioned());
|
data->primaryLocality = BinaryReader::fromStringRef<int8_t>(fPrimaryLocality.get().get(), Unversioned());
|
||||||
|
data->bytesRestored += fPrimaryLocality.get().expectedSize();
|
||||||
|
}
|
||||||
|
|
||||||
state Version version = BinaryReader::fromStringRef<Version>(fVersion.get().get(), Unversioned());
|
state Version version = BinaryReader::fromStringRef<Version>(fVersion.get().get(), Unversioned());
|
||||||
debug_checkRestoredVersion(data->thisServerID, version, "StorageServer");
|
debug_checkRestoredVersion(data->thisServerID, version, "StorageServer");
|
||||||
data->setInitialVersion(version);
|
data->setInitialVersion(version);
|
||||||
|
data->bytesRestored += fVersion.get().expectedSize();
|
||||||
|
|
||||||
state RangeResult available = fShardAvailable.get();
|
state RangeResult available = fShardAvailable.get();
|
||||||
|
data->bytesRestored += available.logicalSize();
|
||||||
state int availableLoc;
|
state int availableLoc;
|
||||||
for (availableLoc = 0; availableLoc < available.size(); availableLoc++) {
|
for (availableLoc = 0; availableLoc < available.size(); availableLoc++) {
|
||||||
KeyRangeRef keys(available[availableLoc].key.removePrefix(persistShardAvailableKeys.begin),
|
KeyRangeRef keys(available[availableLoc].key.removePrefix(persistShardAvailableKeys.begin),
|
||||||
@ -6006,6 +6040,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||||||
}
|
}
|
||||||
|
|
||||||
state RangeResult assigned = fShardAssigned.get();
|
state RangeResult assigned = fShardAssigned.get();
|
||||||
|
data->bytesRestored += assigned.logicalSize();
|
||||||
state int assignedLoc;
|
state int assignedLoc;
|
||||||
for (assignedLoc = 0; assignedLoc < assigned.size(); assignedLoc++) {
|
for (assignedLoc = 0; assignedLoc < assigned.size(); assignedLoc++) {
|
||||||
KeyRangeRef keys(assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin),
|
KeyRangeRef keys(assigned[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin),
|
||||||
@ -6024,6 +6059,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||||||
}
|
}
|
||||||
|
|
||||||
state RangeResult changeFeeds = fChangeFeeds.get();
|
state RangeResult changeFeeds = fChangeFeeds.get();
|
||||||
|
data->bytesRestored += changeFeeds.logicalSize();
|
||||||
state int feedLoc;
|
state int feedLoc;
|
||||||
for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) {
|
for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) {
|
||||||
Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin);
|
Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin);
|
||||||
@ -6061,10 +6097,11 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
|||||||
++it) {
|
++it) {
|
||||||
if (it->value() == invalidVersion) {
|
if (it->value() == invalidVersion) {
|
||||||
KeyRangeRef clearRange(it->begin(), it->end());
|
KeyRangeRef clearRange(it->begin(), it->end());
|
||||||
data->counters.systemClearRange++;
|
++data->counters.systemClearRange;
|
||||||
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
|
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
|
||||||
// DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange);
|
// DEBUG_KEY_RANGE("clearInvalidVersion", invalidVersion, clearRange);
|
||||||
storage->clear(clearRange);
|
storage->clear(clearRange);
|
||||||
|
++data->counters.systemClearRange;
|
||||||
data->byteSampleApplyClear(clearRange, invalidVersion);
|
data->byteSampleApplyClear(clearRange, invalidVersion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -6142,7 +6179,7 @@ void StorageServer::byteSampleApplySet(KeyValueRef kv, Version ver) {
|
|||||||
auto diskRange = singleKeyRange(key.withPrefix(persistByteSampleKeys.begin));
|
auto diskRange = singleKeyRange(key.withPrefix(persistByteSampleKeys.begin));
|
||||||
addMutationToMutationLogOrStorage(ver,
|
addMutationToMutationLogOrStorage(ver,
|
||||||
MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end));
|
MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end));
|
||||||
counters.systemClearRange++;
|
++counters.systemClearRange;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -6189,7 +6226,7 @@ void StorageServer::byteSampleApplyClear(KeyRangeRef range, Version ver) {
|
|||||||
byteSample.eraseAsync(range.begin, range.end);
|
byteSample.eraseAsync(range.begin, range.end);
|
||||||
auto diskRange = range.withPrefix(persistByteSampleKeys.begin);
|
auto diskRange = range.withPrefix(persistByteSampleKeys.begin);
|
||||||
addMutationToMutationLogOrStorage(ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end));
|
addMutationToMutationLogOrStorage(ver, MutationRef(MutationRef::ClearRange, diskRange.begin, diskRange.end));
|
||||||
counters.systemClearRange++;
|
++counters.systemClearRange;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -6301,6 +6338,7 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
|
|||||||
state Future<Void> doPollMetrics = Void();
|
state Future<Void> doPollMetrics = Void();
|
||||||
|
|
||||||
wait(self->byteSampleRecovery);
|
wait(self->byteSampleRecovery);
|
||||||
|
TraceEvent("StorageServerRestoreDurableState", self->thisServerID).detail("RestoredBytes", self->bytesRestored);
|
||||||
|
|
||||||
// Logs all counters in `counters.cc` and reset the interval.
|
// Logs all counters in `counters.cc` and reset the interval.
|
||||||
self->actors.add(traceCounters("StorageMetrics",
|
self->actors.add(traceCounters("StorageMetrics",
|
||||||
@ -6852,6 +6890,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
|
|||||||
try {
|
try {
|
||||||
wait(self.storage.init());
|
wait(self.storage.init());
|
||||||
wait(self.storage.commit());
|
wait(self.storage.commit());
|
||||||
|
++self.counters.kvCommit;
|
||||||
|
|
||||||
if (seedTag == invalidTag) {
|
if (seedTag == invalidTag) {
|
||||||
std::pair<Version, Tag> verAndTag = wait(addStorageServer(
|
std::pair<Version, Tag> verAndTag = wait(addStorageServer(
|
||||||
@ -6868,6 +6907,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
|
|||||||
|
|
||||||
self.storage.makeNewStorageServerDurable();
|
self.storage.makeNewStorageServerDurable();
|
||||||
wait(self.storage.commit());
|
wait(self.storage.commit());
|
||||||
|
++self.counters.kvCommit;
|
||||||
|
|
||||||
TraceEvent("StorageServerInit", ssi.id())
|
TraceEvent("StorageServerInit", ssi.id())
|
||||||
.detail("Version", self.version.get())
|
.detail("Version", self.version.get())
|
||||||
@ -7067,6 +7107,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
|
|||||||
throw worker_removed();
|
throw worker_removed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
++self.counters.kvCommit;
|
||||||
|
|
||||||
bool ok = wait(self.storage.restoreDurableState());
|
bool ok = wait(self.storage.restoreDurableState());
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user