mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 18:02:31 +08:00
Physical Shard Move (#6264)
Physical Shard Move part I: Checkpoint creation, transfer and restore.
This commit is contained in:
parent
e8077b65e1
commit
c3a68d661e
@ -128,6 +128,7 @@ set(FDBCLIENT_SRCS
|
||||
StatusClient.h
|
||||
StorageServerInterface.cpp
|
||||
StorageServerInterface.h
|
||||
StorageCheckpoint.h
|
||||
Subspace.cpp
|
||||
Subspace.h
|
||||
StackLineage.h
|
||||
|
@ -7630,6 +7630,167 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
static Future<Void> createCheckpointImpl(T tr, KeyRangeRef range, CheckpointFormat format) {
|
||||
TraceEvent("CreateCheckpointTransactionBegin").detail("Range", range.toString());
|
||||
|
||||
state RangeResult keyServers = wait(krmGetRanges(tr, keyServersPrefix, range));
|
||||
ASSERT(!keyServers.more);
|
||||
|
||||
state RangeResult UIDtoTagMap = wait(tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
for (int i = 0; i < keyServers.size() - 1; ++i) {
|
||||
KeyRangeRef shard(keyServers[i].key, keyServers[i + 1].key);
|
||||
std::vector<UID> src;
|
||||
std::vector<UID> dest;
|
||||
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest);
|
||||
|
||||
// The checkpoint request is sent to all replicas, in case any of them is unhealthy.
|
||||
// An alternative is to choose a healthy replica.
|
||||
const UID checkpointID = deterministicRandom()->randomUniqueID();
|
||||
for (int idx = 0; idx < src.size(); ++idx) {
|
||||
CheckpointMetaData checkpoint(shard & range, format, src[idx], checkpointID);
|
||||
checkpoint.setState(CheckpointMetaData::Pending);
|
||||
tr->set(checkpointKeyFor(checkpointID), checkpointValue(checkpoint));
|
||||
}
|
||||
|
||||
TraceEvent("CreateCheckpointTransactionShard")
|
||||
.detail("Shard", shard.toString())
|
||||
.detail("SrcServers", describe(src))
|
||||
.detail("ServerSelected", describe(src))
|
||||
.detail("CheckpointKey", checkpointKeyFor(checkpointID))
|
||||
.detail("ReadVersion", tr->getReadVersion().get());
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> createCheckpoint(Reference<ReadYourWritesTransaction> tr, KeyRangeRef range, CheckpointFormat format) {
|
||||
return holdWhile(tr, createCheckpointImpl(tr, range, format));
|
||||
}
|
||||
|
||||
Future<Void> createCheckpoint(Transaction* tr, KeyRangeRef range, CheckpointFormat format) {
|
||||
return createCheckpointImpl(tr, range, format);
|
||||
}
|
||||
|
||||
// Gets CheckpointMetaData of the specific keyrange, version and format from one of the storage servers, if none of the
|
||||
// servers have the checkpoint, a checkpoint_not_found error is returned.
|
||||
ACTOR static Future<CheckpointMetaData> getCheckpointMetaDataInternal(GetCheckpointRequest req,
|
||||
Reference<LocationInfo> alternatives,
|
||||
double timeout) {
|
||||
TraceEvent("GetCheckpointMetaDataInternalBegin")
|
||||
.detail("Range", req.range.toString())
|
||||
.detail("Version", req.version)
|
||||
.detail("Format", static_cast<int>(req.format))
|
||||
.detail("Locations", alternatives->description());
|
||||
|
||||
state std::vector<Future<ErrorOr<CheckpointMetaData>>> fs;
|
||||
state int i = 0;
|
||||
for (i = 0; i < alternatives->size(); ++i) {
|
||||
// For each shard, all storage servers are checked, only one is required.
|
||||
fs.push_back(errorOr(timeoutError(alternatives->getInterface(i).checkpoint.getReply(req), timeout)));
|
||||
}
|
||||
|
||||
state Optional<Error> error;
|
||||
wait(waitForAll(fs));
|
||||
TraceEvent("GetCheckpointMetaDataInternalWaitEnd")
|
||||
.detail("Range", req.range.toString())
|
||||
.detail("Version", req.version);
|
||||
|
||||
for (i = 0; i < fs.size(); ++i) {
|
||||
if (!fs[i].isReady()) {
|
||||
error = timed_out();
|
||||
TraceEvent("GetCheckpointMetaDataInternalSSTimeout")
|
||||
.detail("Range", req.range.toString())
|
||||
.detail("Version", req.version)
|
||||
.detail("StorageServer", alternatives->getInterface(i).uniqueID);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (fs[i].get().isError()) {
|
||||
const Error& e = fs[i].get().getError();
|
||||
TraceEvent("GetCheckpointMetaDataInternalError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Range", req.range.toString())
|
||||
.detail("Version", req.version)
|
||||
.detail("StorageServer", alternatives->getInterface(i).uniqueID);
|
||||
if (e.code() != error_code_checkpoint_not_found || !error.present()) {
|
||||
error = e;
|
||||
}
|
||||
} else {
|
||||
return fs[i].get().get();
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(error.present());
|
||||
throw error.get();
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<CheckpointMetaData>> getCheckpointMetaData(Database cx,
|
||||
KeyRange keys,
|
||||
Version version,
|
||||
CheckpointFormat format,
|
||||
double timeout) {
|
||||
state Span span("NAPI:GetCheckpoint"_loc);
|
||||
|
||||
loop {
|
||||
TraceEvent("GetCheckpointBegin")
|
||||
.detail("Range", keys.toString())
|
||||
.detail("Version", version)
|
||||
.detail("Format", static_cast<int>(format));
|
||||
|
||||
state std::vector<Future<CheckpointMetaData>> fs;
|
||||
state int i = 0;
|
||||
|
||||
try {
|
||||
state std::vector<std::pair<KeyRange, Reference<LocationInfo>>> locations =
|
||||
wait(getKeyRangeLocations(cx,
|
||||
keys,
|
||||
CLIENT_KNOBS->TOO_MANY,
|
||||
Reverse::False,
|
||||
&StorageServerInterface::checkpoint,
|
||||
span.context,
|
||||
Optional<UID>(),
|
||||
UseProvisionalProxies::False));
|
||||
|
||||
fs.clear();
|
||||
for (i = 0; i < locations.size(); ++i) {
|
||||
fs.push_back(getCheckpointMetaDataInternal(
|
||||
GetCheckpointRequest(version, keys, format), locations[i].second, timeout));
|
||||
TraceEvent("GetCheckpointShardBegin")
|
||||
.detail("Range", locations[i].first.toString())
|
||||
.detail("Version", version)
|
||||
.detail("StorageServers", locations[i].second->description());
|
||||
}
|
||||
|
||||
choose {
|
||||
when(wait(cx->connectionFileChanged())) { cx->invalidateCache(keys); }
|
||||
when(wait(waitForAll(fs))) { break; }
|
||||
when(wait(delay(timeout))) {
|
||||
TraceEvent("GetCheckpointTimeout").detail("Range", keys.toString()).detail("Version", version);
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GetCheckpointError").errorUnsuppressed(e).detail("Range", keys.toString());
|
||||
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
|
||||
e.code() == error_code_connection_failed || e.code() == error_code_broken_promise) {
|
||||
cx->invalidateCache(keys);
|
||||
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<CheckpointMetaData> res;
|
||||
for (i = 0; i < fs.size(); ++i) {
|
||||
TraceEvent("GetCheckpointShardEnd").detail("Checkpoint", fs[i].get().toString());
|
||||
res.push_back(fs[i].get());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion> exclusions) {
|
||||
TraceEvent("ExclusionSafetyCheckBegin")
|
||||
.detail("NumExclusion", exclusions.size())
|
||||
|
@ -490,6 +490,25 @@ int64_t extractIntOption(Optional<StringRef> value,
|
||||
// states: coordinator, TLog and storage state
|
||||
ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID);
|
||||
|
||||
// Adds necessary mutation(s) to the transaction, so that *one* checkpoint will be created for
|
||||
// each and every shards overlapping with `range`. Each checkpoint will be created at a random
|
||||
// storage server for each shard.
|
||||
// All checkpoint(s) will be created at the transaction's commit version.
|
||||
Future<Void> createCheckpoint(Transaction* tr, KeyRangeRef range, CheckpointFormat format);
|
||||
|
||||
// Same as above.
|
||||
Future<Void> createCheckpoint(Reference<ReadYourWritesTransaction> tr, KeyRangeRef range, CheckpointFormat format);
|
||||
|
||||
// Gets checkpoint metadata for `keys` at the specific version, with the particular format.
|
||||
// One CheckpointMetaData will be returned for each distinctive shard.
|
||||
// The collective keyrange of the returned checkpoint(s) is a super-set of `keys`.
|
||||
// checkpoint_not_found() error will be returned if the specific checkpoint(s) cannot be found.
|
||||
ACTOR Future<std::vector<CheckpointMetaData>> getCheckpointMetaData(Database cx,
|
||||
KeyRange keys,
|
||||
Version version,
|
||||
CheckpointFormat format,
|
||||
double timeout = 5.0);
|
||||
|
||||
// Checks with Data Distributor that it is safe to mark all servers in exclusions as failed
|
||||
ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion> exclusions);
|
||||
|
||||
|
@ -367,6 +367,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||
init( ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC, 0 );
|
||||
// If true, enables dynamic adjustment of ROCKSDB_WRITE_RATE_LIMITER_BYTES according to the recent demand of background IO.
|
||||
init( ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE, true );
|
||||
init( DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, "fdb");
|
||||
|
||||
init( ROCKSDB_PERFCONTEXT_ENABLE, false ); if( randomize && BUGGIFY ) ROCKSDB_PERFCONTEXT_ENABLE = deterministicRandom()->coinflip() ? false : true;
|
||||
init( ROCKSDB_PERFCONTEXT_SAMPLE_RATE, 0.0001 );
|
||||
init( ROCKSDB_MAX_SUBCOMPACTIONS, 2 );
|
||||
@ -678,6 +680,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
||||
init( MAX_STORAGE_COMMIT_TIME, 120.0 ); //The max fsync stall time on the storage server and tlog before marking a disk as failed
|
||||
init( RANGESTREAM_LIMIT_BYTES, 2e6 ); if( randomize && BUGGIFY ) RANGESTREAM_LIMIT_BYTES = 1;
|
||||
init( ENABLE_CLEAR_RANGE_EAGER_READS, true );
|
||||
init( CHECKPOINT_TRANSFER_BLOCK_BYTES, 40e6 );
|
||||
init( QUICK_GET_VALUE_FALLBACK, true );
|
||||
init( QUICK_GET_KEY_VALUES_FALLBACK, true );
|
||||
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
|
||||
|
@ -298,6 +298,7 @@ public:
|
||||
bool ROCKSDB_READ_RANGE_REUSE_ITERATORS;
|
||||
int64_t ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC;
|
||||
bool ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE;
|
||||
std::string DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY;
|
||||
bool ROCKSDB_PERFCONTEXT_ENABLE; // Enable rocks perf context metrics. May cause performance overhead
|
||||
double ROCKSDB_PERFCONTEXT_SAMPLE_RATE;
|
||||
int ROCKSDB_MAX_SUBCOMPACTIONS;
|
||||
@ -617,6 +618,7 @@ public:
|
||||
bool ENABLE_CLEAR_RANGE_EAGER_READS;
|
||||
bool QUICK_GET_VALUE_FALLBACK;
|
||||
bool QUICK_GET_KEY_VALUES_FALLBACK;
|
||||
int CHECKPOINT_TRANSFER_BLOCK_BYTES;
|
||||
int QUICK_GET_KEY_VALUES_LIMIT;
|
||||
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
|
||||
|
||||
|
88
fdbclient/StorageCheckpoint.h
Normal file
88
fdbclient/StorageCheckpoint.h
Normal file
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* StorageCheckpoint.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBCLIENT_STORAGCHECKPOINT_H
|
||||
#define FDBCLIENT_STORAGCHECKPOINT_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
|
||||
// FDB storage checkpoint format.
|
||||
enum CheckpointFormat {
|
||||
InvalidFormat = 0,
|
||||
// For RocksDB, checkpoint generated via rocksdb::Checkpoint::ExportColumnFamily().
|
||||
RocksDBColumnFamily = 1,
|
||||
// For RocksDB, checkpoint generated via rocksdb::Checkpoint::CreateCheckpoint().
|
||||
RocksDB = 2,
|
||||
};
|
||||
|
||||
// Metadata of a FDB checkpoint.
|
||||
struct CheckpointMetaData {
|
||||
enum CheckpointState {
|
||||
InvalidState = 0,
|
||||
Pending = 1, // Checkpoint creation pending.
|
||||
Complete = 2, // Checkpoint is created and ready to be read.
|
||||
Deleting = 3, // Checkpoint deletion requested.
|
||||
Fail = 4,
|
||||
};
|
||||
|
||||
constexpr static FileIdentifier file_identifier = 13804342;
|
||||
Version version;
|
||||
KeyRange range;
|
||||
int16_t format; // CheckpointFormat.
|
||||
UID ssID; // Storage server ID on which this checkpoint is created.
|
||||
UID checkpointID; // A unique id for this checkpoint.
|
||||
int16_t state; // CheckpointState.
|
||||
int referenceCount; // A reference count on the checkpoint, it can only be deleted when this is 0.
|
||||
int64_t gcTime; // Time to delete this checkpoint, a Unix timestamp in seconds.
|
||||
|
||||
// A serialized metadata associated with format, this data can be understood by the corresponding KVS.
|
||||
Standalone<StringRef> serializedCheckpoint;
|
||||
|
||||
CheckpointMetaData() : format(InvalidFormat), state(InvalidState), referenceCount(0) {}
|
||||
CheckpointMetaData(KeyRange const& range, CheckpointFormat format, UID const& ssID, UID const& checkpointID)
|
||||
: version(invalidVersion), range(range), format(format), ssID(ssID), checkpointID(checkpointID), state(Pending),
|
||||
referenceCount(0) {}
|
||||
CheckpointMetaData(Version version, KeyRange const& range, CheckpointFormat format, UID checkpointID)
|
||||
: version(version), range(range), format(format), checkpointID(checkpointID), referenceCount(0) {}
|
||||
|
||||
CheckpointState getState() const { return static_cast<CheckpointState>(state); }
|
||||
|
||||
void setState(CheckpointState state) { this->state = static_cast<int16_t>(state); }
|
||||
|
||||
CheckpointFormat getFormat() const { return static_cast<CheckpointFormat>(format); }
|
||||
|
||||
void setFormat(CheckpointFormat format) { this->format = static_cast<int16_t>(format); }
|
||||
|
||||
std::string toString() const {
|
||||
std::string res = "Checkpoint MetaData:\nRange: " + range.toString() + "\nVersion: " + std::to_string(version) +
|
||||
"\nFormat: " + std::to_string(format) + "\nServer: " + ssID.toString() +
|
||||
"\nID: " + checkpointID.toString() + "\nState: " + std::to_string(static_cast<int>(state)) +
|
||||
"\n";
|
||||
return res;
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, range, format, state, checkpointID, ssID, gcTime, serializedCheckpoint);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
@ -24,6 +24,7 @@
|
||||
|
||||
#include <ostream>
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/StorageCheckpoint.h"
|
||||
#include "fdbrpc/Locality.h"
|
||||
#include "fdbrpc/QueueModel.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
@ -85,6 +86,8 @@ struct StorageServerInterface {
|
||||
RequestStream<struct OverlappingChangeFeedsRequest> overlappingChangeFeeds;
|
||||
RequestStream<struct ChangeFeedPopRequest> changeFeedPop;
|
||||
RequestStream<struct ChangeFeedVersionUpdateRequest> changeFeedVersionUpdate;
|
||||
RequestStream<struct GetCheckpointRequest> checkpoint;
|
||||
RequestStream<struct FetchCheckpointRequest> fetchCheckpoint;
|
||||
|
||||
explicit StorageServerInterface(UID uid) : uniqueID(uid) {}
|
||||
StorageServerInterface() : uniqueID(deterministicRandom()->randomUniqueID()) {}
|
||||
@ -137,6 +140,9 @@ struct StorageServerInterface {
|
||||
RequestStream<struct ChangeFeedPopRequest>(getValue.getEndpoint().getAdjustedEndpoint(17));
|
||||
changeFeedVersionUpdate = RequestStream<struct ChangeFeedVersionUpdateRequest>(
|
||||
getValue.getEndpoint().getAdjustedEndpoint(18));
|
||||
checkpoint = RequestStream<struct GetCheckpointRequest>(getValue.getEndpoint().getAdjustedEndpoint(19));
|
||||
fetchCheckpoint =
|
||||
RequestStream<struct FetchCheckpointRequest>(getValue.getEndpoint().getAdjustedEndpoint(20));
|
||||
}
|
||||
} else {
|
||||
ASSERT(Ar::isDeserializing);
|
||||
@ -184,6 +190,8 @@ struct StorageServerInterface {
|
||||
streams.push_back(overlappingChangeFeeds.getReceiver());
|
||||
streams.push_back(changeFeedPop.getReceiver());
|
||||
streams.push_back(changeFeedVersionUpdate.getReceiver());
|
||||
streams.push_back(checkpoint.getReceiver());
|
||||
streams.push_back(fetchCheckpoint.getReceiver());
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
};
|
||||
@ -816,6 +824,60 @@ struct ChangeFeedPopRequest {
|
||||
}
|
||||
};
|
||||
|
||||
// Request to search for a checkpoint for a minimum keyrange: `range`, at the specific version,
|
||||
// in the specific format.
|
||||
// A CheckpointMetaData will be returned if the specific checkpoint is found.
|
||||
struct GetCheckpointRequest {
|
||||
constexpr static FileIdentifier file_identifier = 13804343;
|
||||
Version version; // The FDB version at which the checkpoint is created.
|
||||
KeyRange range;
|
||||
int16_t format; // CheckpointFormat.
|
||||
Optional<UID> checkpointID; // When present, look for the checkpoint with the exact UID.
|
||||
ReplyPromise<CheckpointMetaData> reply;
|
||||
|
||||
GetCheckpointRequest() {}
|
||||
GetCheckpointRequest(Version version, KeyRange const& range, CheckpointFormat format)
|
||||
: version(version), range(range), format(format) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, range, format, checkpointID, reply);
|
||||
}
|
||||
};
|
||||
|
||||
// Reply to FetchCheckpointRequest, transfers checkpoint back to client.
|
||||
struct FetchCheckpointReply : public ReplyPromiseStreamReply {
|
||||
constexpr static FileIdentifier file_identifier = 13804345;
|
||||
Standalone<StringRef> token; // Serialized data specific to a particular checkpoint format.
|
||||
Standalone<StringRef> data;
|
||||
|
||||
FetchCheckpointReply() {}
|
||||
FetchCheckpointReply(StringRef token) : token(token) {}
|
||||
|
||||
int expectedSize() const { return data.expectedSize(); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, token, data);
|
||||
}
|
||||
};
|
||||
|
||||
// Request to fetch checkpoint from a storage server.
|
||||
struct FetchCheckpointRequest {
|
||||
constexpr static FileIdentifier file_identifier = 13804344;
|
||||
UID checkpointID;
|
||||
Standalone<StringRef> token; // Serialized data specific to a particular checkpoint format.
|
||||
ReplyPromiseStream<FetchCheckpointReply> reply;
|
||||
|
||||
FetchCheckpointRequest() = default;
|
||||
FetchCheckpointRequest(UID checkpointID, StringRef token) : checkpointID(checkpointID), token(token) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, checkpointID, token, reply);
|
||||
}
|
||||
};
|
||||
|
||||
struct OverlappingChangeFeedEntry {
|
||||
Key rangeId;
|
||||
KeyRange range;
|
||||
|
@ -215,6 +215,33 @@ const KeyRangeRef writeConflictRangeKeysRange =
|
||||
|
||||
const KeyRef clusterIdKey = LiteralStringRef("\xff/clusterId");
|
||||
|
||||
const KeyRef checkpointPrefix = "\xff/checkpoint/"_sr;
|
||||
|
||||
const Key checkpointKeyFor(UID checkpointID) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
wr.serializeBytes(checkpointPrefix);
|
||||
wr << checkpointID;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
const Value checkpointValue(const CheckpointMetaData& checkpoint) {
|
||||
return ObjectWriter::toValue(checkpoint, IncludeVersion());
|
||||
}
|
||||
|
||||
UID decodeCheckpointKey(const KeyRef& key) {
|
||||
UID checkpointID;
|
||||
BinaryReader rd(key.removePrefix(checkpointPrefix), Unversioned());
|
||||
rd >> checkpointID;
|
||||
return checkpointID;
|
||||
}
|
||||
|
||||
CheckpointMetaData decodeCheckpointValue(const ValueRef& value) {
|
||||
CheckpointMetaData checkpoint;
|
||||
ObjectReader reader(value.begin(), IncludeVersion());
|
||||
reader.deserialize(checkpoint);
|
||||
return checkpoint;
|
||||
}
|
||||
|
||||
// "\xff/cacheServer/[[UID]] := StorageServerInterface"
|
||||
const KeyRangeRef storageCacheServerKeys(LiteralStringRef("\xff/cacheServer/"), LiteralStringRef("\xff/cacheServer0"));
|
||||
const KeyRef storageCacheServersPrefix = storageCacheServerKeys.begin;
|
||||
|
@ -70,6 +70,13 @@ void decodeKeyServersValue(std::map<Tag, UID> const& tag_uid,
|
||||
|
||||
extern const KeyRef clusterIdKey;
|
||||
|
||||
// "\xff/checkpoint/[[UID]] := [[CheckpointMetaData]]"
|
||||
extern const KeyRef checkpointPrefix;
|
||||
const Key checkpointKeyFor(UID checkpointID);
|
||||
const Value checkpointValue(const CheckpointMetaData& checkpoint);
|
||||
UID decodeCheckpointKey(const KeyRef& key);
|
||||
CheckpointMetaData decodeCheckpointValue(const ValueRef& value);
|
||||
|
||||
// "\xff/storageCacheServer/[[UID]] := StorageServerInterface"
|
||||
// This will be added by the cache server on initialization and removed by DD
|
||||
// TODO[mpilman]: We will need a way to map uint16_t ids to UIDs in a future
|
||||
|
@ -541,6 +541,29 @@ private:
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
|
||||
// Generates private mutations for the target storage server, instructing it to create a checkpoint.
|
||||
void checkSetCheckpointKeys(MutationRef m) {
|
||||
if (!m.param1.startsWith(checkpointPrefix)) {
|
||||
return;
|
||||
}
|
||||
if (toCommit) {
|
||||
CheckpointMetaData checkpoint = decodeCheckpointValue(m.param2);
|
||||
Tag tag = decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(checkpoint.ssID)).get().get());
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
TraceEvent("SendingPrivateMutationCheckpoint", dbgid)
|
||||
.detail("Original", m)
|
||||
.detail("Privatized", privatized)
|
||||
.detail("Server", checkpoint.ssID)
|
||||
.detail("TagKey", serverTagKeyFor(checkpoint.ssID))
|
||||
.detail("Tag", tag.toString())
|
||||
.detail("Checkpoint", checkpoint.toString());
|
||||
|
||||
toCommit->addTag(tag);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
}
|
||||
|
||||
void checkSetOtherKeys(MutationRef m) {
|
||||
if (initialCommit)
|
||||
return;
|
||||
@ -1081,6 +1104,7 @@ public:
|
||||
if (m.type == MutationRef::SetValue && isSystemKey(m.param1)) {
|
||||
checkSetKeyServersPrefix(m);
|
||||
checkSetServerKeysPrefix(m);
|
||||
checkSetCheckpointKeys(m);
|
||||
checkSetServerTagsPrefix(m);
|
||||
checkSetStorageCachePrefix(m);
|
||||
checkSetCacheKeysPrefix(m);
|
||||
|
@ -50,6 +50,10 @@ set(FDBSERVER_SRCS
|
||||
KeyValueStoreMemory.actor.cpp
|
||||
KeyValueStoreRocksDB.actor.cpp
|
||||
KeyValueStoreSQLite.actor.cpp
|
||||
ServerCheckpoint.actor.cpp
|
||||
ServerCheckpoint.actor.h
|
||||
RocksDBCheckpointUtils.actor.cpp
|
||||
RocksDBCheckpointUtils.actor.h
|
||||
Knobs.h
|
||||
LatencyBandConfig.cpp
|
||||
LatencyBandConfig.h
|
||||
@ -191,6 +195,7 @@ set(FDBSERVER_SRCS
|
||||
workloads/ChangeFeeds.actor.cpp
|
||||
workloads/DataDistributionMetrics.actor.cpp
|
||||
workloads/DataLossRecovery.actor.cpp
|
||||
workloads/PhysicalShardMove.actor.cpp
|
||||
workloads/DDBalance.actor.cpp
|
||||
workloads/DDMetrics.actor.cpp
|
||||
workloads/DDMetricsExclude.actor.cpp
|
||||
|
@ -24,6 +24,22 @@
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbclient/StorageCheckpoint.h"
|
||||
|
||||
struct CheckpointRequest {
|
||||
const Version version; // The FDB version at which the checkpoint is created.
|
||||
const KeyRange range; // Keyrange this checkpoint must contain.
|
||||
const CheckpointFormat format;
|
||||
const UID checkpointID;
|
||||
const std::string checkpointDir; // The local directory where the checkpoint file will be created.
|
||||
|
||||
CheckpointRequest(const Version version,
|
||||
const KeyRange& range,
|
||||
const CheckpointFormat format,
|
||||
const UID& id,
|
||||
const std::string& checkpointDir)
|
||||
: version(version), range(range), format(format), checkpointID(id), checkpointDir(checkpointDir) {}
|
||||
};
|
||||
|
||||
class IClosable {
|
||||
public:
|
||||
@ -87,6 +103,15 @@ public:
|
||||
|
||||
virtual void enableSnapshot() {}
|
||||
|
||||
// Create a checkpoint.
|
||||
virtual Future<CheckpointMetaData> checkpoint(const CheckpointRequest& request) { throw not_implemented(); }
|
||||
|
||||
// Restore from a checkpoint.
|
||||
virtual Future<Void> restore(const std::vector<CheckpointMetaData>& checkpoints) { throw not_implemented(); }
|
||||
|
||||
// Delete a checkpoint.
|
||||
virtual Future<Void> deleteCheckpoint(const CheckpointMetaData& checkpoint) { throw not_implemented(); }
|
||||
|
||||
/*
|
||||
Concurrency contract
|
||||
Causal consistency:
|
||||
|
@ -5,11 +5,21 @@
|
||||
#include <rocksdb/filter_policy.h>
|
||||
#include <rocksdb/listener.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/metadata.h>
|
||||
#include <rocksdb/slice_transform.h>
|
||||
#include <rocksdb/sst_file_reader.h>
|
||||
#include <rocksdb/sst_file_writer.h>
|
||||
#include <rocksdb/slice.h>
|
||||
#include <rocksdb/env.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/statistics.h>
|
||||
#include <rocksdb/table.h>
|
||||
#include <rocksdb/version.h>
|
||||
#include <rocksdb/types.h>
|
||||
#include <rocksdb/utilities/checkpoint.h>
|
||||
#include <rocksdb/utilities/table_properties_collectors.h>
|
||||
#include <rocksdb/version.h>
|
||||
|
||||
#include <rocksdb/rate_limiter.h>
|
||||
#include <rocksdb/perf_context.h>
|
||||
#include <rocksdb/c.h>
|
||||
@ -32,6 +42,8 @@
|
||||
#endif // SSD_ROCKSDB_EXPERIMENTAL
|
||||
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/RocksDBCheckpointUtils.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
#ifdef SSD_ROCKSDB_EXPERIMENTAL
|
||||
@ -114,7 +126,10 @@ private:
|
||||
std::mutex mutex;
|
||||
};
|
||||
using DB = rocksdb::DB*;
|
||||
using CF = rocksdb::ColumnFamilyHandle*;
|
||||
|
||||
#define PERSIST_PREFIX "\xff\xff"
|
||||
const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version");
|
||||
const StringRef ROCKSDBSTORAGE_HISTOGRAM_GROUP = LiteralStringRef("RocksDBStorage");
|
||||
const StringRef ROCKSDB_COMMIT_LATENCY_HISTOGRAM = LiteralStringRef("RocksDBCommitLatency");
|
||||
const StringRef ROCKSDB_COMMIT_ACTION_HISTOGRAM = LiteralStringRef("RocksDBCommitAction");
|
||||
@ -134,6 +149,74 @@ const StringRef ROCKSDB_READRANGE_NEWITERATOR_HISTOGRAM = LiteralStringRef("Rock
|
||||
const StringRef ROCKSDB_READVALUE_GET_HISTOGRAM = LiteralStringRef("RocksDBReadValueGet");
|
||||
const StringRef ROCKSDB_READPREFIX_GET_HISTOGRAM = LiteralStringRef("RocksDBReadPrefixGet");
|
||||
|
||||
rocksdb::ExportImportFilesMetaData getMetaData(const CheckpointMetaData& checkpoint) {
|
||||
rocksdb::ExportImportFilesMetaData metaData;
|
||||
if (checkpoint.getFormat() != RocksDBColumnFamily) {
|
||||
return metaData;
|
||||
}
|
||||
|
||||
RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(checkpoint);
|
||||
metaData.db_comparator_name = rocksCF.dbComparatorName;
|
||||
|
||||
for (const LiveFileMetaData& fileMetaData : rocksCF.sstFiles) {
|
||||
rocksdb::LiveFileMetaData liveFileMetaData;
|
||||
liveFileMetaData.size = fileMetaData.size;
|
||||
liveFileMetaData.name = fileMetaData.name;
|
||||
liveFileMetaData.file_number = fileMetaData.file_number;
|
||||
liveFileMetaData.db_path = fileMetaData.db_path;
|
||||
liveFileMetaData.smallest_seqno = fileMetaData.smallest_seqno;
|
||||
liveFileMetaData.largest_seqno = fileMetaData.largest_seqno;
|
||||
liveFileMetaData.smallestkey = fileMetaData.smallestkey;
|
||||
liveFileMetaData.largestkey = fileMetaData.largestkey;
|
||||
liveFileMetaData.num_reads_sampled = fileMetaData.num_reads_sampled;
|
||||
liveFileMetaData.being_compacted = fileMetaData.being_compacted;
|
||||
liveFileMetaData.num_entries = fileMetaData.num_entries;
|
||||
liveFileMetaData.num_deletions = fileMetaData.num_deletions;
|
||||
liveFileMetaData.temperature = static_cast<rocksdb::Temperature>(fileMetaData.temperature);
|
||||
liveFileMetaData.oldest_blob_file_number = fileMetaData.oldest_blob_file_number;
|
||||
liveFileMetaData.oldest_ancester_time = fileMetaData.oldest_ancester_time;
|
||||
liveFileMetaData.file_creation_time = fileMetaData.file_creation_time;
|
||||
liveFileMetaData.file_checksum = fileMetaData.file_checksum;
|
||||
liveFileMetaData.file_checksum_func_name = fileMetaData.file_checksum_func_name;
|
||||
liveFileMetaData.column_family_name = fileMetaData.column_family_name;
|
||||
liveFileMetaData.level = fileMetaData.level;
|
||||
metaData.files.push_back(liveFileMetaData);
|
||||
}
|
||||
|
||||
return metaData;
|
||||
}
|
||||
|
||||
void populateMetaData(CheckpointMetaData* checkpoint, const rocksdb::ExportImportFilesMetaData& metaData) {
|
||||
RocksDBColumnFamilyCheckpoint rocksCF;
|
||||
rocksCF.dbComparatorName = metaData.db_comparator_name;
|
||||
for (const rocksdb::LiveFileMetaData& fileMetaData : metaData.files) {
|
||||
LiveFileMetaData liveFileMetaData;
|
||||
liveFileMetaData.size = fileMetaData.size;
|
||||
liveFileMetaData.name = fileMetaData.name;
|
||||
liveFileMetaData.file_number = fileMetaData.file_number;
|
||||
liveFileMetaData.db_path = fileMetaData.db_path;
|
||||
liveFileMetaData.smallest_seqno = fileMetaData.smallest_seqno;
|
||||
liveFileMetaData.largest_seqno = fileMetaData.largest_seqno;
|
||||
liveFileMetaData.smallestkey = fileMetaData.smallestkey;
|
||||
liveFileMetaData.largestkey = fileMetaData.largestkey;
|
||||
liveFileMetaData.num_reads_sampled = fileMetaData.num_reads_sampled;
|
||||
liveFileMetaData.being_compacted = fileMetaData.being_compacted;
|
||||
liveFileMetaData.num_entries = fileMetaData.num_entries;
|
||||
liveFileMetaData.num_deletions = fileMetaData.num_deletions;
|
||||
liveFileMetaData.temperature = static_cast<uint8_t>(fileMetaData.temperature);
|
||||
liveFileMetaData.oldest_blob_file_number = fileMetaData.oldest_blob_file_number;
|
||||
liveFileMetaData.oldest_ancester_time = fileMetaData.oldest_ancester_time;
|
||||
liveFileMetaData.file_creation_time = fileMetaData.file_creation_time;
|
||||
liveFileMetaData.file_checksum = fileMetaData.file_checksum;
|
||||
liveFileMetaData.file_checksum_func_name = fileMetaData.file_checksum_func_name;
|
||||
liveFileMetaData.column_family_name = fileMetaData.column_family_name;
|
||||
liveFileMetaData.level = fileMetaData.level;
|
||||
rocksCF.sstFiles.push_back(liveFileMetaData);
|
||||
}
|
||||
checkpoint->setFormat(RocksDBColumnFamily);
|
||||
checkpoint->serializedCheckpoint = ObjectWriter::toValue(rocksCF, IncludeVersion());
|
||||
}
|
||||
|
||||
rocksdb::Slice toSlice(StringRef s) {
|
||||
return rocksdb::Slice(reinterpret_cast<const char*>(s.begin()), s.size());
|
||||
}
|
||||
@ -219,12 +302,13 @@ rocksdb::ReadOptions getReadOptions() {
|
||||
}
|
||||
|
||||
struct ReadIterator {
|
||||
CF& cf;
|
||||
uint64_t index; // incrementing counter to uniquely identify read iterator.
|
||||
bool inUse;
|
||||
std::shared_ptr<rocksdb::Iterator> iter;
|
||||
double creationTime;
|
||||
ReadIterator(uint64_t index, DB& db, rocksdb::ReadOptions& options)
|
||||
: index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options)) {}
|
||||
ReadIterator(CF& cf, uint64_t index, DB& db, rocksdb::ReadOptions& options)
|
||||
: cf(cf), index(index), inUse(true), creationTime(now()), iter(db->NewIterator(options, cf)) {}
|
||||
};
|
||||
|
||||
/*
|
||||
@ -241,8 +325,8 @@ gets deleted as the ref count becomes 0.
|
||||
*/
|
||||
class ReadIteratorPool {
|
||||
public:
|
||||
ReadIteratorPool(DB& db, const std::string& path)
|
||||
: db(db), index(0), iteratorsReuseCount(0), readRangeOptions(getReadOptions()) {
|
||||
ReadIteratorPool(DB& db, CF& cf, const std::string& path)
|
||||
: db(db), cf(cf), index(0), iteratorsReuseCount(0), readRangeOptions(getReadOptions()) {
|
||||
readRangeOptions.background_purge_on_iterator_cleanup = true;
|
||||
readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
|
||||
TraceEvent("ReadIteratorPool")
|
||||
@ -271,12 +355,12 @@ public:
|
||||
}
|
||||
}
|
||||
index++;
|
||||
ReadIterator iter(index, db, readRangeOptions);
|
||||
ReadIterator iter(cf, index, db, readRangeOptions);
|
||||
iteratorsMap.insert({ index, iter });
|
||||
return iter;
|
||||
} else {
|
||||
index++;
|
||||
ReadIterator iter(index, db, readRangeOptions);
|
||||
ReadIterator iter(cf, index, db, readRangeOptions);
|
||||
return iter;
|
||||
}
|
||||
}
|
||||
@ -316,6 +400,7 @@ private:
|
||||
std::unordered_map<int, ReadIterator> iteratorsMap;
|
||||
std::unordered_map<int, ReadIterator>::iterator it;
|
||||
DB& db;
|
||||
CF& cf;
|
||||
rocksdb::ReadOptions readRangeOptions;
|
||||
std::mutex mutex;
|
||||
// incrementing counter for every new iterator creation, to uniquely identify the iterator in returnIterator().
|
||||
@ -735,10 +820,9 @@ Error statusToError(const rocksdb::Status& s) {
|
||||
}
|
||||
|
||||
struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
using CF = rocksdb::ColumnFamilyHandle*;
|
||||
|
||||
struct Writer : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
CF& cf;
|
||||
|
||||
UID id;
|
||||
std::shared_ptr<rocksdb::RateLimiter> rateLimiter;
|
||||
@ -752,11 +836,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
int threadIndex;
|
||||
|
||||
explicit Writer(DB& db,
|
||||
CF& cf,
|
||||
UID id,
|
||||
std::shared_ptr<ReadIteratorPool> readIterPool,
|
||||
std::shared_ptr<PerfContextMetrics> perfContextMetrics,
|
||||
int threadIndex)
|
||||
: db(db), id(id), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics),
|
||||
: db(db), cf(cf), id(id), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics),
|
||||
threadIndex(threadIndex),
|
||||
rateLimiter(SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0
|
||||
? rocksdb::NewGenericRateLimiter(
|
||||
@ -814,25 +899,57 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(OpenAction& a) {
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||
"default", getCFOptions() } };
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handle;
|
||||
auto options = getOptions();
|
||||
ASSERT(cf == nullptr);
|
||||
|
||||
std::vector<std::string> columnFamilies;
|
||||
rocksdb::Options options = getOptions();
|
||||
rocksdb::Status status = rocksdb::DB::ListColumnFamilies(options, a.path, &columnFamilies);
|
||||
if (std::find(columnFamilies.begin(), columnFamilies.end(), "default") == columnFamilies.end()) {
|
||||
columnFamilies.push_back("default");
|
||||
}
|
||||
|
||||
rocksdb::ColumnFamilyOptions cfOptions = getCFOptions();
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
|
||||
for (const std::string& name : columnFamilies) {
|
||||
descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, cfOptions });
|
||||
}
|
||||
|
||||
options.listeners.push_back(a.errorListener);
|
||||
if (SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC > 0) {
|
||||
options.rate_limiter = rateLimiter;
|
||||
}
|
||||
auto status = rocksdb::DB::Open(options, a.path, defaultCF, &handle, &db);
|
||||
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handles;
|
||||
status = rocksdb::DB::Open(options, a.path, descriptors, &handles, &db);
|
||||
|
||||
if (!status.ok()) {
|
||||
logRocksDBError(status, "Open");
|
||||
a.done.sendError(statusToError(status));
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
for (rocksdb::ColumnFamilyHandle* handle : handles) {
|
||||
if (handle->GetName() == SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY) {
|
||||
cf = handle;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (cf == nullptr) {
|
||||
status = db->CreateColumnFamily(cfOptions, SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, &cf);
|
||||
if (!status.ok()) {
|
||||
logRocksDBError(status, "Open");
|
||||
a.done.sendError(statusToError(status));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent(SevInfo, "RocksDB")
|
||||
.detail("Path", a.path)
|
||||
.detail("Method", "Open")
|
||||
.detail("KnobRocksDBWriteRateLimiterBytesPerSec",
|
||||
SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_BYTES_PER_SEC)
|
||||
.detail("KnobRocksDBWriteRateLimiterAutoTune", SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE);
|
||||
.detail("KnobRocksDBWriteRateLimiterAutoTune", SERVER_KNOBS->ROCKSDB_WRITE_RATE_LIMITER_AUTO_TUNE)
|
||||
.detail("ColumnFamily", cf->GetName());
|
||||
if (g_network->isSimulated()) {
|
||||
// The current thread and main thread are same when the code runs in simulation.
|
||||
// blockUntilReady() is getting the thread into deadlock state, so directly calling
|
||||
@ -848,7 +965,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
}
|
||||
a.done.send(Void());
|
||||
}
|
||||
}
|
||||
|
||||
struct DeleteVisitor : public rocksdb::WriteBatch::Handler {
|
||||
VectorRef<KeyRangeRef>& deletes;
|
||||
@ -863,6 +979,26 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
deletes.push_back_deep(arena, kr);
|
||||
return rocksdb::Status::OK();
|
||||
}
|
||||
|
||||
rocksdb::Status PutCF(uint32_t column_family_id,
|
||||
const rocksdb::Slice& key,
|
||||
const rocksdb::Slice& value) override {
|
||||
return rocksdb::Status::OK();
|
||||
}
|
||||
|
||||
rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
|
||||
return rocksdb::Status::OK();
|
||||
}
|
||||
|
||||
rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
|
||||
return rocksdb::Status::OK();
|
||||
}
|
||||
|
||||
rocksdb::Status MergeCF(uint32_t column_family_id,
|
||||
const rocksdb::Slice& key,
|
||||
const rocksdb::Slice& value) override {
|
||||
return rocksdb::Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
struct CommitAction : TypedAction<Writer, CommitAction> {
|
||||
@ -894,7 +1030,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
}
|
||||
Standalone<VectorRef<KeyRangeRef>> deletes;
|
||||
DeleteVisitor dv(deletes, deletes.arena());
|
||||
ASSERT(a.batchToCommit->Iterate(&dv).ok());
|
||||
rocksdb::Status s = a.batchToCommit->Iterate(&dv);
|
||||
if (!s.ok()) {
|
||||
logRocksDBError(s, "CommitDeleteVisitor");
|
||||
a.done.sendError(statusToError(s));
|
||||
return;
|
||||
}
|
||||
// If there are any range deletes, we should have added them to be deleted.
|
||||
ASSERT(!deletes.empty() || !a.batchToCommit->HasDeleteRange());
|
||||
rocksdb::WriteOptions options;
|
||||
@ -906,7 +1047,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
// Request for batchToCommit bytes. If this request cannot be satisfied, the call is blocked.
|
||||
rateLimiter->Request(a.batchToCommit->GetDataSize() /* bytes */, rocksdb::Env::IO_HIGH);
|
||||
}
|
||||
auto s = db->Write(options, a.batchToCommit.get());
|
||||
s = db->Write(options, a.batchToCommit.get());
|
||||
readIterPool->update();
|
||||
if (a.getHistograms) {
|
||||
writeHistogram->sampleSeconds(timer_monotonic() - writeBeginTime);
|
||||
@ -922,7 +1063,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
for (const auto& keyRange : deletes) {
|
||||
auto begin = toSlice(keyRange.begin);
|
||||
auto end = toSlice(keyRange.end);
|
||||
ASSERT(db->SuggestCompactRange(db->DefaultColumnFamily(), &begin, &end).ok());
|
||||
ASSERT(db->SuggestCompactRange(cf, &begin, &end).ok());
|
||||
}
|
||||
if (a.getHistograms) {
|
||||
deleteCompactRangeHistogram->sampleSeconds(timer_monotonic() - compactRangeBeginTime);
|
||||
@ -956,9 +1097,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
logRocksDBError(s, "Close");
|
||||
}
|
||||
if (a.deleteOnClose) {
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||
"default", getCFOptions() } };
|
||||
s = rocksdb::DestroyDB(a.path, getOptions(), defaultCF);
|
||||
std::set<std::string> columnFamilies{ "default" };
|
||||
columnFamilies.insert(SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY);
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
|
||||
for (const std::string name : columnFamilies) {
|
||||
descriptors.push_back(rocksdb::ColumnFamilyDescriptor{ name, getCFOptions() });
|
||||
}
|
||||
s = rocksdb::DestroyDB(a.path, getOptions(), descriptors);
|
||||
if (!s.ok()) {
|
||||
logRocksDBError(s, "Destroy");
|
||||
} else {
|
||||
@ -968,10 +1113,133 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Close");
|
||||
a.done.send(Void());
|
||||
}
|
||||
|
||||
struct CheckpointAction : TypedAction<Writer, CheckpointAction> {
|
||||
CheckpointAction(const CheckpointRequest& request) : request(request) {}
|
||||
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
|
||||
const CheckpointRequest request;
|
||||
ThreadReturnPromise<CheckpointMetaData> reply;
|
||||
};
|
||||
|
||||
void action(CheckpointAction& a) {
|
||||
TraceEvent("RocksDBServeCheckpointBegin", id)
|
||||
.detail("MinVersion", a.request.version)
|
||||
.detail("Range", a.request.range.toString())
|
||||
.detail("Format", static_cast<int>(a.request.format))
|
||||
.detail("CheckpointDir", a.request.checkpointDir);
|
||||
|
||||
rocksdb::Checkpoint* checkpoint;
|
||||
rocksdb::Status s = rocksdb::Checkpoint::Create(db, &checkpoint);
|
||||
if (!s.ok()) {
|
||||
logRocksDBError(s, "Checkpoint");
|
||||
a.reply.sendError(statusToError(s));
|
||||
return;
|
||||
}
|
||||
|
||||
rocksdb::PinnableSlice value;
|
||||
rocksdb::ReadOptions readOptions = getReadOptions();
|
||||
s = db->Get(readOptions, cf, toSlice(persistVersion), &value);
|
||||
|
||||
if (!s.ok() && !s.IsNotFound()) {
|
||||
logRocksDBError(s, "Checkpoint");
|
||||
a.reply.sendError(statusToError(s));
|
||||
return;
|
||||
}
|
||||
|
||||
const Version version = s.IsNotFound()
|
||||
? latestVersion
|
||||
: BinaryReader::fromStringRef<Version>(toStringRef(value), Unversioned());
|
||||
|
||||
TraceEvent("RocksDBServeCheckpointVersion", id)
|
||||
.detail("CheckpointVersion", a.request.version)
|
||||
.detail("PersistVersion", version);
|
||||
|
||||
// TODO: set the range as the actual shard range.
|
||||
CheckpointMetaData res(version, a.request.range, a.request.format, a.request.checkpointID);
|
||||
const std::string& checkpointDir = a.request.checkpointDir;
|
||||
|
||||
if (a.request.format == RocksDBColumnFamily) {
|
||||
rocksdb::ExportImportFilesMetaData* pMetadata;
|
||||
platform::eraseDirectoryRecursive(checkpointDir);
|
||||
const std::string cwd = platform::getWorkingDirectory() + "/";
|
||||
s = checkpoint->ExportColumnFamily(cf, checkpointDir, &pMetadata);
|
||||
|
||||
if (!s.ok()) {
|
||||
logRocksDBError(s, "Checkpoint");
|
||||
a.reply.sendError(statusToError(s));
|
||||
return;
|
||||
}
|
||||
|
||||
populateMetaData(&res, *pMetadata);
|
||||
delete pMetadata;
|
||||
TraceEvent("RocksDBServeCheckpointSuccess", id)
|
||||
.detail("CheckpointMetaData", res.toString())
|
||||
.detail("RocksDBCF", getRocksCF(res).toString());
|
||||
} else {
|
||||
throw not_implemented();
|
||||
}
|
||||
|
||||
res.setState(CheckpointMetaData::Complete);
|
||||
a.reply.send(res);
|
||||
}
|
||||
|
||||
struct RestoreAction : TypedAction<Writer, RestoreAction> {
|
||||
RestoreAction(const std::string& path, const std::vector<CheckpointMetaData>& checkpoints)
|
||||
: path(path), checkpoints(checkpoints) {}
|
||||
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
|
||||
const std::string path;
|
||||
const std::vector<CheckpointMetaData> checkpoints;
|
||||
ThreadReturnPromise<Void> done;
|
||||
};
|
||||
|
||||
void action(RestoreAction& a) {
|
||||
TraceEvent("RocksDBServeRestoreBegin", id).detail("Path", a.path);
|
||||
|
||||
// TODO: Fail gracefully.
|
||||
ASSERT(!a.checkpoints.empty());
|
||||
|
||||
if (a.checkpoints[0].format == RocksDBColumnFamily) {
|
||||
ASSERT_EQ(a.checkpoints.size(), 1);
|
||||
TraceEvent("RocksDBServeRestoreCF", id)
|
||||
.detail("Path", a.path)
|
||||
.detail("Checkpoint", a.checkpoints[0].toString())
|
||||
.detail("RocksDBCF", getRocksCF(a.checkpoints[0]).toString());
|
||||
|
||||
auto options = getOptions();
|
||||
rocksdb::Status status = rocksdb::DB::Open(options, a.path, &db);
|
||||
|
||||
if (!status.ok()) {
|
||||
logRocksDBError(status, "Restore");
|
||||
a.done.sendError(statusToError(status));
|
||||
return;
|
||||
}
|
||||
|
||||
rocksdb::ExportImportFilesMetaData metaData = getMetaData(a.checkpoints[0]);
|
||||
rocksdb::ImportColumnFamilyOptions importOptions;
|
||||
importOptions.move_files = true;
|
||||
status = db->CreateColumnFamilyWithImport(
|
||||
getCFOptions(), SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, importOptions, metaData, &cf);
|
||||
|
||||
if (!status.ok()) {
|
||||
logRocksDBError(status, "Restore");
|
||||
a.done.sendError(statusToError(status));
|
||||
} else {
|
||||
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Restore");
|
||||
a.done.send(Void());
|
||||
}
|
||||
} else {
|
||||
throw not_implemented();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct Reader : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
CF& cf;
|
||||
double readValueTimeout;
|
||||
double readValuePrefixTimeout;
|
||||
double readRangeTimeout;
|
||||
@ -992,10 +1260,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
int threadIndex;
|
||||
|
||||
explicit Reader(DB& db,
|
||||
CF& cf,
|
||||
std::shared_ptr<ReadIteratorPool> readIterPool,
|
||||
std::shared_ptr<PerfContextMetrics> perfContextMetrics,
|
||||
int threadIndex)
|
||||
: db(db), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics), threadIndex(threadIndex),
|
||||
: db(db), cf(cf), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics),
|
||||
threadIndex(threadIndex),
|
||||
readRangeLatencyHistogram(Histogram::getHistogram(ROCKSDBSTORAGE_HISTOGRAM_GROUP,
|
||||
ROCKSDB_READRANGE_LATENCY_HISTOGRAM,
|
||||
Histogram::Unit::microseconds)),
|
||||
@ -1066,6 +1336,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadValueAction& a) {
|
||||
ASSERT(cf != nullptr);
|
||||
bool doPerfContextMetrics =
|
||||
SERVER_KNOBS->ROCKSDB_PERFCONTEXT_ENABLE &&
|
||||
(deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_PERFCONTEXT_SAMPLE_RATE);
|
||||
@ -1098,7 +1369,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
|
||||
|
||||
double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
auto s = db->Get(options, cf, toSlice(a.key), &value);
|
||||
if (!s.ok() && !s.IsNotFound()) {
|
||||
logRocksDBError(s, "ReadValue");
|
||||
a.result.sendError(statusToError(s));
|
||||
return;
|
||||
}
|
||||
|
||||
if (a.getHistograms) {
|
||||
readValueGetHistogram->sampleSeconds(timer_monotonic() - dbGetBeginTime);
|
||||
}
|
||||
@ -1175,7 +1452,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
|
||||
|
||||
double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0;
|
||||
auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
auto s = db->Get(options, cf, toSlice(a.key), &value);
|
||||
if (a.getHistograms) {
|
||||
readPrefixGetHistogram->sampleSeconds(timer_monotonic() - dbGetBeginTime);
|
||||
}
|
||||
@ -1330,6 +1607,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
DB db = nullptr;
|
||||
std::shared_ptr<PerfContextMetrics> perfContextMetrics;
|
||||
std::string path;
|
||||
rocksdb::ColumnFamilyHandle* defaultFdbCF = nullptr;
|
||||
UID id;
|
||||
Reference<IThreadPool> writeThread;
|
||||
Reference<IThreadPool> readThreads;
|
||||
@ -1357,7 +1635,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
Counters counters;
|
||||
|
||||
explicit RocksDBKeyValueStore(const std::string& path, UID id)
|
||||
: path(path), id(id), perfContextMetrics(new PerfContextMetrics()), readIterPool(new ReadIteratorPool(db, path)),
|
||||
: path(path), id(id), perfContextMetrics(new PerfContextMetrics()),
|
||||
readIterPool(new ReadIteratorPool(db, defaultFdbCF, path)),
|
||||
readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
|
||||
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
|
||||
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
|
||||
@ -1381,11 +1660,11 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
readThreads = createGenericThreadPool();
|
||||
}
|
||||
writeThread->addThread(
|
||||
new Writer(db, id, readIterPool, perfContextMetrics, SERVER_KNOBS->ROCKSDB_READ_PARALLELISM),
|
||||
new Writer(db, defaultFdbCF, id, readIterPool, perfContextMetrics, SERVER_KNOBS->ROCKSDB_READ_PARALLELISM),
|
||||
"fdb-rocksdb-wr");
|
||||
TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
|
||||
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
|
||||
readThreads->addThread(new Reader(db, readIterPool, perfContextMetrics, i), "fdb-rocksdb-re");
|
||||
readThreads->addThread(new Reader(db, defaultFdbCF, readIterPool, perfContextMetrics, i), "fdb-rocksdb-re");
|
||||
}
|
||||
}
|
||||
|
||||
@ -1429,7 +1708,8 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
if (writeBatch == nullptr) {
|
||||
writeBatch.reset(new rocksdb::WriteBatch());
|
||||
}
|
||||
writeBatch->Put(toSlice(kv.key), toSlice(kv.value));
|
||||
ASSERT(defaultFdbCF != nullptr);
|
||||
writeBatch->Put(defaultFdbCF, toSlice(kv.key), toSlice(kv.value));
|
||||
}
|
||||
|
||||
void clear(KeyRangeRef keyRange, const Arena*) override {
|
||||
@ -1437,10 +1717,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
writeBatch.reset(new rocksdb::WriteBatch());
|
||||
}
|
||||
|
||||
ASSERT(defaultFdbCF != nullptr);
|
||||
|
||||
if (keyRange.singleKeyRange()) {
|
||||
writeBatch->Delete(toSlice(keyRange.begin));
|
||||
writeBatch->Delete(defaultFdbCF, toSlice(keyRange.begin));
|
||||
} else {
|
||||
writeBatch->DeleteRange(toSlice(keyRange.begin), toSlice(keyRange.end));
|
||||
writeBatch->DeleteRange(defaultFdbCF, toSlice(keyRange.begin), toSlice(keyRange.end));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1587,6 +1869,46 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
|
||||
return StorageBytes(free, total, live, free);
|
||||
}
|
||||
|
||||
Future<CheckpointMetaData> checkpoint(const CheckpointRequest& request) override {
|
||||
auto a = new Writer::CheckpointAction(request);
|
||||
|
||||
auto res = a->reply.getFuture();
|
||||
writeThread->post(a);
|
||||
return res;
|
||||
}
|
||||
|
||||
Future<Void> restore(const std::vector<CheckpointMetaData>& checkpoints) override {
|
||||
auto a = new Writer::RestoreAction(path, checkpoints);
|
||||
auto res = a->done.getFuture();
|
||||
writeThread->post(a);
|
||||
return res;
|
||||
}
|
||||
|
||||
// Delete a checkpoint.
|
||||
Future<Void> deleteCheckpoint(const CheckpointMetaData& checkpoint) override {
|
||||
if (checkpoint.format == RocksDBColumnFamily) {
|
||||
RocksDBColumnFamilyCheckpoint rocksCF;
|
||||
ObjectReader reader(checkpoint.serializedCheckpoint.begin(), IncludeVersion());
|
||||
reader.deserialize(rocksCF);
|
||||
|
||||
std::unordered_set<std::string> dirs;
|
||||
for (const LiveFileMetaData& file : rocksCF.sstFiles) {
|
||||
dirs.insert(file.db_path);
|
||||
}
|
||||
for (const std::string dir : dirs) {
|
||||
platform::eraseDirectoryRecursive(dir);
|
||||
TraceEvent("DeleteCheckpointRemovedDir", id)
|
||||
.detail("CheckpointID", checkpoint.checkpointID)
|
||||
.detail("Dir", dir);
|
||||
}
|
||||
} else if (checkpoint.format == RocksDB) {
|
||||
throw not_implemented();
|
||||
} else {
|
||||
throw internal_error();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
@ -1701,6 +2023,61 @@ TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/RocksDBReopen") {
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/CheckpointRestore") {
|
||||
state std::string cwd = platform::getWorkingDirectory() + "/";
|
||||
state std::string rocksDBTestDir = "rocksdb-kvstore-br-test-db";
|
||||
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
||||
|
||||
state IKeyValueStore* kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
|
||||
wait(kvStore->init());
|
||||
|
||||
kvStore->set({ LiteralStringRef("foo"), LiteralStringRef("bar") });
|
||||
wait(kvStore->commit(false));
|
||||
|
||||
Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo")));
|
||||
ASSERT(Optional<Value>(LiteralStringRef("bar")) == val);
|
||||
|
||||
platform::eraseDirectoryRecursive("checkpoint");
|
||||
state std::string checkpointDir = cwd + "checkpoint";
|
||||
|
||||
CheckpointRequest request(
|
||||
latestVersion, allKeys, RocksDBColumnFamily, deterministicRandom()->randomUniqueID(), checkpointDir);
|
||||
CheckpointMetaData metaData = wait(kvStore->checkpoint(request));
|
||||
|
||||
state std::string rocksDBRestoreDir = "rocksdb-kvstore-br-restore-db";
|
||||
platform::eraseDirectoryRecursive(rocksDBRestoreDir);
|
||||
|
||||
state IKeyValueStore* kvStoreCopy =
|
||||
new RocksDBKeyValueStore(rocksDBRestoreDir, deterministicRandom()->randomUniqueID());
|
||||
|
||||
std::vector<CheckpointMetaData> checkpoints;
|
||||
checkpoints.push_back(metaData);
|
||||
wait(kvStoreCopy->restore(checkpoints));
|
||||
|
||||
Optional<Value> val = wait(kvStoreCopy->readValue(LiteralStringRef("foo")));
|
||||
ASSERT(Optional<Value>(LiteralStringRef("bar")) == val);
|
||||
|
||||
std::vector<Future<Void>> closes;
|
||||
closes.push_back(kvStore->onClosed());
|
||||
closes.push_back(kvStoreCopy->onClosed());
|
||||
kvStore->close();
|
||||
kvStoreCopy->close();
|
||||
wait(waitForAll(closes));
|
||||
|
||||
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
||||
platform::eraseDirectoryRecursive(rocksDBRestoreDir);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("noSim/fdbserver/KeyValueStoreRocksDB/RocksDBTypes") {
|
||||
// If the following assertion fails, update SstFileMetaData and LiveFileMetaData in RocksDBCheckpointUtils.actor.h
|
||||
// to be the same as rocksdb::SstFileMetaData and rocksdb::LiveFileMetaData.
|
||||
ASSERT_EQ(sizeof(rocksdb::LiveFileMetaData), 184);
|
||||
ASSERT_EQ(sizeof(rocksdb::ExportImportFilesMetaData), 32);
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
#endif // SSD_ROCKSDB_EXPERIMENTAL
|
||||
|
283
fdbserver/RocksDBCheckpointUtils.actor.cpp
Normal file
283
fdbserver/RocksDBCheckpointUtils.actor.cpp
Normal file
@ -0,0 +1,283 @@
|
||||
/*
|
||||
*RocksDBCheckpointUtils.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/RocksDBCheckpointUtils.actor.h"
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/StorageCheckpoint.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
namespace {
|
||||
|
||||
class RocksDBCheckpointReader : public ICheckpointReader {
|
||||
public:
|
||||
RocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID)
|
||||
: checkpoint_(checkpoint), id_(logID), file_(Reference<IAsyncFile>()), offset_(0) {}
|
||||
|
||||
Future<Void> init(StringRef token) override;
|
||||
|
||||
Future<RangeResult> nextKeyValues(const int rowLimit, const int byteLimit) override { throw not_implemented(); }
|
||||
|
||||
// Returns the next chunk of serialized checkpoint.
|
||||
Future<Standalone<StringRef>> nextChunk(const int byteLimit) override;
|
||||
|
||||
Future<Void> close() override;
|
||||
|
||||
private:
|
||||
ACTOR static Future<Void> doInit(RocksDBCheckpointReader* self) {
|
||||
ASSERT(self != nullptr);
|
||||
try {
|
||||
state Reference<IAsyncFile> _file = wait(IAsyncFileSystem::filesystem()->open(
|
||||
self->path_, IAsyncFile::OPEN_READONLY | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO, 0));
|
||||
self->file_ = _file;
|
||||
TraceEvent("RocksDBCheckpointReaderOpenFile").detail("File", self->path_);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarnAlways, "ServerGetCheckpointFileFailure")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("File", self->path_);
|
||||
throw e;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Standalone<StringRef>> getNextChunk(RocksDBCheckpointReader* self, int byteLimit) {
|
||||
int blockSize = std::min(64 * 1024, byteLimit); // Block size read from disk.
|
||||
state Standalone<StringRef> buf = makeAlignedString(_PAGE_SIZE, blockSize);
|
||||
int bytesRead = wait(self->file_->read(mutateString(buf), blockSize, self->offset_));
|
||||
if (bytesRead == 0) {
|
||||
throw end_of_stream();
|
||||
}
|
||||
|
||||
self->offset_ += bytesRead;
|
||||
return buf.substr(0, bytesRead);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> doClose(RocksDBCheckpointReader* self) {
|
||||
wait(delay(0, TaskPriority::FetchKeys));
|
||||
delete self;
|
||||
return Void();
|
||||
}
|
||||
|
||||
CheckpointMetaData checkpoint_;
|
||||
UID id_;
|
||||
Reference<IAsyncFile> file_;
|
||||
int offset_;
|
||||
std::string path_;
|
||||
};
|
||||
|
||||
Future<Void> RocksDBCheckpointReader::init(StringRef token) {
|
||||
ASSERT_EQ(this->checkpoint_.getFormat(), RocksDBColumnFamily);
|
||||
const std::string name = token.toString();
|
||||
this->offset_ = 0;
|
||||
this->path_.clear();
|
||||
const RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(this->checkpoint_);
|
||||
for (const auto& sstFile : rocksCF.sstFiles) {
|
||||
if (sstFile.name == name) {
|
||||
this->path_ = sstFile.db_path + sstFile.name;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (this->path_.empty()) {
|
||||
TraceEvent("RocksDBCheckpointReaderInitFileNotFound").detail("File", this->path_);
|
||||
return checkpoint_not_found();
|
||||
}
|
||||
|
||||
return doInit(this);
|
||||
}
|
||||
|
||||
Future<Standalone<StringRef>> RocksDBCheckpointReader::nextChunk(const int byteLimit) {
|
||||
return getNextChunk(this, byteLimit);
|
||||
}
|
||||
|
||||
Future<Void> RocksDBCheckpointReader::close() {
|
||||
return doClose(this);
|
||||
}
|
||||
|
||||
// Fetch a single sst file from storage server. If the file is fetch successfully, it will be recorded via cFun.
|
||||
ACTOR Future<Void> fetchCheckpointFile(Database cx,
|
||||
std::shared_ptr<CheckpointMetaData> metaData,
|
||||
int idx,
|
||||
std::string dir,
|
||||
std::function<Future<Void>(const CheckpointMetaData&)> cFun,
|
||||
int maxRetries = 3) {
|
||||
state RocksDBColumnFamilyCheckpoint rocksCF;
|
||||
ObjectReader reader(metaData->serializedCheckpoint.begin(), IncludeVersion());
|
||||
reader.deserialize(rocksCF);
|
||||
|
||||
// Skip fetched file.
|
||||
if (rocksCF.sstFiles[idx].fetched && rocksCF.sstFiles[idx].db_path == dir) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
state std::string remoteFile = rocksCF.sstFiles[idx].name;
|
||||
state std::string localFile = dir + rocksCF.sstFiles[idx].name;
|
||||
state UID ssID = metaData->ssID;
|
||||
|
||||
state Transaction tr(cx);
|
||||
state StorageServerInterface ssi;
|
||||
loop {
|
||||
try {
|
||||
Optional<Value> ss = wait(tr.get(serverListKeyFor(ssID)));
|
||||
if (!ss.present()) {
|
||||
throw checkpoint_not_found();
|
||||
}
|
||||
ssi = decodeServerListValue(ss.get());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
state int attempt = 0;
|
||||
loop {
|
||||
try {
|
||||
++attempt;
|
||||
TraceEvent("FetchCheckpointFileBegin")
|
||||
.detail("RemoteFile", remoteFile)
|
||||
.detail("TargetUID", ssID.toString())
|
||||
.detail("StorageServer", ssi.id().toString())
|
||||
.detail("LocalFile", localFile)
|
||||
.detail("Attempt", attempt);
|
||||
|
||||
wait(IAsyncFileSystem::filesystem()->deleteFile(localFile, true));
|
||||
const int64_t flags = IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE |
|
||||
IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_NO_AIO;
|
||||
state int64_t offset = 0;
|
||||
state Reference<IAsyncFile> asyncFile = wait(IAsyncFileSystem::filesystem()->open(localFile, flags, 0666));
|
||||
|
||||
state ReplyPromiseStream<FetchCheckpointReply> stream =
|
||||
ssi.fetchCheckpoint.getReplyStream(FetchCheckpointRequest(metaData->checkpointID, remoteFile));
|
||||
TraceEvent("FetchCheckpointFileReceivingData")
|
||||
.detail("RemoteFile", remoteFile)
|
||||
.detail("TargetUID", ssID.toString())
|
||||
.detail("StorageServer", ssi.id().toString())
|
||||
.detail("LocalFile", localFile)
|
||||
.detail("Attempt", attempt);
|
||||
loop {
|
||||
state FetchCheckpointReply rep = waitNext(stream.getFuture());
|
||||
wait(asyncFile->write(rep.data.begin(), rep.data.size(), offset));
|
||||
wait(asyncFile->flush());
|
||||
offset += rep.data.size();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_end_of_stream) {
|
||||
TraceEvent("FetchCheckpointFileError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("RemoteFile", remoteFile)
|
||||
.detail("StorageServer", ssi.toString())
|
||||
.detail("LocalFile", localFile)
|
||||
.detail("Attempt", attempt);
|
||||
if (attempt >= maxRetries) {
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
wait(asyncFile->sync());
|
||||
int64_t fileSize = wait(asyncFile->size());
|
||||
TraceEvent("FetchCheckpointFileEnd")
|
||||
.detail("RemoteFile", remoteFile)
|
||||
.detail("StorageServer", ssi.toString())
|
||||
.detail("LocalFile", localFile)
|
||||
.detail("Attempt", attempt)
|
||||
.detail("DataSize", offset)
|
||||
.detail("FileSize", fileSize);
|
||||
rocksCF.sstFiles[idx].db_path = dir;
|
||||
rocksCF.sstFiles[idx].fetched = true;
|
||||
metaData->serializedCheckpoint = ObjectWriter::toValue(rocksCF, IncludeVersion());
|
||||
if (cFun) {
|
||||
wait(cFun(*metaData));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
|
||||
CheckpointMetaData initialState,
|
||||
std::string dir,
|
||||
std::function<Future<Void>(const CheckpointMetaData&)> cFun) {
|
||||
TraceEvent("FetchRocksCheckpointBegin")
|
||||
.detail("InitialState", initialState.toString())
|
||||
.detail("CheckpointDir", dir);
|
||||
|
||||
state std::shared_ptr<CheckpointMetaData> metaData = std::make_shared<CheckpointMetaData>(initialState);
|
||||
|
||||
if (metaData->format == RocksDBColumnFamily) {
|
||||
state RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(initialState);
|
||||
TraceEvent("RocksDBCheckpointMetaData").detail("RocksCF", rocksCF.toString());
|
||||
|
||||
state int i = 0;
|
||||
state std::vector<Future<Void>> fs;
|
||||
for (; i < rocksCF.sstFiles.size(); ++i) {
|
||||
fs.push_back(fetchCheckpointFile(cx, metaData, i, dir, cFun));
|
||||
TraceEvent("GetCheckpointFetchingFile")
|
||||
.detail("FileName", rocksCF.sstFiles[i].name)
|
||||
.detail("Server", metaData->ssID.toString());
|
||||
}
|
||||
wait(waitForAll(fs));
|
||||
} else {
|
||||
throw not_implemented();
|
||||
}
|
||||
|
||||
return *metaData;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteRocksCFCheckpoint(CheckpointMetaData checkpoint) {
|
||||
ASSERT_EQ(checkpoint.getFormat(), RocksDBColumnFamily);
|
||||
RocksDBColumnFamilyCheckpoint rocksCF = getRocksCF(checkpoint);
|
||||
TraceEvent("DeleteRocksColumnFamilyCheckpoint", checkpoint.checkpointID)
|
||||
.detail("CheckpointID", checkpoint.checkpointID)
|
||||
.detail("RocksCF", rocksCF.toString());
|
||||
|
||||
state std::unordered_set<std::string> dirs;
|
||||
for (const LiveFileMetaData& file : rocksCF.sstFiles) {
|
||||
dirs.insert(file.db_path);
|
||||
}
|
||||
|
||||
state std::unordered_set<std::string>::iterator it = dirs.begin();
|
||||
for (; it != dirs.end(); ++it) {
|
||||
const std::string dir = *it;
|
||||
platform::eraseDirectoryRecursive(dir);
|
||||
TraceEvent("DeleteCheckpointRemovedDir", checkpoint.checkpointID)
|
||||
.detail("CheckpointID", checkpoint.checkpointID)
|
||||
.detail("Dir", dir);
|
||||
wait(delay(0, TaskPriority::FetchKeys));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID) {
|
||||
return new RocksDBCheckpointReader(checkpoint, logID);
|
||||
}
|
||||
|
||||
RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint) {
|
||||
RocksDBColumnFamilyCheckpoint rocksCF;
|
||||
ObjectReader reader(checkpoint.serializedCheckpoint.begin(), IncludeVersion());
|
||||
reader.deserialize(rocksCF);
|
||||
return rocksCF;
|
||||
}
|
209
fdbserver/RocksDBCheckpointUtils.actor.h
Normal file
209
fdbserver/RocksDBCheckpointUtils.actor.h
Normal file
@ -0,0 +1,209 @@
|
||||
/*
|
||||
*RocksDBCheckpointUtils.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_ROCKSDB_CHECKPOINT_UTILS_ACTOR_G_H)
|
||||
#define FDBSERVER_ROCKSDB_CHECKPOINT_UTILS_ACTOR_G_H
|
||||
#include "fdbserver/RocksDBCheckpointUtils.actor.g.h"
|
||||
#elif !defined(FDBSERVER_ROCKSDB_CHECKPOINT_UTILS_ACTOR_H)
|
||||
#define FDBSERVER_ROCKSDB_CHECKPOINT_UTILS_ACTOR_H
|
||||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/ServerCheckpoint.actor.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
// Copied from rocksdb/metadata.h, so that we can add serializer.
|
||||
struct SstFileMetaData {
|
||||
constexpr static FileIdentifier file_identifier = 3804347;
|
||||
SstFileMetaData()
|
||||
: size(0), file_number(0), smallest_seqno(0), largest_seqno(0), num_reads_sampled(0), being_compacted(false),
|
||||
num_entries(0), num_deletions(0), temperature(0), oldest_blob_file_number(0), oldest_ancester_time(0),
|
||||
file_creation_time(0) {}
|
||||
|
||||
SstFileMetaData(const std::string& _file_name,
|
||||
uint64_t _file_number,
|
||||
const std::string& _path,
|
||||
size_t _size,
|
||||
uint64_t _smallest_seqno,
|
||||
uint64_t _largest_seqno,
|
||||
const std::string& _smallestkey,
|
||||
const std::string& _largestkey,
|
||||
uint64_t _num_reads_sampled,
|
||||
bool _being_compacted,
|
||||
int _temperature,
|
||||
uint64_t _oldest_blob_file_number,
|
||||
uint64_t _oldest_ancester_time,
|
||||
uint64_t _file_creation_time,
|
||||
std::string& _file_checksum,
|
||||
std::string& _file_checksum_func_name)
|
||||
: size(_size), name(_file_name), file_number(_file_number), db_path(_path), smallest_seqno(_smallest_seqno),
|
||||
largest_seqno(_largest_seqno), smallestkey(_smallestkey), largestkey(_largestkey),
|
||||
num_reads_sampled(_num_reads_sampled), being_compacted(_being_compacted), num_entries(0), num_deletions(0),
|
||||
temperature(_temperature), oldest_blob_file_number(_oldest_blob_file_number),
|
||||
oldest_ancester_time(_oldest_ancester_time), file_creation_time(_file_creation_time),
|
||||
file_checksum(_file_checksum), file_checksum_func_name(_file_checksum_func_name) {}
|
||||
|
||||
// File size in bytes.
|
||||
size_t size;
|
||||
// The name of the file.
|
||||
std::string name;
|
||||
// The id of the file.
|
||||
uint64_t file_number;
|
||||
// The full path where the file locates.
|
||||
std::string db_path;
|
||||
|
||||
uint64_t smallest_seqno; // Smallest sequence number in file.
|
||||
uint64_t largest_seqno; // Largest sequence number in file.
|
||||
std::string smallestkey; // Smallest user defined key in the file.
|
||||
std::string largestkey; // Largest user defined key in the file.
|
||||
uint64_t num_reads_sampled; // How many times the file is read.
|
||||
bool being_compacted; // true if the file is currently being compacted.
|
||||
|
||||
uint64_t num_entries;
|
||||
uint64_t num_deletions;
|
||||
|
||||
// This feature is experimental and subject to change.
|
||||
int temperature;
|
||||
|
||||
uint64_t oldest_blob_file_number; // The id of the oldest blob file
|
||||
// referenced by the file.
|
||||
// An SST file may be generated by compactions whose input files may
|
||||
// in turn be generated by earlier compactions. The creation time of the
|
||||
// oldest SST file that is the compaction ancestor of this file.
|
||||
// The timestamp is provided SystemClock::GetCurrentTime().
|
||||
// 0 if the information is not available.
|
||||
//
|
||||
// Note: for TTL blob files, it contains the start of the expiration range.
|
||||
uint64_t oldest_ancester_time;
|
||||
// Timestamp when the SST file is created, provided by
|
||||
// SystemClock::GetCurrentTime(). 0 if the information is not available.
|
||||
uint64_t file_creation_time;
|
||||
|
||||
// The checksum of a SST file, the value is decided by the file content and
|
||||
// the checksum algorithm used for this SST file. The checksum function is
|
||||
// identified by the file_checksum_func_name. If the checksum function is
|
||||
// not specified, file_checksum is "0" by default.
|
||||
std::string file_checksum;
|
||||
|
||||
// The name of the checksum function used to generate the file checksum
|
||||
// value. If file checksum is not enabled (e.g., sst_file_checksum_func is
|
||||
// null), file_checksum_func_name is UnknownFileChecksumFuncName, which is
|
||||
// "Unknown".
|
||||
std::string file_checksum_func_name;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar,
|
||||
size,
|
||||
name,
|
||||
file_number,
|
||||
db_path,
|
||||
smallest_seqno,
|
||||
largest_seqno,
|
||||
smallestkey,
|
||||
largestkey,
|
||||
num_reads_sampled,
|
||||
being_compacted,
|
||||
num_entries,
|
||||
num_deletions,
|
||||
temperature,
|
||||
oldest_blob_file_number,
|
||||
oldest_ancester_time,
|
||||
file_creation_time,
|
||||
file_checksum,
|
||||
file_checksum_func_name);
|
||||
}
|
||||
};
|
||||
|
||||
// Copied from rocksdb::LiveFileMetaData.
|
||||
struct LiveFileMetaData : public SstFileMetaData {
|
||||
constexpr static FileIdentifier file_identifier = 3804346;
|
||||
std::string column_family_name; // Name of the column family
|
||||
int level; // Level at which this file resides.
|
||||
bool fetched;
|
||||
LiveFileMetaData() : column_family_name(), level(0), fetched(false) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar,
|
||||
SstFileMetaData::size,
|
||||
SstFileMetaData::name,
|
||||
SstFileMetaData::file_number,
|
||||
SstFileMetaData::db_path,
|
||||
SstFileMetaData::smallest_seqno,
|
||||
SstFileMetaData::largest_seqno,
|
||||
SstFileMetaData::smallestkey,
|
||||
SstFileMetaData::largestkey,
|
||||
SstFileMetaData::num_reads_sampled,
|
||||
SstFileMetaData::being_compacted,
|
||||
SstFileMetaData::num_entries,
|
||||
SstFileMetaData::num_deletions,
|
||||
SstFileMetaData::temperature,
|
||||
SstFileMetaData::oldest_blob_file_number,
|
||||
SstFileMetaData::oldest_ancester_time,
|
||||
SstFileMetaData::file_creation_time,
|
||||
SstFileMetaData::file_checksum,
|
||||
SstFileMetaData::file_checksum_func_name,
|
||||
column_family_name,
|
||||
level,
|
||||
fetched);
|
||||
}
|
||||
};
|
||||
|
||||
// Checkpoint metadata associated with RockDBColumnFamily format.
|
||||
// Based on rocksdb::ExportImportFilesMetaData.
|
||||
struct RocksDBColumnFamilyCheckpoint {
|
||||
constexpr static FileIdentifier file_identifier = 13804346;
|
||||
std::string dbComparatorName;
|
||||
|
||||
std::vector<LiveFileMetaData> sstFiles;
|
||||
|
||||
CheckpointFormat format() const { return RocksDBColumnFamily; }
|
||||
|
||||
std::string toString() const {
|
||||
std::string res = "RocksDBColumnFamilyCheckpoint:\nSST Files:\n";
|
||||
for (const auto& file : sstFiles) {
|
||||
res += file.db_path + file.name + "\n";
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, dbComparatorName, sstFiles);
|
||||
}
|
||||
};
|
||||
|
||||
// Fetch the checkpoint file(s) to local dir, the checkpoint is specified by initialState.
|
||||
// If cFun is provided, the fetch progress can be checkpointed, so that next time, the fetch process
|
||||
// can be continued, in case of crash.
|
||||
ACTOR Future<CheckpointMetaData> fetchRocksDBCheckpoint(Database cx,
|
||||
CheckpointMetaData initialState,
|
||||
std::string dir,
|
||||
std::function<Future<Void>(const CheckpointMetaData&)> cFun);
|
||||
|
||||
ACTOR Future<Void> deleteRocksCFCheckpoint(CheckpointMetaData checkpoint);
|
||||
|
||||
ICheckpointReader* newRocksDBCheckpointReader(const CheckpointMetaData& checkpoint, UID logID);
|
||||
|
||||
RocksDBColumnFamilyCheckpoint getRocksCF(const CheckpointMetaData& checkpoint);
|
||||
#endif
|
67
fdbserver/ServerCheckpoint.actor.cpp
Normal file
67
fdbserver/ServerCheckpoint.actor.cpp
Normal file
@ -0,0 +1,67 @@
|
||||
/*
|
||||
*ServerCheckpoint.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/ServerCheckpoint.actor.h"
|
||||
#include "fdbserver/RocksDBCheckpointUtils.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
ICheckpointReader* newCheckpointReader(const CheckpointMetaData& checkpoint, UID logID) {
|
||||
if (checkpoint.getFormat() == RocksDBColumnFamily) {
|
||||
return newRocksDBCheckpointReader(checkpoint, logID);
|
||||
} else if (checkpoint.getFormat() == RocksDB) {
|
||||
throw not_implemented();
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteCheckpoint(CheckpointMetaData checkpoint) {
|
||||
wait(delay(0, TaskPriority::FetchKeys));
|
||||
|
||||
if (checkpoint.getFormat() == RocksDBColumnFamily) {
|
||||
wait(deleteRocksCFCheckpoint(checkpoint));
|
||||
} else if (checkpoint.getFormat() == RocksDB) {
|
||||
throw not_implemented();
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<CheckpointMetaData> fetchCheckpoint(Database cx,
|
||||
CheckpointMetaData initialState,
|
||||
std::string dir,
|
||||
std::function<Future<Void>(const CheckpointMetaData&)> cFun) {
|
||||
state CheckpointMetaData result;
|
||||
if (initialState.getFormat() == RocksDBColumnFamily) {
|
||||
CheckpointMetaData _result = wait(fetchRocksDBCheckpoint(cx, initialState, dir, cFun));
|
||||
result = _result;
|
||||
} else if (initialState.getFormat() == RocksDB) {
|
||||
throw not_implemented();
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
66
fdbserver/ServerCheckpoint.actor.h
Normal file
66
fdbserver/ServerCheckpoint.actor.h
Normal file
@ -0,0 +1,66 @@
|
||||
/*
|
||||
*ServerCheckpoint.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_SERVER_CHECKPOINT_ACTOR_G_H)
|
||||
#define FDBSERVER_SERVER_CHECKPOINT_ACTOR_G_H
|
||||
#include "fdbserver/ServerCheckpoint.actor.g.h"
|
||||
#elif !defined(FDBSERVER_SERVER_CHECKPOINT_ACTOR_H)
|
||||
#define FDBSERVER_SERVER_CHECKPOINT_ACTOR_H
|
||||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/StorageCheckpoint.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
// An ICheckpointReader can read the contents of a checkpoint created from a KV store,
|
||||
// i.e., by IKeyValueStore::checkpoint().
|
||||
class ICheckpointReader {
|
||||
public:
|
||||
// `token` is a serialized object defined by each derived ICheckpointReader class, to specify the
|
||||
// starting point for the underlying checkpoint.
|
||||
virtual Future<Void> init(StringRef token) = 0;
|
||||
|
||||
// Scans the checkpoint, and returns the key-value pairs.
|
||||
virtual Future<RangeResult> nextKeyValues(const int rowLimit, const int ByteLimit) = 0;
|
||||
|
||||
// Returns the next chunk of the serialized checkpoint.
|
||||
virtual Future<Standalone<StringRef>> nextChunk(const int ByteLimit) = 0;
|
||||
|
||||
virtual Future<Void> close() = 0;
|
||||
|
||||
protected:
|
||||
virtual ~ICheckpointReader() {}
|
||||
};
|
||||
|
||||
ICheckpointReader* newCheckpointReader(const CheckpointMetaData& checkpoint, UID logID);
|
||||
|
||||
// Delete a checkpoint.
|
||||
ACTOR Future<Void> deleteCheckpoint(CheckpointMetaData checkpoint);
|
||||
|
||||
// Fetchs checkpoint to a local `dir`, `initialState` provides the checkpoint formats, location, restart point, etc.
|
||||
// If cFun is provided, the progress can be checkpointed.
|
||||
// Returns a CheckpointMetaData, which could contain KVS-specific results, e.g., the list of fetched checkpoint files.
|
||||
ACTOR Future<CheckpointMetaData> fetchCheckpoint(Database cx,
|
||||
CheckpointMetaData initialState,
|
||||
std::string dir,
|
||||
std::function<Future<Void>(const CheckpointMetaData&)> cFun = nullptr);
|
||||
#endif
|
@ -58,6 +58,7 @@
|
||||
#include "fdbserver/MutationTracking.h"
|
||||
#include "fdbserver/RecoveryState.h"
|
||||
#include "fdbserver/StorageMetrics.h"
|
||||
#include "fdbserver/ServerCheckpoint.actor.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/TLogInterface.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
@ -104,6 +105,46 @@ bool canReplyWith(Error e) {
|
||||
}
|
||||
} // namespace
|
||||
|
||||
#define PERSIST_PREFIX "\xff\xff"
|
||||
|
||||
// Immutable
|
||||
static const KeyValueRef persistFormat(LiteralStringRef(PERSIST_PREFIX "Format"),
|
||||
LiteralStringRef("FoundationDB/StorageServer/1/4"));
|
||||
static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("FoundationDB/StorageServer/1/2"),
|
||||
LiteralStringRef("FoundationDB/StorageServer/1/5"));
|
||||
static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID");
|
||||
static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID");
|
||||
static const KeyRef persistSSPairID = LiteralStringRef(PERSIST_PREFIX "ssWithTSSPairID");
|
||||
static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ");
|
||||
static const KeyRef persistClusterIdKey = LiteralStringRef(PERSIST_PREFIX "clusterId");
|
||||
|
||||
// (Potentially) change with the durable version or when fetchKeys completes
|
||||
static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version");
|
||||
static const KeyRangeRef persistShardAssignedKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAssigned/"), LiteralStringRef(PERSIST_PREFIX "ShardAssigned0"));
|
||||
static const KeyRangeRef persistShardAvailableKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAvailable/"), LiteralStringRef(PERSIST_PREFIX "ShardAvailable0"));
|
||||
static const KeyRangeRef persistByteSampleKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/"), LiteralStringRef(PERSIST_PREFIX "BS0"));
|
||||
static const KeyRangeRef persistByteSampleSampleKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/"),
|
||||
LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0"));
|
||||
static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol");
|
||||
static const KeyRef persistPrimaryLocality = LiteralStringRef(PERSIST_PREFIX "PrimaryLocality");
|
||||
static const KeyRangeRef persistChangeFeedKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "RF/"), LiteralStringRef(PERSIST_PREFIX "RF0"));
|
||||
static const KeyRangeRef persistTenantMapKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "TM/"), LiteralStringRef(PERSIST_PREFIX "TM0"));
|
||||
// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys)
|
||||
|
||||
// Checkpoint related prefixes.
|
||||
static const KeyRangeRef persistCheckpointKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "Checkpoint/"), LiteralStringRef(PERSIST_PREFIX "Checkpoint0"));
|
||||
static const KeyRangeRef persistPendingCheckpointKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "PendingCheckpoint/"),
|
||||
LiteralStringRef(PERSIST_PREFIX "PendingCheckpoint0"));
|
||||
static const std::string rocksdbCheckpointDirPrefix = "/rockscheckpoints_";
|
||||
|
||||
struct AddingShard : NonCopyable {
|
||||
KeyRange keys;
|
||||
Future<Void> fetchClient; // holds FetchKeys() actor
|
||||
@ -241,6 +282,14 @@ struct StorageServerDisk {
|
||||
return storage->readRange(keys, rowLimit, byteLimit, type);
|
||||
}
|
||||
|
||||
Future<CheckpointMetaData> checkpoint(const CheckpointRequest& request) { return storage->checkpoint(request); }
|
||||
|
||||
Future<Void> restore(const std::vector<CheckpointMetaData>& checkpoints) { return storage->restore(checkpoints); }
|
||||
|
||||
Future<Void> deleteCheckpoint(const CheckpointMetaData& checkpoint) {
|
||||
return storage->deleteCheckpoint(checkpoint);
|
||||
}
|
||||
|
||||
KeyValueStoreType getKeyValueStoreType() const { return storage->getType(); }
|
||||
StorageBytes getStorageBytes() const { return storage->getStorageBytes(); }
|
||||
std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); }
|
||||
@ -418,6 +467,8 @@ private:
|
||||
std::unordered_map<KeyRef, Reference<ServerWatchMetadata>> watchMap; // keep track of server watches
|
||||
|
||||
public:
|
||||
std::map<Version, std::vector<CheckpointMetaData>> pendingCheckpoints; // Pending checkpoint requests
|
||||
std::unordered_map<UID, CheckpointMetaData> checkpoints; // Existing and deleting checkpoints
|
||||
TenantMap tenantMap;
|
||||
TenantPrefixIndex tenantPrefixIndex;
|
||||
|
||||
@ -1727,6 +1778,116 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Finds a checkpoint.
|
||||
ACTOR Future<Void> getCheckpointQ(StorageServer* self, GetCheckpointRequest req) {
|
||||
// Wait until the desired version is durable.
|
||||
wait(self->durableVersion.whenAtLeast(req.version + 1));
|
||||
|
||||
TraceEvent(SevDebug, "ServeGetCheckpointVersionSatisfied", self->thisServerID)
|
||||
.detail("Version", req.version)
|
||||
.detail("Range", req.range.toString())
|
||||
.detail("Format", static_cast<int>(req.format));
|
||||
|
||||
try {
|
||||
std::unordered_map<UID, CheckpointMetaData>::iterator it = self->checkpoints.begin();
|
||||
for (; it != self->checkpoints.end(); ++it) {
|
||||
const CheckpointMetaData& md = it->second;
|
||||
if (md.version == req.version && md.format == req.format && md.range.contains(req.range) &&
|
||||
md.getState() == CheckpointMetaData::Complete) {
|
||||
req.reply.send(md);
|
||||
TraceEvent(SevDebug, "ServeGetCheckpointEnd", self->thisServerID).detail("Checkpoint", md.toString());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (it == self->checkpoints.end()) {
|
||||
req.reply.sendError(checkpoint_not_found());
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (!canReplyWith(e)) {
|
||||
throw;
|
||||
}
|
||||
req.reply.sendError(e);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Delete the checkpoint from disk, as well as all related presisted meta data.
|
||||
ACTOR Future<Void> deleteCheckpointQ(StorageServer* self, Version version, CheckpointMetaData checkpoint) {
|
||||
wait(self->durableVersion.whenAtLeast(version));
|
||||
|
||||
TraceEvent("DeleteCheckpointBegin", self->thisServerID).detail("Checkpoint", checkpoint.toString());
|
||||
|
||||
self->checkpoints.erase(checkpoint.checkpointID);
|
||||
|
||||
try {
|
||||
wait(deleteCheckpoint(checkpoint));
|
||||
} catch (Error& e) {
|
||||
// TODO: Handle errors more gracefully.
|
||||
throw;
|
||||
}
|
||||
|
||||
state Key persistCheckpointKey(persistCheckpointKeys.begin.toString() + checkpoint.checkpointID.toString());
|
||||
state Key pendingCheckpointKey(persistPendingCheckpointKeys.begin.toString() + checkpoint.checkpointID.toString());
|
||||
Version version = self->data().getLatestVersion();
|
||||
auto& mLV = self->addVersionToMutationLog(version);
|
||||
self->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::ClearRange, pendingCheckpointKey, keyAfter(pendingCheckpointKey)));
|
||||
self->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::ClearRange, persistCheckpointKey, keyAfter(persistCheckpointKey)));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Serves FetchCheckpointRequests.
|
||||
ACTOR Future<Void> fetchCheckpointQ(StorageServer* self, FetchCheckpointRequest req) {
|
||||
TraceEvent("ServeFetchCheckpointBegin", self->thisServerID)
|
||||
.detail("CheckpointID", req.checkpointID)
|
||||
.detail("Token", req.token);
|
||||
|
||||
req.reply.setByteLimit(SERVER_KNOBS->CHECKPOINT_TRANSFER_BLOCK_BYTES);
|
||||
|
||||
// Returns error is the checkpoint cannot be found.
|
||||
const auto it = self->checkpoints.find(req.checkpointID);
|
||||
if (it == self->checkpoints.end()) {
|
||||
req.reply.sendError(checkpoint_not_found());
|
||||
TraceEvent("ServeFetchCheckpointNotFound", self->thisServerID).detail("CheckpointID", req.checkpointID);
|
||||
return Void();
|
||||
}
|
||||
|
||||
try {
|
||||
state ICheckpointReader* reader = newCheckpointReader(it->second, deterministicRandom()->randomUniqueID());
|
||||
wait(reader->init(req.token));
|
||||
|
||||
loop {
|
||||
state Standalone<StringRef> data = wait(reader->nextChunk(CLIENT_KNOBS->REPLY_BYTE_LIMIT));
|
||||
wait(req.reply.onReady());
|
||||
FetchCheckpointReply reply(req.token);
|
||||
reply.data = data;
|
||||
req.reply.send(reply);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_end_of_stream) {
|
||||
req.reply.sendError(end_of_stream());
|
||||
TraceEvent("ServeFetchCheckpointEnd", self->thisServerID)
|
||||
.detail("CheckpointID", req.checkpointID)
|
||||
.detail("Token", req.token);
|
||||
} else {
|
||||
TraceEvent(SevWarnAlways, "ServerFetchCheckpointFailure")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("CheckpointID", req.checkpointID)
|
||||
.detail("Token", req.token);
|
||||
if (!canReplyWith(e)) {
|
||||
throw e;
|
||||
}
|
||||
req.reply.sendError(e);
|
||||
}
|
||||
}
|
||||
|
||||
wait(reader->close());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) {
|
||||
wait(delay(0));
|
||||
wait(data->version.whenAtLeast(req.minVersion));
|
||||
@ -4165,38 +4326,6 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
|
||||
}
|
||||
}
|
||||
|
||||
#define PERSIST_PREFIX "\xff\xff"
|
||||
|
||||
// Immutable
|
||||
static const KeyValueRef persistFormat(LiteralStringRef(PERSIST_PREFIX "Format"),
|
||||
LiteralStringRef("FoundationDB/StorageServer/1/4"));
|
||||
static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("FoundationDB/StorageServer/1/2"),
|
||||
LiteralStringRef("FoundationDB/StorageServer/1/5"));
|
||||
static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID");
|
||||
static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID");
|
||||
static const KeyRef persistSSPairID = LiteralStringRef(PERSIST_PREFIX "ssWithTSSPairID");
|
||||
static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ");
|
||||
static const KeyRef persistClusterIdKey = LiteralStringRef(PERSIST_PREFIX "clusterId");
|
||||
|
||||
// (Potentially) change with the durable version or when fetchKeys completes
|
||||
static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version");
|
||||
static const KeyRangeRef persistShardAssignedKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAssigned/"), LiteralStringRef(PERSIST_PREFIX "ShardAssigned0"));
|
||||
static const KeyRangeRef persistShardAvailableKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "ShardAvailable/"), LiteralStringRef(PERSIST_PREFIX "ShardAvailable0"));
|
||||
static const KeyRangeRef persistByteSampleKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/"), LiteralStringRef(PERSIST_PREFIX "BS0"));
|
||||
static const KeyRangeRef persistByteSampleSampleKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS/"),
|
||||
LiteralStringRef(PERSIST_PREFIX "BS/" PERSIST_PREFIX "BS0"));
|
||||
static const KeyRef persistLogProtocol = LiteralStringRef(PERSIST_PREFIX "LogProtocol");
|
||||
static const KeyRef persistPrimaryLocality = LiteralStringRef(PERSIST_PREFIX "PrimaryLocality");
|
||||
static const KeyRangeRef persistChangeFeedKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "RF/"), LiteralStringRef(PERSIST_PREFIX "RF0"));
|
||||
static const KeyRangeRef persistTenantMapKeys =
|
||||
KeyRangeRef(LiteralStringRef(PERSIST_PREFIX "TM/"), LiteralStringRef(PERSIST_PREFIX "TM0"));
|
||||
// data keys are unmangled (but never start with PERSIST_PREFIX because they are always in allKeys)
|
||||
|
||||
ACTOR Future<Void> fetchChangeFeedApplier(StorageServer* data,
|
||||
Reference<ChangeFeedInfo> changeFeedInfo,
|
||||
Key rangeId,
|
||||
@ -5064,9 +5193,11 @@ public:
|
||||
}
|
||||
|
||||
if (m.param1.startsWith(systemKeys.end)) {
|
||||
if ((m.type == MutationRef::SetValue) && m.param1.substr(1).startsWith(storageCachePrefix))
|
||||
if ((m.type == MutationRef::SetValue) && m.param1.substr(1).startsWith(storageCachePrefix)) {
|
||||
applyPrivateCacheData(data, m);
|
||||
else {
|
||||
} else if ((m.type == MutationRef::SetValue) && m.param1.substr(1).startsWith(checkpointPrefix)) {
|
||||
registerPendingCheckpoint(data, m, ver);
|
||||
} else {
|
||||
applyPrivateData(data, m);
|
||||
}
|
||||
} else {
|
||||
@ -5353,6 +5484,24 @@ private:
|
||||
ASSERT(false); // Unknown private mutation
|
||||
}
|
||||
}
|
||||
|
||||
// Registers a pending checkpoint request, it will be fullfilled when the desired version is durable.
|
||||
void registerPendingCheckpoint(StorageServer* data, const MutationRef& m, Version ver) {
|
||||
CheckpointMetaData checkpoint = decodeCheckpointValue(m.param2);
|
||||
ASSERT(checkpoint.getState() == CheckpointMetaData::Pending);
|
||||
const UID checkpointID = decodeCheckpointKey(m.param1.substr(1));
|
||||
checkpoint.version = ver;
|
||||
data->pendingCheckpoints[ver].push_back(checkpoint);
|
||||
|
||||
auto& mLV = data->addVersionToMutationLog(ver);
|
||||
const Key pendingCheckpointKey(persistPendingCheckpointKeys.begin.toString() + checkpointID.toString());
|
||||
data->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::SetValue, pendingCheckpointKey, checkpointValue(checkpoint)));
|
||||
|
||||
TraceEvent("RegisterPendingCheckpoint", data->thisServerID)
|
||||
.detail("Key", pendingCheckpointKey)
|
||||
.detail("Checkpoint", checkpoint.toString());
|
||||
}
|
||||
};
|
||||
|
||||
void StorageServer::insertTenant(TenantNameRef tenantName,
|
||||
@ -5413,8 +5562,8 @@ ACTOR Future<Void> tssDelayForever() {
|
||||
ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
state double start;
|
||||
try {
|
||||
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
|
||||
// This is often referred to as the storage server e-brake (emergency brake)
|
||||
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of
|
||||
// memory. This is often referred to as the storage server e-brake (emergency brake)
|
||||
|
||||
// We allow the storage server to make some progress between e-brake periods, referreed to as "overage", in
|
||||
// order to ensure that it advances desiredOldestVersion enough for updateStorage to make enough progress on
|
||||
@ -5452,8 +5601,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
data->tssFaultInjectTime.get() < now()) {
|
||||
if (deterministicRandom()->random01() < 0.01) {
|
||||
TraceEvent(SevWarnAlways, "TSSInjectDelayForever", data->thisServerID).log();
|
||||
// small random chance to just completely get stuck here, each tss should eventually hit this in this
|
||||
// mode
|
||||
// small random chance to just completely get stuck here, each tss should eventually hit this in
|
||||
// this mode
|
||||
wait(tssDelayForever());
|
||||
} else {
|
||||
// otherwise pause for part of a second
|
||||
@ -5550,14 +5699,14 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
}
|
||||
|
||||
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
|
||||
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a version in
|
||||
// the middle of a rolled back version range.
|
||||
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a
|
||||
// version in the middle of a rolled back version range.
|
||||
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
|
||||
auto fk = data->readyFetchKeys.back();
|
||||
data->readyFetchKeys.pop_back();
|
||||
fk.send(&fii);
|
||||
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this actor
|
||||
// until it was completed.
|
||||
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this
|
||||
// actor until it was completed.
|
||||
}
|
||||
|
||||
for (auto& c : fii.changes)
|
||||
@ -5566,9 +5715,10 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
wait(doEagerReads(data, &eager));
|
||||
if (data->shardChangeCounter == changeCounter)
|
||||
break;
|
||||
TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it again.
|
||||
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads only
|
||||
// selectively
|
||||
TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it
|
||||
// again.
|
||||
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads
|
||||
// only selectively
|
||||
eager = UpdateEagerReadInfo();
|
||||
}
|
||||
data->eagerReadsLatencyHistogram->sampleSeconds(now() - start);
|
||||
@ -5598,8 +5748,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
for (; mutationNum < pUpdate->mutations.size(); mutationNum++) {
|
||||
updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version, true);
|
||||
mutationBytes += pUpdate->mutations[mutationNum].totalSize();
|
||||
// data->counters.mutationBytes or data->counters.mutations should not be updated because they should
|
||||
// have counted when the mutations arrive from cursor initially.
|
||||
// data->counters.mutationBytes or data->counters.mutations should not be updated because they
|
||||
// should have counted when the mutations arrive from cursor initially.
|
||||
injectedChanges = true;
|
||||
if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
|
||||
mutationBytes = 0;
|
||||
@ -5830,6 +5980,49 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> createCheckpoint(StorageServer* data, CheckpointMetaData metaData) {
|
||||
ASSERT(metaData.ssID == data->thisServerID);
|
||||
const CheckpointRequest req(metaData.version,
|
||||
metaData.range,
|
||||
static_cast<CheckpointFormat>(metaData.format),
|
||||
metaData.checkpointID,
|
||||
data->folder + rocksdbCheckpointDirPrefix + metaData.checkpointID.toString());
|
||||
state CheckpointMetaData checkpointResult;
|
||||
try {
|
||||
state CheckpointMetaData res = wait(data->storage.checkpoint(req));
|
||||
checkpointResult = res;
|
||||
checkpointResult.ssID = data->thisServerID;
|
||||
ASSERT(checkpointResult.getState() == CheckpointMetaData::Complete);
|
||||
data->checkpoints[checkpointResult.checkpointID] = checkpointResult;
|
||||
TraceEvent("StorageCreatedCheckpoint", data->thisServerID).detail("Checkpoint", checkpointResult.toString());
|
||||
} catch (Error& e) {
|
||||
// If checkpoint creation fails, the failure is persisted.
|
||||
checkpointResult = metaData;
|
||||
checkpointResult.setState(CheckpointMetaData::Fail);
|
||||
TraceEvent("StorageCreatedCheckpointFailure", data->thisServerID)
|
||||
.detail("PendingCheckpoint", checkpointResult.toString());
|
||||
}
|
||||
|
||||
// Persist the checkpoint meta data.
|
||||
try {
|
||||
Key pendingCheckpointKey(persistPendingCheckpointKeys.begin.toString() +
|
||||
checkpointResult.checkpointID.toString());
|
||||
Key persistCheckpointKey(persistCheckpointKeys.begin.toString() + checkpointResult.checkpointID.toString());
|
||||
data->storage.clearRange(singleKeyRange(pendingCheckpointKey));
|
||||
data->storage.writeKeyValue(KeyValueRef(persistCheckpointKey, checkpointValue(checkpointResult)));
|
||||
wait(data->storage.commit());
|
||||
} catch (Error& e) {
|
||||
// If the checkpoint meta data is not persisted successfully, remove the checkpoint.
|
||||
TraceEvent("StorageCreateCheckpointPersistFailure", data->thisServerID)
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Checkpoint", checkpointResult.toString());
|
||||
data->checkpoints.erase(checkpointResult.checkpointID);
|
||||
data->actors.add(deleteCheckpointQ(data, metaData.version, checkpointResult));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||
loop {
|
||||
ASSERT(data->durableVersion.get() == data->storageVersion());
|
||||
@ -5851,6 +6044,33 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||
state Version desiredVersion = data->desiredOldestVersion.get();
|
||||
state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
|
||||
|
||||
// Clean up stale checkpoint requests, this is not supposed to happen, since checkpoints are cleaned up on
|
||||
// failures. This is kept as a safeguard.
|
||||
while (!data->pendingCheckpoints.empty() && data->pendingCheckpoints.begin()->first <= startOldestVersion) {
|
||||
for (int idx = 0; idx < data->pendingCheckpoints.begin()->second.size(); ++idx) {
|
||||
auto& metaData = data->pendingCheckpoints.begin()->second[idx];
|
||||
data->actors.add(deleteCheckpointQ(data, startOldestVersion, metaData));
|
||||
TraceEvent(SevWarnAlways, "StorageStaleCheckpointRequest", data->thisServerID)
|
||||
.detail("PendingCheckpoint", metaData.toString())
|
||||
.detail("DurableVersion", startOldestVersion);
|
||||
}
|
||||
data->pendingCheckpoints.erase(data->pendingCheckpoints.begin());
|
||||
}
|
||||
|
||||
// Create checkpoint if the pending request version is within (startOldestVersion, desiredVersion].
|
||||
// Versions newer than the checkpoint version won't be committed before the checkpoint is created.
|
||||
state bool requireCheckpoint = false;
|
||||
if (!data->pendingCheckpoints.empty()) {
|
||||
const Version cVer = data->pendingCheckpoints.begin()->first;
|
||||
if (cVer <= desiredVersion) {
|
||||
TraceEvent("CheckpointVersionSatisfied", data->thisServerID)
|
||||
.detail("DesiredVersion", desiredVersion)
|
||||
.detail("CheckPointVersion", cVer);
|
||||
desiredVersion = cVer;
|
||||
requireCheckpoint = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Write mutations to storage until we reach the desiredVersion or have written too much (bytesleft)
|
||||
state double beforeStorageUpdates = now();
|
||||
loop {
|
||||
@ -5920,13 +6140,24 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||
|
||||
debug_advanceMinCommittedVersion(data->thisServerID, newOldestVersion);
|
||||
|
||||
if (requireCheckpoint) {
|
||||
ASSERT(newOldestVersion == data->pendingCheckpoints.begin()->first);
|
||||
std::vector<Future<Void>> createCheckpoints;
|
||||
for (int idx = 0; idx < data->pendingCheckpoints.begin()->second.size(); ++idx) {
|
||||
createCheckpoints.push_back(createCheckpoint(data, data->pendingCheckpoints.begin()->second[idx]));
|
||||
}
|
||||
wait(waitForAll(createCheckpoints));
|
||||
data->pendingCheckpoints.erase(data->pendingCheckpoints.begin());
|
||||
requireCheckpoint = false;
|
||||
}
|
||||
|
||||
if (newOldestVersion > data->rebootAfterDurableVersion) {
|
||||
TraceEvent("RebootWhenDurableTriggered", data->thisServerID)
|
||||
.detail("NewOldestVersion", newOldestVersion)
|
||||
.detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
|
||||
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
|
||||
// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
|
||||
// Otherwise, in the race situation when storage server receives both reboot and
|
||||
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this
|
||||
// process) never sets durableInProgress, we should set durableInProgress before send the
|
||||
// please_reboot() error. Otherwise, in the race situation when storage server receives both reboot and
|
||||
// brokenPromise of durableInProgress, the worker of the storage server will die.
|
||||
// We will eventually end up with no worker for storage server role.
|
||||
// The data distributor's buildTeam() will get stuck in building a team
|
||||
@ -5948,12 +6179,13 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||
}
|
||||
|
||||
durableInProgress.send(Void());
|
||||
wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to shut
|
||||
// down, so delay to check for cancellation
|
||||
wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to
|
||||
// shut down, so delay to check for cancellation
|
||||
|
||||
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was
|
||||
// effective and are applied after we change the durable version. Also ensure that we have to lock while calling
|
||||
// changeDurableVersion, because otherwise the latest version of mutableData might be partially loaded.
|
||||
// effective and are applied after we change the durable version. Also ensure that we have to lock while
|
||||
// calling changeDurableVersion, because otherwise the latest version of mutableData might be partially
|
||||
// loaded.
|
||||
state double beforeSSDurableVersionUpdate = now();
|
||||
wait(data->durableVersionLock.take());
|
||||
data->popVersion(data->durableVersion.get() + 1);
|
||||
@ -6029,6 +6261,23 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) {
|
||||
availableKeys.end,
|
||||
endAvailable ? LiteralStringRef("1") : LiteralStringRef("0")));
|
||||
}
|
||||
|
||||
// When a shard is moved out, delete all related checkpoints created for data move.
|
||||
if (!available) {
|
||||
for (auto& [id, checkpoint] : self->checkpoints) {
|
||||
if (checkpoint.range.intersects(keys)) {
|
||||
Key persistCheckpointKey(persistCheckpointKeys.begin.toString() + checkpoint.checkpointID.toString());
|
||||
checkpoint.setState(CheckpointMetaData::Deleting);
|
||||
self->addMutationToMutationLog(
|
||||
mLV, MutationRef(MutationRef::SetValue, persistCheckpointKey, checkpointValue(checkpoint)));
|
||||
}
|
||||
self->actors.add(deleteCheckpointQ(self, mLV.version + 1, checkpoint));
|
||||
TraceEvent("SSDeleteCheckpointScheduled", self->thisServerID)
|
||||
.detail("MovedOutRange", keys.toString())
|
||||
.detail("Checkpoint", checkpoint.toString())
|
||||
.detail("DeleteVersion", mLV.version + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) {
|
||||
@ -6297,6 +6546,8 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
||||
state Future<RangeResult> fShardAssigned = storage->readRange(persistShardAssignedKeys);
|
||||
state Future<RangeResult> fShardAvailable = storage->readRange(persistShardAvailableKeys);
|
||||
state Future<RangeResult> fChangeFeeds = storage->readRange(persistChangeFeedKeys);
|
||||
state Future<RangeResult> fPendingCheckpoints = storage->readRange(persistPendingCheckpointKeys);
|
||||
state Future<RangeResult> fCheckpoints = storage->readRange(persistCheckpointKeys);
|
||||
state Future<RangeResult> fTenantMap = storage->readRange(persistTenantMapKeys);
|
||||
|
||||
state Promise<Void> byteSampleSampleRecovered;
|
||||
@ -6307,7 +6558,8 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
||||
TraceEvent("ReadingDurableState", data->thisServerID).log();
|
||||
wait(waitForAll(std::vector{
|
||||
fFormat, fID, fClusterID, ftssPairID, fssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
|
||||
wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable, fChangeFeeds, fTenantMap }));
|
||||
wait(waitForAll(
|
||||
std::vector{ fShardAssigned, fShardAvailable, fChangeFeeds, fPendingCheckpoints, fCheckpoints, fTenantMap }));
|
||||
wait(byteSampleSampleRecovered.getFuture());
|
||||
TraceEvent("RestoringDurableState", data->thisServerID).log();
|
||||
|
||||
@ -6347,8 +6599,8 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
||||
}
|
||||
|
||||
// It's a bit sketchy to rely on an untrusted storage engine to persist its quarantine state when the quarantine
|
||||
// state means the storage engine already had a durability or correctness error, but it should get re-quarantined
|
||||
// very quickly because of a mismatch if it starts trying to do things again
|
||||
// state means the storage engine already had a durability or correctness error, but it should get
|
||||
// re-quarantined very quickly because of a mismatch if it starts trying to do things again
|
||||
if (fTssQuarantine.get().present()) {
|
||||
TEST(true); // TSS restarted while quarantined
|
||||
data->tssInQuarantine = true;
|
||||
@ -6373,6 +6625,25 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
||||
data->setInitialVersion(version);
|
||||
data->bytesRestored += fVersion.get().expectedSize();
|
||||
|
||||
state RangeResult pendingCheckpoints = fPendingCheckpoints.get();
|
||||
state int pCLoc;
|
||||
for (pCLoc = 0; pCLoc < pendingCheckpoints.size(); ++pCLoc) {
|
||||
CheckpointMetaData metaData = decodeCheckpointValue(pendingCheckpoints[pCLoc].value);
|
||||
data->pendingCheckpoints[metaData.version].push_back(metaData);
|
||||
wait(yield());
|
||||
}
|
||||
|
||||
state RangeResult checkpoints = fCheckpoints.get();
|
||||
state int cLoc;
|
||||
for (cLoc = 0; cLoc < checkpoints.size(); ++cLoc) {
|
||||
CheckpointMetaData metaData = decodeCheckpointValue(checkpoints[cLoc].value);
|
||||
data->checkpoints[metaData.checkpointID] = metaData;
|
||||
if (metaData.getState() == CheckpointMetaData::Deleting) {
|
||||
data->actors.add(deleteCheckpointQ(data, version, metaData));
|
||||
}
|
||||
wait(yield());
|
||||
}
|
||||
|
||||
state RangeResult available = fShardAvailable.get();
|
||||
data->bytesRestored += available.logicalSize();
|
||||
state int availableLoc;
|
||||
@ -6624,15 +6895,15 @@ ACTOR Future<Void> waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re
|
||||
|
||||
// SOMEDAY: validation! The changes here are possibly partial changes (we receive multiple
|
||||
// messages per
|
||||
// update to our requested range). This means that the validation would have to occur after all
|
||||
// the messages for one clear or set have been dispatched.
|
||||
// update to our requested range). This means that the validation would have to occur after
|
||||
// all the messages for one clear or set have been dispatched.
|
||||
|
||||
/*StorageMetrics m = getMetrics( data, req.keys );
|
||||
bool b = ( m.bytes != metrics.bytes || m.bytesPerKSecond != metrics.bytesPerKSecond ||
|
||||
m.iosPerKSecond != metrics.iosPerKSecond ); if (b) { printf("keys: '%s' - '%s' @%p\n",
|
||||
printable(req.keys.begin).c_str(), printable(req.keys.end).c_str(), this);
|
||||
printf("waitMetrics: desync %d (%lld %lld %lld) != (%lld %lld %lld); +(%lld %lld %lld)\n", b,
|
||||
m.bytes, m.bytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.bytesPerKSecond,
|
||||
printf("waitMetrics: desync %d (%lld %lld %lld) != (%lld %lld %lld); +(%lld %lld %lld)\n",
|
||||
b, m.bytes, m.bytesPerKSecond, m.iosPerKSecond, metrics.bytes, metrics.bytesPerKSecond,
|
||||
metrics.iosPerKSecond, c.bytes, c.bytesPerKSecond, c.iosPerKSecond);
|
||||
|
||||
}*/
|
||||
@ -6641,8 +6912,8 @@ ACTOR Future<Void> waitMetrics(StorageServerMetrics* self, WaitMetricsRequest re
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_actor_cancelled)
|
||||
throw; // This is only cancelled when the main loop had exited...no need in this case to clean up
|
||||
// self
|
||||
throw; // This is only cancelled when the main loop had exited...no need in this case to clean
|
||||
// up self
|
||||
error = e;
|
||||
break;
|
||||
}
|
||||
@ -6822,8 +7093,8 @@ ACTOR Future<Void> serveGetValueRequests(StorageServer* self, FutureStream<GetVa
|
||||
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetValue;
|
||||
loop {
|
||||
GetValueRequest req = waitNext(getValue);
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
|
||||
// before doing real work
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so
|
||||
// downgrade before doing real work
|
||||
if (req.debugID.present())
|
||||
g_traceBatch.addEvent("GetValueDebug",
|
||||
req.debugID.get().first(),
|
||||
@ -6841,8 +7112,8 @@ ACTOR Future<Void> serveGetKeyValuesRequests(StorageServer* self, FutureStream<G
|
||||
loop {
|
||||
GetKeyValuesRequest req = waitNext(getKeyValues);
|
||||
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
|
||||
// before doing real work
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so
|
||||
// downgrade before doing real work
|
||||
self->actors.add(self->readGuard(req, getKeyValuesQ));
|
||||
}
|
||||
}
|
||||
@ -6864,8 +7135,8 @@ ACTOR Future<Void> serveGetKeyValuesStreamRequests(StorageServer* self,
|
||||
FutureStream<GetKeyValuesStreamRequest> getKeyValuesStream) {
|
||||
loop {
|
||||
GetKeyValuesStreamRequest req = waitNext(getKeyValuesStream);
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
|
||||
// before doing real work
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so
|
||||
// downgrade before doing real work
|
||||
// FIXME: add readGuard again
|
||||
self->actors.add(getKeyValuesStreamQ(self, req));
|
||||
}
|
||||
@ -6875,8 +7146,8 @@ ACTOR Future<Void> serveGetKeyRequests(StorageServer* self, FutureStream<GetKeyR
|
||||
getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKey;
|
||||
loop {
|
||||
GetKeyRequest req = waitNext(getKey);
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
|
||||
// before doing real work
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so
|
||||
// downgrade before doing real work
|
||||
self->actors.add(self->readGuard(req, getKeyQ));
|
||||
}
|
||||
}
|
||||
@ -7067,8 +7338,8 @@ ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
|
||||
|
||||
ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface ssi) {
|
||||
state Future<Void> doUpdate = Void();
|
||||
state bool updateReceived =
|
||||
false; // true iff the current update() actor assigned to doUpdate has already received an update from the tlog
|
||||
state bool updateReceived = false; // true iff the current update() actor assigned to doUpdate has already
|
||||
// received an update from the tlog
|
||||
state double lastLoopTopTime = now();
|
||||
state Future<Void> dbInfoChange = Void();
|
||||
state Future<Void> checkLastUpdate = Void();
|
||||
@ -7134,8 +7405,8 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
||||
self->popVersion(self->durableVersion.get() + 1, true);
|
||||
}
|
||||
// If update() is waiting for results from the tlog, it might never get them, so needs to be
|
||||
// cancelled. But if it is waiting later, cancelling it could cause problems (e.g. fetchKeys that
|
||||
// already committed to transitioning to waiting state)
|
||||
// cancelled. But if it is waiting later, cancelling it could cause problems (e.g. fetchKeys
|
||||
// that already committed to transitioning to waiting state)
|
||||
if (!updateReceived) {
|
||||
doUpdate = Void();
|
||||
}
|
||||
@ -7178,6 +7449,17 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
||||
else
|
||||
doUpdate = update(self, &updateReceived);
|
||||
}
|
||||
when(GetCheckpointRequest req = waitNext(ssi.checkpoint.getFuture())) {
|
||||
if (!self->isReadable(req.range)) {
|
||||
req.reply.sendError(wrong_shard_server());
|
||||
continue;
|
||||
} else {
|
||||
self->actors.add(getCheckpointQ(self, req));
|
||||
}
|
||||
}
|
||||
when(FetchCheckpointRequest req = waitNext(ssi.fetchCheckpoint.getFuture())) {
|
||||
self->actors.add(fetchCheckpointQ(self, req));
|
||||
}
|
||||
when(wait(updateProcessStatsTimer)) {
|
||||
updateProcessStats(self);
|
||||
updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
|
||||
@ -7190,16 +7472,17 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
|
||||
bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData, Error const& e) {
|
||||
self.shuttingDown = true;
|
||||
|
||||
// Clearing shards shuts down any fetchKeys actors; these may do things on cancellation that are best done with self
|
||||
// still valid
|
||||
// Clearing shards shuts down any fetchKeys actors; these may do things on cancellation that are best done with
|
||||
// self still valid
|
||||
self.shards.insert(allKeys, Reference<ShardInfo>());
|
||||
|
||||
// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise just
|
||||
// close it.
|
||||
// Dispose the IKVS (destroying its data permanently) only if this shutdown is definitely permanent. Otherwise
|
||||
// just close it.
|
||||
if (e.code() == error_code_please_reboot) {
|
||||
// do nothing.
|
||||
} else if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed) {
|
||||
// SOMEDAY: could close instead of dispose if tss in quarantine gets removed so it could still be investigated?
|
||||
// SOMEDAY: could close instead of dispose if tss in quarantine gets removed so it could still be
|
||||
// investigated?
|
||||
persistentData->dispose();
|
||||
} else {
|
||||
persistentData->close();
|
||||
|
234
fdbserver/workloads/PhysicalShardMove.actor.cpp
Normal file
234
fdbserver/workloads/PhysicalShardMove.actor.cpp
Normal file
@ -0,0 +1,234 @@
|
||||
/*
|
||||
*PhysicalShardMove.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/ServerCheckpoint.actor.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/flow.h"
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
std::string printValue(const ErrorOr<Optional<Value>>& value) {
|
||||
if (value.isError()) {
|
||||
return value.getError().name();
|
||||
}
|
||||
return value.get().present() ? value.get().get().toString() : "Value Not Found.";
|
||||
}
|
||||
} // namespace
|
||||
|
||||
struct SSCheckpointWorkload : TestWorkload {
|
||||
const bool enabled;
|
||||
bool pass;
|
||||
|
||||
SSCheckpointWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(!clientId), pass(true) {}
|
||||
|
||||
void validationFailed(ErrorOr<Optional<Value>> expectedValue, ErrorOr<Optional<Value>> actualValue) {
|
||||
TraceEvent(SevError, "TestFailed")
|
||||
.detail("ExpectedValue", printValue(expectedValue))
|
||||
.detail("ActualValue", printValue(actualValue));
|
||||
pass = false;
|
||||
}
|
||||
|
||||
std::string description() const override { return "SSCheckpoint"; }
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return Void(); }
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (!enabled) {
|
||||
return Void();
|
||||
}
|
||||
return _start(this, cx);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _start(SSCheckpointWorkload* self, Database cx) {
|
||||
state Key key = "TestKey"_sr;
|
||||
state Key endKey = "TestKey0"_sr;
|
||||
state Value oldValue = "TestValue"_sr;
|
||||
|
||||
int ignore = wait(setDDMode(cx, 0));
|
||||
state Version version = wait(self->writeAndVerify(self, cx, key, oldValue));
|
||||
|
||||
// Create checkpoint.
|
||||
state Transaction tr(cx);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state CheckpointFormat format = RocksDBColumnFamily;
|
||||
loop {
|
||||
try {
|
||||
wait(createCheckpoint(&tr, KeyRangeRef(key, endKey), format));
|
||||
wait(tr.commit());
|
||||
version = tr.getCommittedVersion();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TestCheckpointCreated")
|
||||
.detail("Range", KeyRangeRef(key, endKey).toString())
|
||||
.detail("Version", version);
|
||||
|
||||
// Fetch checkpoint meta data.
|
||||
loop {
|
||||
try {
|
||||
state std::vector<CheckpointMetaData> records =
|
||||
wait(getCheckpointMetaData(cx, KeyRangeRef(key, endKey), version, format));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestFetchCheckpointMetadataError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Range", KeyRangeRef(key, endKey).toString())
|
||||
.detail("Version", version);
|
||||
|
||||
// The checkpoint was just created, we don't expect this error.
|
||||
ASSERT(e.code() != error_code_checkpoint_not_found);
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("TestCheckpointFetched")
|
||||
.detail("Range", KeyRangeRef(key, endKey).toString())
|
||||
.detail("Version", version)
|
||||
.detail("Shards", records.size());
|
||||
|
||||
state std::string pwd = platform::getWorkingDirectory();
|
||||
state std::string folder = pwd + "/checkpoints";
|
||||
platform::eraseDirectoryRecursive(folder);
|
||||
ASSERT(platform::createDirectory(folder));
|
||||
|
||||
// Fetch checkpoint.
|
||||
state int i = 0;
|
||||
for (; i < records.size(); ++i) {
|
||||
loop {
|
||||
TraceEvent("TestFetchingCheckpoint").detail("Checkpoint", records[i].toString());
|
||||
try {
|
||||
state CheckpointMetaData record = wait(fetchCheckpoint(cx, records[0], folder));
|
||||
TraceEvent("TestCheckpointFetched").detail("Checkpoint", records[i].toString());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TestFetchCheckpointError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Checkpoint", records[i].toString());
|
||||
wait(delay(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state std::string rocksDBTestDir = "rocksdb-kvstore-test-db";
|
||||
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
||||
|
||||
// Restore KVS.
|
||||
state IKeyValueStore* kvStore = keyValueStoreRocksDB(
|
||||
rocksDBTestDir, deterministicRandom()->randomUniqueID(), KeyValueStoreType::SSD_ROCKSDB_V1);
|
||||
try {
|
||||
wait(kvStore->restore(records));
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "TestRestoreCheckpointError")
|
||||
.errorUnsuppressed(e)
|
||||
.detail("Checkpoint", describe(records));
|
||||
}
|
||||
|
||||
// Compare the keyrange between the original database and the one restored from checkpoint.
|
||||
// For now, it should have been a single key.
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
loop {
|
||||
try {
|
||||
state RangeResult res = wait(tr.getRange(KeyRangeRef(key, endKey), CLIENT_KNOBS->TOO_MANY));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < res.size(); ++i) {
|
||||
Optional<Value> value = wait(kvStore->readValue(res[i].key));
|
||||
ASSERT(value.present());
|
||||
ASSERT(value.get() == res[i].value);
|
||||
}
|
||||
|
||||
int ignore = wait(setDDMode(cx, 1));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> readAndVerify(SSCheckpointWorkload* self,
|
||||
Database cx,
|
||||
Key key,
|
||||
ErrorOr<Optional<Value>> expectedValue) {
|
||||
state Transaction tr(cx);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
loop {
|
||||
try {
|
||||
state Optional<Value> res = wait(timeoutError(tr.get(key), 30.0));
|
||||
const bool equal = !expectedValue.isError() && res == expectedValue.get();
|
||||
if (!equal) {
|
||||
self->validationFailed(expectedValue, ErrorOr<Optional<Value>>(res));
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (expectedValue.isError() && expectedValue.getError().code() == e.code()) {
|
||||
break;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Version> writeAndVerify(SSCheckpointWorkload* self, Database cx, Key key, Optional<Value> value) {
|
||||
state Transaction tr(cx);
|
||||
state Version version;
|
||||
loop {
|
||||
try {
|
||||
if (value.present()) {
|
||||
tr.set(key, value.get());
|
||||
} else {
|
||||
tr.clear(key);
|
||||
}
|
||||
wait(timeoutError(tr.commit(), 30.0));
|
||||
version = tr.getCommittedVersion();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
wait(self->readAndVerify(self, cx, key, value));
|
||||
|
||||
return version;
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override { return pass; }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<SSCheckpointWorkload> SSCheckpointWorkloadFactory("SSCheckpointWorkload");
|
@ -174,6 +174,7 @@ ERROR( blob_granule_no_ryw, 2036, "Blob Granule Read Transactions must be specif
|
||||
ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read Transactions must be specified as ryw-disabled" )
|
||||
ERROR( get_mapped_key_values_has_more, 2038, "getMappedRange does not support continuation for now" )
|
||||
ERROR( get_mapped_range_reads_your_writes, 2039, "getMappedRange tries to read data that were previously written in the transaction" )
|
||||
ERROR( checkpoint_not_found, 2040, "Checkpoint not found" )
|
||||
|
||||
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
|
||||
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )
|
||||
@ -283,6 +284,7 @@ ERROR( snap_invalid_uid_string, 2509, "The given uid string is not a 32-length h
|
||||
// 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx
|
||||
ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error
|
||||
ERROR( internal_error, 4100, "An internal error occurred" )
|
||||
ERROR( not_implemented, 4200, "Not implemented yet" )
|
||||
// clang-format on
|
||||
|
||||
#undef ERROR
|
||||
|
@ -138,6 +138,7 @@ if(WITH_PYTHON)
|
||||
add_fdb_test(TEST_FILES fast/CycleAndLock.toml)
|
||||
add_fdb_test(TEST_FILES fast/CycleTest.toml)
|
||||
add_fdb_test(TEST_FILES fast/ChangeFeeds.toml)
|
||||
add_fdb_test(TEST_FILES fast/PhysicalShardMove.toml)
|
||||
add_fdb_test(TEST_FILES fast/DataLossRecovery.toml)
|
||||
add_fdb_test(TEST_FILES fast/EncryptionOps.toml)
|
||||
add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
|
||||
|
13
tests/fast/PhysicalShardMove.toml
Normal file
13
tests/fast/PhysicalShardMove.toml
Normal file
@ -0,0 +1,13 @@
|
||||
[configuration]
|
||||
config = 'triple'
|
||||
storageEngineType = 4
|
||||
processesPerMachine = 1
|
||||
coordinators = 3
|
||||
machineCount = 15
|
||||
|
||||
[[test]]
|
||||
testTitle = 'PhysicalShardMove'
|
||||
useDB = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'SSCheckpointWorkload'
|
Loading…
x
Reference in New Issue
Block a user