mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-03 03:41:53 +08:00
Disallow repopulating a management cluster from a data cluster with matching tenant ID prefix unless forced. Remember the largest used tenant ID on the data cluster and use it to update the management cluster tenant ID when force repopulating the same ID.
This commit is contained in:
parent
90048d1e92
commit
cbc330697c
@ -209,8 +209,9 @@ ACTOR Future<bool> metaclusterRemoveCommand(Reference<IDatabase> db, std::vector
|
||||
}
|
||||
|
||||
void printRestoreUsage() {
|
||||
fmt::print("Usage: metacluster restore <NAME> [dryrun] connection_string=<CONNECTION_STRING>\n"
|
||||
"<restore_known_data_cluster|repopulate_from_data_cluster> [force_join]\n\n");
|
||||
fmt::print(
|
||||
"Usage: metacluster restore <NAME> [dryrun] connection_string=<CONNECTION_STRING>\n"
|
||||
"<restore_known_data_cluster|repopulate_from_data_cluster> [force_join|force_reuse_tenant_id_prefix] ...\n\n");
|
||||
|
||||
fmt::print("Add a restored data cluster back to a metacluster.\n\n");
|
||||
|
||||
@ -233,30 +234,49 @@ void printRestoreUsage() {
|
||||
fmt::print("clusters. Any conflicts arising between the added data cluster and existing data\n");
|
||||
fmt::print("will cause the restore to fail. Before repopulating a metacluster from a data\n");
|
||||
fmt::print("cluster, that data cluster needs to be detached from its prior metacluster using\n");
|
||||
fmt::print("the `metacluster remove' command.\n");
|
||||
fmt::print("the `metacluster remove' command.\n\n");
|
||||
|
||||
fmt::print("When repopulating a management cluster, it is expected that the new metacluster\n");
|
||||
fmt::print("will be configured with a different tenant ID prefix. By default, reusing the same\n");
|
||||
fmt::print("prefix will result in an error during the restore process. To override this behavior,\n");
|
||||
fmt::print("use `force_reuse_tenant_id_prefix'.\n");
|
||||
}
|
||||
|
||||
// metacluster restore command
|
||||
ACTOR Future<bool> metaclusterRestoreCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
|
||||
if (tokens.size() < 5 || tokens.size() > 7) {
|
||||
if (tokens.size() < 5 || tokens.size() > 8) {
|
||||
printRestoreUsage();
|
||||
return false;
|
||||
}
|
||||
|
||||
state bool dryRun = tokens[3] == "dryrun"_sr;
|
||||
state bool forceJoin = tokens[tokens.size() - 1] == "force_join"_sr;
|
||||
state bool forceJoin = false;
|
||||
state bool forceReuseTenantIdPrefix = false;
|
||||
|
||||
if (tokens.size() < 5 + (int)dryRun + (int)forceJoin) {
|
||||
loop {
|
||||
if (tokens.back() == "force_join"_sr) {
|
||||
forceJoin = true;
|
||||
} else if (tokens.back() == "force_reuse_tenant_id_prefix"_sr) {
|
||||
forceReuseTenantIdPrefix = true;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
tokens.pop_back();
|
||||
}
|
||||
|
||||
int expectedTokens = 5 + (dryRun ? 1 : 0);
|
||||
if (tokens.size() != expectedTokens) {
|
||||
printRestoreUsage();
|
||||
return false;
|
||||
}
|
||||
|
||||
state ClusterName clusterName = tokens[2];
|
||||
state StringRef restoreType = tokens[tokens.size() - 1 - (int)forceJoin];
|
||||
state StringRef restoreType = tokens.back();
|
||||
|
||||
// connection string
|
||||
DataClusterEntry defaultEntry;
|
||||
auto config = parseClusterConfiguration(tokens, defaultEntry, 3 + (int)dryRun, 3 + (int)dryRun + 1);
|
||||
auto config = parseClusterConfiguration(tokens, defaultEntry, expectedTokens - 2, expectedTokens - 1);
|
||||
if (!config.present()) {
|
||||
return false;
|
||||
} else if (!config.get().first.present()) {
|
||||
@ -275,6 +295,7 @@ ACTOR Future<bool> metaclusterRestoreCommand(Reference<IDatabase> db, std::vecto
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun(dryRun),
|
||||
ForceJoin(forceJoin),
|
||||
ForceReuseTenantIdPrefix(forceReuseTenantIdPrefix),
|
||||
&messages));
|
||||
} else if (restoreType == "repopulate_from_data_cluster"_sr) {
|
||||
wait(MetaclusterAPI::restoreCluster(db,
|
||||
@ -283,6 +304,7 @@ ACTOR Future<bool> metaclusterRestoreCommand(Reference<IDatabase> db, std::vecto
|
||||
ApplyManagementClusterUpdates::False,
|
||||
RestoreDryRun(dryRun),
|
||||
ForceJoin(forceJoin),
|
||||
ForceReuseTenantIdPrefix(forceReuseTenantIdPrefix),
|
||||
&messages));
|
||||
} else {
|
||||
fmt::print(stderr, "ERROR: unrecognized restore mode `{}'\n", printable(restoreType));
|
||||
@ -581,15 +603,15 @@ void metaclusterGenerator(const char* text,
|
||||
const char* opts[] = { "dryrun", "connection_string=", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
} else {
|
||||
bool dryrun = tokens[3] == "dryrun"_sr;
|
||||
if (tokens.size() == 3 + (int)dryrun) {
|
||||
int dryrun = tokens[3] == "dryrun"_sr ? 1 : 0;
|
||||
if (tokens.size() == 3 + dryrun) {
|
||||
const char* opts[] = { "connection_string=", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
} else if (tokens.size() == 4 + (int)dryrun) {
|
||||
} else if (tokens.size() == 4 + dryrun) {
|
||||
const char* opts[] = { "restore_known_data_cluster", "repopulate_from_data_cluster", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
} else if (tokens.size() == 5 + (int)dryrun) {
|
||||
const char* opts[] = { "force_join", nullptr };
|
||||
} else if (tokens.size() >= 5 + dryrun && tokens.size() < 7 + dryrun) {
|
||||
const char* opts[] = { "force_join", "force_reuse_tenant_id_prefix", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
}
|
||||
@ -624,11 +646,11 @@ std::vector<const char*> metaclusterHintGenerator(std::vector<StringRef> const&
|
||||
"[dryrun]",
|
||||
"connection_string=<CONNECTION_STRING>",
|
||||
"<restore_known_data_cluster|repopulate_from_data_cluster>",
|
||||
"[force_join]" };
|
||||
if (tokens.size() < 4 || (tokens[3].size() <= 6 && "dryrun"_sr.startsWith(tokens[3]))) {
|
||||
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
|
||||
} else if (tokens.size() < 6) {
|
||||
return std::vector<const char*>(opts.begin() + tokens.size() - 1, opts.end());
|
||||
"[force_join|force_reuse_tenant_id_prefix]" };
|
||||
if (tokens.size() < 4 || (tokens[3].size() <= 7 && "dryrun"_sr.startsWith(tokens[3]))) {
|
||||
return std::vector<const char*>(opts.begin() + std::min(tokens.size() - 2, opts.size() - 1), opts.end());
|
||||
} else if (tokens.size() < 7) {
|
||||
return std::vector<const char*>(opts.begin() + std::min(tokens.size() - 1, opts.size() - 1), opts.end());
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ FDB_DEFINE_BOOLEAN_PARAM(RunOnDisconnectedCluster);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(RunOnMismatchedCluster);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(RestoreDryRun);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(ForceJoin);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(ForceReuseTenantIdPrefix);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(ForceRemove);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(IgnoreCapacityLimit);
|
||||
|
||||
|
@ -101,6 +101,7 @@ FDB_DECLARE_BOOLEAN_PARAM(RunOnDisconnectedCluster);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(RunOnMismatchedCluster);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(RestoreDryRun);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ForceJoin);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ForceReuseTenantIdPrefix);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ForceRemove);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(IgnoreCapacityLimit);
|
||||
|
||||
@ -677,11 +678,11 @@ void updateClusterMetadata(Transaction tr,
|
||||
} else if (previousMetadata.entry.clusterState == DataClusterState::REMOVING) {
|
||||
throw cluster_removed();
|
||||
} else if (!isRestoring && previousMetadata.entry.clusterState == DataClusterState::RESTORING &&
|
||||
(!updatedEntry.present() || (updatedEntry.get().clusterState != DataClusterState::READY &&
|
||||
updatedEntry.get().clusterState != DataClusterState::REMOVING))) {
|
||||
(updatedEntry.get().clusterState != DataClusterState::READY &&
|
||||
updatedEntry.get().clusterState != DataClusterState::REMOVING)) {
|
||||
throw cluster_restoring();
|
||||
} else if (isRestoring) {
|
||||
ASSERT(previousMetadata.entry.clusterState == DataClusterState::RESTORING ||
|
||||
ASSERT(previousMetadata.entry.clusterState == DataClusterState::RESTORING &&
|
||||
updatedEntry.get().clusterState == DataClusterState::RESTORING);
|
||||
}
|
||||
ManagementClusterMetadata::dataClusters().set(tr, name, updatedEntry.get());
|
||||
@ -786,13 +787,15 @@ struct RemoveClusterImpl {
|
||||
// cluster.
|
||||
if (self->lastTenantId.present()) {
|
||||
Optional<int64_t> lastId = wait(TenantMetadata::lastTenantId().get(tr));
|
||||
if (!lastId.present() || lastId.get() < self->lastTenantId.get()) {
|
||||
if (!lastId.present() || (TenantAPI::getTenantIdPrefix(lastId.get()) ==
|
||||
TenantAPI::getTenantIdPrefix(self->lastTenantId.get()) &&
|
||||
lastId.get() < self->lastTenantId.get())) {
|
||||
TenantMetadata::lastTenantId().set(tr, self->lastTenantId.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Insert a tombstone marking this tenant removed even if we aren't registered
|
||||
// Insert a tombstone marking this cluster removed even if we aren't registered
|
||||
MetaclusterMetadata::registrationTombstones().insert(tr, clusterId);
|
||||
|
||||
TraceEvent("RemovedMetaclusterRegistrationOnDataCluster")
|
||||
@ -1151,6 +1154,10 @@ struct RegisterClusterImpl {
|
||||
self->ctx.metaclusterRegistration.get().toDataClusterRegistration(self->clusterName,
|
||||
self->clusterEntry.id));
|
||||
|
||||
// The data cluster will track the last ID it allocated in this metacluster, so erase any prior tenant
|
||||
// ID state
|
||||
TenantMetadata::lastTenantId().clear(tr);
|
||||
|
||||
// If we happen to have any orphaned restore IDs from a previous time this cluster was in a metacluster,
|
||||
// erase them now.
|
||||
MetaclusterMetadata::activeRestoreIds().clear(tr);
|
||||
@ -1392,13 +1399,19 @@ struct RestoreClusterImpl {
|
||||
ApplyManagementClusterUpdates applyManagementClusterUpdates;
|
||||
RestoreDryRun restoreDryRun;
|
||||
ForceJoin forceJoin;
|
||||
ForceReuseTenantIdPrefix forceReuseTenantIdPrefix;
|
||||
std::vector<std::string>& messages;
|
||||
|
||||
// Unique ID generated for this restore. Used to avoid concurrent restores
|
||||
UID restoreId = deterministicRandom()->randomUniqueID();
|
||||
|
||||
// Loaded from the management cluster
|
||||
Optional<int64_t> lastManagementClusterTenantId;
|
||||
|
||||
// Loaded from the data cluster
|
||||
UID dataClusterId;
|
||||
Optional<int64_t> lastDataClusterTenantId;
|
||||
Optional<int64_t> newLastDataClusterTenantId;
|
||||
|
||||
// Tenant list from data and management clusters
|
||||
std::unordered_map<int64_t, TenantMapEntry> dataClusterTenantMap;
|
||||
@ -1412,10 +1425,12 @@ struct RestoreClusterImpl {
|
||||
ApplyManagementClusterUpdates applyManagementClusterUpdates,
|
||||
RestoreDryRun restoreDryRun,
|
||||
ForceJoin forceJoin,
|
||||
ForceReuseTenantIdPrefix forceReuseTenantIdPrefix,
|
||||
std::vector<std::string>& messages)
|
||||
: ctx(managementDb, {}, { DataClusterState::RESTORING }), clusterName(clusterName),
|
||||
connectionString(connectionString), applyManagementClusterUpdates(applyManagementClusterUpdates),
|
||||
restoreDryRun(restoreDryRun), forceJoin(forceJoin), messages(messages) {}
|
||||
restoreDryRun(restoreDryRun), forceJoin(forceJoin), forceReuseTenantIdPrefix(forceReuseTenantIdPrefix),
|
||||
messages(messages) {}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
static Future<Void> checkRestoreId(RestoreClusterImpl* self, Transaction tr) {
|
||||
@ -1429,11 +1444,12 @@ struct RestoreClusterImpl {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Returns true if the restore ID was erased
|
||||
ACTOR template <class Transaction>
|
||||
static Future<Void> eraseRestoreId(RestoreClusterImpl* self, Transaction tr) {
|
||||
static Future<bool> eraseRestoreId(RestoreClusterImpl* self, Transaction tr) {
|
||||
Optional<UID> transactionId = wait(MetaclusterMetadata::activeRestoreIds().get(tr, self->clusterName));
|
||||
if (!transactionId.present()) {
|
||||
return Void();
|
||||
return false;
|
||||
} else if (transactionId.get() != self->restoreId) {
|
||||
throw conflicting_restore();
|
||||
} else {
|
||||
@ -1441,7 +1457,7 @@ struct RestoreClusterImpl {
|
||||
MetaclusterMetadata::activeRestoreIds().erase(tr, self->clusterName);
|
||||
}
|
||||
|
||||
return Void();
|
||||
return true;
|
||||
}
|
||||
|
||||
template <class Function>
|
||||
@ -1561,6 +1577,9 @@ struct RestoreClusterImpl {
|
||||
state Future<bool> tombstoneFuture =
|
||||
MetaclusterMetadata::registrationTombstones().exists(tr, self->dataClusterId);
|
||||
|
||||
state Future<Void> lastTenantIdFuture =
|
||||
store(self->lastDataClusterTenantId, TenantMetadata::lastTenantId().get(tr));
|
||||
|
||||
state Optional<MetaclusterRegistrationEntry> metaclusterRegistration =
|
||||
wait(MetaclusterMetadata::metaclusterRegistration().get(tr));
|
||||
|
||||
@ -1570,6 +1589,8 @@ struct RestoreClusterImpl {
|
||||
throw cluster_removed();
|
||||
}
|
||||
|
||||
wait(lastTenantIdFuture);
|
||||
|
||||
MetaclusterRegistrationEntry dataClusterEntry =
|
||||
self->ctx.metaclusterRegistration.get().toDataClusterRegistration(self->clusterName,
|
||||
self->dataClusterId);
|
||||
@ -1601,23 +1622,28 @@ struct RestoreClusterImpl {
|
||||
return Void();
|
||||
}
|
||||
|
||||
void markClusterRestoring(Reference<typename DB::TransactionT> tr) {
|
||||
MetaclusterMetadata::activeRestoreIds().addReadConflictKey(tr, clusterName);
|
||||
MetaclusterMetadata::activeRestoreIds().set(tr, clusterName, restoreId);
|
||||
if (ctx.dataClusterMetadata.get().entry.clusterState != DataClusterState::RESTORING) {
|
||||
DataClusterEntry updatedEntry = ctx.dataClusterMetadata.get().entry;
|
||||
ACTOR static Future<Void> markClusterRestoring(RestoreClusterImpl* self, Reference<typename DB::TransactionT> tr) {
|
||||
MetaclusterMetadata::activeRestoreIds().addReadConflictKey(tr, self->clusterName);
|
||||
MetaclusterMetadata::activeRestoreIds().set(tr, self->clusterName, self->restoreId);
|
||||
if (self->ctx.dataClusterMetadata.get().entry.clusterState != DataClusterState::RESTORING) {
|
||||
DataClusterEntry updatedEntry = self->ctx.dataClusterMetadata.get().entry;
|
||||
updatedEntry.clusterState = DataClusterState::RESTORING;
|
||||
|
||||
updateClusterMetadata(tr, clusterName, ctx.dataClusterMetadata.get(), connectionString, updatedEntry);
|
||||
updateClusterMetadata(
|
||||
tr, self->clusterName, self->ctx.dataClusterMetadata.get(), self->connectionString, updatedEntry);
|
||||
// Remove this cluster from the cluster capacity index, but leave its configured capacity intact in the
|
||||
// cluster entry. This allows us to retain the configured capacity while preventing the cluster from
|
||||
// being used to allocate new tenant groups.
|
||||
DataClusterEntry noCapacityEntry = updatedEntry;
|
||||
noCapacityEntry.capacity.numTenantGroups = 0;
|
||||
updateClusterCapacityIndex(tr, clusterName, updatedEntry, noCapacityEntry);
|
||||
updateClusterCapacityIndex(tr, self->clusterName, updatedEntry, noCapacityEntry);
|
||||
}
|
||||
|
||||
TraceEvent("MarkedDataClusterRestoring").detail("Name", clusterName);
|
||||
wait(store(self->lastManagementClusterTenantId,
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantId.get(tr)));
|
||||
|
||||
TraceEvent("MarkedDataClusterRestoring").detail("Name", self->clusterName);
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> markClusterAsReady(Reference<typename DB::TransactionT> tr) {
|
||||
@ -1632,7 +1658,7 @@ struct RestoreClusterImpl {
|
||||
noCapacityEntry.capacity.numTenantGroups = 0;
|
||||
updateClusterCapacityIndex(tr, clusterName, noCapacityEntry, updatedEntry);
|
||||
|
||||
return eraseRestoreId(this, tr);
|
||||
return success(eraseRestoreId(this, tr));
|
||||
}
|
||||
|
||||
return Void();
|
||||
@ -2011,6 +2037,13 @@ struct RestoreClusterImpl {
|
||||
ManagementClusterMetadata::tenantMetadata().tenantGroupMap.get(tr, tenantEntry.tenantGroup.get());
|
||||
}
|
||||
|
||||
if (self->lastDataClusterTenantId.present() &&
|
||||
TenantAPI::getTenantIdPrefix(tenantEntry.id) ==
|
||||
TenantAPI::getTenantIdPrefix(self->lastDataClusterTenantId.get()) &&
|
||||
!self->restoreDryRun) {
|
||||
ASSERT_LE(tenantEntry.id, self->lastDataClusterTenantId.get());
|
||||
}
|
||||
|
||||
Optional<MetaclusterTenantMapEntry> existingEntry = wait(tryGetTenantTransaction(tr, tenantEntry.tenantName));
|
||||
if (existingEntry.present()) {
|
||||
if (existingEntry.get().assignedCluster == self->clusterName) {
|
||||
@ -2075,15 +2108,14 @@ struct RestoreClusterImpl {
|
||||
|
||||
ACTOR static Future<Void> addTenantBatchToManagementCluster(RestoreClusterImpl* self,
|
||||
Reference<typename DB::TransactionT> tr,
|
||||
std::vector<TenantMapEntry> tenants) {
|
||||
Optional<int64_t> tenantIdPrefix = wait(TenantMetadata::tenantIdPrefix().get(tr));
|
||||
ASSERT(tenantIdPrefix.present());
|
||||
|
||||
std::vector<TenantMapEntry> tenants,
|
||||
int64_t tenantIdPrefix) {
|
||||
state std::vector<Future<bool>> futures;
|
||||
state int64_t maxId = tenantIdPrefix.get() << 48;
|
||||
state int64_t maxId = -1;
|
||||
for (auto const& t : tenants) {
|
||||
if (TenantAPI::getTenantIdPrefix(t.id) == tenantIdPrefix.get()) {
|
||||
if (TenantAPI::getTenantIdPrefix(t.id) == tenantIdPrefix) {
|
||||
maxId = std::max(maxId, t.id);
|
||||
self->newLastDataClusterTenantId = std::max(t.id, self->newLastDataClusterTenantId.orDefault(0));
|
||||
}
|
||||
futures.push_back(addTenantToManagementCluster(self, tr, t));
|
||||
}
|
||||
@ -2106,33 +2138,68 @@ struct RestoreClusterImpl {
|
||||
|
||||
if (!self->restoreDryRun) {
|
||||
if (numGroupsCreated > 0) {
|
||||
state DataClusterMetadata clusterMetadata = wait(getClusterTransaction(tr, self->clusterName));
|
||||
|
||||
DataClusterEntry updatedEntry = clusterMetadata.entry;
|
||||
DataClusterEntry updatedEntry = self->ctx.dataClusterMetadata.get().entry;
|
||||
if (updatedEntry.clusterState != DataClusterState::RESTORING) {
|
||||
throw conflicting_restore();
|
||||
}
|
||||
updatedEntry.allocated.numTenantGroups += numGroupsCreated;
|
||||
updateClusterMetadata(tr,
|
||||
self->clusterName,
|
||||
clusterMetadata,
|
||||
self->ctx.dataClusterMetadata.get(),
|
||||
Optional<ClusterConnectionString>(),
|
||||
updatedEntry,
|
||||
IsRestoring::True);
|
||||
}
|
||||
|
||||
int64_t lastTenantId =
|
||||
wait(ManagementClusterMetadata::tenantMetadata().lastTenantId.getD(tr, Snapshot::False, 0));
|
||||
wait(ManagementClusterMetadata::tenantMetadata().lastTenantId.getD(tr, Snapshot::False, -1));
|
||||
|
||||
if (maxId > lastTenantId) {
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantId.set(tr, maxId);
|
||||
}
|
||||
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantId.set(tr, std::max(lastTenantId, maxId));
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantModification.setVersionstamp(tr, Versionstamp(), 0);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<int64_t> updateLastTenantId(RestoreClusterImpl* self, Reference<typename DB::TransactionT> tr) {
|
||||
state Optional<int64_t> lastTenantId = wait(ManagementClusterMetadata::tenantMetadata().lastTenantId.get(tr));
|
||||
state int64_t tenantIdPrefix;
|
||||
if (!lastTenantId.present()) {
|
||||
Optional<int64_t> prefix = wait(TenantMetadata::tenantIdPrefix().get(tr));
|
||||
ASSERT(prefix.present());
|
||||
tenantIdPrefix = prefix.get();
|
||||
} else {
|
||||
tenantIdPrefix = TenantAPI::getTenantIdPrefix(lastTenantId.get());
|
||||
}
|
||||
|
||||
if (self->lastDataClusterTenantId.present() &&
|
||||
tenantIdPrefix == TenantAPI::getTenantIdPrefix(self->lastDataClusterTenantId.get())) {
|
||||
if (!self->forceReuseTenantIdPrefix) {
|
||||
self->messages.push_back(fmt::format(
|
||||
"The data cluster being added is using the same tenant ID prefix {} as the management cluster.",
|
||||
tenantIdPrefix));
|
||||
throw invalid_metacluster_configuration();
|
||||
} else if (!self->restoreDryRun && self->lastDataClusterTenantId.get() > lastTenantId.orDefault(-1)) {
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantId.set(tr, self->lastDataClusterTenantId.get());
|
||||
}
|
||||
|
||||
self->newLastDataClusterTenantId = self->lastDataClusterTenantId;
|
||||
}
|
||||
|
||||
return tenantIdPrefix;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> addTenantsToManagementCluster(RestoreClusterImpl* self) {
|
||||
state std::unordered_map<int64_t, TenantMapEntry>::iterator itr;
|
||||
state std::vector<TenantMapEntry> tenantBatch;
|
||||
state int64_t tenantsToAdd = 0;
|
||||
|
||||
state int64_t tenantIdPrefix = wait(self->runRestoreManagementTransaction(
|
||||
[self = self](Reference<typename DB::TransactionT> tr) { return updateLastTenantId(self, tr); }));
|
||||
|
||||
for (itr = self->dataClusterTenantMap.begin(); itr != self->dataClusterTenantMap.end(); ++itr) {
|
||||
state std::unordered_map<int64_t, MetaclusterTenantMapEntry>::iterator managementEntry =
|
||||
self->mgmtClusterTenantMap.find(itr->second.id);
|
||||
@ -2153,9 +2220,10 @@ struct RestoreClusterImpl {
|
||||
|
||||
if (tenantBatch.size() == CLIENT_KNOBS->METACLUSTER_RESTORE_BATCH_SIZE) {
|
||||
wait(self->runRestoreManagementTransaction(
|
||||
[self = self, tenantBatch = tenantBatch](Reference<typename DB::TransactionT> tr) {
|
||||
[self = self, tenantBatch = tenantBatch, tenantIdPrefix = tenantIdPrefix](
|
||||
Reference<typename DB::TransactionT> tr) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
return addTenantBatchToManagementCluster(self, tr, tenantBatch);
|
||||
return addTenantBatchToManagementCluster(self, tr, tenantBatch, tenantIdPrefix);
|
||||
}));
|
||||
tenantBatch.clear();
|
||||
}
|
||||
@ -2163,9 +2231,10 @@ struct RestoreClusterImpl {
|
||||
|
||||
if (!tenantBatch.empty()) {
|
||||
wait(self->runRestoreManagementTransaction(
|
||||
[self = self, tenantBatch = tenantBatch](Reference<typename DB::TransactionT> tr) {
|
||||
[self = self, tenantBatch = tenantBatch, tenantIdPrefix = tenantIdPrefix](
|
||||
Reference<typename DB::TransactionT> tr) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
return addTenantBatchToManagementCluster(self, tr, tenantBatch);
|
||||
return addTenantBatchToManagementCluster(self, tr, tenantBatch, tenantIdPrefix);
|
||||
}));
|
||||
}
|
||||
|
||||
@ -2179,6 +2248,19 @@ struct RestoreClusterImpl {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> finalizeDataClusterAfterRepopulate(RestoreClusterImpl* self, Reference<ITransaction> tr) {
|
||||
bool erased = wait(eraseRestoreId(self, tr));
|
||||
if (erased) {
|
||||
if (self->newLastDataClusterTenantId.present()) {
|
||||
TenantMetadata::lastTenantId().set(tr, self->newLastDataClusterTenantId.get());
|
||||
} else {
|
||||
TenantMetadata::lastTenantId().clear(tr);
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> runDataClusterRestore(RestoreClusterImpl* self) {
|
||||
// Run a management transaction to populate the data cluster metadata
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
@ -2192,10 +2274,8 @@ struct RestoreClusterImpl {
|
||||
// set state to restoring
|
||||
if (!self->restoreDryRun) {
|
||||
try {
|
||||
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
|
||||
self->markClusterRestoring(tr);
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
[self = self](Reference<typename DB::TransactionT> tr) { return markClusterRestoring(self, tr); }));
|
||||
} catch (Error& e) {
|
||||
// If the transaction retries after success or if we are trying a second time to restore the cluster, it
|
||||
// will throw an error indicating that the restore has already started
|
||||
@ -2205,11 +2285,16 @@ struct RestoreClusterImpl {
|
||||
}
|
||||
}
|
||||
|
||||
// Set the restore ID in the data cluster
|
||||
// Set the restore ID in the data cluster and update the last tenant ID to match the management cluster
|
||||
if (!self->restoreDryRun) {
|
||||
wait(self->ctx.runDataClusterTransaction([self = self](Reference<ITransaction> tr) {
|
||||
MetaclusterMetadata::activeRestoreIds().addReadConflictKey(tr, self->clusterName);
|
||||
MetaclusterMetadata::activeRestoreIds().set(tr, self->clusterName, self->restoreId);
|
||||
if (self->lastManagementClusterTenantId.present()) {
|
||||
TenantMetadata::lastTenantId().set(tr, self->lastManagementClusterTenantId.get());
|
||||
} else {
|
||||
TenantMetadata::lastTenantId().clear(tr);
|
||||
}
|
||||
return Future<Void>(Void());
|
||||
}));
|
||||
}
|
||||
@ -2232,7 +2317,7 @@ struct RestoreClusterImpl {
|
||||
if (!self->restoreDryRun) {
|
||||
// Remove the active restore ID from the data cluster
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self](Reference<ITransaction> tr) { return eraseRestoreId(self, tr); }));
|
||||
[self = self](Reference<ITransaction> tr) { return success(eraseRestoreId(self, tr)); }));
|
||||
|
||||
// set restored cluster to ready state
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
@ -2276,7 +2361,7 @@ struct RestoreClusterImpl {
|
||||
if (!self->restoreDryRun) {
|
||||
// Remove the active restore ID from the data cluster
|
||||
wait(self->ctx.runDataClusterTransaction(
|
||||
[self = self](Reference<ITransaction> tr) { return eraseRestoreId(self, tr); }));
|
||||
[self = self](Reference<ITransaction> tr) { return finalizeDataClusterAfterRepopulate(self, tr); }));
|
||||
|
||||
// set restored cluster to ready state
|
||||
wait(self->ctx.runManagementTransaction(
|
||||
@ -2303,9 +2388,16 @@ Future<Void> restoreCluster(Reference<DB> db,
|
||||
ApplyManagementClusterUpdates applyManagementClusterUpdates,
|
||||
RestoreDryRun restoreDryRun,
|
||||
ForceJoin forceJoin,
|
||||
ForceReuseTenantIdPrefix forceReuseTenantIdPrefix,
|
||||
std::vector<std::string>* messages) {
|
||||
state RestoreClusterImpl<DB> impl(
|
||||
db, name, connectionString, applyManagementClusterUpdates, restoreDryRun, forceJoin, *messages);
|
||||
state RestoreClusterImpl<DB> impl(db,
|
||||
name,
|
||||
connectionString,
|
||||
applyManagementClusterUpdates,
|
||||
restoreDryRun,
|
||||
forceJoin,
|
||||
forceReuseTenantIdPrefix,
|
||||
*messages);
|
||||
wait(impl.run());
|
||||
return Void();
|
||||
}
|
||||
@ -2550,9 +2642,15 @@ struct CreateTenantImpl {
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> storeTenantInDataCluster(CreateTenantImpl* self, Reference<ITransaction> tr) {
|
||||
std::pair<Optional<TenantMapEntry>, bool> dataClusterTenant = wait(TenantAPI::createTenantTransaction(
|
||||
state Future<int64_t> lastTenantIdFuture = TenantMetadata::lastTenantId().getD(tr, Snapshot::False, -1);
|
||||
state std::pair<Optional<TenantMapEntry>, bool> dataClusterTenant = wait(TenantAPI::createTenantTransaction(
|
||||
tr, self->tenantEntry.toTenantMapEntry(), ClusterType::METACLUSTER_DATA));
|
||||
|
||||
int64_t lastTenantId = wait(lastTenantIdFuture);
|
||||
if (lastTenantId < self->tenantEntry.id) {
|
||||
TenantMetadata::lastTenantId().set(tr, self->tenantEntry.id);
|
||||
}
|
||||
|
||||
// If the tenant map entry is empty, then we encountered a tombstone indicating that the tenant was
|
||||
// simultaneously removed.
|
||||
if (!dataClusterTenant.first.present()) {
|
||||
|
@ -195,6 +195,15 @@ private:
|
||||
ASSERT(data.metaclusterRegistration.get().name == clusterName);
|
||||
ASSERT(data.metaclusterRegistration.get().id == clusterMetadata.entry.id);
|
||||
|
||||
if (data.tenantData.lastTenantId >= 0) {
|
||||
ASSERT_EQ(TenantAPI::getTenantIdPrefix(data.tenantData.lastTenantId), managementData.tenantIdPrefix);
|
||||
ASSERT_LE(data.tenantData.lastTenantId, managementData.tenantData.lastTenantId);
|
||||
} else {
|
||||
for (auto const& [id, tenant] : data.tenantData.tenantMap) {
|
||||
ASSERT_NE(TenantAPI::getTenantIdPrefix(id), managementData.tenantIdPrefix);
|
||||
}
|
||||
}
|
||||
|
||||
std::set<int64_t> expectedTenants;
|
||||
auto clusterTenantMapItr = managementData.clusterTenantMap.find(clusterName);
|
||||
if (clusterTenantMapItr != managementData.clusterTenantMap.end()) {
|
||||
|
@ -62,11 +62,8 @@ private:
|
||||
ASSERT_EQ(tenantId, tenantMapEntry.id);
|
||||
ASSERT_EQ(tenantData.tenantNameIndex[tenantMapEntry.tenantName], tenantId);
|
||||
|
||||
// Data clusters do not keep their last tenant ID up to date while part of a metacluster
|
||||
if (tenantData.clusterType != ClusterType::METACLUSTER_DATA) {
|
||||
if (TenantAPI::getTenantIdPrefix(tenantId) == TenantAPI::getTenantIdPrefix(tenantData.lastTenantId)) {
|
||||
ASSERT_LE(tenantId, tenantData.lastTenantId);
|
||||
}
|
||||
if (TenantAPI::getTenantIdPrefix(tenantId) == TenantAPI::getTenantIdPrefix(tenantData.lastTenantId)) {
|
||||
ASSERT_LE(tenantId, tenantData.lastTenantId);
|
||||
}
|
||||
|
||||
if (tenantMapEntry.tenantGroup.present()) {
|
||||
|
@ -356,6 +356,7 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
|
||||
applyManagementClusterUpdates,
|
||||
RestoreDryRun::True,
|
||||
forceJoin,
|
||||
ForceReuseTenantIdPrefix::True,
|
||||
&messages));
|
||||
|
||||
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestoreDryRunDone", debugId)
|
||||
@ -371,6 +372,7 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
|
||||
applyManagementClusterUpdates,
|
||||
RestoreDryRun::False,
|
||||
forceJoin,
|
||||
ForceReuseTenantIdPrefix::True,
|
||||
&messages));
|
||||
|
||||
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestoreComplete", debugId)
|
||||
|
@ -380,6 +380,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
||||
state Reference<DataClusterData> dataDb = self->dataDbs[clusterName];
|
||||
state bool dryRun = deterministicRandom()->coinflip();
|
||||
state bool forceJoin = deterministicRandom()->coinflip();
|
||||
state bool forceReuseTenantIdPrefix = deterministicRandom()->coinflip();
|
||||
|
||||
state std::vector<std::string> messages;
|
||||
state bool retried = false;
|
||||
@ -403,6 +404,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
||||
ApplyManagementClusterUpdates(!dataDb->detached),
|
||||
RestoreDryRun(dryRun),
|
||||
ForceJoin(forceJoin),
|
||||
ForceReuseTenantIdPrefix(forceReuseTenantIdPrefix),
|
||||
&messages);
|
||||
Optional<Void> result = wait(timeout(restoreFuture, deterministicRandom()->randomInt(1, 30)));
|
||||
if (!result.present()) {
|
||||
@ -411,6 +413,12 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
||||
}
|
||||
|
||||
ASSERT(dataDb->registered || dataDb->detached);
|
||||
|
||||
// Since we are not creating a new management cluster with updated tenant ID prefix, it will
|
||||
// fail to repopulate a management cluster unless we force reuse of the tenant ID prefix or in
|
||||
// some cases if the cluster is empty
|
||||
ASSERT(forceReuseTenantIdPrefix || !dataDb->detached || dataDb->tenants.empty());
|
||||
|
||||
if (dataDb->detached && !dryRun) {
|
||||
dataDb->detached = false;
|
||||
dataDb->registered = true;
|
||||
@ -445,7 +453,13 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
||||
wait(removeFailedRestoredCluster(self, clusterName));
|
||||
wait(resolveCollisions(self, clusterName, dataDb));
|
||||
continue;
|
||||
} else if (error.code() == error_code_invalid_metacluster_configuration) {
|
||||
ASSERT(!forceReuseTenantIdPrefix && dataDb->detached);
|
||||
wait(removeFailedRestoredCluster(self, clusterName));
|
||||
forceReuseTenantIdPrefix = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
TraceEvent(SevError, "RestoreClusterFailure").error(error).detail("ClusterName", clusterName);
|
||||
ASSERT(false);
|
||||
}
|
||||
|
@ -95,6 +95,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
bool recoverManagementCluster;
|
||||
bool recoverDataClusters;
|
||||
|
||||
int initialTenantIdPrefix;
|
||||
bool backupComplete = false;
|
||||
double endTime = std::numeric_limits<double>::max();
|
||||
|
||||
@ -107,6 +108,9 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
int mode = deterministicRandom()->randomInt(0, 3);
|
||||
recoverManagementCluster = (mode != 2);
|
||||
recoverDataClusters = (mode != 1);
|
||||
|
||||
initialTenantIdPrefix = deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
|
||||
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1);
|
||||
}
|
||||
|
||||
ClusterName chooseClusterName() { return dataDbIndex[deterministicRandom()->randomInt(0, dataDbIndex.size())]; }
|
||||
@ -179,11 +183,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
MultiVersionApi::api->selectApiVersion(cx->apiVersion.version());
|
||||
self->managementDb = MultiVersionDatabase::debugCreateFromExistingDatabase(threadSafeHandle);
|
||||
wait(success(MetaclusterAPI::createMetacluster(
|
||||
self->managementDb,
|
||||
"management_cluster"_sr,
|
||||
deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
|
||||
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1),
|
||||
false)));
|
||||
self->managementDb, "management_cluster"_sr, self->initialTenantIdPrefix, false)));
|
||||
|
||||
ASSERT(g_simulator->extraDatabases.size() > 0);
|
||||
state std::vector<std::string>::iterator extraDatabasesItr;
|
||||
@ -248,6 +248,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
MetaclusterRestoreWorkload* self) {
|
||||
state FileBackupAgent backupAgent;
|
||||
state Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
||||
state ForceReuseTenantIdPrefix forceReuseTenantIdPrefix(deterministicRandom()->coinflip());
|
||||
addDefaultBackupRanges(backupRanges);
|
||||
|
||||
TraceEvent("MetaclusterRestoreWorkloadClearDatabase").detail("ClusterName", clusterName);
|
||||
@ -279,6 +280,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun::True,
|
||||
forceJoin,
|
||||
forceReuseTenantIdPrefix,
|
||||
&messages));
|
||||
|
||||
state MetaclusterData<IDatabase> postDryRunMetaclusterData(self->managementDb);
|
||||
@ -309,6 +311,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
ApplyManagementClusterUpdates::True,
|
||||
RestoreDryRun::False,
|
||||
forceJoin,
|
||||
forceReuseTenantIdPrefix,
|
||||
&messages)));
|
||||
wait(delay(deterministicRandom()->random01() * 5));
|
||||
}
|
||||
@ -535,12 +538,17 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
|
||||
ACTOR static Future<Void> restoreManagementCluster(MetaclusterRestoreWorkload* self) {
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoringManagementCluster");
|
||||
wait(success(MetaclusterAPI::createMetacluster(
|
||||
self->managementDb,
|
||||
"management_cluster"_sr,
|
||||
deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
|
||||
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1),
|
||||
false)));
|
||||
|
||||
state int newTenantIdPrefix = self->initialTenantIdPrefix;
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
while (newTenantIdPrefix == self->initialTenantIdPrefix) {
|
||||
newTenantIdPrefix = deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
|
||||
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1);
|
||||
}
|
||||
}
|
||||
|
||||
wait(success(
|
||||
MetaclusterAPI::createMetacluster(self->managementDb, "management_cluster"_sr, newTenantIdPrefix, false)));
|
||||
state std::map<ClusterName, DataClusterData>::iterator clusterItr;
|
||||
for (clusterItr = self->dataDbs.begin(); clusterItr != self->dataDbs.end(); ++clusterItr) {
|
||||
TraceEvent("MetaclusterRestoreWorkloadProcessDataCluster").detail("FromCluster", clusterItr->first);
|
||||
@ -587,6 +595,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
ApplyManagementClusterUpdates::False,
|
||||
RestoreDryRun::True,
|
||||
ForceJoin(deterministicRandom()->coinflip()),
|
||||
ForceReuseTenantIdPrefix(newTenantIdPrefix == self->initialTenantIdPrefix),
|
||||
&messages));
|
||||
|
||||
state MetaclusterData<IDatabase> postDryRunMetaclusterData(self->managementDb);
|
||||
@ -621,6 +630,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
ApplyManagementClusterUpdates::False,
|
||||
RestoreDryRun::False,
|
||||
ForceJoin(deterministicRandom()->coinflip()),
|
||||
ForceReuseTenantIdPrefix(newTenantIdPrefix == self->initialTenantIdPrefix),
|
||||
&messagesList[restoreFutures.size()])));
|
||||
wait(delay(deterministicRandom()->random01() * 5));
|
||||
}
|
||||
|
@ -278,6 +278,7 @@ ERROR( cluster_restoring, 2170, "The cluster is being restored to the metacluste
|
||||
ERROR( invalid_data_cluster, 2171, "The data cluster being restored has no record of its metacluster" )
|
||||
ERROR( metacluster_mismatch, 2172, "The cluster does not have the expected name or is associated with a different metacluster" )
|
||||
ERROR( conflicting_restore, 2173, "Another restore is running for the same data cluster" )
|
||||
ERROR( invalid_metacluster_configuration, 2174, "Metacluster configuration is invalid" )
|
||||
|
||||
// 2200 - errors from bindings and official APIs
|
||||
ERROR( api_version_unset, 2200, "API version is not set" )
|
||||
|
Loading…
x
Reference in New Issue
Block a user