mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 18:32:18 +08:00
Add a backup start key
If the backup key is not set, do not recruit backup workers for old epoches.
This commit is contained in:
parent
e14246ac16
commit
c08a192c75
@ -494,6 +494,7 @@ ProcessData decodeWorkerListValue( ValueRef const& value ) {
|
|||||||
const KeyRangeRef backupProgressKeys(LiteralStringRef("\xff/backupProgress/"),
|
const KeyRangeRef backupProgressKeys(LiteralStringRef("\xff/backupProgress/"),
|
||||||
LiteralStringRef("\xff/backupProgress0"));
|
LiteralStringRef("\xff/backupProgress0"));
|
||||||
const KeyRef backupProgressPrefix = backupProgressKeys.begin;
|
const KeyRef backupProgressPrefix = backupProgressKeys.begin;
|
||||||
|
const KeyRef backupStartedKey = LiteralStringRef("\xff/backupStarted");
|
||||||
|
|
||||||
const Key backupProgressKeyFor(UID workerID) {
|
const Key backupProgressKeyFor(UID workerID) {
|
||||||
BinaryWriter wr(Unversioned());
|
BinaryWriter wr(Unversioned());
|
||||||
|
@ -182,6 +182,9 @@ const Value backupProgressValue(const WorkerBackupStatus& status);
|
|||||||
UID decodeBackupProgressKey(const KeyRef& key);
|
UID decodeBackupProgressKey(const KeyRef& key);
|
||||||
WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value);
|
WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value);
|
||||||
|
|
||||||
|
// "\xff/backupStarted"
|
||||||
|
extern const KeyRef backupStartedKey;
|
||||||
|
|
||||||
extern const KeyRef coordinatorsKey;
|
extern const KeyRef coordinatorsKey;
|
||||||
extern const KeyRef logsKey;
|
extern const KeyRef logsKey;
|
||||||
extern const KeyRef minRequiredCommitVersionKey;
|
extern const KeyRef minRequiredCommitVersionKey;
|
||||||
|
@ -20,6 +20,8 @@ void BackupProgress::addBackupStatus(const WorkerBackupStatus& status) {
|
|||||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> BackupProgress::getUnfinishedBackup() {
|
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> BackupProgress::getUnfinishedBackup() {
|
||||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit;
|
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit;
|
||||||
|
|
||||||
|
if (!backupStartedValue.present()) return toRecruit; // No active backups
|
||||||
|
|
||||||
for (const auto& [epoch, info] : epochInfos) {
|
for (const auto& [epoch, info] : epochInfos) {
|
||||||
std::set<Tag> tags = enumerateLogRouterTags(info.logRouterTags);
|
std::set<Tag> tags = enumerateLogRouterTags(info.logRouterTags);
|
||||||
std::map<Tag, Version> tagVersions;
|
std::map<Tag, Version> tagVersions;
|
||||||
@ -60,9 +62,11 @@ ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupPro
|
|||||||
try {
|
try {
|
||||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
|
state Optional<Value> value = wait(tr.get(backupStartedKey));
|
||||||
Standalone<RangeResultRef> results = wait(tr.getRange(backupProgressKeys, CLIENT_KNOBS->TOO_MANY));
|
Standalone<RangeResultRef> results = wait(tr.getRange(backupProgressKeys, CLIENT_KNOBS->TOO_MANY));
|
||||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||||
|
|
||||||
|
bStatus->setBackupStartedValue(value);
|
||||||
for (auto& it : results) {
|
for (auto& it : results) {
|
||||||
const UID workerID = decodeBackupProgressKey(it.key);
|
const UID workerID = decodeBackupProgressKey(it.key);
|
||||||
const WorkerBackupStatus status = decodeBackupProgressValue(it.value);
|
const WorkerBackupStatus status = decodeBackupProgressValue(it.value);
|
||||||
@ -87,6 +91,7 @@ TEST_CASE("/BackupProgress/Unfinished") {
|
|||||||
const Tag tag1(tagLocalityLogRouter, 0);
|
const Tag tag1(tagLocalityLogRouter, 0);
|
||||||
epochInfos.insert({ epoch1, ILogSystem::EpochTagsVersionsInfo(1, begin1, end1) });
|
epochInfos.insert({ epoch1, ILogSystem::EpochTagsVersionsInfo(1, begin1, end1) });
|
||||||
BackupProgress progress(UID(0, 0), epochInfos);
|
BackupProgress progress(UID(0, 0), epochInfos);
|
||||||
|
progress.setBackupStartedValue(Optional<Value>(LiteralStringRef("1")));
|
||||||
|
|
||||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> unfinished = progress.getUnfinishedBackup();
|
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> unfinished = progress.getUnfinishedBackup();
|
||||||
|
|
||||||
|
@ -51,6 +51,11 @@ public:
|
|||||||
// backup [savedVersion + 1, endVersion)
|
// backup [savedVersion + 1, endVersion)
|
||||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> getUnfinishedBackup();
|
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> getUnfinishedBackup();
|
||||||
|
|
||||||
|
// Set the value for "backupStartedKey"
|
||||||
|
void setBackupStartedValue(Optional<Value> value) {
|
||||||
|
backupStartedValue = value;
|
||||||
|
}
|
||||||
|
|
||||||
void addref() { ReferenceCounted<BackupProgress>::addref(); }
|
void addref() { ReferenceCounted<BackupProgress>::addref(); }
|
||||||
|
|
||||||
void delref() { ReferenceCounted<BackupProgress>::delref(); }
|
void delref() { ReferenceCounted<BackupProgress>::delref(); }
|
||||||
@ -73,6 +78,9 @@ private:
|
|||||||
// progress status for a tag in an epoch due to later epoch trying to fill
|
// progress status for a tag in an epoch due to later epoch trying to fill
|
||||||
// the gap. "progress" MUST be iterated in ascending order.
|
// the gap. "progress" MUST be iterated in ascending order.
|
||||||
std::map<LogEpoch, std::map<Tag, Version>> progress;
|
std::map<LogEpoch, std::map<Tag, Version>> progress;
|
||||||
|
|
||||||
|
// Value of the "backupStartedKey".
|
||||||
|
Optional<Value> backupStartedValue;
|
||||||
};
|
};
|
||||||
|
|
||||||
ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupProgress> bStatus);
|
ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupProgress> bStatus);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user