1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-23 15:36:56 +08:00

Cleanup stale disk files for double recruitment of storage server ()

This commit is contained in:
Hui Liu 2023-04-06 12:13:59 -07:00 committed by GitHub
parent ecc6d5a712
commit 396f89a3f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 141 additions and 25 deletions

@ -935,6 +935,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( PEER_DEGRADATION_CONNECTION_FAILURE_COUNT, 5 );
init( WORKER_HEALTH_REPORT_RECENT_DESTROYED_PEER, true );
init( STORAGE_SERVER_REBOOT_ON_IO_TIMEOUT, false ); if ( randomize && BUGGIFY ) STORAGE_SERVER_REBOOT_ON_IO_TIMEOUT = true;
init( STORAGE_DISK_CLEANUP_MAX_RETRIES, 10 );
init( STORAGE_DISK_CLEANUP_RETRY_INTERVAL, isSimulated ? 2 : 30 );
// Test harness
init( WORKER_POLL_DELAY, 1.0 );

@ -903,6 +903,8 @@ public:
// Enabling this can reduce toil of manually restarting the SS.
// Enable with caution: If io_timeout is caused by disk failure, we won't
// want to restart the SS, which increases risk of data corruption.
int STORAGE_DISK_CLEANUP_MAX_RETRIES; // Max retries to cleanup left-over disk files from last storage server
int STORAGE_DISK_CLEANUP_RETRY_INTERVAL; // Sleep interval between cleanup retries
// Test harness
double WORKER_POLL_DELAY;

@ -1887,7 +1887,6 @@ Future<T> forwardError(Future<T> f, Promise<Void> target) {
if (e.code() != error_code_actor_cancelled && target.canBeSet()) {
target.sendError(e);
}
throw;
}
}
@ -8055,7 +8054,7 @@ public:
// Only proceed if the last commit is a success, but don't throw if it's not because shutdown
// should not throw.
wait(ready(self->m_lastCommit));
if (!self->m_lastCommit.isError()) {
if (!self->getErrorNoDelay().isReady()) {
// Run the destructive sanity check, but don't throw.
ErrorOr<Void> err = wait(errorOr(self->m_tree->clearAllAndCheckSanity()));
// If the test threw an error, it must be an injected fault or something has gone wrong.
@ -8100,7 +8099,9 @@ public:
StorageBytes getStorageBytes() const override { return m_tree->getStorageBytes(); }
Future<Void> getError() const override { return delayed(m_errorPromise.getFuture() || m_tree->getError()); };
Future<Void> getError() const override { return delayed(getErrorNoDelay()); }
Future<Void> getErrorNoDelay() const { return m_errorPromise.getFuture() || m_tree->getError(); };
void clear(KeyRangeRef range, const Arena* arena = 0) override {
debug_printf("CLEAR %s\n", printable(range).c_str());

@ -21,10 +21,12 @@
#include <cstdlib>
#include <tuple>
#include <boost/lexical_cast.hpp>
#include <unordered_map>
#include "fdbclient/FDBTypes.h"
#include "fdbserver/BlobMigratorInterface.h"
#include "flow/ApiVersion.h"
#include "flow/CodeProbe.h"
#include "flow/IAsyncFile.h"
#include "fdbrpc/Locality.h"
#include "fdbclient/GetEncryptCipherKeys_impl.actor.h"
@ -1330,20 +1332,55 @@ ACTOR Future<Void> monitorHighMemory(int64_t threshold) {
return Void();
}
struct StorageDiskCleaner {
KeyValueStoreType storeType;
LocalityData locality;
std::string filename;
Future<Void> future;
};
struct TrackRunningStorage {
UID self;
KeyValueStoreType storeType;
LocalityData locality;
std::string filename;
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages;
std::unordered_map<UID, StorageDiskCleaner>* storageCleaners;
TrackRunningStorage(UID self,
KeyValueStoreType storeType,
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages)
: self(self), storeType(storeType), runningStorages(runningStorages) {
LocalityData locality,
const std::string& filename,
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages,
std::unordered_map<UID, StorageDiskCleaner>* storageCleaners)
: self(self), storeType(storeType), locality(locality), filename(filename), runningStorages(runningStorages),
storageCleaners(storageCleaners) {
runningStorages->emplace(self, storeType);
}
~TrackRunningStorage() { runningStorages->erase(std::make_pair(self, storeType)); };
~TrackRunningStorage() {
runningStorages->erase(std::make_pair(self, storeType));
// Start a disk cleaner except for tss data store
try {
if (basename(filename).find(testingStoragePrefix.toString()) != 0) {
if (!storageCleaners->contains(self)) {
StorageDiskCleaner cleaner;
cleaner.storeType = storeType;
cleaner.locality = locality;
cleaner.filename = filename;
cleaner.future = Void(); // cleaner task will start later
storageCleaners->insert({ self, cleaner });
TraceEvent("AddStorageCleaner", self).detail("Size", storageCleaners->size());
}
}
} catch (Error& e) {
TraceEvent("SkipStorageCleaner", self).error(e).detail("File", filename);
}
};
};
ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValueStoreType>>* runningStorages,
std::unordered_map<UID, StorageDiskCleaner>* storageCleaners,
Future<Void> prevStorageServer,
KeyValueStoreType storeType,
std::string filename,
@ -1357,7 +1394,7 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
IKeyValueStore* store,
bool validateDataFiles,
Promise<Void>* rebootKVStore) {
state TrackRunningStorage _(id, storeType, runningStorages);
state TrackRunningStorage _(id, storeType, locality, filename, runningStorages, storageCleaners);
loop {
ErrorOr<Void> e = wait(errorOr(prevStorageServer));
if (!e.isError())
@ -1741,6 +1778,65 @@ ACTOR Future<Void> updateClusterId(UID ccClusterId, Reference<AsyncVar<Optional<
return Void();
}
ACTOR Future<Void> cleanupStaleStorageDisk(Reference<AsyncVar<ServerDBInfo>> dbInfo,
std::unordered_map<UID, StorageDiskCleaner>* cleaners,
UID storeID,
StorageDiskCleaner cleaner,
int64_t memoryLimit) {
state int retries = 0;
loop {
try {
if (retries > SERVER_KNOBS->STORAGE_DISK_CLEANUP_MAX_RETRIES) {
TraceEvent("SkipDiskCleanup").detail("Filename", cleaner.filename).detail("StoreID", storeID);
return Void();
}
TraceEvent("StorageServerLivenessCheck").detail("StoreID", storeID).detail("Retry", retries);
Reference<CommitProxyInfo> commitProxies(new CommitProxyInfo(dbInfo->get().client.commitProxies));
if (commitProxies->size() == 0) {
TraceEvent("SkipDiskCleanup").log();
return Void();
}
GetStorageServerRejoinInfoRequest request(storeID, cleaner.locality.dcId());
GetStorageServerRejoinInfoReply _rep =
wait(basicLoadBalance(commitProxies, &CommitProxyInterface::getStorageServerRejoinInfo, request));
// a successful response means the storage server is still alive
retries++;
} catch (Error& e) {
// error worker_removed indicates the storage server has been removed, so it's safe to delete its data
if (e.code() == error_code_worker_removed) {
// delete the files on disk
if (fileExists(cleaner.filename)) {
state IKeyValueStore* kvs = openKVStore(
cleaner.storeType, cleaner.filename, storeID, memoryLimit, false, false, false, dbInfo, {});
wait(ready(kvs->init()));
kvs->dispose();
CODE_PROBE(true, "Removed stale disk file");
TraceEvent("RemoveStorageDisk").detail("Filename", cleaner.filename).detail("StoreID", storeID);
}
// remove the cleaner
cleaners->erase(storeID);
return Void();
}
}
wait(delay(SERVER_KNOBS->STORAGE_DISK_CLEANUP_RETRY_INTERVAL));
}
}
// Delete storage server data files if it's not alive anymore
void cleanupStorageDisks(Reference<AsyncVar<ServerDBInfo>> dbInfo,
std::unordered_map<UID, StorageDiskCleaner>& storageCleaners,
int64_t memoryLimit) {
for (auto& cleaner : storageCleaners) {
if (cleaner.second.future.isReady()) {
CODE_PROBE(true, "Cleanup stale disk stores for double recruitment");
cleaner.second.future =
cleanupStaleStorageDisk(dbInfo, &storageCleaners, cleaner.first, cleaner.second, memoryLimit);
}
}
}
ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
LocalityData locality,
@ -1802,7 +1898,14 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state std::string coordFolder = abspath(_coordFolder);
state WorkerInterface interf(locality);
state std::set<std::pair<UID, KeyValueStoreType>> runningStorages;
// storageCleaners manages cleanup actors after a storage server is terminated. It cleans up
// stale disk files in case storage server is terminated for io_timeout or io_error but the worker
// process is still alive. If worker process is alive, it may be recruited as a new storage server
// and leave the stale disk file unattended.
state std::unordered_map<UID, StorageDiskCleaner> storageCleaners;
interf.initEndpoints();
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
@ -1910,8 +2013,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
// std::string doesn't have startsWith
std::string tssPrefix = testingStoragePrefix.toString();
// TODO might be more efficient to mark a boolean on DiskStore in getDiskStores, but that kind of breaks
// the abstraction since DiskStore also applies to storage cache + tlog
// TODO might be more efficient to mark a boolean on DiskStore in getDiskStores, but that kind of
// breaks the abstraction since DiskStore also applies to storage cache + tlog
bool isTss = s.filename.find(tssPrefix) != std::string::npos;
Role ssRole = isTss ? Role::TESTING_STORAGE_SERVER : Role::STORAGE_SERVER;
@ -1920,8 +2023,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
recruited.locality = locality;
recruited.tssPairID =
isTss ? Optional<UID>(UID())
: Optional<UID>(); // presence of optional is used as source of truth for tss vs not. Value
// gets overridden later in restoreDurableState
: Optional<UID>(); // presence of optional is used as source of truth for tss vs not.
// Value gets overridden later in restoreDurableState
recruited.initEndpoints();
std::map<std::string, std::string> details;
@ -1954,6 +2057,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
recoveries.push_back(recovery.getFuture());
f = handleIOErrors(f, kv, s.storeID, kvClosed);
f = storageServerRollbackRebooter(&runningStorages,
&storageCleaners,
f,
s.storeType,
s.filename,
@ -2128,13 +2232,13 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
// to make sure:
// (1) the worker can start serving requests once it is recruited as storage or TLog server, and
// (2) a slow recovering worker server wouldn't been recruited as TLog and make recovery slow.
// However, the worker server can still serve stateless roles, and if encryption is on, it is crucial to have
// some worker available to serve the EncryptKeyProxy role, before opening encrypted storage files.
// However, the worker server can still serve stateless roles, and if encryption is on, it is crucial to
// have some worker available to serve the EncryptKeyProxy role, before opening encrypted storage files.
//
// To achieve it, registrationClient allows a worker to first register with the cluster controller to be
// recruited only as a stateless process i.e. it can't be recruited as a SS or TLog process; once the local disk
// recovery is complete (if applicable), the process re-registers with cluster controller as a stateful process
// role.
// recruited only as a stateless process i.e. it can't be recruited as a SS or TLog process; once the local
// disk recovery is complete (if applicable), the process re-registers with cluster controller as a stateful
// process role.
Promise<Void> recoveredDiskFiles;
Future<Void> recoverDiskFiles = trigger(
[=]() {
@ -2395,17 +2499,16 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
CODE_PROBE(true, "Recruited while already a blob manager.");
} else if (lastBMRecruitRequestId == req.reqId && !bmEpochAndInterf->get().present()) {
// The previous blob manager WAS present, like the above case, but it died before the CC got the
// response to the recruitment request, so the CC retried to recruit the same blob manager id/epoch
// from the same reqId. To keep epoch safety between different managers, instead of restarting the
// same manager id at the same epoch, we should just tell it the original request succeeded, and let
// it realize this manager died via failure detection and start a new one.
// response to the recruitment request, so the CC retried to recruit the same blob manager
// id/epoch from the same reqId. To keep epoch safety between different managers, instead of
// restarting the same manager id at the same epoch, we should just tell it the original request
// succeeded, and let it realize this manager died via failure detection and start a new one.
CODE_PROBE(true, "Recruited while formerly the same blob manager.", probe::decoration::rare);
} else {
// TODO: it'd be more optimal to halt the last manager if present here, but it will figure it out
// via the epoch check
// Also, not halting lets us handle the case here where the last BM had a higher
// epoch and somehow the epochs got out of order by a delayed initialize request. The one we start
// here will just halt on the lock check.
// TODO: it'd be more optimal to halt the last manager if present here, but it will figure it
// out via the epoch check Also, not halting lets us handle the case here where the last BM had
// a higher epoch and somehow the epochs got out of order by a delayed initialize request. The
// one we start here will just halt on the lock check.
startRole(Role::BLOB_MANAGER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.haltBlobManager);
@ -2609,6 +2712,13 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
ASSERT(req.initialClusterVersion >= 0);
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
// When a new storage server is recruited, we need to check if any other storage
// server has run on this worker process(a.k.a double recruitment). The previous storage
// server may have leftover disk files if it stopped with io_error or io_timeout. Now DD
// already repairs the team and it's time to start the cleanup
cleanupStorageDisks(dbInfo, storageCleaners, memoryLimit);
bool isTss = req.tssPairIDAndVersion.present();
StorageServerInterface recruited(req.interfaceId);
recruited.locality = locality;
@ -2679,6 +2789,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
s = handleIOErrors(s, data, recruited.id(), kvClosed);
s = storageCache.removeOnReady(req.reqId, s);
s = storageServerRollbackRebooter(&runningStorages,
&storageCleaners,
s,
req.storeType,
filename,