Ensure only one snapshot can happen at a time.

This commit is contained in:
Kishore Nallan 2024-03-08 12:18:50 +05:30
parent 72d1e62e20
commit 093dae1c9e
2 changed files with 15 additions and 2 deletions

View File

@ -139,6 +139,8 @@ private:
std::atomic<bool> shutting_down;
std::atomic<size_t> pending_writes;
std::atomic<size_t> snapshot_in_progress;
const uint64_t snapshot_interval_s; // frequency of actual snapshotting
uint64_t last_snapshot_ts; // when last snapshot ran

View File

@ -488,6 +488,7 @@ void* ReplicationState::save_snapshot(void* arg) {
std::string file_name = std::string(db_snapshot_name) + "/" + file.BaseName().value();
if (sa->writer->add_file(file_name) != 0) {
sa->done->status().set_error(EIO, "Fail to add file to writer.");
sa->replication_state->snapshot_in_progress = false;
return nullptr;
}
}
@ -500,6 +501,7 @@ void* ReplicationState::save_snapshot(void* arg) {
auto file_name = std::string(analytics_db_snapshot_name) + "/" + file.BaseName().value();
if (sa->writer->add_file(file_name) != 0) {
sa->done->status().set_error(EIO, "Fail to add analytics file to writer.");
sa->replication_state->snapshot_in_progress = false;
return nullptr;
}
}
@ -539,6 +541,7 @@ void* ReplicationState::save_snapshot(void* arg) {
// NOTE: *must* do a dummy write here since snapshots cannot be triggered if no write has happened since the
// last snapshot. By doing a dummy write right after a snapshot, we ensure that this can never be the case.
sa->replication_state->do_dummy_write();
sa->replication_state->snapshot_in_progress = false;
LOG(INFO) << "save_snapshot done";
@ -549,6 +552,7 @@ void* ReplicationState::save_snapshot(void* arg) {
void ReplicationState::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) {
LOG(INFO) << "on_snapshot_save";
snapshot_in_progress = true;
std::string db_snapshot_path = writer->get_path() + "/" + db_snapshot_name;
std::string analytics_db_snapshot_path = writer->get_path() + "/" + analytics_db_snapshot_name;
@ -828,7 +832,7 @@ ReplicationState::ReplicationState(HttpServer* server, BatchedIndexer* batched_i
num_collections_parallel_load(num_collections_parallel_load),
num_documents_parallel_load(num_documents_parallel_load),
read_caught_up(false), write_caught_up(false),
ready(false), shutting_down(false), pending_writes(0),
ready(false), shutting_down(false), pending_writes(0), snapshot_in_progress(false),
last_snapshot_ts(std::time(nullptr)), snapshot_interval_s(config->get_snapshot_interval_seconds()) {
}
@ -860,7 +864,14 @@ void ReplicationState::do_snapshot(const std::string& snapshot_path, const std::
return ;
}
LOG(INFO) << "Triggerring an on demand snapshot...";
if(snapshot_in_progress) {
res->set_409("Another snapshot is in progress.");
auto req_res = new async_req_res_t(req, res, true);
get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
return ;
}
LOG(INFO) << "Triggering an on demand snapshot...";
thread_pool->enqueue([&snapshot_path, req, res, this]() {
OnDemandSnapshotClosure* snapshot_closure = new OnDemandSnapshotClosure(this, req, res);