diff --git a/include/raft_server.h b/include/raft_server.h index 823e8df7..b78364aa 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -95,6 +95,8 @@ class ReplicationState : public braft::StateMachine { private: static constexpr const char* db_snapshot_name = "db_snapshot"; + mutable std::shared_mutex mutex; + braft::Node* volatile node; butil::atomic leader_term; std::set peers; @@ -168,6 +170,7 @@ public: // Blocking this thread until the node is eventually down. void join() { + std::unique_lock lock(mutex); if (node) { node->join(); delete node; diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 34ec01d8..91ce970e 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -115,6 +115,8 @@ std::string ReplicationState::to_nodes_config(const butil::EndPoint& peering_end } void ReplicationState::write(const std::shared_ptr& request, const std::shared_ptr& response) { + std::shared_lock lock(mutex); + if(!node) { return ; } @@ -146,6 +148,7 @@ void ReplicationState::write(const std::shared_ptr& request, const std } void ReplicationState::write_to_leader(const std::shared_ptr& request, const std::shared_ptr& response) const { + // no lock on `node` needed as caller uses the lock if(!node || node->leader_id().is_empty()) { // Handle no leader scenario LOG(ERROR) << "Rejecting write: could not find a leader."; @@ -277,6 +280,7 @@ void ReplicationState::on_apply(braft::Iterator& iter) { if(request_generated->_req == nullptr && request_generated->body == "INIT_SNAPSHOT") { // We attempt to trigger a cold snapshot against an existing stand-alone DB for backward compatibility InitSnapshotClosure* init_snapshot_closure = new InitSnapshotClosure(this); + std::shared_lock lock(mutex); node->snapshot(init_snapshot_closure); continue ; } @@ -385,7 +389,9 @@ int ReplicationState::init_db() { } int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) { + std::shared_lock lock(mutex); CHECK(!node || !node->is_leader()) << "Leader is not supposed to load snapshot"; + lock.unlock(); LOG(INFO) << "on_snapshot_load"; @@ -405,6 +411,8 @@ int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) { } void ReplicationState::refresh_nodes(const std::string & nodes) { + std::shared_lock lock(mutex); + if(!node) { LOG(WARNING) << "Node state is not initialized: unable to refresh nodes."; return ; @@ -452,10 +460,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { return ; } + lock.unlock(); + // update catch up status thread_pool->enqueue([this]() { auto seq_num = this->store->get_latest_seq_number(); + std::shared_lock lock(this->mutex); const std::string & leader_addr = node->leader_id().to_string(); + lock.unlock(); std::vector addr_parts; StringUtils::split(leader_addr, addr_parts, ":"); @@ -513,6 +525,8 @@ ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_m } bool ReplicationState::is_alive() const { + std::shared_lock lock(mutex); + if(node == nullptr || !is_ready()) { return false; } @@ -522,6 +536,8 @@ bool ReplicationState::is_alive() const { } uint64_t ReplicationState::node_state() const { + std::shared_lock lock(mutex); + if(node == nullptr) { return 0; } @@ -539,6 +555,7 @@ void ReplicationState::do_snapshot(const std::string& snapshot_path, const std:: thread_pool->enqueue([&snapshot_path, req, res, this]() { OnDemandSnapshotClosure* snapshot_closure = new OnDemandSnapshotClosure(this, req, res); ext_snapshot_path = snapshot_path; + std::shared_lock lock(this->mutex); node->snapshot(snapshot_closure); }); } @@ -552,12 +569,16 @@ const std::string &ReplicationState::get_ext_snapshot_path() const { } void ReplicationState::do_dummy_write() { + std::shared_lock lock(mutex); + if(node->leader_id().is_empty()) { LOG(ERROR) << "Could not do a dummy write, as node does not have a leader"; return ; } const std::string & leader_addr = node->leader_id().to_string(); + lock.unlock(); + const std::string protocol = api_uses_ssl ? "https" : "http"; std::string url = get_leader_url_path(leader_addr, "/health", protocol); @@ -569,6 +590,8 @@ void ReplicationState::do_dummy_write() { } bool ReplicationState::trigger_vote() { + std::shared_lock lock(mutex); + if(node) { auto status = node->vote(election_timeout_interval_ms); LOG(INFO) << "Triggered vote. Ok? " << status.ok() << ", status: " << status;