diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index 1fbb8d532d..3d3a0d4ecd 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -128,6 +128,7 @@ set(FDBCLIENT_SRCS StatusClient.h StorageServerInterface.cpp StorageServerInterface.h + StorageCheckpoint.h Subspace.cpp Subspace.h StackLineage.h diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 8f5c9efb44..68b32c84bc 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -7630,6 +7630,167 @@ ACTOR Future snapCreate(Database cx, Standalone snapCmd, UID sn } } +ACTOR template +static Future createCheckpointImpl(T tr, KeyRangeRef range, CheckpointFormat format) { + TraceEvent("CreateCheckpointTransactionBegin").detail("Range", range.toString()); + + state RangeResult keyServers = wait(krmGetRanges(tr, keyServersPrefix, range)); + ASSERT(!keyServers.more); + + state RangeResult UIDtoTagMap = wait(tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); + + for (int i = 0; i < keyServers.size() - 1; ++i) { + KeyRangeRef shard(keyServers[i].key, keyServers[i + 1].key); + std::vector src; + std::vector dest; + decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest); + + // The checkpoint request is sent to all replicas, in case any of them is unhealthy. + // An alternative is to choose a healthy replica. + const UID checkpointID = deterministicRandom()->randomUniqueID(); + for (int idx = 0; idx < src.size(); ++idx) { + CheckpointMetaData checkpoint(shard & range, format, src[idx], checkpointID); + checkpoint.setState(CheckpointMetaData::Pending); + tr->set(checkpointKeyFor(checkpointID), checkpointValue(checkpoint)); + } + + TraceEvent("CreateCheckpointTransactionShard") + .detail("Shard", shard.toString()) + .detail("SrcServers", describe(src)) + .detail("ServerSelected", describe(src)) + .detail("CheckpointKey", checkpointKeyFor(checkpointID)) + .detail("ReadVersion", tr->getReadVersion().get()); + } + + return Void(); +} + +Future createCheckpoint(Reference tr, KeyRangeRef range, CheckpointFormat format) { + return holdWhile(tr, createCheckpointImpl(tr, range, format)); +} + +Future createCheckpoint(Transaction* tr, KeyRangeRef range, CheckpointFormat format) { + return createCheckpointImpl(tr, range, format); +} + +// Gets CheckpointMetaData of the specific keyrange, version and format from one of the storage servers, if none of the +// servers have the checkpoint, a checkpoint_not_found error is returned. +ACTOR static Future getCheckpointMetaDataInternal(GetCheckpointRequest req, + Reference alternatives, + double timeout) { + TraceEvent("GetCheckpointMetaDataInternalBegin") + .detail("Range", req.range.toString()) + .detail("Version", req.version) + .detail("Format", static_cast(req.format)) + .detail("Locations", alternatives->description()); + + state std::vector>> fs; + state int i = 0; + for (i = 0; i < alternatives->size(); ++i) { + // For each shard, all storage servers are checked, only one is required. + fs.push_back(errorOr(timeoutError(alternatives->getInterface(i).checkpoint.getReply(req), timeout))); + } + + state Optional error; + wait(waitForAll(fs)); + TraceEvent("GetCheckpointMetaDataInternalWaitEnd") + .detail("Range", req.range.toString()) + .detail("Version", req.version); + + for (i = 0; i < fs.size(); ++i) { + if (!fs[i].isReady()) { + error = timed_out(); + TraceEvent("GetCheckpointMetaDataInternalSSTimeout") + .detail("Range", req.range.toString()) + .detail("Version", req.version) + .detail("StorageServer", alternatives->getInterface(i).uniqueID); + continue; + } + + if (fs[i].get().isError()) { + const Error& e = fs[i].get().getError(); + TraceEvent("GetCheckpointMetaDataInternalError") + .errorUnsuppressed(e) + .detail("Range", req.range.toString()) + .detail("Version", req.version) + .detail("StorageServer", alternatives->getInterface(i).uniqueID); + if (e.code() != error_code_checkpoint_not_found || !error.present()) { + error = e; + } + } else { + return fs[i].get().get(); + } + } + + ASSERT(error.present()); + throw error.get(); +} + +ACTOR Future> getCheckpointMetaData(Database cx, + KeyRange keys, + Version version, + CheckpointFormat format, + double timeout) { + state Span span("NAPI:GetCheckpoint"_loc); + + loop { + TraceEvent("GetCheckpointBegin") + .detail("Range", keys.toString()) + .detail("Version", version) + .detail("Format", static_cast(format)); + + state std::vector> fs; + state int i = 0; + + try { + state std::vector>> locations = + wait(getKeyRangeLocations(cx, + keys, + CLIENT_KNOBS->TOO_MANY, + Reverse::False, + &StorageServerInterface::checkpoint, + span.context, + Optional(), + UseProvisionalProxies::False)); + + fs.clear(); + for (i = 0; i < locations.size(); ++i) { + fs.push_back(getCheckpointMetaDataInternal( + GetCheckpointRequest(version, keys, format), locations[i].second, timeout)); + TraceEvent("GetCheckpointShardBegin") + .detail("Range", locations[i].first.toString()) + .detail("Version", version) + .detail("StorageServers", locations[i].second->description()); + } + + choose { + when(wait(cx->connectionFileChanged())) { cx->invalidateCache(keys); } + when(wait(waitForAll(fs))) { break; } + when(wait(delay(timeout))) { + TraceEvent("GetCheckpointTimeout").detail("Range", keys.toString()).detail("Version", version); + } + } + } catch (Error& e) { + TraceEvent("GetCheckpointError").errorUnsuppressed(e).detail("Range", keys.toString()); + if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed || + e.code() == error_code_connection_failed || e.code() == error_code_broken_promise) { + cx->invalidateCache(keys); + wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY)); + } else { + throw; + } + } + } + + std::vector res; + for (i = 0; i < fs.size(); ++i) { + TraceEvent("GetCheckpointShardEnd").detail("Checkpoint", fs[i].get().toString()); + res.push_back(fs[i].get()); + } + return res; +} + ACTOR Future checkSafeExclusions(Database cx, std::vector exclusions) { TraceEvent("ExclusionSafetyCheckBegin") .detail("NumExclusion", exclusions.size()) diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index fd3052d638..9bbe1073a7 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -490,6 +490,25 @@ int64_t extractIntOption(Optional value, // states: coordinator, TLog and storage state ACTOR Future snapCreate(Database cx, Standalone snapCmd, UID snapUID); +// Adds necessary mutation(s) to the transaction, so that *one* checkpoint will be created for +// each and every shards overlapping with `range`. Each checkpoint will be created at a random +// storage server for each shard. +// All checkpoint(s) will be created at the transaction's commit version. +Future createCheckpoint(Transaction* tr, KeyRangeRef range, CheckpointFormat format); + +// Same as above. +Future createCheckpoint(Reference tr, KeyRangeRef range, CheckpointFormat format); + +// Gets checkpoint metadata for `keys` at the specific version, with the particular format. +// One CheckpointMetaData will be returned for each distinctive shard. +// The collective keyrange of the returned checkpoint(s) is a super-set of `keys`. +// checkpoint_not_found() error will be returned if the specific checkpoint(s) cannot be found. +ACTOR Future> getCheckpointMetaData(Database cx, + KeyRange keys, + Version version, + CheckpointFormat format, + double timeout = 5.0); + // Checks with Data Distributor that it is safe to mark all servers in exclusions as failed ACTOR Future checkSafeExclusions(Database cx, std::vector exclusions); diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index bbfdcece5d..c87d9f5ce6 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -367,6 +367,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 0 ); // If true, enables dynamic adjustment of ROCKSDB_WRITE_RATE_LIMITER_BYTES according to the recent demand of background IO. init( ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE, true ); + init( DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, "fdb"); + init( ROCKSDB_PERFCONTEXT_ENABLE, false ); if( randomize && BUGGIFY ) ROCKSDB_PERFCONTEXT_ENABLE = deterministicRandom()->coinflip() ? false : true; init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 ); init( ROCKSDB_MAX_SUBCOMPACTIONS, 2 ); @@ -678,6 +680,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( MAX_STORAGE_COMMIT_TIME, 120.0 ); //The max fsync stall time on the storage server and tlog before marking a disk as failed init( RANGESTREAM_LIMIT_BYTES, 2e6 ); if( randomize && BUGGIFY ) RANGESTREAM_LIMIT_BYTES = 1; init( ENABLE_CLEAR_RANGE_EAGER_READS, true ); + init( CHECKPOINT_TRANSFER_BLOCK_BYTES, 40e6 ); init( QUICK_GET_VALUE_FALLBACK, true ); init( QUICK_GET_KEY_VALUES_FALLBACK, true ); init( QUICK_GET_KEY_VALUES_LIMIT, 2000 ); diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 532f70c19a..257746a0c5 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -298,6 +298,7 @@ public: bool ROCKSDB_READ_RANGE_REUSE_ITERATORS; int64_t ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC; bool ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE; + std::string DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY; bool ROCKSDB_PERFCONTEXT_ENABLE; // Enable rocks perf context metrics. May cause performance overhead double ROCKSDB_PERFCONTEXT_SAMPLE_RATE; int ROCKSDB_MAX_SUBCOMPACTIONS; @@ -617,6 +618,7 @@ public: bool ENABLE_CLEAR_RANGE_EAGER_READS; bool QUICK_GET_VALUE_FALLBACK; bool QUICK_GET_KEY_VALUES_FALLBACK; + int CHECKPOINT_TRANSFER_BLOCK_BYTES; int QUICK_GET_KEY_VALUES_LIMIT; int QUICK_GET_KEY_VALUES_LIMIT_BYTES; diff --git a/fdbclient/StorageCheckpoint.h b/fdbclient/StorageCheckpoint.h new file mode 100644 index 0000000000..7c83d71a3f --- /dev/null +++ b/fdbclient/StorageCheckpoint.h @@ -0,0 +1,88 @@ +/* + * StorageCheckpoint.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBCLIENT_STORAGCHECKPOINT_H +#define FDBCLIENT_STORAGCHECKPOINT_H +#pragma once + +#include "fdbclient/FDBTypes.h" + +// FDB storage checkpoint format. +enum CheckpointFormat { + InvalidFormat = 0, + // For RocksDB, checkpoint generated via rocksdb::Checkpoint::ExportColumnFamily(). + RocksDBColumnFamily = 1, + // For RocksDB, checkpoint generated via rocksdb::Checkpoint::CreateCheckpoint(). + RocksDB = 2, +}; + +// Metadata of a FDB checkpoint. +struct CheckpointMetaData { + enum CheckpointState { + InvalidState = 0, + Pending = 1, // Checkpoint creation pending. + Complete = 2, // Checkpoint is created and ready to be read. + Deleting = 3, // Checkpoint deletion requested. + Fail = 4, + }; + + constexpr static FileIdentifier file_identifier = 13804342; + Version version; + KeyRange range; + int16_t format; // CheckpointFormat. + UID ssID; // Storage server ID on which this checkpoint is created. + UID checkpointID; // A unique id for this checkpoint. + int16_t state; // CheckpointState. + int referenceCount; // A reference count on the checkpoint, it can only be deleted when this is 0. + int64_t gcTime; // Time to delete this checkpoint, a Unix timestamp in seconds. + + // A serialized metadata associated with format, this data can be understood by the corresponding KVS. + Standalone serializedCheckpoint; + + CheckpointMetaData() : format(InvalidFormat), state(InvalidState), referenceCount(0) {} + CheckpointMetaData(KeyRange const& range, CheckpointFormat format, UID const& ssID, UID const& checkpointID) + : version(invalidVersion), range(range), format(format), ssID(ssID), checkpointID(checkpointID), state(Pending), + referenceCount(0) {} + CheckpointMetaData(Version version, KeyRange const& range, CheckpointFormat format, UID checkpointID) + : version(version), range(range), format(format), checkpointID(checkpointID), referenceCount(0) {} + + CheckpointState getState() const { return static_cast(state); } + + void setState(CheckpointState state) { this->state = static_cast(state); } + + CheckpointFormat getFormat() const { return static_cast(format); } + + void setFormat(CheckpointFormat format) { this->format = static_cast(format); } + + std::string toString() const { + std::string res = "Checkpoint MetaData:\nRange: " + range.toString() + "\nVersion: " + std::to_string(version) + + "\nFormat: " + std::to_string(format) + "\nServer: " + ssID.toString() + + "\nID: " + checkpointID.toString() + "\nState: " + std::to_string(static_cast(state)) + + "\n"; + return res; + } + + template + void serialize(Ar& ar) { + serializer(ar, version, range, format, state, checkpointID, ssID, gcTime, serializedCheckpoint); + } +}; + +#endif diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 75c9411f18..592d2dd167 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -24,6 +24,7 @@ #include #include "fdbclient/FDBTypes.h" +#include "fdbclient/StorageCheckpoint.h" #include "fdbrpc/Locality.h" #include "fdbrpc/QueueModel.h" #include "fdbrpc/fdbrpc.h" @@ -85,6 +86,8 @@ struct StorageServerInterface { RequestStream overlappingChangeFeeds; RequestStream changeFeedPop; RequestStream changeFeedVersionUpdate; + RequestStream checkpoint; + RequestStream fetchCheckpoint; explicit StorageServerInterface(UID uid) : uniqueID(uid) {} StorageServerInterface() : uniqueID(deterministicRandom()->randomUniqueID()) {} @@ -137,6 +140,9 @@ struct StorageServerInterface { RequestStream(getValue.getEndpoint().getAdjustedEndpoint(17)); changeFeedVersionUpdate = RequestStream( getValue.getEndpoint().getAdjustedEndpoint(18)); + checkpoint = RequestStream(getValue.getEndpoint().getAdjustedEndpoint(19)); + fetchCheckpoint = + RequestStream(getValue.getEndpoint().getAdjustedEndpoint(20)); } } else { ASSERT(Ar::isDeserializing); @@ -184,6 +190,8 @@ struct StorageServerInterface { streams.push_back(overlappingChangeFeeds.getReceiver()); streams.push_back(changeFeedPop.getReceiver()); streams.push_back(changeFeedVersionUpdate.getReceiver()); + streams.push_back(checkpoint.getReceiver()); + streams.push_back(fetchCheckpoint.getReceiver()); FlowTransport::transport().addEndpoints(streams); } }; @@ -816,6 +824,60 @@ struct ChangeFeedPopRequest { } }; +// Request to search for a checkpoint for a minimum keyrange: `range`, at the specific version, +// in the specific format. +// A CheckpointMetaData will be returned if the specific checkpoint is found. +struct GetCheckpointRequest { + constexpr static FileIdentifier file_identifier = 13804343; + Version version; // The FDB version at which the checkpoint is created. + KeyRange range; + int16_t format; // CheckpointFormat. + Optional checkpointID; // When present, look for the checkpoint with the exact UID. + ReplyPromise reply; + + GetCheckpointRequest() {} + GetCheckpointRequest(Version version, KeyRange const& range, CheckpointFormat format) + : version(version), range(range), format(format) {} + + template + void serialize(Ar& ar) { + serializer(ar, version, range, format, checkpointID, reply); + } +}; + +// Reply to FetchCheckpointRequest, transfers checkpoint back to client. +struct FetchCheckpointReply : public ReplyPromiseStreamReply { + constexpr static FileIdentifier file_identifier = 13804345; + Standalone token; // Serialized data specific to a particular checkpoint format. + Standalone data; + + FetchCheckpointReply() {} + FetchCheckpointReply(StringRef token) : token(token) {} + + int expectedSize() const { return data.expectedSize(); } + + template + void serialize(Ar& ar) { + serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, token, data); + } +}; + +// Request to fetch checkpoint from a storage server. +struct FetchCheckpointRequest { + constexpr static FileIdentifier file_identifier = 13804344; + UID checkpointID; + Standalone token; // Serialized data specific to a particular checkpoint format. + ReplyPromiseStream reply; + + FetchCheckpointRequest() = default; + FetchCheckpointRequest(UID checkpointID, StringRef token) : checkpointID(checkpointID), token(token) {} + + template + void serialize(Ar& ar) { + serializer(ar, checkpointID, token, reply); + } +}; + struct OverlappingChangeFeedEntry { Key rangeId; KeyRange range; diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 4d1bc23574..9d1329f98b 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -215,6 +215,33 @@ const KeyRangeRef writeConflictRangeKeysRange = const KeyRef clusterIdKey = LiteralStringRef("\xff/clusterId"); +const KeyRef checkpointPrefix = "\xff/checkpoint/"_sr; + +const Key checkpointKeyFor(UID checkpointID) { + BinaryWriter wr(Unversioned()); + wr.serializeBytes(checkpointPrefix); + wr << checkpointID; + return wr.toValue(); +} + +const Value checkpointValue(const CheckpointMetaData& checkpoint) { + return ObjectWriter::toValue(checkpoint, IncludeVersion()); +} + +UID decodeCheckpointKey(const KeyRef& key) { + UID checkpointID; + BinaryReader rd(key.removePrefix(checkpointPrefix), Unversioned()); + rd >> checkpointID; + return checkpointID; +} + +CheckpointMetaData decodeCheckpointValue(const ValueRef& value) { + CheckpointMetaData checkpoint; + ObjectReader reader(value.begin(), IncludeVersion()); + reader.deserialize(checkpoint); + return checkpoint; +} + // "\xff/cacheServer/[[UID]] := StorageServerInterface" const KeyRangeRef storageCacheServerKeys(LiteralStringRef("\xff/cacheServer/"), LiteralStringRef("\xff/cacheServer0")); const KeyRef storageCacheServersPrefix = storageCacheServerKeys.begin; diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index a7aa733f65..228c058d77 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -70,6 +70,13 @@ void decodeKeyServersValue(std::map const& tag_uid, extern const KeyRef clusterIdKey; +// "\xff/checkpoint/[[UID]] := [[CheckpointMetaData]]" +extern const KeyRef checkpointPrefix; +const Key checkpointKeyFor(UID checkpointID); +const Value checkpointValue(const CheckpointMetaData& checkpoint); +UID decodeCheckpointKey(const KeyRef& key); +CheckpointMetaData decodeCheckpointValue(const ValueRef& value); + // "\xff/storageCacheServer/[[UID]] := StorageServerInterface" // This will be added by the cache server on initialization and removed by DD // TODO[mpilman]: We will need a way to map uint16_t ids to UIDs in a future diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 471652dcda..5afc862b92 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -541,6 +541,29 @@ private: toCommit->writeTypedMessage(privatized); } + // Generates private mutations for the target storage server, instructing it to create a checkpoint. + void checkSetCheckpointKeys(MutationRef m) { + if (!m.param1.startsWith(checkpointPrefix)) { + return; + } + if (toCommit) { + CheckpointMetaData checkpoint = decodeCheckpointValue(m.param2); + Tag tag = decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(checkpoint.ssID)).get().get()); + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + TraceEvent("SendingPrivateMutationCheckpoint", dbgid) + .detail("Original", m) + .detail("Privatized", privatized) + .detail("Server", checkpoint.ssID) + .detail("TagKey", serverTagKeyFor(checkpoint.ssID)) + .detail("Tag", tag.toString()) + .detail("Checkpoint", checkpoint.toString()); + + toCommit->addTag(tag); + toCommit->writeTypedMessage(privatized); + } + } + void checkSetOtherKeys(MutationRef m) { if (initialCommit) return; @@ -1081,6 +1104,7 @@ public: if (m.type == MutationRef::SetValue && isSystemKey(m.param1)) { checkSetKeyServersPrefix(m); checkSetServerKeysPrefix(m); + checkSetCheckpointKeys(m); checkSetServerTagsPrefix(m); checkSetStorageCachePrefix(m); checkSetCacheKeysPrefix(m); diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 728b4f9ab1..f548fba0b5 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -50,6 +50,10 @@ set(FDBSERVER_SRCS KeyValueStoreMemory.actor.cpp KeyValueStoreRocksDB.actor.cpp KeyValueStoreSQLite.actor.cpp + ServerCheckpoint.actor.cpp + ServerCheckpoint.actor.h + RocksDBCheckpointUtils.actor.cpp + RocksDBCheckpointUtils.actor.h Knobs.h LatencyBandConfig.cpp LatencyBandConfig.h @@ -191,6 +195,7 @@ set(FDBSERVER_SRCS workloads/ChangeFeeds.actor.cpp workloads/DataDistributionMetrics.actor.cpp workloads/DataLossRecovery.actor.cpp + workloads/PhysicalShardMove.actor.cpp workloads/DDBalance.actor.cpp workloads/DDMetrics.actor.cpp workloads/DDMetricsExclude.actor.cpp diff --git a/fdbserver/IKeyValueStore.h b/fdbserver/IKeyValueStore.h index 6217cada9c..a295a55004 100644 --- a/fdbserver/IKeyValueStore.h +++ b/fdbserver/IKeyValueStore.h @@ -24,6 +24,22 @@ #include "fdbclient/FDBTypes.h" #include "fdbserver/Knobs.h" +#include "fdbclient/StorageCheckpoint.h" + +struct CheckpointRequest { + const Version version; // The FDB version at which the checkpoint is created. + const KeyRange range; // Keyrange this checkpoint must contain. + const CheckpointFormat format; + const UID checkpointID; + const std::string checkpointDir; // The local directory where the checkpoint file will be created. + + CheckpointRequest(const Version version, + const KeyRange& range, + const CheckpointFormat format, + const UID& id, + const std::string& checkpointDir) + : version(version), range(range), format(format), checkpointID(id), checkpointDir(checkpointDir) {} +}; class IClosable { public: @@ -87,6 +103,15 @@ public: virtual void enableSnapshot() {} + // Create a checkpoint. + virtual Future checkpoint(const CheckpointRequest& request) { throw not_implemented(); } + + // Restore from a checkpoint. + virtual Future restore(const std::vector& checkpoints) { throw not_implemented(); } + + // Delete a checkpoint. + virtual Future deleteCheckpoint(const CheckpointMetaData& checkpoint) { throw not_implemented(); } + /* Concurrency contract Causal consistency: diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 39ca3787ec..c157458d81 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -5,11 +5,21 @@ #include #include #include +#include #include +#include +#include +#include +#include +#include #include #include #include +#include +#include #include +#include + #include #include #include @@ -32,6 +42,8 @@ #endif // SSD_ROCKSDB_EXPERIMENTAL #include "fdbserver/IKeyValueStore.h" +#include "fdbserver/RocksDBCheckpointUtils.actor.h" + #include "flow/actorcompiler.h" // has to be last include #ifdef SSD_ROCKSDB_EXPERIMENTAL @@ -114,7 +126,10 @@ private: std::mutex mutex; }; using DB = rocksdb::DB*; +using CF = rocksdb::ColumnFamilyHandle*; +#define PERSIST_PREFIX "\xff\xff" +const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version"); const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage"); const StringRef ROCKSDB_COMMIT_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBCommitLatency"); const StringRef ROCKSDB_COMMIT_ACTION_HISTOGRAM = LiteralStringRef("RocksDBCommitAction"); @@ -134,6 +149,74 @@ const StringRef ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM = LiteralStringRef("Rock const StringRef ROCKSDB_READVALUE_GET_HISTOGRAM = LiteralStringRef("RocksDBReadValueGet"); const StringRef ROCKSDB_READPREFIX_GET_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixGet"); +rocksdb::ExportImportFilesMetaData getMetaData(const CheckpointMetaData& checkpoint) { + rocksdb::ExportImportFilesMetaData metaData; + if (checkpoint.getFormat() != RocksDBColumnFamily) { + return metaData; + } + + RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(checkpoint); + metaData.db_comparator_name = rocksCF.dbComparatorName; + + for (const LiveFileMetaData& fileMetaData : rocksCF.sstFiles) { + rocksdb::LiveFileMetaData liveFileMetaData; + liveFileMetaData.size = fileMetaData.size; + liveFileMetaData.name = fileMetaData.name; + liveFileMetaData.file_number = fileMetaData.file_number; + liveFileMetaData.db_path = fileMetaData.db_path; + liveFileMetaData.smallest_seqno = fileMetaData.smallest_seqno; + liveFileMetaData.largest_seqno = fileMetaData.largest_seqno; + liveFileMetaData.smallestkey = fileMetaData.smallestkey; + liveFileMetaData.largestkey = fileMetaData.largestkey; + liveFileMetaData.num_reads_sampled = fileMetaData.num_reads_sampled; + liveFileMetaData.being_compacted = fileMetaData.being_compacted; + liveFileMetaData.num_entries = fileMetaData.num_entries; + liveFileMetaData.num_deletions = fileMetaData.num_deletions; + liveFileMetaData.temperature = static_cast(fileMetaData.temperature); + liveFileMetaData.oldest_blob_file_number = fileMetaData.oldest_blob_file_number; + liveFileMetaData.oldest_ancester_time = fileMetaData.oldest_ancester_time; + liveFileMetaData.file_creation_time = fileMetaData.file_creation_time; + liveFileMetaData.file_checksum = fileMetaData.file_checksum; + liveFileMetaData.file_checksum_func_name = fileMetaData.file_checksum_func_name; + liveFileMetaData.column_family_name = fileMetaData.column_family_name; + liveFileMetaData.level = fileMetaData.level; + metaData.files.push_back(liveFileMetaData); + } + + return metaData; +} + +void populateMetaData(CheckpointMetaData* checkpoint, const rocksdb::ExportImportFilesMetaData& metaData) { + RocksDBColumnFamilyCheckpoint rocksCF; + rocksCF.dbComparatorName = metaData.db_comparator_name; + for (const rocksdb::LiveFileMetaData& fileMetaData : metaData.files) { + LiveFileMetaData liveFileMetaData; + liveFileMetaData.size = fileMetaData.size; + liveFileMetaData.name = fileMetaData.name; + liveFileMetaData.file_number = fileMetaData.file_number; + liveFileMetaData.db_path = fileMetaData.db_path; + liveFileMetaData.smallest_seqno = fileMetaData.smallest_seqno; + liveFileMetaData.largest_seqno = fileMetaData.largest_seqno; + liveFileMetaData.smallestkey = fileMetaData.smallestkey; + liveFileMetaData.largestkey = fileMetaData.largestkey; + liveFileMetaData.num_reads_sampled = fileMetaData.num_reads_sampled; + liveFileMetaData.being_compacted = fileMetaData.being_compacted; + liveFileMetaData.num_entries = fileMetaData.num_entries; + liveFileMetaData.num_deletions = fileMetaData.num_deletions; + liveFileMetaData.temperature = static_cast(fileMetaData.temperature); + liveFileMetaData.oldest_blob_file_number = fileMetaData.oldest_blob_file_number; + liveFileMetaData.oldest_ancester_time = fileMetaData.oldest_ancester_time; + liveFileMetaData.file_creation_time = fileMetaData.file_creation_time; + liveFileMetaData.file_checksum = fileMetaData.file_checksum; + liveFileMetaData.file_checksum_func_name = fileMetaData.file_checksum_func_name; + liveFileMetaData.column_family_name = fileMetaData.column_family_name; + liveFileMetaData.level = fileMetaData.level; + rocksCF.sstFiles.push_back(liveFileMetaData); + } + checkpoint->setFormat(RocksDBColumnFamily); + checkpoint->serializedCheckpoint = ObjectWriter::toValue(rocksCF, IncludeVersion()); +} + rocksdb::Slice toSlice(StringRef s) { return rocksdb::Slice(reinterpret_cast(s.begin()), s.size()); } @@ -219,12 +302,13 @@ rocksdb::ReadOptions getReadOptions() { } struct ReadIterator { + CF& cf; uint64_t index; // incrementing counter to uniquely identify read iterator. bool inUse; std::shared_ptr iter; double creationTime; - ReadIterator(uint64_t index, DB& db, rocksdb::ReadOptions& options) - : index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options)) {} + ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions& options) + : cf(cf), index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {} }; /* @@ -241,8 +325,8 @@ gets deleted as the ref count becomes 0. */ class ReadIteratorPool { public: - ReadIteratorPool(DB& db, const std::string& path) - : db(db), index(0), iteratorsReuseCount(0), readRangeOptions(getReadOptions()) { + ReadIteratorPool(DB& db, CF& cf, const std::string& path) + : db(db), cf(cf), index(0), iteratorsReuseCount(0), readRangeOptions(getReadOptions()) { readRangeOptions.background_purge_on_iterator_cleanup = true; readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0); TraceEvent("ReadIteratorPool") @@ -271,12 +355,12 @@ public: } } index++; - ReadIterator iter(index, db, readRangeOptions); + ReadIterator iter(cf, index, db, readRangeOptions); iteratorsMap.insert({ index, iter }); return iter; } else { index++; - ReadIterator iter(index, db, readRangeOptions); + ReadIterator iter(cf, index, db, readRangeOptions); return iter; } } @@ -316,6 +400,7 @@ private: std::unordered_map iteratorsMap; std::unordered_map::iterator it; DB& db; + CF& cf; rocksdb::ReadOptions readRangeOptions; std::mutex mutex; // incrementing counter for every new iterator creation, to uniquely identify the iterator in returnIterator(). @@ -735,10 +820,9 @@ Error statusToError(const rocksdb::Status& s) { } struct RocksDBKeyValueStore : IKeyValueStore { - using CF = rocksdb::ColumnFamilyHandle*; - struct Writer : IThreadPoolReceiver { DB& db; + CF& cf; UID id; std::shared_ptr rateLimiter; @@ -752,11 +836,12 @@ struct RocksDBKeyValueStore : IKeyValueStore { int threadIndex; explicit Writer(DB& db, + CF& cf, UID id, std::shared_ptr readIterPool, std::shared_ptr perfContextMetrics, int threadIndex) - : db(db), id(id), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics), + : db(db), cf(cf), id(id), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics), threadIndex(threadIndex), rateLimiter(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0 ? rocksdb::NewGenericRateLimiter( @@ -814,40 +899,71 @@ struct RocksDBKeyValueStore : IKeyValueStore { double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; void action(OpenAction& a) { - std::vector defaultCF = { rocksdb::ColumnFamilyDescriptor{ - "default", getCFOptions() } }; - std::vector handle; - auto options = getOptions(); + ASSERT(cf == nullptr); + + std::vector columnFamilies; + rocksdb::Options options = getOptions(); + rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, a.path, &columnFamilies); + if (std::find(columnFamilies.begin(), columnFamilies.end(), "default") == columnFamilies.end()) { + columnFamilies.push_back("default"); + } + + rocksdb::ColumnFamilyOptions cfOptions = getCFOptions(); + std::vector descriptors; + for (const std::string& name : columnFamilies) { + descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, cfOptions }); + } + options.listeners.push_back(a.errorListener); if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) { options.rate_limiter = rateLimiter; } - auto status = rocksdb::DB::Open(options, a.path, defaultCF, &handle, &db); + + std::vector handles; + status = rocksdb::DB::Open(options, a.path, descriptors, &handles, &db); + if (!status.ok()) { logRocksDBError(status, "Open"); a.done.sendError(statusToError(status)); + return; + } + + for (rocksdb::ColumnFamilyHandle* handle : handles) { + if (handle->GetName() == SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY) { + cf = handle; + break; + } + } + + if (cf == nullptr) { + status = db->CreateColumnFamily(cfOptions, SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, &cf); + if (!status.ok()) { + logRocksDBError(status, "Open"); + a.done.sendError(statusToError(status)); + } + } + + TraceEvent(SevInfo, "RocksDB") + .detail("Path", a.path) + .detail("Method", "Open") + .detail("KnobRocksDBWriteRateLimiterBytesPerSec", + SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC) + .detail("KnobRocksDBWriteRateLimiterAutoTune", SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE) + .detail("ColumnFamily", cf->GetName()); + if (g_network->isSimulated()) { + // The current thread and main thread are same when the code runs in simulation. + // blockUntilReady() is getting the thread into deadlock state, so directly calling + // the metricsLogger. + a.metrics = rocksDBMetricLogger(options.statistics, perfContextMetrics, db, readIterPool) && + flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool); } else { - TraceEvent(SevInfo, "RocksDB") - .detail("Path", a.path) - .detail("Method", "Open") - .detail("KnobRocksDBWriteRateLimiterBytesPerSec", - SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC) - .detail("KnobRocksDBWriteRateLimiterAutoTune", SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE); - if (g_network->isSimulated()) { - // The current thread and main thread are same when the code runs in simulation. - // blockUntilReady() is getting the thread into deadlock state, so directly calling - // the metricsLogger. + onMainThread([&] { a.metrics = rocksDBMetricLogger(options.statistics, perfContextMetrics, db, readIterPool) && flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool); - } else { - onMainThread([&] { - a.metrics = rocksDBMetricLogger(options.statistics, perfContextMetrics, db, readIterPool) && - flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool); - return Future(true); - }).blockUntilReady(); - } - a.done.send(Void()); + return Future(true); + }).blockUntilReady(); } + a.done.send(Void()); } struct DeleteVisitor : public rocksdb::WriteBatch::Handler { @@ -863,6 +979,26 @@ struct RocksDBKeyValueStore : IKeyValueStore { deletes.push_back_deep(arena, kr); return rocksdb::Status::OK(); } + + rocksdb::Status PutCF(uint32_t column_family_id, + const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + return rocksdb::Status::OK(); + } + + rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + return rocksdb::Status::OK(); + } + + rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { + return rocksdb::Status::OK(); + } + + rocksdb::Status MergeCF(uint32_t column_family_id, + const rocksdb::Slice& key, + const rocksdb::Slice& value) override { + return rocksdb::Status::OK(); + } }; struct CommitAction : TypedAction { @@ -894,7 +1030,12 @@ struct RocksDBKeyValueStore : IKeyValueStore { } Standalone> deletes; DeleteVisitor dv(deletes, deletes.arena()); - ASSERT(a.batchToCommit->Iterate(&dv).ok()); + rocksdb::Status s = a.batchToCommit->Iterate(&dv); + if (!s.ok()) { + logRocksDBError(s, "CommitDeleteVisitor"); + a.done.sendError(statusToError(s)); + return; + } // If there are any range deletes, we should have added them to be deleted. ASSERT(!deletes.empty() || !a.batchToCommit->HasDeleteRange()); rocksdb::WriteOptions options; @@ -906,7 +1047,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { // Request for batchToCommit bytes. If this request cannot be satisfied, the call is blocked. rateLimiter->Request(a.batchToCommit->GetDataSize() /* bytes */, rocksdb::Env::IO_HIGH); } - auto s = db->Write(options, a.batchToCommit.get()); + s = db->Write(options, a.batchToCommit.get()); readIterPool->update(); if (a.getHistograms) { writeHistogram->sampleSeconds(timer_monotonic() - writeBeginTime); @@ -922,7 +1063,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { for (const auto& keyRange : deletes) { auto begin = toSlice(keyRange.begin); auto end = toSlice(keyRange.end); - ASSERT(db->SuggestCompactRange(db->DefaultColumnFamily(), &begin, &end).ok()); + ASSERT(db->SuggestCompactRange(cf, &begin, &end).ok()); } if (a.getHistograms) { deleteCompactRangeHistogram->sampleSeconds(timer_monotonic() - compactRangeBeginTime); @@ -956,9 +1097,13 @@ struct RocksDBKeyValueStore : IKeyValueStore { logRocksDBError(s, "Close"); } if (a.deleteOnClose) { - std::vector defaultCF = { rocksdb::ColumnFamilyDescriptor{ - "default", getCFOptions() } }; - s = rocksdb::DestroyDB(a.path, getOptions(), defaultCF); + std::set columnFamilies{ "default" }; + columnFamilies.insert(SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY); + std::vector descriptors; + for (const std::string name : columnFamilies) { + descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, getCFOptions() }); + } + s = rocksdb::DestroyDB(a.path, getOptions(), descriptors); if (!s.ok()) { logRocksDBError(s, "Destroy"); } else { @@ -968,10 +1113,133 @@ struct RocksDBKeyValueStore : IKeyValueStore { TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Close"); a.done.send(Void()); } + + struct CheckpointAction : TypedAction { + CheckpointAction(const CheckpointRequest& request) : request(request) {} + + double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } + + const CheckpointRequest request; + ThreadReturnPromise reply; + }; + + void action(CheckpointAction& a) { + TraceEvent("RocksDBServeCheckpointBegin", id) + .detail("MinVersion", a.request.version) + .detail("Range", a.request.range.toString()) + .detail("Format", static_cast(a.request.format)) + .detail("CheckpointDir", a.request.checkpointDir); + + rocksdb::Checkpoint* checkpoint; + rocksdb::Status s = rocksdb::Checkpoint::Create(db, &checkpoint); + if (!s.ok()) { + logRocksDBError(s, "Checkpoint"); + a.reply.sendError(statusToError(s)); + return; + } + + rocksdb::PinnableSlice value; + rocksdb::ReadOptions readOptions = getReadOptions(); + s = db->Get(readOptions, cf, toSlice(persistVersion), &value); + + if (!s.ok() && !s.IsNotFound()) { + logRocksDBError(s, "Checkpoint"); + a.reply.sendError(statusToError(s)); + return; + } + + const Version version = s.IsNotFound() + ? latestVersion + : BinaryReader::fromStringRef(toStringRef(value), Unversioned()); + + TraceEvent("RocksDBServeCheckpointVersion", id) + .detail("CheckpointVersion", a.request.version) + .detail("PersistVersion", version); + + // TODO: set the range as the actual shard range. + CheckpointMetaData res(version, a.request.range, a.request.format, a.request.checkpointID); + const std::string& checkpointDir = a.request.checkpointDir; + + if (a.request.format == RocksDBColumnFamily) { + rocksdb::ExportImportFilesMetaData* pMetadata; + platform::eraseDirectoryRecursive(checkpointDir); + const std::string cwd = platform::getWorkingDirectory() + "/"; + s = checkpoint->ExportColumnFamily(cf, checkpointDir, &pMetadata); + + if (!s.ok()) { + logRocksDBError(s, "Checkpoint"); + a.reply.sendError(statusToError(s)); + return; + } + + populateMetaData(&res, *pMetadata); + delete pMetadata; + TraceEvent("RocksDBServeCheckpointSuccess", id) + .detail("CheckpointMetaData", res.toString()) + .detail("RocksDBCF", getRocksCF(res).toString()); + } else { + throw not_implemented(); + } + + res.setState(CheckpointMetaData::Complete); + a.reply.send(res); + } + + struct RestoreAction : TypedAction { + RestoreAction(const std::string& path, const std::vector& checkpoints) + : path(path), checkpoints(checkpoints) {} + + double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } + + const std::string path; + const std::vector checkpoints; + ThreadReturnPromise done; + }; + + void action(RestoreAction& a) { + TraceEvent("RocksDBServeRestoreBegin", id).detail("Path", a.path); + + // TODO: Fail gracefully. + ASSERT(!a.checkpoints.empty()); + + if (a.checkpoints[0].format == RocksDBColumnFamily) { + ASSERT_EQ(a.checkpoints.size(), 1); + TraceEvent("RocksDBServeRestoreCF", id) + .detail("Path", a.path) + .detail("Checkpoint", a.checkpoints[0].toString()) + .detail("RocksDBCF", getRocksCF(a.checkpoints[0]).toString()); + + auto options = getOptions(); + rocksdb::Status status = rocksdb::DB::Open(options, a.path, &db); + + if (!status.ok()) { + logRocksDBError(status, "Restore"); + a.done.sendError(statusToError(status)); + return; + } + + rocksdb::ExportImportFilesMetaData metaData = getMetaData(a.checkpoints[0]); + rocksdb::ImportColumnFamilyOptions importOptions; + importOptions.move_files = true; + status = db->CreateColumnFamilyWithImport( + getCFOptions(), SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, importOptions, metaData, &cf); + + if (!status.ok()) { + logRocksDBError(status, "Restore"); + a.done.sendError(statusToError(status)); + } else { + TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Restore"); + a.done.send(Void()); + } + } else { + throw not_implemented(); + } + } }; struct Reader : IThreadPoolReceiver { DB& db; + CF& cf; double readValueTimeout; double readValuePrefixTimeout; double readRangeTimeout; @@ -992,10 +1260,12 @@ struct RocksDBKeyValueStore : IKeyValueStore { int threadIndex; explicit Reader(DB& db, + CF& cf, std::shared_ptr readIterPool, std::shared_ptr perfContextMetrics, int threadIndex) - : db(db), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics), threadIndex(threadIndex), + : db(db), cf(cf), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics), + threadIndex(threadIndex), readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP, ROCKSDB_READRANGE_LATENCY_HISTOGRAM, Histogram::Unit::microseconds)), @@ -1066,6 +1336,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; } }; void action(ReadValueAction& a) { + ASSERT(cf != nullptr); bool doPerfContextMetrics = SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE && (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE); @@ -1098,7 +1369,13 @@ struct RocksDBKeyValueStore : IKeyValueStore { options.deadline = std::chrono::duration_cast(deadlineSeconds); double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0; - auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value); + auto s = db->Get(options, cf, toSlice(a.key), &value); + if (!s.ok() && !s.IsNotFound()) { + logRocksDBError(s, "ReadValue"); + a.result.sendError(statusToError(s)); + return; + } + if (a.getHistograms) { readValueGetHistogram->sampleSeconds(timer_monotonic() - dbGetBeginTime); } @@ -1175,7 +1452,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { options.deadline = std::chrono::duration_cast(deadlineSeconds); double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0; - auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value); + auto s = db->Get(options, cf, toSlice(a.key), &value); if (a.getHistograms) { readPrefixGetHistogram->sampleSeconds(timer_monotonic() - dbGetBeginTime); } @@ -1330,6 +1607,7 @@ struct RocksDBKeyValueStore : IKeyValueStore { DB db = nullptr; std::shared_ptr perfContextMetrics; std::string path; + rocksdb::ColumnFamilyHandle* defaultFdbCF = nullptr; UID id; Reference writeThread; Reference readThreads; @@ -1357,7 +1635,8 @@ struct RocksDBKeyValueStore : IKeyValueStore { Counters counters; explicit RocksDBKeyValueStore(const std::string& path, UID id) - : path(path), id(id), perfContextMetrics(new PerfContextMetrics()), readIterPool(new ReadIteratorPool(db, path)), + : path(path), id(id), perfContextMetrics(new PerfContextMetrics()), + readIterPool(new ReadIteratorPool(db, defaultFdbCF, path)), readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX), numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX), @@ -1381,11 +1660,11 @@ struct RocksDBKeyValueStore : IKeyValueStore { readThreads = createGenericThreadPool(); } writeThread->addThread( - new Writer(db, id, readIterPool, perfContextMetrics, SERVER_KNOBS->ROCKSDB_READ_PARALLELISM), + new Writer(db, defaultFdbCF, id, readIterPool, perfContextMetrics, SERVER_KNOBS->ROCKSDB_READ_PARALLELISM), "fdb-rocksdb-wr"); TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM); for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) { - readThreads->addThread(new Reader(db, readIterPool, perfContextMetrics, i), "fdb-rocksdb-re"); + readThreads->addThread(new Reader(db, defaultFdbCF, readIterPool, perfContextMetrics, i), "fdb-rocksdb-re"); } } @@ -1429,7 +1708,8 @@ struct RocksDBKeyValueStore : IKeyValueStore { if (writeBatch == nullptr) { writeBatch.reset(new rocksdb::WriteBatch()); } - writeBatch->Put(toSlice(kv.key), toSlice(kv.value)); + ASSERT(defaultFdbCF != nullptr); + writeBatch->Put(defaultFdbCF, toSlice(kv.key), toSlice(kv.value)); } void clear(KeyRangeRef keyRange, const Arena*) override { @@ -1437,10 +1717,12 @@ struct RocksDBKeyValueStore : IKeyValueStore { writeBatch.reset(new rocksdb::WriteBatch()); } + ASSERT(defaultFdbCF != nullptr); + if (keyRange.singleKeyRange()) { - writeBatch->Delete(toSlice(keyRange.begin)); + writeBatch->Delete(defaultFdbCF, toSlice(keyRange.begin)); } else { - writeBatch->DeleteRange(toSlice(keyRange.begin), toSlice(keyRange.end)); + writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end)); } } @@ -1587,6 +1869,46 @@ struct RocksDBKeyValueStore : IKeyValueStore { return StorageBytes(free, total, live, free); } + + Future checkpoint(const CheckpointRequest& request) override { + auto a = new Writer::CheckpointAction(request); + + auto res = a->reply.getFuture(); + writeThread->post(a); + return res; + } + + Future restore(const std::vector& checkpoints) override { + auto a = new Writer::RestoreAction(path, checkpoints); + auto res = a->done.getFuture(); + writeThread->post(a); + return res; + } + + // Delete a checkpoint. + Future deleteCheckpoint(const CheckpointMetaData& checkpoint) override { + if (checkpoint.format == RocksDBColumnFamily) { + RocksDBColumnFamilyCheckpoint rocksCF; + ObjectReader reader(checkpoint.serializedCheckpoint.begin(), IncludeVersion()); + reader.deserialize(rocksCF); + + std::unordered_set dirs; + for (const LiveFileMetaData& file : rocksCF.sstFiles) { + dirs.insert(file.db_path); + } + for (const std::string dir : dirs) { + platform::eraseDirectoryRecursive(dir); + TraceEvent("DeleteCheckpointRemovedDir", id) + .detail("CheckpointID", checkpoint.checkpointID) + .detail("Dir", dir); + } + } else if (checkpoint.format == RocksDB) { + throw not_implemented(); + } else { + throw internal_error(); + } + return Void(); + } }; } // namespace @@ -1701,6 +2023,61 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/RocksDBReopen") { return Void(); } +TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestore") { + state std::string cwd = platform::getWorkingDirectory() + "/"; + state std::string rocksDBTestDir = "rocksdb-kvstore-br-test-db"; + platform::eraseDirectoryRecursive(rocksDBTestDir); + + state IKeyValueStore* kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID()); + wait(kvStore->init()); + + kvStore->set({ LiteralStringRef("foo"), LiteralStringRef("bar") }); + wait(kvStore->commit(false)); + + Optional val = wait(kvStore->readValue(LiteralStringRef("foo"))); + ASSERT(Optional(LiteralStringRef("bar")) == val); + + platform::eraseDirectoryRecursive("checkpoint"); + state std::string checkpointDir = cwd + "checkpoint"; + + CheckpointRequest request( + latestVersion, allKeys, RocksDBColumnFamily, deterministicRandom()->randomUniqueID(), checkpointDir); + CheckpointMetaData metaData = wait(kvStore->checkpoint(request)); + + state std::string rocksDBRestoreDir = "rocksdb-kvstore-br-restore-db"; + platform::eraseDirectoryRecursive(rocksDBRestoreDir); + + state IKeyValueStore* kvStoreCopy = + new RocksDBKeyValueStore(rocksDBRestoreDir, deterministicRandom()->randomUniqueID()); + + std::vector checkpoints; + checkpoints.push_back(metaData); + wait(kvStoreCopy->restore(checkpoints)); + + Optional val = wait(kvStoreCopy->readValue(LiteralStringRef("foo"))); + ASSERT(Optional(LiteralStringRef("bar")) == val); + + std::vector> closes; + closes.push_back(kvStore->onClosed()); + closes.push_back(kvStoreCopy->onClosed()); + kvStore->close(); + kvStoreCopy->close(); + wait(waitForAll(closes)); + + platform::eraseDirectoryRecursive(rocksDBTestDir); + platform::eraseDirectoryRecursive(rocksDBRestoreDir); + + return Void(); +} + +TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/RocksDBTypes") { + // If the following assertion fails, update SstFileMetaData and LiveFileMetaData in RocksDBCheckpointUtils.actor.h + // to be the same as rocksdb::SstFileMetaData and rocksdb::LiveFileMetaData. + ASSERT_EQ(sizeof(rocksdb::LiveFileMetaData), 184); + ASSERT_EQ(sizeof(rocksdb::ExportImportFilesMetaData), 32); + return Void(); +} + } // namespace #endif // SSD_ROCKSDB_EXPERIMENTAL diff --git a/fdbserver/RocksDBCheckpointUtils.actor.cpp b/fdbserver/RocksDBCheckpointUtils.actor.cpp new file mode 100644 index 0000000000..612f8b1f20 --- /dev/null +++ b/fdbserver/RocksDBCheckpointUtils.actor.cpp @@ -0,0 +1,283 @@ +/* + *RocksDBCheckpointUtils.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/RocksDBCheckpointUtils.actor.h" + +#include "fdbclient/FDBTypes.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbclient/StorageCheckpoint.h" +#include "flow/Trace.h" +#include "flow/flow.h" + +#include "flow/actorcompiler.h" // has to be last include + +namespace { + +class RocksDBCheckpointReader : public ICheckpointReader { +public: + RocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID) + : checkpoint_(checkpoint), id_(logID), file_(Reference()), offset_(0) {} + + Future init(StringRef token) override; + + Future nextKeyValues(const int rowLimit, const int byteLimit) override { throw not_implemented(); } + + // Returns the next chunk of serialized checkpoint. + Future> nextChunk(const int byteLimit) override; + + Future close() override; + +private: + ACTOR static Future doInit(RocksDBCheckpointReader* self) { + ASSERT(self != nullptr); + try { + state Reference _file = wait(IAsyncFileSystem::filesystem()->open( + self->path_, IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO, 0)); + self->file_ = _file; + TraceEvent("RocksDBCheckpointReaderOpenFile").detail("File", self->path_); + } catch (Error& e) { + TraceEvent(SevWarnAlways, "ServerGetCheckpointFileFailure") + .errorUnsuppressed(e) + .detail("File", self->path_); + throw e; + } + + return Void(); + } + + ACTOR static Future> getNextChunk(RocksDBCheckpointReader* self, int byteLimit) { + int blockSize = std::min(64 * 1024, byteLimit); // Block size read from disk. + state Standalone buf = makeAlignedString(_PAGE_SIZE, blockSize); + int bytesRead = wait(self->file_->read(mutateString(buf), blockSize, self->offset_)); + if (bytesRead == 0) { + throw end_of_stream(); + } + + self->offset_ += bytesRead; + return buf.substr(0, bytesRead); + } + + ACTOR static Future doClose(RocksDBCheckpointReader* self) { + wait(delay(0, TaskPriority::FetchKeys)); + delete self; + return Void(); + } + + CheckpointMetaData checkpoint_; + UID id_; + Reference file_; + int offset_; + std::string path_; +}; + +Future RocksDBCheckpointReader::init(StringRef token) { + ASSERT_EQ(this->checkpoint_.getFormat(), RocksDBColumnFamily); + const std::string name = token.toString(); + this->offset_ = 0; + this->path_.clear(); + const RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(this->checkpoint_); + for (const auto& sstFile : rocksCF.sstFiles) { + if (sstFile.name == name) { + this->path_ = sstFile.db_path + sstFile.name; + break; + } + } + + if (this->path_.empty()) { + TraceEvent("RocksDBCheckpointReaderInitFileNotFound").detail("File", this->path_); + return checkpoint_not_found(); + } + + return doInit(this); +} + +Future> RocksDBCheckpointReader::nextChunk(const int byteLimit) { + return getNextChunk(this, byteLimit); +} + +Future RocksDBCheckpointReader::close() { + return doClose(this); +} + +// Fetch a single sst file from storage server. If the file is fetch successfully, it will be recorded via cFun. +ACTOR Future fetchCheckpointFile(Database cx, + std::shared_ptr metaData, + int idx, + std::string dir, + std::function(const CheckpointMetaData&)> cFun, + int maxRetries = 3) { + state RocksDBColumnFamilyCheckpoint rocksCF; + ObjectReader reader(metaData->serializedCheckpoint.begin(), IncludeVersion()); + reader.deserialize(rocksCF); + + // Skip fetched file. + if (rocksCF.sstFiles[idx].fetched && rocksCF.sstFiles[idx].db_path == dir) { + return Void(); + } + + state std::string remoteFile = rocksCF.sstFiles[idx].name; + state std::string localFile = dir + rocksCF.sstFiles[idx].name; + state UID ssID = metaData->ssID; + + state Transaction tr(cx); + state StorageServerInterface ssi; + loop { + try { + Optional ss = wait(tr.get(serverListKeyFor(ssID))); + if (!ss.present()) { + throw checkpoint_not_found(); + } + ssi = decodeServerListValue(ss.get()); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + state int attempt = 0; + loop { + try { + ++attempt; + TraceEvent("FetchCheckpointFileBegin") + .detail("RemoteFile", remoteFile) + .detail("TargetUID", ssID.toString()) + .detail("StorageServer", ssi.id().toString()) + .detail("LocalFile", localFile) + .detail("Attempt", attempt); + + wait(IAsyncFileSystem::filesystem()->deleteFile(localFile, true)); + const int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | + IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO; + state int64_t offset = 0; + state Reference asyncFile = wait(IAsyncFileSystem::filesystem()->open(localFile, flags, 0666)); + + state ReplyPromiseStream stream = + ssi.fetchCheckpoint.getReplyStream(FetchCheckpointRequest(metaData->checkpointID, remoteFile)); + TraceEvent("FetchCheckpointFileReceivingData") + .detail("RemoteFile", remoteFile) + .detail("TargetUID", ssID.toString()) + .detail("StorageServer", ssi.id().toString()) + .detail("LocalFile", localFile) + .detail("Attempt", attempt); + loop { + state FetchCheckpointReply rep = waitNext(stream.getFuture()); + wait(asyncFile->write(rep.data.begin(), rep.data.size(), offset)); + wait(asyncFile->flush()); + offset += rep.data.size(); + } + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) { + TraceEvent("FetchCheckpointFileError") + .errorUnsuppressed(e) + .detail("RemoteFile", remoteFile) + .detail("StorageServer", ssi.toString()) + .detail("LocalFile", localFile) + .detail("Attempt", attempt); + if (attempt >= maxRetries) { + throw e; + } + } else { + wait(asyncFile->sync()); + int64_t fileSize = wait(asyncFile->size()); + TraceEvent("FetchCheckpointFileEnd") + .detail("RemoteFile", remoteFile) + .detail("StorageServer", ssi.toString()) + .detail("LocalFile", localFile) + .detail("Attempt", attempt) + .detail("DataSize", offset) + .detail("FileSize", fileSize); + rocksCF.sstFiles[idx].db_path = dir; + rocksCF.sstFiles[idx].fetched = true; + metaData->serializedCheckpoint = ObjectWriter::toValue(rocksCF, IncludeVersion()); + if (cFun) { + wait(cFun(*metaData)); + } + return Void(); + } + } + } +} + +} // namespace + +ACTOR Future fetchRocksDBCheckpoint(Database cx, + CheckpointMetaData initialState, + std::string dir, + std::function(const CheckpointMetaData&)> cFun) { + TraceEvent("FetchRocksCheckpointBegin") + .detail("InitialState", initialState.toString()) + .detail("CheckpointDir", dir); + + state std::shared_ptr metaData = std::make_shared(initialState); + + if (metaData->format == RocksDBColumnFamily) { + state RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(initialState); + TraceEvent("RocksDBCheckpointMetaData").detail("RocksCF", rocksCF.toString()); + + state int i = 0; + state std::vector> fs; + for (; i < rocksCF.sstFiles.size(); ++i) { + fs.push_back(fetchCheckpointFile(cx, metaData, i, dir, cFun)); + TraceEvent("GetCheckpointFetchingFile") + .detail("FileName", rocksCF.sstFiles[i].name) + .detail("Server", metaData->ssID.toString()); + } + wait(waitForAll(fs)); + } else { + throw not_implemented(); + } + + return *metaData; +} + +ACTOR Future deleteRocksCFCheckpoint(CheckpointMetaData checkpoint) { + ASSERT_EQ(checkpoint.getFormat(), RocksDBColumnFamily); + RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(checkpoint); + TraceEvent("DeleteRocksColumnFamilyCheckpoint", checkpoint.checkpointID) + .detail("CheckpointID", checkpoint.checkpointID) + .detail("RocksCF", rocksCF.toString()); + + state std::unordered_set dirs; + for (const LiveFileMetaData& file : rocksCF.sstFiles) { + dirs.insert(file.db_path); + } + + state std::unordered_set::iterator it = dirs.begin(); + for (; it != dirs.end(); ++it) { + const std::string dir = *it; + platform::eraseDirectoryRecursive(dir); + TraceEvent("DeleteCheckpointRemovedDir", checkpoint.checkpointID) + .detail("CheckpointID", checkpoint.checkpointID) + .detail("Dir", dir); + wait(delay(0, TaskPriority::FetchKeys)); + } + return Void(); +} + +ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID) { + return new RocksDBCheckpointReader(checkpoint, logID); +} + +RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint) { + RocksDBColumnFamilyCheckpoint rocksCF; + ObjectReader reader(checkpoint.serializedCheckpoint.begin(), IncludeVersion()); + reader.deserialize(rocksCF); + return rocksCF; +} \ No newline at end of file diff --git a/fdbserver/RocksDBCheckpointUtils.actor.h b/fdbserver/RocksDBCheckpointUtils.actor.h new file mode 100644 index 0000000000..3d0c157bf3 --- /dev/null +++ b/fdbserver/RocksDBCheckpointUtils.actor.h @@ -0,0 +1,209 @@ +/* + *RocksDBCheckpointUtils.actor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_ROCKSDB_CHECKPOINT_UTILS_ACTOR_G_H) +#define FDBSERVER_ROCKSDB_CHECKPOINT_UTILS_ACTOR_G_H +#include "fdbserver/RocksDBCheckpointUtils.actor.g.h" +#elif !defined(FDBSERVER_ROCKSDB_CHECKPOINT_UTILS_ACTOR_H) +#define FDBSERVER_ROCKSDB_CHECKPOINT_UTILS_ACTOR_H + +#include "fdbclient/NativeAPI.actor.h" +#include "fdbserver/ServerCheckpoint.actor.h" +#include "flow/flow.h" + +#include "flow/actorcompiler.h" // has to be last include + +// Copied from rocksdb/metadata.h, so that we can add serializer. +struct SstFileMetaData { + constexpr static FileIdentifier file_identifier = 3804347; + SstFileMetaData() + : size(0), file_number(0), smallest_seqno(0), largest_seqno(0), num_reads_sampled(0), being_compacted(false), + num_entries(0), num_deletions(0), temperature(0), oldest_blob_file_number(0), oldest_ancester_time(0), + file_creation_time(0) {} + + SstFileMetaData(const std::string& _file_name, + uint64_t _file_number, + const std::string& _path, + size_t _size, + uint64_t _smallest_seqno, + uint64_t _largest_seqno, + const std::string& _smallestkey, + const std::string& _largestkey, + uint64_t _num_reads_sampled, + bool _being_compacted, + int _temperature, + uint64_t _oldest_blob_file_number, + uint64_t _oldest_ancester_time, + uint64_t _file_creation_time, + std::string& _file_checksum, + std::string& _file_checksum_func_name) + : size(_size), name(_file_name), file_number(_file_number), db_path(_path), smallest_seqno(_smallest_seqno), + largest_seqno(_largest_seqno), smallestkey(_smallestkey), largestkey(_largestkey), + num_reads_sampled(_num_reads_sampled), being_compacted(_being_compacted), num_entries(0), num_deletions(0), + temperature(_temperature), oldest_blob_file_number(_oldest_blob_file_number), + oldest_ancester_time(_oldest_ancester_time), file_creation_time(_file_creation_time), + file_checksum(_file_checksum), file_checksum_func_name(_file_checksum_func_name) {} + + // File size in bytes. + size_t size; + // The name of the file. + std::string name; + // The id of the file. + uint64_t file_number; + // The full path where the file locates. + std::string db_path; + + uint64_t smallest_seqno; // Smallest sequence number in file. + uint64_t largest_seqno; // Largest sequence number in file. + std::string smallestkey; // Smallest user defined key in the file. + std::string largestkey; // Largest user defined key in the file. + uint64_t num_reads_sampled; // How many times the file is read. + bool being_compacted; // true if the file is currently being compacted. + + uint64_t num_entries; + uint64_t num_deletions; + + // This feature is experimental and subject to change. + int temperature; + + uint64_t oldest_blob_file_number; // The id of the oldest blob file + // referenced by the file. + // An SST file may be generated by compactions whose input files may + // in turn be generated by earlier compactions. The creation time of the + // oldest SST file that is the compaction ancestor of this file. + // The timestamp is provided SystemClock::GetCurrentTime(). + // 0 if the information is not available. + // + // Note: for TTL blob files, it contains the start of the expiration range. + uint64_t oldest_ancester_time; + // Timestamp when the SST file is created, provided by + // SystemClock::GetCurrentTime(). 0 if the information is not available. + uint64_t file_creation_time; + + // The checksum of a SST file, the value is decided by the file content and + // the checksum algorithm used for this SST file. The checksum function is + // identified by the file_checksum_func_name. If the checksum function is + // not specified, file_checksum is "0" by default. + std::string file_checksum; + + // The name of the checksum function used to generate the file checksum + // value. If file checksum is not enabled (e.g., sst_file_checksum_func is + // null), file_checksum_func_name is UnknownFileChecksumFuncName, which is + // "Unknown". + std::string file_checksum_func_name; + + template + void serialize(Ar& ar) { + serializer(ar, + size, + name, + file_number, + db_path, + smallest_seqno, + largest_seqno, + smallestkey, + largestkey, + num_reads_sampled, + being_compacted, + num_entries, + num_deletions, + temperature, + oldest_blob_file_number, + oldest_ancester_time, + file_creation_time, + file_checksum, + file_checksum_func_name); + } +}; + +// Copied from rocksdb::LiveFileMetaData. +struct LiveFileMetaData : public SstFileMetaData { + constexpr static FileIdentifier file_identifier = 3804346; + std::string column_family_name; // Name of the column family + int level; // Level at which this file resides. + bool fetched; + LiveFileMetaData() : column_family_name(), level(0), fetched(false) {} + + template + void serialize(Ar& ar) { + serializer(ar, + SstFileMetaData::size, + SstFileMetaData::name, + SstFileMetaData::file_number, + SstFileMetaData::db_path, + SstFileMetaData::smallest_seqno, + SstFileMetaData::largest_seqno, + SstFileMetaData::smallestkey, + SstFileMetaData::largestkey, + SstFileMetaData::num_reads_sampled, + SstFileMetaData::being_compacted, + SstFileMetaData::num_entries, + SstFileMetaData::num_deletions, + SstFileMetaData::temperature, + SstFileMetaData::oldest_blob_file_number, + SstFileMetaData::oldest_ancester_time, + SstFileMetaData::file_creation_time, + SstFileMetaData::file_checksum, + SstFileMetaData::file_checksum_func_name, + column_family_name, + level, + fetched); + } +}; + +// Checkpoint metadata associated with RockDBColumnFamily format. +// Based on rocksdb::ExportImportFilesMetaData. +struct RocksDBColumnFamilyCheckpoint { + constexpr static FileIdentifier file_identifier = 13804346; + std::string dbComparatorName; + + std::vector sstFiles; + + CheckpointFormat format() const { return RocksDBColumnFamily; } + + std::string toString() const { + std::string res = "RocksDBColumnFamilyCheckpoint:\nSST Files:\n"; + for (const auto& file : sstFiles) { + res += file.db_path + file.name + "\n"; + } + return res; + } + + template + void serialize(Ar& ar) { + serializer(ar, dbComparatorName, sstFiles); + } +}; + +// Fetch the checkpoint file(s) to local dir, the checkpoint is specified by initialState. +// If cFun is provided, the fetch progress can be checkpointed, so that next time, the fetch process +// can be continued, in case of crash. +ACTOR Future fetchRocksDBCheckpoint(Database cx, + CheckpointMetaData initialState, + std::string dir, + std::function(const CheckpointMetaData&)> cFun); + +ACTOR Future deleteRocksCFCheckpoint(CheckpointMetaData checkpoint); + +ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID); + +RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint); +#endif \ No newline at end of file diff --git a/fdbserver/ServerCheckpoint.actor.cpp b/fdbserver/ServerCheckpoint.actor.cpp new file mode 100644 index 0000000000..15047b62eb --- /dev/null +++ b/fdbserver/ServerCheckpoint.actor.cpp @@ -0,0 +1,67 @@ +/* + *ServerCheckpoint.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/ServerCheckpoint.actor.h" +#include "fdbserver/RocksDBCheckpointUtils.actor.h" + +#include "flow/actorcompiler.h" // has to be last include + +ICheckpointReader* newCheckpointReader(const CheckpointMetaData& checkpoint, UID logID) { + if (checkpoint.getFormat() == RocksDBColumnFamily) { + return newRocksDBCheckpointReader(checkpoint, logID); + } else if (checkpoint.getFormat() == RocksDB) { + throw not_implemented(); + } else { + ASSERT(false); + } + + return nullptr; +} + +ACTOR Future deleteCheckpoint(CheckpointMetaData checkpoint) { + wait(delay(0, TaskPriority::FetchKeys)); + + if (checkpoint.getFormat() == RocksDBColumnFamily) { + wait(deleteRocksCFCheckpoint(checkpoint)); + } else if (checkpoint.getFormat() == RocksDB) { + throw not_implemented(); + } else { + ASSERT(false); + } + + return Void(); +} + +ACTOR Future fetchCheckpoint(Database cx, + CheckpointMetaData initialState, + std::string dir, + std::function(const CheckpointMetaData&)> cFun) { + state CheckpointMetaData result; + if (initialState.getFormat() == RocksDBColumnFamily) { + CheckpointMetaData _result = wait(fetchRocksDBCheckpoint(cx, initialState, dir, cFun)); + result = _result; + } else if (initialState.getFormat() == RocksDB) { + throw not_implemented(); + } else { + ASSERT(false); + } + + return result; +} \ No newline at end of file diff --git a/fdbserver/ServerCheckpoint.actor.h b/fdbserver/ServerCheckpoint.actor.h new file mode 100644 index 0000000000..b7a5a2d50c --- /dev/null +++ b/fdbserver/ServerCheckpoint.actor.h @@ -0,0 +1,66 @@ +/* + *ServerCheckpoint.actor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_SERVER_CHECKPOINT_ACTOR_G_H) +#define FDBSERVER_SERVER_CHECKPOINT_ACTOR_G_H +#include "fdbserver/ServerCheckpoint.actor.g.h" +#elif !defined(FDBSERVER_SERVER_CHECKPOINT_ACTOR_H) +#define FDBSERVER_SERVER_CHECKPOINT_ACTOR_H + +#include "fdbclient/NativeAPI.actor.h" +#include "fdbclient/StorageCheckpoint.h" +#include "flow/flow.h" + +#include "flow/actorcompiler.h" // has to be last include + +// An ICheckpointReader can read the contents of a checkpoint created from a KV store, +// i.e., by IKeyValueStore::checkpoint(). +class ICheckpointReader { +public: + // `token` is a serialized object defined by each derived ICheckpointReader class, to specify the + // starting point for the underlying checkpoint. + virtual Future init(StringRef token) = 0; + + // Scans the checkpoint, and returns the key-value pairs. + virtual Future nextKeyValues(const int rowLimit, const int ByteLimit) = 0; + + // Returns the next chunk of the serialized checkpoint. + virtual Future> nextChunk(const int ByteLimit) = 0; + + virtual Future close() = 0; + +protected: + virtual ~ICheckpointReader() {} +}; + +ICheckpointReader* newCheckpointReader(const CheckpointMetaData& checkpoint, UID logID); + +// Delete a checkpoint. +ACTOR Future deleteCheckpoint(CheckpointMetaData checkpoint); + +// Fetchs checkpoint to a local `dir`, `initialState` provides the checkpoint formats, location, restart point, etc. +// If cFun is provided, the progress can be checkpointed. +// Returns a CheckpointMetaData, which could contain KVS-specific results, e.g., the list of fetched checkpoint files. +ACTOR Future fetchCheckpoint(Database cx, + CheckpointMetaData initialState, + std::string dir, + std::function(const CheckpointMetaData&)> cFun = nullptr); +#endif \ No newline at end of file diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index d7a8dfb0a9..77a5b904b5 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -58,6 +58,7 @@ #include "fdbserver/MutationTracking.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/StorageMetrics.h" +#include "fdbserver/ServerCheckpoint.actor.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/WaitFailure.h" @@ -104,6 +105,46 @@ bool canReplyWith(Error e) { } } // namespace +#define PERSIST_PREFIX "\xff\xff" + +// Immutable +static const KeyValueRef persistFormat(LiteralStringRef(PERSIST_PREFIX "Format"), + LiteralStringRef("FoundationDB/StorageServer/1/4")); +static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("FoundationDB/StorageServer/1/2"), + LiteralStringRef("FoundationDB/StorageServer/1/5")); +static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID"); +static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID"); +static const KeyRef persistSSPairID = LiteralStringRef(PERSIST_PREFIX "ssWithTSSPairID"); +static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ"); +static const KeyRef persistClusterIdKey = LiteralStringRef(PERSIST_PREFIX "clusterId"); + +// (Potentially) change with the durable version or when fetchKeys completes +static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version"); +static const KeyRangeRef persistShardAssignedKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAssigned/"), LiteralStringRef(PERSIST_PREFIX "ShardAssigned0")); +static const KeyRangeRef persistShardAvailableKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAvailable/"), LiteralStringRef(PERSIST_PREFIX "ShardAvailable0")); +static const KeyRangeRef persistByteSampleKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/"), LiteralStringRef(PERSIST_PREFIX "BS0")); +static const KeyRangeRef persistByteSampleSampleKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/"), + LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0")); +static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol"); +static const KeyRef persistPrimaryLocality = LiteralStringRef(PERSIST_PREFIX "PrimaryLocality"); +static const KeyRangeRef persistChangeFeedKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "RF/"), LiteralStringRef(PERSIST_PREFIX "RF0")); +static const KeyRangeRef persistTenantMapKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "TM/"), LiteralStringRef(PERSIST_PREFIX "TM0")); +// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys) + +// Checkpoint related prefixes. +static const KeyRangeRef persistCheckpointKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "Checkpoint/"), LiteralStringRef(PERSIST_PREFIX "Checkpoint0")); +static const KeyRangeRef persistPendingCheckpointKeys = + KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "PendingCheckpoint/"), + LiteralStringRef(PERSIST_PREFIX "PendingCheckpoint0")); +static const std::string rocksdbCheckpointDirPrefix = "/rockscheckpoints_"; + struct AddingShard : NonCopyable { KeyRange keys; Future fetchClient; // holds FetchKeys() actor @@ -241,6 +282,14 @@ struct StorageServerDisk { return storage->readRange(keys, rowLimit, byteLimit, type); } + Future checkpoint(const CheckpointRequest& request) { return storage->checkpoint(request); } + + Future restore(const std::vector& checkpoints) { return storage->restore(checkpoints); } + + Future deleteCheckpoint(const CheckpointMetaData& checkpoint) { + return storage->deleteCheckpoint(checkpoint); + } + KeyValueStoreType getKeyValueStoreType() const { return storage->getType(); } StorageBytes getStorageBytes() const { return storage->getStorageBytes(); } std::tuple getSize() const { return storage->getSize(); } @@ -418,6 +467,8 @@ private: std::unordered_map> watchMap; // keep track of server watches public: + std::map> pendingCheckpoints; // Pending checkpoint requests + std::unordered_map checkpoints; // Existing and deleting checkpoints TenantMap tenantMap; TenantPrefixIndex tenantPrefixIndex; @@ -1727,6 +1778,116 @@ ACTOR Future changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) return Void(); } +// Finds a checkpoint. +ACTOR Future getCheckpointQ(StorageServer* self, GetCheckpointRequest req) { + // Wait until the desired version is durable. + wait(self->durableVersion.whenAtLeast(req.version + 1)); + + TraceEvent(SevDebug, "ServeGetCheckpointVersionSatisfied", self->thisServerID) + .detail("Version", req.version) + .detail("Range", req.range.toString()) + .detail("Format", static_cast(req.format)); + + try { + std::unordered_map::iterator it = self->checkpoints.begin(); + for (; it != self->checkpoints.end(); ++it) { + const CheckpointMetaData& md = it->second; + if (md.version == req.version && md.format == req.format && md.range.contains(req.range) && + md.getState() == CheckpointMetaData::Complete) { + req.reply.send(md); + TraceEvent(SevDebug, "ServeGetCheckpointEnd", self->thisServerID).detail("Checkpoint", md.toString()); + break; + } + } + + if (it == self->checkpoints.end()) { + req.reply.sendError(checkpoint_not_found()); + } + } catch (Error& e) { + if (!canReplyWith(e)) { + throw; + } + req.reply.sendError(e); + } + return Void(); +} + +// Delete the checkpoint from disk, as well as all related presisted meta data. +ACTOR Future deleteCheckpointQ(StorageServer* self, Version version, CheckpointMetaData checkpoint) { + wait(self->durableVersion.whenAtLeast(version)); + + TraceEvent("DeleteCheckpointBegin", self->thisServerID).detail("Checkpoint", checkpoint.toString()); + + self->checkpoints.erase(checkpoint.checkpointID); + + try { + wait(deleteCheckpoint(checkpoint)); + } catch (Error& e) { + // TODO: Handle errors more gracefully. + throw; + } + + state Key persistCheckpointKey(persistCheckpointKeys.begin.toString() + checkpoint.checkpointID.toString()); + state Key pendingCheckpointKey(persistPendingCheckpointKeys.begin.toString() + checkpoint.checkpointID.toString()); + Version version = self->data().getLatestVersion(); + auto& mLV = self->addVersionToMutationLog(version); + self->addMutationToMutationLog( + mLV, MutationRef(MutationRef::ClearRange, pendingCheckpointKey, keyAfter(pendingCheckpointKey))); + self->addMutationToMutationLog( + mLV, MutationRef(MutationRef::ClearRange, persistCheckpointKey, keyAfter(persistCheckpointKey))); + + return Void(); +} + +// Serves FetchCheckpointRequests. +ACTOR Future fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest req) { + TraceEvent("ServeFetchCheckpointBegin", self->thisServerID) + .detail("CheckpointID", req.checkpointID) + .detail("Token", req.token); + + req.reply.setByteLimit(SERVER_KNOBS->CHECKPOINT_TRANSFER_BLOCK_BYTES); + + // Returns error is the checkpoint cannot be found. + const auto it = self->checkpoints.find(req.checkpointID); + if (it == self->checkpoints.end()) { + req.reply.sendError(checkpoint_not_found()); + TraceEvent("ServeFetchCheckpointNotFound", self->thisServerID).detail("CheckpointID", req.checkpointID); + return Void(); + } + + try { + state ICheckpointReader* reader = newCheckpointReader(it->second, deterministicRandom()->randomUniqueID()); + wait(reader->init(req.token)); + + loop { + state Standalone data = wait(reader->nextChunk(CLIENT_KNOBS->REPLY_BYTE_LIMIT)); + wait(req.reply.onReady()); + FetchCheckpointReply reply(req.token); + reply.data = data; + req.reply.send(reply); + } + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + req.reply.sendError(end_of_stream()); + TraceEvent("ServeFetchCheckpointEnd", self->thisServerID) + .detail("CheckpointID", req.checkpointID) + .detail("Token", req.token); + } else { + TraceEvent(SevWarnAlways, "ServerFetchCheckpointFailure") + .errorUnsuppressed(e) + .detail("CheckpointID", req.checkpointID) + .detail("Token", req.token); + if (!canReplyWith(e)) { + throw e; + } + req.reply.sendError(e); + } + } + + wait(reader->close()); + return Void(); +} + ACTOR Future overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) { wait(delay(0)); wait(data->version.whenAtLeast(req.minVersion)); @@ -4165,38 +4326,6 @@ ACTOR Future tryGetRange(PromiseStream results, Transaction* } } -#define PERSIST_PREFIX "\xff\xff" - -// Immutable -static const KeyValueRef persistFormat(LiteralStringRef(PERSIST_PREFIX "Format"), - LiteralStringRef("FoundationDB/StorageServer/1/4")); -static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("FoundationDB/StorageServer/1/2"), - LiteralStringRef("FoundationDB/StorageServer/1/5")); -static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID"); -static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID"); -static const KeyRef persistSSPairID = LiteralStringRef(PERSIST_PREFIX "ssWithTSSPairID"); -static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ"); -static const KeyRef persistClusterIdKey = LiteralStringRef(PERSIST_PREFIX "clusterId"); - -// (Potentially) change with the durable version or when fetchKeys completes -static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version"); -static const KeyRangeRef persistShardAssignedKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAssigned/"), LiteralStringRef(PERSIST_PREFIX "ShardAssigned0")); -static const KeyRangeRef persistShardAvailableKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAvailable/"), LiteralStringRef(PERSIST_PREFIX "ShardAvailable0")); -static const KeyRangeRef persistByteSampleKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/"), LiteralStringRef(PERSIST_PREFIX "BS0")); -static const KeyRangeRef persistByteSampleSampleKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/"), - LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0")); -static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol"); -static const KeyRef persistPrimaryLocality = LiteralStringRef(PERSIST_PREFIX "PrimaryLocality"); -static const KeyRangeRef persistChangeFeedKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "RF/"), LiteralStringRef(PERSIST_PREFIX "RF0")); -static const KeyRangeRef persistTenantMapKeys = - KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "TM/"), LiteralStringRef(PERSIST_PREFIX "TM0")); -// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys) - ACTOR Future fetchChangeFeedApplier(StorageServer* data, Reference changeFeedInfo, Key rangeId, @@ -5064,9 +5193,11 @@ public: } if (m.param1.startsWith(systemKeys.end)) { - if ((m.type == MutationRef::SetValue) && m.param1.substr(1).startsWith(storageCachePrefix)) + if ((m.type == MutationRef::SetValue) && m.param1.substr(1).startsWith(storageCachePrefix)) { applyPrivateCacheData(data, m); - else { + } else if ((m.type == MutationRef::SetValue) && m.param1.substr(1).startsWith(checkpointPrefix)) { + registerPendingCheckpoint(data, m, ver); + } else { applyPrivateData(data, m); } } else { @@ -5353,6 +5484,24 @@ private: ASSERT(false); // Unknown private mutation } } + + // Registers a pending checkpoint request, it will be fullfilled when the desired version is durable. + void registerPendingCheckpoint(StorageServer* data, const MutationRef& m, Version ver) { + CheckpointMetaData checkpoint = decodeCheckpointValue(m.param2); + ASSERT(checkpoint.getState() == CheckpointMetaData::Pending); + const UID checkpointID = decodeCheckpointKey(m.param1.substr(1)); + checkpoint.version = ver; + data->pendingCheckpoints[ver].push_back(checkpoint); + + auto& mLV = data->addVersionToMutationLog(ver); + const Key pendingCheckpointKey(persistPendingCheckpointKeys.begin.toString() + checkpointID.toString()); + data->addMutationToMutationLog( + mLV, MutationRef(MutationRef::SetValue, pendingCheckpointKey, checkpointValue(checkpoint))); + + TraceEvent("RegisterPendingCheckpoint", data->thisServerID) + .detail("Key", pendingCheckpointKey) + .detail("Checkpoint", checkpoint.toString()); + } }; void StorageServer::insertTenant(TenantNameRef tenantName, @@ -5413,8 +5562,8 @@ ACTOR Future tssDelayForever() { ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { state double start; try { - // If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory - // This is often referred to as the storage server e-brake (emergency brake) + // If we are disk bound and durableVersion is very old, we need to block updates or we could run out of + // memory. This is often referred to as the storage server e-brake (emergency brake) // We allow the storage server to make some progress between e-brake periods, referreed to as "overage", in // order to ensure that it advances desiredOldestVersion enough for updateStorage to make enough progress on @@ -5452,8 +5601,8 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { data->tssFaultInjectTime.get() < now()) { if (deterministicRandom()->random01() < 0.01) { TraceEvent(SevWarnAlways, "TSSInjectDelayForever", data->thisServerID).log(); - // small random chance to just completely get stuck here, each tss should eventually hit this in this - // mode + // small random chance to just completely get stuck here, each tss should eventually hit this in + // this mode wait(tssDelayForever()); } else { // otherwise pause for part of a second @@ -5550,14 +5699,14 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { } // Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now. - // If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in - // the middle of a rolled back version range. + // If there is an epoch end we skip this step, to increase testability and to prevent inserting a + // version in the middle of a rolled back version range. while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) { auto fk = data->readyFetchKeys.back(); data->readyFetchKeys.pop_back(); fk.send(&fii); - // fetchKeys() would put the data it fetched into the fii. The thread will not return back to this actor - // until it was completed. + // fetchKeys() would put the data it fetched into the fii. The thread will not return back to this + // actor until it was completed. } for (auto& c : fii.changes) @@ -5566,9 +5715,10 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { wait(doEagerReads(data, &eager)); if (data->shardChangeCounter == changeCounter) break; - TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it again. - // SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads only - // selectively + TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it + // again. + // SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads + // only selectively eager = UpdateEagerReadInfo(); } data->eagerReadsLatencyHistogram->sampleSeconds(now() - start); @@ -5598,8 +5748,8 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { for (; mutationNum < pUpdate->mutations.size(); mutationNum++) { updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version, true); mutationBytes += pUpdate->mutations[mutationNum].totalSize(); - // data->counters.mutationBytes or data->counters.mutations should not be updated because they should - // have counted when the mutations arrive from cursor initially. + // data->counters.mutationBytes or data->counters.mutations should not be updated because they + // should have counted when the mutations arrive from cursor initially. injectedChanges = true; if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) { mutationBytes = 0; @@ -5830,6 +5980,49 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { } } +ACTOR Future createCheckpoint(StorageServer* data, CheckpointMetaData metaData) { + ASSERT(metaData.ssID == data->thisServerID); + const CheckpointRequest req(metaData.version, + metaData.range, + static_cast(metaData.format), + metaData.checkpointID, + data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString()); + state CheckpointMetaData checkpointResult; + try { + state CheckpointMetaData res = wait(data->storage.checkpoint(req)); + checkpointResult = res; + checkpointResult.ssID = data->thisServerID; + ASSERT(checkpointResult.getState() == CheckpointMetaData::Complete); + data->checkpoints[checkpointResult.checkpointID] = checkpointResult; + TraceEvent("StorageCreatedCheckpoint", data->thisServerID).detail("Checkpoint", checkpointResult.toString()); + } catch (Error& e) { + // If checkpoint creation fails, the failure is persisted. + checkpointResult = metaData; + checkpointResult.setState(CheckpointMetaData::Fail); + TraceEvent("StorageCreatedCheckpointFailure", data->thisServerID) + .detail("PendingCheckpoint", checkpointResult.toString()); + } + + // Persist the checkpoint meta data. + try { + Key pendingCheckpointKey(persistPendingCheckpointKeys.begin.toString() + + checkpointResult.checkpointID.toString()); + Key persistCheckpointKey(persistCheckpointKeys.begin.toString() + checkpointResult.checkpointID.toString()); + data->storage.clearRange(singleKeyRange(pendingCheckpointKey)); + data->storage.writeKeyValue(KeyValueRef(persistCheckpointKey, checkpointValue(checkpointResult))); + wait(data->storage.commit()); + } catch (Error& e) { + // If the checkpoint meta data is not persisted successfully, remove the checkpoint. + TraceEvent("StorageCreateCheckpointPersistFailure", data->thisServerID) + .errorUnsuppressed(e) + .detail("Checkpoint", checkpointResult.toString()); + data->checkpoints.erase(checkpointResult.checkpointID); + data->actors.add(deleteCheckpointQ(data, metaData.version, checkpointResult)); + } + + return Void(); +} + ACTOR Future updateStorage(StorageServer* data) { loop { ASSERT(data->durableVersion.get() == data->storageVersion()); @@ -5851,6 +6044,33 @@ ACTOR Future updateStorage(StorageServer* data) { state Version desiredVersion = data->desiredOldestVersion.get(); state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES; + // Clean up stale checkpoint requests, this is not supposed to happen, since checkpoints are cleaned up on + // failures. This is kept as a safeguard. + while (!data->pendingCheckpoints.empty() && data->pendingCheckpoints.begin()->first <= startOldestVersion) { + for (int idx = 0; idx < data->pendingCheckpoints.begin()->second.size(); ++idx) { + auto& metaData = data->pendingCheckpoints.begin()->second[idx]; + data->actors.add(deleteCheckpointQ(data, startOldestVersion, metaData)); + TraceEvent(SevWarnAlways, "StorageStaleCheckpointRequest", data->thisServerID) + .detail("PendingCheckpoint", metaData.toString()) + .detail("DurableVersion", startOldestVersion); + } + data->pendingCheckpoints.erase(data->pendingCheckpoints.begin()); + } + + // Create checkpoint if the pending request version is within (startOldestVersion, desiredVersion]. + // Versions newer than the checkpoint version won't be committed before the checkpoint is created. + state bool requireCheckpoint = false; + if (!data->pendingCheckpoints.empty()) { + const Version cVer = data->pendingCheckpoints.begin()->first; + if (cVer <= desiredVersion) { + TraceEvent("CheckpointVersionSatisfied", data->thisServerID) + .detail("DesiredVersion", desiredVersion) + .detail("CheckPointVersion", cVer); + desiredVersion = cVer; + requireCheckpoint = true; + } + } + // Write mutations to storage until we reach the desiredVersion or have written too much (bytesleft) state double beforeStorageUpdates = now(); loop { @@ -5920,13 +6140,24 @@ ACTOR Future updateStorage(StorageServer* data) { debug_advanceMinCommittedVersion(data->thisServerID, newOldestVersion); + if (requireCheckpoint) { + ASSERT(newOldestVersion == data->pendingCheckpoints.begin()->first); + std::vector> createCheckpoints; + for (int idx = 0; idx < data->pendingCheckpoints.begin()->second.size(); ++idx) { + createCheckpoints.push_back(createCheckpoint(data, data->pendingCheckpoints.begin()->second[idx])); + } + wait(waitForAll(createCheckpoints)); + data->pendingCheckpoints.erase(data->pendingCheckpoints.begin()); + requireCheckpoint = false; + } + if (newOldestVersion > data->rebootAfterDurableVersion) { TraceEvent("RebootWhenDurableTriggered", data->thisServerID) .detail("NewOldestVersion", newOldestVersion) .detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion); - // To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process) - // never sets durableInProgress, we should set durableInProgress before send the please_reboot() error. - // Otherwise, in the race situation when storage server receives both reboot and + // To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this + // process) never sets durableInProgress, we should set durableInProgress before send the + // please_reboot() error. Otherwise, in the race situation when storage server receives both reboot and // brokenPromise of durableInProgress, the worker of the storage server will die. // We will eventually end up with no worker for storage server role. // The data distributor's buildTeam() will get stuck in building a team @@ -5948,12 +6179,13 @@ ACTOR Future updateStorage(StorageServer* data) { } durableInProgress.send(Void()); - wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to shut - // down, so delay to check for cancellation + wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to + // shut down, so delay to check for cancellation // Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was - // effective and are applied after we change the durable version. Also ensure that we have to lock while calling - // changeDurableVersion, because otherwise the latest version of mutableData might be partially loaded. + // effective and are applied after we change the durable version. Also ensure that we have to lock while + // calling changeDurableVersion, because otherwise the latest version of mutableData might be partially + // loaded. state double beforeSSDurableVersionUpdate = now(); wait(data->durableVersionLock.take()); data->popVersion(data->durableVersion.get() + 1); @@ -6029,6 +6261,23 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) { availableKeys.end, endAvailable ? LiteralStringRef("1") : LiteralStringRef("0"))); } + + // When a shard is moved out, delete all related checkpoints created for data move. + if (!available) { + for (auto& [id, checkpoint] : self->checkpoints) { + if (checkpoint.range.intersects(keys)) { + Key persistCheckpointKey(persistCheckpointKeys.begin.toString() + checkpoint.checkpointID.toString()); + checkpoint.setState(CheckpointMetaData::Deleting); + self->addMutationToMutationLog( + mLV, MutationRef(MutationRef::SetValue, persistCheckpointKey, checkpointValue(checkpoint))); + } + self->actors.add(deleteCheckpointQ(self, mLV.version + 1, checkpoint)); + TraceEvent("SSDeleteCheckpointScheduled", self->thisServerID) + .detail("MovedOutRange", keys.toString()) + .detail("Checkpoint", checkpoint.toString()) + .detail("DeleteVersion", mLV.version + 1); + } + } } void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) { @@ -6297,6 +6546,8 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor state Future fShardAssigned = storage->readRange(persistShardAssignedKeys); state Future fShardAvailable = storage->readRange(persistShardAvailableKeys); state Future fChangeFeeds = storage->readRange(persistChangeFeedKeys); + state Future fPendingCheckpoints = storage->readRange(persistPendingCheckpointKeys); + state Future fCheckpoints = storage->readRange(persistCheckpointKeys); state Future fTenantMap = storage->readRange(persistTenantMapKeys); state Promise byteSampleSampleRecovered; @@ -6307,7 +6558,8 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor TraceEvent("ReadingDurableState", data->thisServerID).log(); wait(waitForAll(std::vector{ fFormat, fID, fClusterID, ftssPairID, fssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality })); - wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable, fChangeFeeds, fTenantMap })); + wait(waitForAll( + std::vector{ fShardAssigned, fShardAvailable, fChangeFeeds, fPendingCheckpoints, fCheckpoints, fTenantMap })); wait(byteSampleSampleRecovered.getFuture()); TraceEvent("RestoringDurableState", data->thisServerID).log(); @@ -6347,8 +6599,8 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor } // It's a bit sketchy to rely on an untrusted storage engine to persist its quarantine state when the quarantine - // state means the storage engine already had a durability or correctness error, but it should get re-quarantined - // very quickly because of a mismatch if it starts trying to do things again + // state means the storage engine already had a durability or correctness error, but it should get + // re-quarantined very quickly because of a mismatch if it starts trying to do things again if (fTssQuarantine.get().present()) { TEST(true); // TSS restarted while quarantined data->tssInQuarantine = true; @@ -6373,6 +6625,25 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor data->setInitialVersion(version); data->bytesRestored += fVersion.get().expectedSize(); + state RangeResult pendingCheckpoints = fPendingCheckpoints.get(); + state int pCLoc; + for (pCLoc = 0; pCLoc < pendingCheckpoints.size(); ++pCLoc) { + CheckpointMetaData metaData = decodeCheckpointValue(pendingCheckpoints[pCLoc].value); + data->pendingCheckpoints[metaData.version].push_back(metaData); + wait(yield()); + } + + state RangeResult checkpoints = fCheckpoints.get(); + state int cLoc; + for (cLoc = 0; cLoc < checkpoints.size(); ++cLoc) { + CheckpointMetaData metaData = decodeCheckpointValue(checkpoints[cLoc].value); + data->checkpoints[metaData.checkpointID] = metaData; + if (metaData.getState() == CheckpointMetaData::Deleting) { + data->actors.add(deleteCheckpointQ(data, version, metaData)); + } + wait(yield()); + } + state RangeResult available = fShardAvailable.get(); data->bytesRestored += available.logicalSize(); state int availableLoc; @@ -6624,15 +6895,15 @@ ACTOR Future waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re // SOMEDAY: validation! The changes here are possibly partial changes (we receive multiple // messages per - // update to our requested range). This means that the validation would have to occur after all - // the messages for one clear or set have been dispatched. + // update to our requested range). This means that the validation would have to occur after + // all the messages for one clear or set have been dispatched. /*StorageMetrics m = getMetrics( data, req.keys ); bool b = ( m.bytes != metrics.bytes || m.bytesPerKSecond != metrics.bytesPerKSecond || m.iosPerKSecond != metrics.iosPerKSecond ); if (b) { printf("keys: '%s' - '%s' @%p\n", printable(req.keys.begin).c_str(), printable(req.keys.end).c_str(), this); - printf("waitMetrics: desync %d (%lld %lld %lld) != (%lld %lld %lld); +(%lld %lld %lld)\n", b, - m.bytes, m.bytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.bytesPerKSecond, + printf("waitMetrics: desync %d (%lld %lld %lld) != (%lld %lld %lld); +(%lld %lld %lld)\n", + b, m.bytes, m.bytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.bytesPerKSecond, metrics.iosPerKSecond, c.bytes, c.bytesPerKSecond, c.iosPerKSecond); }*/ @@ -6641,8 +6912,8 @@ ACTOR Future waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re } } catch (Error& e) { if (e.code() == error_code_actor_cancelled) - throw; // This is only cancelled when the main loop had exited...no need in this case to clean up - // self + throw; // This is only cancelled when the main loop had exited...no need in this case to clean + // up self error = e; break; } @@ -6822,8 +7093,8 @@ ACTOR Future serveGetValueRequests(StorageServer* self, FutureStreammodify(&TransactionLineage::operation) = TransactionLineage::Operation::GetValue; loop { GetValueRequest req = waitNext(getValue); - // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade - // before doing real work + // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so + // downgrade before doing real work if (req.debugID.present()) g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), @@ -6841,8 +7112,8 @@ ACTOR Future serveGetKeyValuesRequests(StorageServer* self, FutureStreamactors.add(self->readGuard(req, getKeyValuesQ)); } } @@ -6864,8 +7135,8 @@ ACTOR Future serveGetKeyValuesStreamRequests(StorageServer* self, FutureStream getKeyValuesStream) { loop { GetKeyValuesStreamRequest req = waitNext(getKeyValuesStream); - // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade - // before doing real work + // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so + // downgrade before doing real work // FIXME: add readGuard again self->actors.add(getKeyValuesStreamQ(self, req)); } @@ -6875,8 +7146,8 @@ ACTOR Future serveGetKeyRequests(StorageServer* self, FutureStreammodify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKey; loop { GetKeyRequest req = waitNext(getKey); - // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade - // before doing real work + // Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so + // downgrade before doing real work self->actors.add(self->readGuard(req, getKeyQ)); } } @@ -7067,8 +7338,8 @@ ACTOR Future reportStorageServerState(StorageServer* self) { ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface ssi) { state Future doUpdate = Void(); - state bool updateReceived = - false; // true iff the current update() actor assigned to doUpdate has already received an update from the tlog + state bool updateReceived = false; // true iff the current update() actor assigned to doUpdate has already + // received an update from the tlog state double lastLoopTopTime = now(); state Future dbInfoChange = Void(); state Future checkLastUpdate = Void(); @@ -7134,8 +7405,8 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface self->popVersion(self->durableVersion.get() + 1, true); } // If update() is waiting for results from the tlog, it might never get them, so needs to be - // cancelled. But if it is waiting later, cancelling it could cause problems (e.g. fetchKeys that - // already committed to transitioning to waiting state) + // cancelled. But if it is waiting later, cancelling it could cause problems (e.g. fetchKeys + // that already committed to transitioning to waiting state) if (!updateReceived) { doUpdate = Void(); } @@ -7178,6 +7449,17 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface else doUpdate = update(self, &updateReceived); } + when(GetCheckpointRequest req = waitNext(ssi.checkpoint.getFuture())) { + if (!self->isReadable(req.range)) { + req.reply.sendError(wrong_shard_server()); + continue; + } else { + self->actors.add(getCheckpointQ(self, req)); + } + } + when(FetchCheckpointRequest req = waitNext(ssi.fetchCheckpoint.getFuture())) { + self->actors.add(fetchCheckpointQ(self, req)); + } when(wait(updateProcessStatsTimer)) { updateProcessStats(self); updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL); @@ -7190,16 +7472,17 @@ ACTOR Future storageServerCore(StorageServer* self, StorageServerInterface bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData, Error const& e) { self.shuttingDown = true; - // Clearing shards shuts down any fetchKeys actors; these may do things on cancellation that are best done with self - // still valid + // Clearing shards shuts down any fetchKeys actors; these may do things on cancellation that are best done with + // self still valid self.shards.insert(allKeys, Reference()); - // Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just - // close it. + // Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise + // just close it. if (e.code() == error_code_please_reboot) { // do nothing. } else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) { - // SOMEDAY: could close instead of dispose if tss in quarantine gets removed so it could still be investigated? + // SOMEDAY: could close instead of dispose if tss in quarantine gets removed so it could still be + // investigated? persistentData->dispose(); } else { persistentData->close(); diff --git a/fdbserver/workloads/PhysicalShardMove.actor.cpp b/fdbserver/workloads/PhysicalShardMove.actor.cpp new file mode 100644 index 0000000000..333b7bfc79 --- /dev/null +++ b/fdbserver/workloads/PhysicalShardMove.actor.cpp @@ -0,0 +1,234 @@ +/* + *PhysicalShardMove.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbrpc/simulator.h" +#include "fdbserver/IKeyValueStore.h" +#include "fdbserver/ServerCheckpoint.actor.h" +#include "fdbserver/MoveKeys.actor.h" +#include "fdbserver/QuietDatabase.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/Error.h" +#include "flow/IRandom.h" +#include "flow/flow.h" +#include +#include + +#include "flow/actorcompiler.h" // This must be the last #include. + +namespace { +std::string printValue(const ErrorOr>& value) { + if (value.isError()) { + return value.getError().name(); + } + return value.get().present() ? value.get().get().toString() : "Value Not Found."; +} +} // namespace + +struct SSCheckpointWorkload : TestWorkload { + const bool enabled; + bool pass; + + SSCheckpointWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {} + + void validationFailed(ErrorOr> expectedValue, ErrorOr> actualValue) { + TraceEvent(SevError, "TestFailed") + .detail("ExpectedValue", printValue(expectedValue)) + .detail("ActualValue", printValue(actualValue)); + pass = false; + } + + std::string description() const override { return "SSCheckpoint"; } + + Future setup(Database const& cx) override { return Void(); } + + Future start(Database const& cx) override { + if (!enabled) { + return Void(); + } + return _start(this, cx); + } + + ACTOR Future _start(SSCheckpointWorkload* self, Database cx) { + state Key key = "TestKey"_sr; + state Key endKey = "TestKey0"_sr; + state Value oldValue = "TestValue"_sr; + + int ignore = wait(setDDMode(cx, 0)); + state Version version = wait(self->writeAndVerify(self, cx, key, oldValue)); + + // Create checkpoint. + state Transaction tr(cx); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + state CheckpointFormat format = RocksDBColumnFamily; + loop { + try { + wait(createCheckpoint(&tr, KeyRangeRef(key, endKey), format)); + wait(tr.commit()); + version = tr.getCommittedVersion(); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + TraceEvent("TestCheckpointCreated") + .detail("Range", KeyRangeRef(key, endKey).toString()) + .detail("Version", version); + + // Fetch checkpoint meta data. + loop { + try { + state std::vector records = + wait(getCheckpointMetaData(cx, KeyRangeRef(key, endKey), version, format)); + break; + } catch (Error& e) { + TraceEvent("TestFetchCheckpointMetadataError") + .errorUnsuppressed(e) + .detail("Range", KeyRangeRef(key, endKey).toString()) + .detail("Version", version); + + // The checkpoint was just created, we don't expect this error. + ASSERT(e.code() != error_code_checkpoint_not_found); + } + } + + TraceEvent("TestCheckpointFetched") + .detail("Range", KeyRangeRef(key, endKey).toString()) + .detail("Version", version) + .detail("Shards", records.size()); + + state std::string pwd = platform::getWorkingDirectory(); + state std::string folder = pwd + "/checkpoints"; + platform::eraseDirectoryRecursive(folder); + ASSERT(platform::createDirectory(folder)); + + // Fetch checkpoint. + state int i = 0; + for (; i < records.size(); ++i) { + loop { + TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString()); + try { + state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder)); + TraceEvent("TestCheckpointFetched").detail("Checkpoint", records[i].toString()); + break; + } catch (Error& e) { + TraceEvent("TestFetchCheckpointError") + .errorUnsuppressed(e) + .detail("Checkpoint", records[i].toString()); + wait(delay(1)); + } + } + } + + state std::string rocksDBTestDir = "rocksdb-kvstore-test-db"; + platform::eraseDirectoryRecursive(rocksDBTestDir); + + // Restore KVS. + state IKeyValueStore* kvStore = keyValueStoreRocksDB( + rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1); + try { + wait(kvStore->restore(records)); + } catch (Error& e) { + TraceEvent(SevError, "TestRestoreCheckpointError") + .errorUnsuppressed(e) + .detail("Checkpoint", describe(records)); + } + + // Compare the keyrange between the original database and the one restored from checkpoint. + // For now, it should have been a single key. + tr.reset(); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + loop { + try { + state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY)); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + for (i = 0; i < res.size(); ++i) { + Optional value = wait(kvStore->readValue(res[i].key)); + ASSERT(value.present()); + ASSERT(value.get() == res[i].value); + } + + int ignore = wait(setDDMode(cx, 1)); + return Void(); + } + + ACTOR Future readAndVerify(SSCheckpointWorkload* self, + Database cx, + Key key, + ErrorOr> expectedValue) { + state Transaction tr(cx); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + loop { + try { + state Optional res = wait(timeoutError(tr.get(key), 30.0)); + const bool equal = !expectedValue.isError() && res == expectedValue.get(); + if (!equal) { + self->validationFailed(expectedValue, ErrorOr>(res)); + } + break; + } catch (Error& e) { + if (expectedValue.isError() && expectedValue.getError().code() == e.code()) { + break; + } + wait(tr.onError(e)); + } + } + + return Void(); + } + + ACTOR Future writeAndVerify(SSCheckpointWorkload* self, Database cx, Key key, Optional value) { + state Transaction tr(cx); + state Version version; + loop { + try { + if (value.present()) { + tr.set(key, value.get()); + } else { + tr.clear(key); + } + wait(timeoutError(tr.commit(), 30.0)); + version = tr.getCommittedVersion(); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + + wait(self->readAndVerify(self, cx, key, value)); + + return version; + } + + Future check(Database const& cx) override { return pass; } + + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory SSCheckpointWorkloadFactory("SSCheckpointWorkload"); \ No newline at end of file diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 5864a31563..b0a5a25a57 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -174,6 +174,7 @@ ERROR( blob_granule_no_ryw, 2036, "Blob Granule Read Transactions must be specif ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read Transactions must be specified as ryw-disabled" ) ERROR( get_mapped_key_values_has_more, 2038, "getMappedRange does not support continuation for now" ) ERROR( get_mapped_range_reads_your_writes, 2039, "getMappedRange tries to read data that were previously written in the transaction" ) +ERROR( checkpoint_not_found, 2040, "Checkpoint not found" ) ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" ) ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" ) @@ -283,6 +284,7 @@ ERROR( snap_invalid_uid_string, 2509, "The given uid string is not a 32-length h // 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error ERROR( internal_error, 4100, "An internal error occurred" ) +ERROR( not_implemented, 4200, "Not implemented yet" ) // clang-format on #undef ERROR diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1c6d535706..198ed4fb2d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -138,6 +138,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/CycleAndLock.toml) add_fdb_test(TEST_FILES fast/CycleTest.toml) add_fdb_test(TEST_FILES fast/ChangeFeeds.toml) + add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml) add_fdb_test(TEST_FILES fast/DataLossRecovery.toml) add_fdb_test(TEST_FILES fast/EncryptionOps.toml) add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml) diff --git a/tests/fast/PhysicalShardMove.toml b/tests/fast/PhysicalShardMove.toml new file mode 100644 index 0000000000..72d1f0331c --- /dev/null +++ b/tests/fast/PhysicalShardMove.toml @@ -0,0 +1,13 @@ +[configuration] +config = 'triple' +storageEngineType = 4 +processesPerMachine = 1 +coordinators = 3 +machineCount = 15 + +[[test]] +testTitle = 'PhysicalShardMove' +useDB = true + + [[test.workload]] + testName = 'SSCheckpointWorkload'