Rely on nodes configuration for peer discovery.

This commit is contained in:
Kishore Nallan 2022-06-18 07:22:18 +05:30
parent ec217bd7d7
commit e3d7a5932d
3 changed files with 23 additions and 15 deletions

View File

@ -229,7 +229,7 @@ public:
const std::string& get_ext_snapshot_path() const;
// for timed snapshots
void do_snapshot();
void do_snapshot(const std::string& nodes);
void persist_applying_index();

View File

@ -774,7 +774,7 @@ bool ReplicationState::is_leader() {
return node->is_leader();
}
void ReplicationState::do_snapshot() {
void ReplicationState::do_snapshot(const std::string& nodes) {
auto current_ts = std::time(nullptr);
if(current_ts - last_snapshot_ts < snapshot_interval_s) {
//LOG(INFO) << "Skipping snapshot: not enough time has elapsed.";
@ -786,31 +786,39 @@ void ReplicationState::do_snapshot() {
if(is_leader()) {
// run the snapshot only if there are no other recovering followers
std::vector<braft::PeerId> peers;
braft::Configuration peer_config;
peer_config.parse_from(nodes);
peer_config.list_peers(&peers);
std::shared_lock lock(node_mutex);
node->list_peers(&peers);
std::string leader_addr = node->leader_id().to_string();
std::string my_addr = node->node_id().peer_id.to_string();
lock.unlock();
//LOG(INFO) << "my_addr: " << my_addr;
bool all_peers_healthy = true;
// iterate peers and check health status
for(const auto& peer: peers) {
const std::string& node_addr = peer.to_string();
if(leader_addr == node_addr) {
const std::string& peer_addr = peer.to_string();
//LOG(INFO) << "do_snapshot, peer_addr: " << peer_addr;
if(my_addr == peer_addr) {
// skip self
//LOG(INFO) << "do_snapshot: skipping self, peer_addr: " << peer_addr;
continue;
}
const std::string protocol = api_uses_ssl ? "https" : "http";
std::string url = get_node_url_path(node_addr, "/health", protocol);
//LOG(INFO) << "Calling url: " << url;
std::string url = get_node_url_path(peer_addr, "/health", protocol);
std::string api_res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::get_response(url, api_res, res_headers);
bool peer_healthy = (status_code == 200);
//LOG(INFO) << "do_snapshot, status_code: " << status_code;
if(!peer_healthy) {
LOG(WARNING) << "Peer " << node_addr << " reported unhealthy during snapshot pre-check.";
LOG(WARNING) << "Peer " << peer_addr << " reported unhealthy during snapshot pre-check.";
}
all_peers_healthy = all_peers_healthy && peer_healthy;

View File

@ -305,7 +305,6 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
// Wait until 'CTRL-C' is pressed. then Stop() and Join() the service
size_t raft_counter = 0;
while (!brpc::IsAskedToQuit() && !quit_raft_service.load()) {
// post-increment to ensure that we refresh right away on a fresh boot
if(raft_counter % 10 == 0) {
// reset peer configuration periodically to identify change in cluster membership
const Option<std::string> & refreshed_nodes_op = fetch_nodes_config(path_to_nodes);
@ -313,9 +312,14 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
LOG(WARNING) << "Error while refreshing peer configuration: " << refreshed_nodes_op.error();
continue;
}
const std::string& nodes_config = ReplicationState::to_nodes_config(peering_endpoint, api_port,
refreshed_nodes_op.get());
refreshed_nodes_op.get());
replication_state.refresh_nodes(nodes_config);
if(raft_counter % 60 == 0) {
replication_state.do_snapshot(nodes_config);
}
}
if(raft_counter % 3 == 0) {
@ -324,10 +328,6 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
replication_state.refresh_catchup_status(log_msg);
}
if(raft_counter % 60 == 0) {
replication_state.do_snapshot();
}
raft_counter++;
sleep(1);
}