Refresh catch up status more frequently.

This commit is contained in:
Kishore Nallan 2021-04-10 13:06:56 +05:30
parent 1da15d8ee5
commit 5b4b617b59
3 changed files with 35 additions and 8 deletions

View File

@ -144,6 +144,8 @@ public:
// updates cluster membership
void refresh_nodes(const std::string & nodes);
void refresh_catchup_status(bool log_msg);
bool trigger_vote();
bool has_leader_term() const {

View File

@ -181,7 +181,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
if (request->_req->proceed_req && response->proxied_stream) {
// indicates async request body of in-flight request
LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size();
//LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size();
request->notify();
return ;
}
@ -473,7 +473,6 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
if(node->is_leader()) {
RefreshNodesClosure* refresh_nodes_done = new RefreshNodesClosure;
node->change_peers(new_conf, refresh_nodes_done);
this->caught_up = true;
} else {
if(node->leader_id().is_empty()) {
// When node is not a leader, does not have a leader and is also a single-node cluster,
@ -492,6 +491,25 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
LOG(WARNING) << "Multi-node with no leader: refusing to reset peers.";
}
return ;
}
}
}
void ReplicationState::refresh_catchup_status(bool log_msg) {
std::shared_lock lock(mutex);
if (!node) {
LOG_IF(WARNING, log_msg) << "Node state is not initialized: unable to refresh nodes.";
return;
}
if(node->is_leader()) {
this->caught_up = true;
} else {
// follower does not have a leader!
if(node->leader_id().is_empty()) {
this->caught_up = false;
return ;
}
@ -499,7 +517,7 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
lock.unlock();
// update catch up status
thread_pool->enqueue([this]() {
thread_pool->enqueue([this, log_msg]() {
auto seq_num = this->store->get_latest_seq_number();
std::shared_lock lock(this->mutex);
const std::string & leader_addr = node->leader_id().to_string();
@ -517,7 +535,7 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
if(status != 500) {
if(!StringUtils::is_uint64_t(api_res)) {
LOG(ERROR) << "Invalid API response when fetching sequence number: " << api_res;
LOG_IF(ERROR, log_msg) << "Invalid API response when fetching sequence number: " << api_res;
this->caught_up = false;
return ;
}
@ -535,14 +553,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
// However, if the difference is large, then something could be wrong
if(leader_seq < seq_num) {
LOG(ERROR) << "Leader sequence " << leader_seq << " is less than local sequence " << seq_num
<< ", catchup_min_sequence_diff: " << catchup_min_sequence_diff;
LOG_IF(ERROR, log_msg) << "Leader sequence " << leader_seq << " is less than local sequence "
<< seq_num << ", catchup_min_sequence_diff: " << catchup_min_sequence_diff;
this->caught_up = false;
return ;
}
float seq_progress = (float(seq_num) / leader_seq) * 100;
LOG(INFO) << "Follower progress percentage: " << seq_progress;
LOG_IF(INFO, log_msg) << "Follower progress percentage: " << seq_progress;
this->caught_up = (seq_progress >= catch_up_threshold_percentage);
}

View File

@ -263,7 +263,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
size_t raft_counter = 0;
while (!brpc::IsAskedToQuit() && !quit_raft_service.load()) {
// post-increment to ensure that we refresh right away on a fresh boot
if(raft_counter++ % 10 == 0) {
if(raft_counter % 10 == 0) {
// reset peer configuration periodically to identify change in cluster membership
const Option<std::string> & refreshed_nodes_op = fetch_nodes_config(path_to_nodes);
if(!refreshed_nodes_op.ok()) {
@ -275,6 +275,13 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
replication_state.refresh_nodes(nodes_config);
}
if(raft_counter % 3 == 0) {
// update node catch up status periodically, take care of logging too verbosely
bool log_msg = (raft_counter % 9 == 0);
replication_state.refresh_catchup_status(log_msg);
}
raft_counter++;
sleep(1);
}