Protect access to node object.

This commit is contained in:
kishorenc 2021-03-11 18:49:37 +05:30
parent f1d7d323b8
commit 0c6a6d483a
2 changed files with 26 additions and 0 deletions

View File

@ -95,6 +95,8 @@ class ReplicationState : public braft::StateMachine {
private:
static constexpr const char* db_snapshot_name = "db_snapshot";
mutable std::shared_mutex mutex;
braft::Node* volatile node;
butil::atomic<int64_t> leader_term;
std::set<braft::PeerId> peers;
@ -168,6 +170,7 @@ public:
// Blocking this thread until the node is eventually down.
void join() {
std::unique_lock lock(mutex);
if (node) {
node->join();
delete node;

View File

@ -115,6 +115,8 @@ std::string ReplicationState::to_nodes_config(const butil::EndPoint& peering_end
}
void ReplicationState::write(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) {
std::shared_lock lock(mutex);
if(!node) {
return ;
}
@ -146,6 +148,7 @@ void ReplicationState::write(const std::shared_ptr<http_req>& request, const std
}
void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) const {
// no lock on `node` needed as caller uses the lock
if(!node || node->leader_id().is_empty()) {
// Handle no leader scenario
LOG(ERROR) << "Rejecting write: could not find a leader.";
@ -277,6 +280,7 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
if(request_generated->_req == nullptr && request_generated->body == "INIT_SNAPSHOT") {
// We attempt to trigger a cold snapshot against an existing stand-alone DB for backward compatibility
InitSnapshotClosure* init_snapshot_closure = new InitSnapshotClosure(this);
std::shared_lock lock(mutex);
node->snapshot(init_snapshot_closure);
continue ;
}
@ -385,7 +389,9 @@ int ReplicationState::init_db() {
}
int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) {
std::shared_lock lock(mutex);
CHECK(!node || !node->is_leader()) << "Leader is not supposed to load snapshot";
lock.unlock();
LOG(INFO) << "on_snapshot_load";
@ -405,6 +411,8 @@ int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) {
}
void ReplicationState::refresh_nodes(const std::string & nodes) {
std::shared_lock lock(mutex);
if(!node) {
LOG(WARNING) << "Node state is not initialized: unable to refresh nodes.";
return ;
@ -452,10 +460,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
return ;
}
lock.unlock();
// update catch up status
thread_pool->enqueue([this]() {
auto seq_num = this->store->get_latest_seq_number();
std::shared_lock lock(this->mutex);
const std::string & leader_addr = node->leader_id().to_string();
lock.unlock();
std::vector<std::string> addr_parts;
StringUtils::split(leader_addr, addr_parts, ":");
@ -513,6 +525,8 @@ ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_m
}
bool ReplicationState::is_alive() const {
std::shared_lock lock(mutex);
if(node == nullptr || !is_ready()) {
return false;
}
@ -522,6 +536,8 @@ bool ReplicationState::is_alive() const {
}
uint64_t ReplicationState::node_state() const {
std::shared_lock lock(mutex);
if(node == nullptr) {
return 0;
}
@ -539,6 +555,7 @@ void ReplicationState::do_snapshot(const std::string& snapshot_path, const std::
thread_pool->enqueue([&snapshot_path, req, res, this]() {
OnDemandSnapshotClosure* snapshot_closure = new OnDemandSnapshotClosure(this, req, res);
ext_snapshot_path = snapshot_path;
std::shared_lock lock(this->mutex);
node->snapshot(snapshot_closure);
});
}
@ -552,12 +569,16 @@ const std::string &ReplicationState::get_ext_snapshot_path() const {
}
void ReplicationState::do_dummy_write() {
std::shared_lock lock(mutex);
if(node->leader_id().is_empty()) {
LOG(ERROR) << "Could not do a dummy write, as node does not have a leader";
return ;
}
const std::string & leader_addr = node->leader_id().to_string();
lock.unlock();
const std::string protocol = api_uses_ssl ? "https" : "http";
std::string url = get_leader_url_path(leader_addr, "/health", protocol);
@ -569,6 +590,8 @@ void ReplicationState::do_dummy_write() {
}
bool ReplicationState::trigger_vote() {
std::shared_lock lock(mutex);
if(node) {
auto status = node->vote(election_timeout_interval_ms);
LOG(INFO) << "Triggered vote. Ok? " << status.ok() << ", status: " << status;