diff --git a/include/raft_server.h b/include/raft_server.h index 5220c86d..52252894 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -144,6 +144,8 @@ public: // updates cluster membership void refresh_nodes(const std::string & nodes); + void refresh_catchup_status(bool log_msg); + bool trigger_vote(); bool has_leader_term() const { diff --git a/src/raft_server.cpp b/src/raft_server.cpp index e2a68172..b851d18d 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -181,7 +181,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, if (request->_req->proceed_req && response->proxied_stream) { // indicates async request body of in-flight request - LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size(); + //LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size(); request->notify(); return ; } @@ -473,7 +473,6 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { if(node->is_leader()) { RefreshNodesClosure* refresh_nodes_done = new RefreshNodesClosure; node->change_peers(new_conf, refresh_nodes_done); - this->caught_up = true; } else { if(node->leader_id().is_empty()) { // When node is not a leader, does not have a leader and is also a single-node cluster, @@ -492,6 +491,25 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { LOG(WARNING) << "Multi-node with no leader: refusing to reset peers."; } + return ; + } + } +} + +void ReplicationState::refresh_catchup_status(bool log_msg) { + std::shared_lock lock(mutex); + + if (!node) { + LOG_IF(WARNING, log_msg) << "Node state is not initialized: unable to refresh nodes."; + return; + } + + if(node->is_leader()) { + this->caught_up = true; + } else { + + // follower does not have a leader! + if(node->leader_id().is_empty()) { this->caught_up = false; return ; } @@ -499,7 +517,7 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { lock.unlock(); // update catch up status - thread_pool->enqueue([this]() { + thread_pool->enqueue([this, log_msg]() { auto seq_num = this->store->get_latest_seq_number(); std::shared_lock lock(this->mutex); const std::string & leader_addr = node->leader_id().to_string(); @@ -517,7 +535,7 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { if(status != 500) { if(!StringUtils::is_uint64_t(api_res)) { - LOG(ERROR) << "Invalid API response when fetching sequence number: " << api_res; + LOG_IF(ERROR, log_msg) << "Invalid API response when fetching sequence number: " << api_res; this->caught_up = false; return ; } @@ -535,14 +553,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { // However, if the difference is large, then something could be wrong if(leader_seq < seq_num) { - LOG(ERROR) << "Leader sequence " << leader_seq << " is less than local sequence " << seq_num - << ", catchup_min_sequence_diff: " << catchup_min_sequence_diff; + LOG_IF(ERROR, log_msg) << "Leader sequence " << leader_seq << " is less than local sequence " + << seq_num << ", catchup_min_sequence_diff: " << catchup_min_sequence_diff; this->caught_up = false; return ; } float seq_progress = (float(seq_num) / leader_seq) * 100; - LOG(INFO) << "Follower progress percentage: " << seq_progress; + LOG_IF(INFO, log_msg) << "Follower progress percentage: " << seq_progress; this->caught_up = (seq_progress >= catch_up_threshold_percentage); } diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 6a1167eb..d94afa86 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -263,7 +263,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st size_t raft_counter = 0; while (!brpc::IsAskedToQuit() && !quit_raft_service.load()) { // post-increment to ensure that we refresh right away on a fresh boot - if(raft_counter++ % 10 == 0) { + if(raft_counter % 10 == 0) { // reset peer configuration periodically to identify change in cluster membership const Option & refreshed_nodes_op = fetch_nodes_config(path_to_nodes); if(!refreshed_nodes_op.ok()) { @@ -275,6 +275,13 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st replication_state.refresh_nodes(nodes_config); } + if(raft_counter % 3 == 0) { + // update node catch up status periodically, take care of logging too verbosely + bool log_msg = (raft_counter % 9 == 0); + replication_state.refresh_catchup_status(log_msg); + } + + raft_counter++; sleep(1); }