diff --git a/include/raft_server.h b/include/raft_server.h index 0d5e3579..4463ddb3 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -229,7 +229,7 @@ public: const std::string& get_ext_snapshot_path() const; // for timed snapshots - void do_snapshot(); + void do_snapshot(const std::string& nodes); void persist_applying_index(); diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 36d248f4..fcf830d8 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -774,7 +774,7 @@ bool ReplicationState::is_leader() { return node->is_leader(); } -void ReplicationState::do_snapshot() { +void ReplicationState::do_snapshot(const std::string& nodes) { auto current_ts = std::time(nullptr); if(current_ts - last_snapshot_ts < snapshot_interval_s) { //LOG(INFO) << "Skipping snapshot: not enough time has elapsed."; @@ -786,31 +786,39 @@ void ReplicationState::do_snapshot() { if(is_leader()) { // run the snapshot only if there are no other recovering followers std::vector peers; + braft::Configuration peer_config; + peer_config.parse_from(nodes); + peer_config.list_peers(&peers); + std::shared_lock lock(node_mutex); - node->list_peers(&peers); - std::string leader_addr = node->leader_id().to_string(); + std::string my_addr = node->node_id().peer_id.to_string(); lock.unlock(); + //LOG(INFO) << "my_addr: " << my_addr; bool all_peers_healthy = true; // iterate peers and check health status for(const auto& peer: peers) { - const std::string& node_addr = peer.to_string(); - if(leader_addr == node_addr) { + const std::string& peer_addr = peer.to_string(); + //LOG(INFO) << "do_snapshot, peer_addr: " << peer_addr; + + if(my_addr == peer_addr) { // skip self + //LOG(INFO) << "do_snapshot: skipping self, peer_addr: " << peer_addr; continue; } const std::string protocol = api_uses_ssl ? "https" : "http"; - std::string url = get_node_url_path(node_addr, "/health", protocol); - //LOG(INFO) << "Calling url: " << url; + std::string url = get_node_url_path(peer_addr, "/health", protocol); std::string api_res; std::map res_headers; long status_code = HttpClient::get_response(url, api_res, res_headers); bool peer_healthy = (status_code == 200); + //LOG(INFO) << "do_snapshot, status_code: " << status_code; + if(!peer_healthy) { - LOG(WARNING) << "Peer " << node_addr << " reported unhealthy during snapshot pre-check."; + LOG(WARNING) << "Peer " << peer_addr << " reported unhealthy during snapshot pre-check."; } all_peers_healthy = all_peers_healthy && peer_healthy; diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index b232d1bf..85961a3a 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -305,7 +305,6 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st // Wait until 'CTRL-C' is pressed. then Stop() and Join() the service size_t raft_counter = 0; while (!brpc::IsAskedToQuit() && !quit_raft_service.load()) { - // post-increment to ensure that we refresh right away on a fresh boot if(raft_counter % 10 == 0) { // reset peer configuration periodically to identify change in cluster membership const Option & refreshed_nodes_op = fetch_nodes_config(path_to_nodes); @@ -313,9 +312,14 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st LOG(WARNING) << "Error while refreshing peer configuration: " << refreshed_nodes_op.error(); continue; } + const std::string& nodes_config = ReplicationState::to_nodes_config(peering_endpoint, api_port, - refreshed_nodes_op.get()); + refreshed_nodes_op.get()); replication_state.refresh_nodes(nodes_config); + + if(raft_counter % 60 == 0) { + replication_state.do_snapshot(nodes_config); + } } if(raft_counter % 3 == 0) { @@ -324,10 +328,6 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st replication_state.refresh_catchup_status(log_msg); } - if(raft_counter % 60 == 0) { - replication_state.do_snapshot(); - } - raft_counter++; sleep(1); }