Update caught up only when leader status is available.

This commit is contained in:
Kishore Nallan 2023-01-21 18:17:50 +05:30
parent d25a021ea9
commit e9ffdaf4ed

View File

@ -660,29 +660,24 @@ void ReplicationState::refresh_catchup_status(bool log_msg) {
std::string api_res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::get_response(url, api_res, res_headers);
if(status_code == 404) {
// earlier versions don't have this end-point, so we just ignore
return ;
}
if(status_code != 200) {
this->read_caught_up = false;
return ;
}
// compare leader's applied log with local applied to see if we are lagging
nlohmann::json leader_status = nlohmann::json::parse(api_res);
if(leader_status.contains("committed_index")) {
int64_t leader_committed_index = leader_status["committed_index"].get<int64_t>();
if(leader_committed_index <= n_status.committed_index) {
// this can happen due to network latency in making the /status call
if(status_code == 200) {
// compare leader's applied log with local applied to see if we are lagging
nlohmann::json leader_status = nlohmann::json::parse(api_res);
if(leader_status.contains("committed_index")) {
int64_t leader_committed_index = leader_status["committed_index"].get<int64_t>();
if(leader_committed_index <= n_status.committed_index) {
// this can happen due to network latency in making the /status call
// we will refrain from changing current status
return ;
}
this->read_caught_up = ((leader_committed_index - n_status.committed_index) < healthy_read_lag);
} else {
// we will refrain from changing current status
return ;
LOG(ERROR) << "Error, `committed_index` key not found in /status response from leader.";
}
this->read_caught_up = ((leader_committed_index - n_status.committed_index) < healthy_read_lag);
} else {
// we will refrain from changing current status
LOG(ERROR) << "Error, `committed_index` key not found in /status response from leader.";
// we will again refrain from changing current status
LOG(ERROR) << "Error, /status end-point returned bad status code " << status_code;
}
}
@ -697,6 +692,7 @@ ReplicationState::ReplicationState(HttpServer* server, BatchedIndexer* batched_i
config(config),
num_collections_parallel_load(num_collections_parallel_load),
num_documents_parallel_load(num_documents_parallel_load),
read_caught_up(false), write_caught_up(false),
ready(false), shutting_down(false), pending_writes(0),
last_snapshot_ts(std::time(nullptr)), snapshot_interval_s(config->get_snapshot_interval_seconds()) {
@ -743,7 +739,7 @@ const std::string &ReplicationState::get_ext_snapshot_path() const {
void ReplicationState::do_dummy_write() {
std::shared_lock lock(node_mutex);
if(node->leader_id().is_empty()) {
if(!node || node->leader_id().is_empty()) {
LOG(ERROR) << "Could not do a dummy write, as node does not have a leader";
return ;
}