diff --git a/include/raft_server.h b/include/raft_server.h index 6dbd2eb8..61e2b761 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -139,6 +139,8 @@ private: std::atomic shutting_down; std::atomic pending_writes; + std::atomic snapshot_in_progress; + const uint64_t snapshot_interval_s; // frequency of actual snapshotting uint64_t last_snapshot_ts; // when last snapshot ran diff --git a/src/raft_server.cpp b/src/raft_server.cpp index ecc41f46..7307b54f 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -488,6 +488,7 @@ void* ReplicationState::save_snapshot(void* arg) { std::string file_name = std::string(db_snapshot_name) + "/" + file.BaseName().value(); if (sa->writer->add_file(file_name) != 0) { sa->done->status().set_error(EIO, "Fail to add file to writer."); + sa->replication_state->snapshot_in_progress = false; return nullptr; } } @@ -500,6 +501,7 @@ void* ReplicationState::save_snapshot(void* arg) { auto file_name = std::string(analytics_db_snapshot_name) + "/" + file.BaseName().value(); if (sa->writer->add_file(file_name) != 0) { sa->done->status().set_error(EIO, "Fail to add analytics file to writer."); + sa->replication_state->snapshot_in_progress = false; return nullptr; } } @@ -539,6 +541,7 @@ void* ReplicationState::save_snapshot(void* arg) { // NOTE: *must* do a dummy write here since snapshots cannot be triggered if no write has happened since the // last snapshot. By doing a dummy write right after a snapshot, we ensure that this can never be the case. sa->replication_state->do_dummy_write(); + sa->replication_state->snapshot_in_progress = false; LOG(INFO) << "save_snapshot done"; @@ -549,6 +552,7 @@ void* ReplicationState::save_snapshot(void* arg) { void ReplicationState::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) { LOG(INFO) << "on_snapshot_save"; + snapshot_in_progress = true; std::string db_snapshot_path = writer->get_path() + "/" + db_snapshot_name; std::string analytics_db_snapshot_path = writer->get_path() + "/" + analytics_db_snapshot_name; @@ -828,7 +832,7 @@ ReplicationState::ReplicationState(HttpServer* server, BatchedIndexer* batched_i num_collections_parallel_load(num_collections_parallel_load), num_documents_parallel_load(num_documents_parallel_load), read_caught_up(false), write_caught_up(false), - ready(false), shutting_down(false), pending_writes(0), + ready(false), shutting_down(false), pending_writes(0), snapshot_in_progress(false), last_snapshot_ts(std::time(nullptr)), snapshot_interval_s(config->get_snapshot_interval_seconds()) { } @@ -860,7 +864,14 @@ void ReplicationState::do_snapshot(const std::string& snapshot_path, const std:: return ; } - LOG(INFO) << "Triggerring an on demand snapshot..."; + if(snapshot_in_progress) { + res->set_409("Another snapshot is in progress."); + auto req_res = new async_req_res_t(req, res, true); + get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); + return ; + } + + LOG(INFO) << "Triggering an on demand snapshot..."; thread_pool->enqueue([&snapshot_path, req, res, this]() { OnDemandSnapshotClosure* snapshot_closure = new OnDemandSnapshotClosure(this, req, res);