Checkpoint restore with shard (#8667)

* Added SSPhysicalShard.

* Update physicalShards in StorageServer::addShard().

* Handle notAssigned shard.

* fetchKeys() are not stopped during TerminateStorageServer since
physicalShards are not cleared.

* Fixed addingSplitLeft unset shardId.

* Increased the timeout for Rocks reads in simulation.

* Cleanup.

* set SERVE_AUDIT_STORAGE_PARALLELISM to 1.

* Disabled ValidateStorage test.

* Resolved comments.

Co-authored-by: He Liu <heliu@apple.com>
This commit is contained in:
He Liu 2022-11-07 10:02:35 -08:00 committed by GitHub
parent 0adcd92d33
commit bffa838398
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 141 additions and 9 deletions

View File

@ -768,7 +768,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( FETCH_KEYS_PARALLELISM_FULL, 6 );
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
init( SERVE_FETCH_CHECKPOINT_PARALLELISM, 4 );
init( SERVE_AUDIT_STORAGE_PARALLELISM, 2 );
init( SERVE_AUDIT_STORAGE_PARALLELISM, 1 );
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_RECOVERY_VERSION_LAG_LIMIT, 2 * MAX_READ_TRANSACTION_LIFE_VERSIONS );
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;

View File

@ -495,7 +495,7 @@ public:
for (; it != self->initData->dataMoveMap.ranges().end(); ++it) {
const DataMoveMetaData& meta = it.value()->meta;
if (meta.ranges.empty()) {
TraceEvent(SevWarnAlways, "EmptyDataMoveRange", self->ddId).detail("DataMoveMetaData", meta.toString());
TraceEvent(SevWarn, "EmptyDataMoveRange", self->ddId).detail("DataMoveMetaData", meta.toString());
continue;
}
if (it.value()->isCancelled() || (it.value()->valid && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) {

View File

@ -647,6 +647,91 @@ struct BusiestWriteTagContext {
busiestWriteTagEventHolder(makeReference<EventCacheHolder>(busiestWriteTagTrackingKey)), lastUpdateTime(-1) {}
};
// A SSPhysicalShard represents a physical shard, it contains a list of keyranges.
class SSPhysicalShard {
public:
SSPhysicalShard(const int64_t id) : id(id) {}
void addRange(Reference<ShardInfo> shard);
// Remove the shard if a shard to the same pointer (ShardInfo*) exists.
void removeRange(Reference<ShardInfo> shard);
// Clear all shards overlapping with `range`.
void removeRange(KeyRangeRef range);
bool supportCheckpoint() const;
bool hasRange(Reference<ShardInfo> shard) const;
private:
const int64_t id;
std::vector<Reference<ShardInfo>> ranges;
};
void SSPhysicalShard::addRange(Reference<ShardInfo> shard) {
TraceEvent(SevVerbose, "SSPhysicalShardAddShard")
.detail("ShardID", format("%016llx", this->id))
.detail("Assigned", !shard->notAssigned())
.detail("Range", shard->keys);
ASSERT(!shard->notAssigned());
removeRange(shard->keys);
ranges.push_back(shard);
}
void SSPhysicalShard::removeRange(Reference<ShardInfo> shard) {
TraceEvent(SevVerbose, "SSPhysicalShardRemoveShard")
.detail("ShardID", format("%016llx", this->id))
.detail("Assigned", !shard->notAssigned())
.detail("Range", shard->keys);
for (int i = 0; i < this->ranges.size(); ++i) {
const auto& r = this->ranges[i];
if (r.getPtr() == shard.getPtr()) {
this->ranges[i] = this->ranges.back();
this->ranges.pop_back();
return;
}
}
}
void SSPhysicalShard::removeRange(KeyRangeRef range) {
TraceEvent(SevDebug, "SSPhysicalShardRemoveRange")
.detail("ShardID", format("%016llx", this->id))
.detail("Range", range);
for (int i = 0; i < this->ranges.size();) {
const auto& r = this->ranges[i];
if (r->keys.intersects(range)) {
this->ranges[i] = this->ranges.back();
this->ranges.pop_back();
} else {
++i;
}
}
}
bool SSPhysicalShard::supportCheckpoint() const {
for (const auto& r : this->ranges) {
ASSERT(r->desiredShardId == this->id);
if (r->shardId != this->id) {
return false;
}
}
return true;
}
bool SSPhysicalShard::hasRange(Reference<ShardInfo> shard) const {
for (int i = 0; i < this->ranges.size(); ++i) {
if (this->ranges[i].getPtr() == shard.getPtr()) {
return true;
}
}
return false;
}
struct StorageServer : public IStorageMetricsService {
typedef VersionedMap<KeyRef, ValueOrClearToRef> VersionedData;
@ -931,6 +1016,7 @@ public:
StorageServerDisk storage;
KeyRangeMap<Reference<ShardInfo>> shards;
std::unordered_map<int64_t, SSPhysicalShard> physicalShards;
uint64_t shardChangeCounter; // max( shards->changecounter )
KeyRangeMap<bool> cachedRangeMap; // indicates if a key-range is being cached
@ -1357,6 +1443,25 @@ public:
}
//~StorageServer() { fclose(log); }
void addRangeToPhysicalShard(Reference<ShardInfo> newRange) {
if (!shardAware || newRange->notAssigned()) {
return;
}
auto [it, ignored] =
physicalShards.insert(std::make_pair(newRange->desiredShardId, SSPhysicalShard(newRange->desiredShardId)));
it->second.addRange(newRange);
}
void removeRangeFromPhysicalShard(Reference<ShardInfo> range) {
if (!range.isValid() || !shardAware || range->notAssigned()) {
return;
}
auto it = physicalShards.find(range->desiredShardId);
ASSERT(it != physicalShards.end());
it->second.removeRange(range);
}
// Puts the given shard into shards. The caller is responsible for adding shards
// for all ranges in shards.getAffectedRangesAfterInsertion(newShard->keys)), because these
@ -1368,7 +1473,25 @@ public:
/*auto affected = shards.getAffectedRangesAfterInsertion( newShard->keys, Reference<ShardInfo>() );
for(auto i = affected.begin(); i != affected.end(); ++i)
shards.insert( *i, Reference<ShardInfo>() );*/
shards.insert(newShard->keys, Reference<ShardInfo>(newShard));
if (shardAware && newShard->notAssigned()) {
auto sh = shards.intersectingRanges(newShard->keys);
for (auto it = sh.begin(); it != sh.end(); ++it) {
if (it->value().isValid() && !it->value()->notAssigned()) {
TraceEvent(SevVerbose, "StorageServerAddShardClear")
.detail("NewShardRange", newShard->keys)
.detail("Range", it->value()->keys)
.detail("ShardID", format("%016llx", it->value()->desiredShardId))
.detail("NewShardID", format("%016llx", newShard->desiredShardId))
.detail("NewShardActualID", format("%016llx", newShard->shardId));
removeRangeFromPhysicalShard(it->value());
}
}
}
Reference<ShardInfo> rShard(newShard);
shards.insert(newShard->keys, rShard);
addRangeToPhysicalShard(rShard);
}
void addMutation(Version version,
bool fromFetch,
@ -1655,6 +1778,8 @@ void validate(StorageServer* data, bool force = false) {
for (auto s = data->shards.ranges().begin(); s != data->shards.ranges().end(); ++s) {
TraceEvent(SevVerbose, "ValidateShard", data->thisServerID)
.detail("Range", s->range())
.detail("ShardID", format("%016llx", s->value()->shardId))
.detail("DesiredShardID", format("%016llx", s->value()->desiredShardId))
.detail("ShardRange", s->value()->keys)
.detail("ShardState", s->value()->debugDescribeState())
.log();
@ -1662,6 +1787,11 @@ void validate(StorageServer* data, bool force = false) {
ASSERT(!s->value()->keys.empty());
if (data->shardAware) {
s->value()->validate();
if (!s->value()->notAssigned()) {
auto it = data->physicalShards.find(s->value()->desiredShardId);
ASSERT(it != data->physicalShards.end());
ASSERT(it->second.hasRange(s->value()));
}
}
}
@ -7016,9 +7146,10 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
if (data->shardAware) {
StorageServerShard rightShard = data->shards[keys.begin]->toStorageServerShard();
rightShard.range = KeyRangeRef(nfk, keys.end);
shard->server->addShard(ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, nfk), shard));
auto* leftShard = ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, nfk), shard);
leftShard->populateShard(rightShard);
shard->server->addShard(leftShard);
shard->server->addShard(ShardInfo::newShard(data, rightShard));
data->shards[keys.begin]->populateShard(rightShard);
} else {
shard->server->addShard(ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, nfk), shard));
shard->server->addShard(ShardInfo::newAdding(data, KeyRangeRef(nfk, keys.end)));
@ -7679,7 +7810,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data,
validate(data);
DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys, data->thisServerID);
DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys, data->thisServerID)
.detail("DataMoveID", dataMoveId);
const uint64_t desiredId = dataMoveId.first();
const Version cVer = version + 1;
@ -7823,7 +7955,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data,
// TODO(psm): Replace this with moveInShard->cancel().
ASSERT(false);
} else {
TraceEvent(SevVerbose, "CSKMoveInToSameShard", data->thisServerID)
TraceEvent(SevInfo, "CSKMoveInToSameShard", data->thisServerID)
.detail("DataMoveID", dataMoveId)
.detailf("TargetShard", "%016llx", desiredId)
.detail("MoveRange", keys)
@ -10731,7 +10863,7 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
// Clearing shards shuts down any fetchKeys actors; these may do things on cancellation that are best done with
// self still valid
self.shards.insert(allKeys, Reference<ShardInfo>());
self.addShard(ShardInfo::newNotAssigned(allKeys));
// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise
// just close it.

View File

@ -202,7 +202,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/WriteDuringReadClean.toml)
add_fdb_test(TEST_FILES noSim/RandomUnitTests.toml UNIT)
if(WITH_ROCKSDB_EXPERIMENTAL)
add_fdb_test(TEST_FILES fast/ValidateStorage.toml)
add_fdb_test(TEST_FILES fast/ValidateStorage.toml IGNORE)
add_fdb_test(TEST_FILES noSim/KeyValueStoreRocksDBTest.toml UNIT)
add_fdb_test(TEST_FILES noSim/ShardedRocksDBTest.toml UNIT)
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml)