mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-03 03:41:53 +08:00
Add support for modifying a data cluster that is being restored so that we can manage conflicts
This commit is contained in:
parent
2d59c5681d
commit
f9a68056ac
@ -151,13 +151,37 @@ ACTOR Future<bool> metaclusterRemoveCommand(Reference<IDatabase> db, std::vector
|
||||
fmt::print("Removes the specified data cluster from a metacluster.\n");
|
||||
fmt::print("If FORCE is specified, then the cluster will be detached even if it has\n"
|
||||
"tenants assigned to it.\n");
|
||||
fmt::print("If run on a data cluster, the data cluster will remove its association\n"
|
||||
"with the metacluster without modifying the management cluster. Doing so\n"
|
||||
"requires the FORCE option to be set. Use of this mode is required to\n"
|
||||
"repopulate a management cluster from a data cluster using the\n"
|
||||
"`metacluster restore' command.\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
state ClusterNameRef clusterName = tokens[tokens.size() - 1];
|
||||
wait(MetaclusterAPI::removeCluster(db, clusterName, tokens.size() == 4));
|
||||
state bool force = tokens.size() == 4;
|
||||
|
||||
fmt::print("The cluster `{}' has been removed\n", printable(clusterName).c_str());
|
||||
state ClusterType clusterType = wait(runTransaction(db, [](Reference<ITransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
return TenantAPI::getClusterType(tr);
|
||||
}));
|
||||
|
||||
if (clusterType == ClusterType::METACLUSTER_DATA && !force) {
|
||||
fmt::print("ERROR: cannot remove a data cluster directly. To remove a data cluster,\n"
|
||||
"use the `remove' command on the management cluster. To force a data cluster\n"
|
||||
"to forget its metacluster association without fully removing it, use FORCE.\n");
|
||||
}
|
||||
|
||||
wait(MetaclusterAPI::removeCluster(db, clusterName, clusterType, force));
|
||||
|
||||
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
fmt::print("The cluster `{}' has been removed\n", printable(clusterName).c_str());
|
||||
} else {
|
||||
fmt::print("The cluster `{}' has removed its association with its metacluster.\n"
|
||||
"The metacluster has not been modified.\n",
|
||||
printable(clusterName).c_str());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -166,17 +190,23 @@ ACTOR Future<bool> metaclusterRestoreCommand(Reference<IDatabase> db, std::vecto
|
||||
if (tokens.size() != 5) {
|
||||
fmt::print("Usage: metacluster restore <NAME> connection_string=<CONNECTION_STRING>\n"
|
||||
"<restore_known_data_cluster|repopulate_from_data_cluster>\n\n");
|
||||
fmt::print("Add a restored data cluster back to a metacluster.\n");
|
||||
|
||||
fmt::print("Add a restored data cluster back to a metacluster.\n\n");
|
||||
|
||||
fmt::print("Use `restore_known_data_cluster' to add back a restored copy of a data cluster\n");
|
||||
fmt::print("that the metacluster is already tracking. This mode should be used if only data\n");
|
||||
fmt::print("clusters are being restored, and any discrepancies between the management and\n");
|
||||
fmt::print("data clusters will be resolved using the management cluster metadata.\n");
|
||||
fmt::print("data clusters will be resolved using the management cluster metadata.\n\n");
|
||||
|
||||
fmt::print("Use `repopulate_from_data_cluster' to rebuild a lost management cluster from the\n");
|
||||
fmt::print("data clusters in a metacluster. This mode should be used if the management\n");
|
||||
fmt::print("cluster is being restored. If any data clusters are also being restored, the\n");
|
||||
fmt::print("oldest data clusters should be added first before any non-recovered data\n");
|
||||
fmt::print("clusters. Any discrepancies arising between the data clusters will be resolved\n");
|
||||
fmt::print("using the data cluster that was added last.");
|
||||
fmt::print("clusters. Any conflicts arising between the added data cluster and existing data\n");
|
||||
fmt::print("will cause the restore to fail. Before repopulating a metacluster from a data\n");
|
||||
fmt::print("cluster, that data cluster needs to be detached from its prior metacluster using\n");
|
||||
fmt::print("the `metacluster remove' command.\n");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -604,7 +604,8 @@ void updateClusterMetadata(Transaction tr,
|
||||
if (previousMetadata.entry.clusterState == DataClusterState::REMOVING) {
|
||||
throw cluster_removed();
|
||||
} else if (!isRestoring && previousMetadata.entry.clusterState == DataClusterState::RESTORING &&
|
||||
(!updatedEntry.present() || updatedEntry.get().clusterState != DataClusterState::READY)) {
|
||||
(!updatedEntry.present() || (updatedEntry.get().clusterState != DataClusterState::READY &&
|
||||
updatedEntry.get().clusterState != DataClusterState::REMOVING))) {
|
||||
throw cluster_restoring();
|
||||
} else if (isRestoring) {
|
||||
ASSERT(previousMetadata.entry.clusterState == DataClusterState::RESTORING ||
|
||||
@ -627,6 +628,7 @@ static Future<Void> registerInManagementCluster(Transaction tr,
|
||||
state Optional<DataClusterMetadata> dataClusterMetadata = wait(tryGetClusterTransaction(tr, clusterName));
|
||||
if (dataClusterMetadata.present() &&
|
||||
!dataClusterMetadata.get().matchesConfiguration(DataClusterMetadata(clusterEntry, connectionString))) {
|
||||
TraceEvent("RegisterClusterAlreadyExists").detail("ClusterName", clusterName);
|
||||
throw cluster_already_exists();
|
||||
} else if (!dataClusterMetadata.present()) {
|
||||
clusterEntry.allocated = ClusterUsage();
|
||||
@ -765,14 +767,16 @@ struct RemoveClusterImpl {
|
||||
MetaclusterOperationContext<DB> ctx;
|
||||
|
||||
// Initialization parameters
|
||||
Reference<DB> db;
|
||||
ClusterType clusterType;
|
||||
bool forceRemove;
|
||||
|
||||
// Parameters set in markClusterRemoving
|
||||
Optional<int64_t> lastTenantId;
|
||||
|
||||
RemoveClusterImpl(Reference<DB> managementDb, ClusterName clusterName, bool forceRemove)
|
||||
: ctx(managementDb, clusterName, { DataClusterState::REMOVING, DataClusterState::RESTORING }),
|
||||
forceRemove(forceRemove) {}
|
||||
RemoveClusterImpl(Reference<DB> db, ClusterName clusterName, ClusterType clusterType, bool forceRemove)
|
||||
: ctx(db, clusterName, { DataClusterState::REMOVING, DataClusterState::RESTORING }), db(db),
|
||||
clusterType(clusterType), forceRemove(forceRemove) {}
|
||||
|
||||
// Returns false if the cluster is no longer present, or true if it is present and the removal should proceed.
|
||||
ACTOR static Future<bool> markClusterRemoving(RemoveClusterImpl* self, Reference<typename DB::TransactionT> tr) {
|
||||
@ -811,7 +815,8 @@ struct RemoveClusterImpl {
|
||||
}
|
||||
|
||||
// Delete metacluster metadata from the data cluster
|
||||
ACTOR static Future<Void> updateDataCluster(RemoveClusterImpl* self, Reference<ITransaction> tr) {
|
||||
ACTOR template <class Transaction>
|
||||
static Future<Void> updateDataCluster(RemoveClusterImpl* self, Reference<Transaction> tr) {
|
||||
// Delete metacluster related metadata
|
||||
MetaclusterMetadata::metaclusterRegistration().clear(tr);
|
||||
TenantMetadata::tenantTombstones().clear(tr);
|
||||
@ -828,9 +833,7 @@ struct RemoveClusterImpl {
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("ReconfiguredDataCluster")
|
||||
.detail("Name", self->ctx.clusterName.get())
|
||||
.detail("Version", tr->getCommittedVersion());
|
||||
TraceEvent("ReconfiguredDataCluster").detail("Name", self->ctx.clusterName.get());
|
||||
|
||||
return Void();
|
||||
}
|
||||
@ -965,7 +968,51 @@ struct RemoveClusterImpl {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> dataClusterForgetMetacluster(RemoveClusterImpl* self,
|
||||
Reference<typename DB::TransactionT> tr) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state Optional<MetaclusterRegistrationEntry> metaclusterRegistrationEntry =
|
||||
wait(MetaclusterMetadata::metaclusterRegistration().get(tr));
|
||||
|
||||
if (!metaclusterRegistrationEntry.present()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
if (metaclusterRegistrationEntry.get().clusterType != ClusterType::METACLUSTER_DATA) {
|
||||
TraceEvent(SevWarn, "CannotRemoveNonDataCluster")
|
||||
.detail("ClusterName", self->ctx.clusterName.get())
|
||||
.detail("MetaclusterRegistration",
|
||||
metaclusterRegistrationEntry.map(&MetaclusterRegistrationEntry::toString));
|
||||
throw invalid_metacluster_operation();
|
||||
}
|
||||
|
||||
if (metaclusterRegistrationEntry.get().name != self->ctx.clusterName.get()) {
|
||||
TraceEvent(SevWarn, "CannotRemoveDataClusterWithNameMismatch")
|
||||
.detail("ExpectedName", self->ctx.clusterName.get())
|
||||
.detail("MetaclusterRegistration",
|
||||
metaclusterRegistrationEntry.map(&MetaclusterRegistrationEntry::toString));
|
||||
throw invalid_metacluster_operation();
|
||||
}
|
||||
|
||||
wait(updateDataCluster(self, tr));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> run(RemoveClusterImpl* self) {
|
||||
// On data clusters, we forget the metacluster information without updating the management cluster
|
||||
if (self->clusterType == ClusterType::METACLUSTER_DATA) {
|
||||
if (!self->forceRemove) {
|
||||
throw invalid_metacluster_operation();
|
||||
}
|
||||
|
||||
wait(runTransaction(self->db, [self = self](Reference<typename DB::TransactionT> tr) {
|
||||
return dataClusterForgetMetacluster(self, tr);
|
||||
}));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
state bool clusterIsPresent;
|
||||
try {
|
||||
wait(store(clusterIsPresent,
|
||||
@ -1003,15 +1050,14 @@ struct RemoveClusterImpl {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
Future<Void> run() { return run(this); }
|
||||
};
|
||||
|
||||
ACTOR template <class DB>
|
||||
Future<Void> removeCluster(Reference<DB> db, ClusterName name, bool forceRemove) {
|
||||
state RemoveClusterImpl<DB> impl(db, name, forceRemove);
|
||||
Future<Void> removeCluster(Reference<DB> db, ClusterName name, ClusterType clusterType, bool forceRemove) {
|
||||
state RemoveClusterImpl<DB> impl(db, name, clusterType, forceRemove);
|
||||
wait(impl.run());
|
||||
return Void();
|
||||
}
|
||||
@ -1174,11 +1220,12 @@ struct RestoreClusterImpl {
|
||||
messages(messages) {}
|
||||
|
||||
// If restoring a data cluster, verify that it has a matching registration entry
|
||||
// If adding a data cluster to a restored management cluster, update the data cluster registration entry
|
||||
// If adding a data cluster to a restored management cluster, create a new data cluster registration entry
|
||||
// with the new management cluster name/ID
|
||||
ACTOR static Future<Void> processMetaclusterRegistration(RestoreClusterImpl* self) {
|
||||
state Reference<IDatabase> db = wait(openDatabase(self->connectionString));
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
state UID dataClusterId = deterministicRandom()->randomUniqueID();
|
||||
|
||||
loop {
|
||||
try {
|
||||
@ -1186,24 +1233,38 @@ struct RestoreClusterImpl {
|
||||
state Optional<MetaclusterRegistrationEntry> metaclusterRegistration =
|
||||
wait(MetaclusterMetadata::metaclusterRegistration().get(tr));
|
||||
|
||||
if (!metaclusterRegistration.present()) {
|
||||
throw invalid_data_cluster();
|
||||
}
|
||||
|
||||
if (!metaclusterRegistration.get().matches(self->ctx.metaclusterRegistration.get())) {
|
||||
if (self->applyManagementClusterUpdates) {
|
||||
if (self->applyManagementClusterUpdates) {
|
||||
if (!metaclusterRegistration.present() && self->applyManagementClusterUpdates) {
|
||||
throw invalid_data_cluster();
|
||||
} else if (!metaclusterRegistration.get().matches(self->ctx.metaclusterRegistration.get()) ||
|
||||
metaclusterRegistration.get().name != self->clusterName) {
|
||||
TraceEvent(SevWarn, "MetaclusterRestoreClusterMismatch")
|
||||
.detail("ExistingRegistration", metaclusterRegistration.get())
|
||||
.detail("ManagementClusterRegistration", self->ctx.metaclusterRegistration.get());
|
||||
throw cluster_already_exists();
|
||||
} else {
|
||||
MetaclusterMetadata::metaclusterRegistration().set(
|
||||
tr,
|
||||
self->ctx.metaclusterRegistration.get().toDataClusterRegistration(
|
||||
metaclusterRegistration.get().name, metaclusterRegistration.get().id));
|
||||
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
}
|
||||
|
||||
self->dataClusterId = metaclusterRegistration.get().id;
|
||||
} else {
|
||||
MetaclusterRegistrationEntry dataClusterEntry =
|
||||
self->ctx.metaclusterRegistration.get().toDataClusterRegistration(self->clusterName,
|
||||
dataClusterId);
|
||||
self->dataClusterId = dataClusterEntry.id;
|
||||
|
||||
if (metaclusterRegistration.present()) {
|
||||
if (dataClusterEntry.matches(metaclusterRegistration.get())) {
|
||||
break;
|
||||
}
|
||||
|
||||
TraceEvent(SevWarn, "MetaclusterRestoreClusterAlreadyRegistered")
|
||||
.detail("ExistingRegistration", metaclusterRegistration.get());
|
||||
throw cluster_already_registered();
|
||||
}
|
||||
|
||||
MetaclusterMetadata::metaclusterRegistration().set(tr, dataClusterEntry);
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
}
|
||||
|
||||
self->dataClusterId = metaclusterRegistration.get().id;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
|
@ -214,8 +214,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
||||
try {
|
||||
loop {
|
||||
// TODO: check force removal
|
||||
Future<Void> removeFuture =
|
||||
MetaclusterAPI::removeCluster(self->managementDb, clusterName, detachCluster);
|
||||
Future<Void> removeFuture = MetaclusterAPI::removeCluster(
|
||||
self->managementDb, clusterName, ClusterType::METACLUSTER_MANAGEMENT, detachCluster);
|
||||
try {
|
||||
Optional<Void> result = wait(timeout(removeFuture, deterministicRandom()->randomInt(1, 30)));
|
||||
if (result.present()) {
|
||||
@ -956,8 +956,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
||||
|
||||
std::vector<Future<Void>> removeClusterFutures;
|
||||
for (auto [clusterName, clusterMetadata] : dataClusters) {
|
||||
removeClusterFutures.push_back(
|
||||
MetaclusterAPI::removeCluster(self->managementDb, clusterName, !deleteTenants));
|
||||
removeClusterFutures.push_back(MetaclusterAPI::removeCluster(
|
||||
self->managementDb, clusterName, ClusterType::METACLUSTER_MANAGEMENT, !deleteTenants));
|
||||
}
|
||||
|
||||
wait(waitForAll(removeClusterFutures));
|
||||
|
@ -307,8 +307,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
// We don't expect the data cluster tenant, so delete it
|
||||
else {
|
||||
removeTrackedTenant(t.second.first);
|
||||
deleteFutures.push_back(TenantAPI::deleteTenant(
|
||||
dataDb.getReference(), t.first, t.second.first, ClusterType::METACLUSTER_DATA));
|
||||
deleteFutures.push_back(TenantAPI::deleteTenant(dataDb.getReference(), t.first, t.second.first));
|
||||
}
|
||||
}
|
||||
|
||||
@ -379,8 +378,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
std::vector<Future<Void>> groupDeletions;
|
||||
for (auto const& t : tenantsInGroup) {
|
||||
self->removeTrackedTenant(t);
|
||||
groupDeletions.push_back(
|
||||
TenantAPI::deleteTenantTransaction(tr, t, ClusterType::METACLUSTER_DATA));
|
||||
groupDeletions.push_back(TenantAPI::deleteTenantTransaction(tr, t));
|
||||
}
|
||||
return waitForAll(groupDeletions);
|
||||
}));
|
||||
@ -403,10 +401,16 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> restoreManagementCluster(MetaclusterRestoreWorkload* self) {
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoringManagementCluster");
|
||||
wait(success(MetaclusterAPI::createMetacluster(self->managementDb, "management_cluster"_sr)));
|
||||
state std::map<ClusterName, DataClusterData>::iterator clusterItr;
|
||||
for (clusterItr = self->dataDbs.begin(); clusterItr != self->dataDbs.end(); ++clusterItr) {
|
||||
TraceEvent("MetaclusterRestoreWorkloadRecoverManagementCluster").detail("FromCluster", clusterItr->first);
|
||||
TraceEvent("MetaclusterRestoreWorkloadProcessDataCluster").detail("FromCluster", clusterItr->first);
|
||||
|
||||
wait(MetaclusterAPI::removeCluster(
|
||||
clusterItr->second.db.getReference(), clusterItr->first, ClusterType::METACLUSTER_DATA, true));
|
||||
TraceEvent("MetaclusterRestoreWorkloadForgotMetacluster").detail("ClusterName", clusterItr->first);
|
||||
|
||||
state KeyBackedRangeResult<std::pair<TenantName, int64_t>> managementTenantList;
|
||||
state KeyBackedRangeResult<std::pair<TenantGroupName, TenantGroupEntry>> managementGroupList;
|
||||
|
||||
@ -467,7 +471,7 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
wait(getDataClusterTenants(clusterItr->second.db));
|
||||
|
||||
try {
|
||||
TraceEvent("MetaclusterRestoreWorkloadRecoverManagementCluster")
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoreManagementCluster")
|
||||
.detail("FromCluster", clusterItr->first)
|
||||
.detail("TenantCollisions", tenantCollisions.size());
|
||||
|
||||
@ -487,6 +491,17 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
if (!failedDueToCollision) {
|
||||
throw;
|
||||
}
|
||||
|
||||
try {
|
||||
wait(MetaclusterAPI::removeCluster(
|
||||
self->managementDb, clusterItr->first, ClusterType::METACLUSTER_MANAGEMENT, true));
|
||||
TraceEvent("MetaclusterRestoreWorkloadRemoveFailedCluster")
|
||||
.detail("ClusterName", clusterItr->first);
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_cluster_not_found) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::pair<int64_t, TenantMapEntry>> dataTenantsAfterRestore =
|
||||
@ -515,7 +530,8 @@ struct MetaclusterRestoreWorkload : TestWorkload {
|
||||
}
|
||||
}
|
||||
}
|
||||
TraceEvent("MetaclusterRestoreWorkloadRecoveredManagementCluster").detail("FromCluster", clusterItr->first);
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoredDataClusterToManagementCluster")
|
||||
.detail("FromCluster", clusterItr->first);
|
||||
}
|
||||
|
||||
TraceEvent("MetaclusterRestoreWorkloadRestoredManagementCluster");
|
||||
|
Loading…
x
Reference in New Issue
Block a user