Switch to managed snapshotting.

Ensures that a snapshot does not run when the peers are unhealthy (possibly recovering).
This commit is contained in:
Kishore Nallan 2022-06-16 17:05:17 +05:30
parent 06ce311987
commit f3af2319a3
4 changed files with 108 additions and 15 deletions

View File

@ -82,6 +82,19 @@ public:
void Run();
};
class TimedSnapshotClosure : public braft::Closure {
private:
ReplicationState* replication_state;
public:
TimedSnapshotClosure(ReplicationState *replication_state) : replication_state(replication_state){}
~TimedSnapshotClosure() {}
void Run();
};
struct cached_disk_stat_t {
const static size_t REFRESH_INTERVAL_SECS = 30;
uint64_t disk_total_bytes = 0;
@ -149,6 +162,9 @@ private:
cached_disk_stat_t cached_disk_stat;
const uint64_t snapshot_interval_s; // frequency of actual snapshotting
uint64_t last_snapshot_ts; // when last snapshot ran
public:
static constexpr const char* log_dir_name = "log";
@ -202,6 +218,7 @@ public:
Store* get_store();
// for manual / external snapshots
void do_snapshot(const std::string& snapshot_path, const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
static std::string to_nodes_config(const butil::EndPoint &peering_endpoint, const int api_port,
@ -211,6 +228,9 @@ public:
const std::string& get_ext_snapshot_path() const;
// for timed snapshots
void do_snapshot();
void persist_applying_index();
http_message_dispatcher* get_message_dispatcher() const;
@ -290,6 +310,6 @@ private:
void do_dummy_write();
std::string get_leader_url_path(const std::string& leader_addr, const std::string& path,
const std::string& protocol) const;
std::string get_node_url_path(const std::string& node_addr, const std::string& path,
const std::string& protocol) const;
};

View File

@ -15,10 +15,10 @@ HttpServer::HttpServer(const std::string & version, const std::string & listen_a
uint32_t listen_port, const std::string & ssl_cert_path, const std::string & ssl_cert_key_path,
const uint64_t ssl_refresh_interval_ms, bool cors_enabled,
const std::set<std::string>& cors_domains, ThreadPool* thread_pool):
SSL_REFRESH_INTERVAL_MS(ssl_refresh_interval_ms),
exit_loop(false), version(version), listen_address(listen_address), listen_port(listen_port),
ssl_cert_path(ssl_cert_path), ssl_cert_key_path(ssl_cert_key_path),
cors_enabled(cors_enabled), cors_domains(cors_domains), thread_pool(thread_pool) {
SSL_REFRESH_INTERVAL_MS(ssl_refresh_interval_ms),
exit_loop(false), version(version), listen_address(listen_address), listen_port(listen_port),
ssl_cert_path(ssl_cert_path), ssl_cert_key_path(ssl_cert_key_path),
cors_enabled(cors_enabled), cors_domains(cors_domains), thread_pool(thread_pool) {
accept_ctx = new h2o_accept_ctx_t();
h2o_config_init(&config);
hostconf = h2o_config_register_host(&config, h2o_iovec_init(H2O_STRLIT("default")), 65535);
@ -34,8 +34,9 @@ HttpServer::HttpServer(const std::string & version, const std::string & listen_a
message_dispatcher = new http_message_dispatcher;
message_dispatcher->init(ctx.loop);
ssl_refresh_timer.timer.expire_at = 0; // used during destructor
metrics_refresh_timer.timer.expire_at = 0; // used during destructor
// used during destructor
ssl_refresh_timer.timer.expire_at = 0;
metrics_refresh_timer.timer.expire_at = 0;
accept_ctx->ssl_ctx = nullptr;
}

View File

@ -80,11 +80,13 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int
// flag controls snapshot download size of each RPC
braft::FLAGS_raft_max_byte_count_per_rpc = 4 * 1024 * 1024; // 4 MB
// automatic snapshot is disabled since it caused issues during slow follower catch-ups
node_options.snapshot_interval_s = -1;
node_options.catchup_margin = config->get_healthy_read_lag();
node_options.election_timeout_ms = election_timeout_ms;
node_options.fsm = this;
node_options.node_owns_fsm = false;
node_options.snapshot_interval_s = snapshot_interval_s;
node_options.filter_before_copy_remote = true;
std::string prefix = "local://" + raft_dir;
node_options.log_uri = prefix + "/" + log_dir_name;
@ -262,7 +264,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
auto raw_req = request->_req;
const std::string& path = std::string(raw_req->path.base, raw_req->path.len);
const std::string& scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len);
const std::string url = get_leader_url_path(leader_addr, path, scheme);
const std::string url = get_node_url_path(leader_addr, path, scheme);
thread_pool->enqueue([request, response, server, path, url, this]() {
pending_writes++;
@ -318,10 +320,10 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
});
}
std::string ReplicationState::get_leader_url_path(const std::string& leader_addr, const std::string& path,
const std::string& protocol) const {
std::string ReplicationState::get_node_url_path(const std::string& node_addr, const std::string& path,
const std::string& protocol) const {
std::vector<std::string> addr_parts;
StringUtils::split(leader_addr, addr_parts, ":");
StringUtils::split(node_addr, addr_parts, ":");
std::string leader_host_port = addr_parts[0] + ":" + addr_parts[2];
std::string url = protocol + "://" + leader_host_port + path;
return url;
@ -636,7 +638,8 @@ ReplicationState::ReplicationState(HttpServer* server, BatchedIndexer* batched_i
config(config),
num_collections_parallel_load(num_collections_parallel_load),
num_documents_parallel_load(num_documents_parallel_load),
ready(false), shutting_down(false), pending_writes(0) {
ready(false), shutting_down(false), pending_writes(0),
last_snapshot_ts(std::time(nullptr)), snapshot_interval_s(config->get_snapshot_interval_seconds()) {
}
@ -690,7 +693,7 @@ void ReplicationState::do_dummy_write() {
lock.unlock();
const std::string protocol = api_uses_ssl ? "https" : "http";
std::string url = get_leader_url_path(leader_addr, "/health", protocol);
std::string url = get_node_url_path(leader_addr, "/health", protocol);
std::string api_res;
std::map<std::string, std::string> res_headers;
@ -771,6 +774,71 @@ bool ReplicationState::is_leader() {
return node->is_leader();
}
void ReplicationState::do_snapshot() {
auto current_ts = std::time(nullptr);
if(current_ts - last_snapshot_ts < snapshot_interval_s) {
//LOG(INFO) << "Skipping snapshot: not enough time has elapsed.";
return;
}
LOG(INFO) << "Snapshot timer is active, current_ts: " << current_ts << ", last_snapshot_ts: " << last_snapshot_ts;
if(is_leader()) {
// run the snapshot only if there are no other recovering followers
std::vector<braft::PeerId> peers;
std::shared_lock lock(node_mutex);
node->list_peers(&peers);
std::string leader_addr = node->leader_id().to_string();
lock.unlock();
bool all_peers_healthy = true;
// iterate peers and check health status
for(const auto& peer: peers) {
const std::string& node_addr = peer.to_string();
if(leader_addr == node_addr) {
// skip self
continue;
}
const std::string protocol = api_uses_ssl ? "https" : "http";
std::string url = get_node_url_path(node_addr, "/health", protocol);
//LOG(INFO) << "Calling url: " << url;
std::string api_res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::get_response(url, api_res, res_headers);
bool peer_healthy = (status_code == 200);
if(!peer_healthy) {
LOG(WARNING) << "Peer " << node_addr << " reported unhealthy during snapshot pre-check.";
}
all_peers_healthy = all_peers_healthy && peer_healthy;
}
if(!all_peers_healthy) {
LOG(WARNING) << "Unable to trigger snapshot as one or more of the peers reported unhealthy.";
return ;
}
}
TimedSnapshotClosure* snapshot_closure = new TimedSnapshotClosure(this);
std::shared_lock lock(node_mutex);
node->snapshot(snapshot_closure);
last_snapshot_ts = current_ts;
}
void TimedSnapshotClosure::Run() {
// Auto delete this after Done()
std::unique_ptr<TimedSnapshotClosure> self_guard(this);
if(status().ok()) {
LOG(INFO) << "Timed snapshot succeeded!";
} else {
LOG(ERROR) << "Timed snapshot failed, error: " << status().error_str() << ", code: " << status().error_code();
}
}
void OnDemandSnapshotClosure::Run() {
// Auto delete this after Done()
std::unique_ptr<OnDemandSnapshotClosure> self_guard(this);

View File

@ -322,6 +322,10 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
replication_state.refresh_catchup_status(log_msg);
}
if(raft_counter % 60 == 0) {
replication_state.do_snapshot();
}
raft_counter++;
sleep(1);
}