diff --git a/include/raft_server.h b/include/raft_server.h index 6e3ecc41..9d1003d1 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -82,6 +82,19 @@ public: void Run(); }; +class TimedSnapshotClosure : public braft::Closure { +private: + ReplicationState* replication_state; + +public: + + TimedSnapshotClosure(ReplicationState *replication_state) : replication_state(replication_state){} + + ~TimedSnapshotClosure() {} + + void Run(); +}; + struct cached_disk_stat_t { const static size_t REFRESH_INTERVAL_SECS = 30; uint64_t disk_total_bytes = 0; @@ -149,6 +162,9 @@ private: cached_disk_stat_t cached_disk_stat; + const uint64_t snapshot_interval_s; // frequency of actual snapshotting + uint64_t last_snapshot_ts; // when last snapshot ran + public: static constexpr const char* log_dir_name = "log"; @@ -202,6 +218,7 @@ public: Store* get_store(); + // for manual / external snapshots void do_snapshot(const std::string& snapshot_path, const std::shared_ptr& req, const std::shared_ptr& res); static std::string to_nodes_config(const butil::EndPoint &peering_endpoint, const int api_port, @@ -211,6 +228,9 @@ public: const std::string& get_ext_snapshot_path() const; + // for timed snapshots + void do_snapshot(); + void persist_applying_index(); http_message_dispatcher* get_message_dispatcher() const; @@ -290,6 +310,6 @@ private: void do_dummy_write(); - std::string get_leader_url_path(const std::string& leader_addr, const std::string& path, - const std::string& protocol) const; + std::string get_node_url_path(const std::string& node_addr, const std::string& path, + const std::string& protocol) const; }; diff --git a/src/http_server.cpp b/src/http_server.cpp index 7794d016..59cbaa4f 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -15,10 +15,10 @@ HttpServer::HttpServer(const std::string & version, const std::string & listen_a uint32_t listen_port, const std::string & ssl_cert_path, const std::string & ssl_cert_key_path, const uint64_t ssl_refresh_interval_ms, bool cors_enabled, const std::set& cors_domains, ThreadPool* thread_pool): - SSL_REFRESH_INTERVAL_MS(ssl_refresh_interval_ms), - exit_loop(false), version(version), listen_address(listen_address), listen_port(listen_port), - ssl_cert_path(ssl_cert_path), ssl_cert_key_path(ssl_cert_key_path), - cors_enabled(cors_enabled), cors_domains(cors_domains), thread_pool(thread_pool) { + SSL_REFRESH_INTERVAL_MS(ssl_refresh_interval_ms), + exit_loop(false), version(version), listen_address(listen_address), listen_port(listen_port), + ssl_cert_path(ssl_cert_path), ssl_cert_key_path(ssl_cert_key_path), + cors_enabled(cors_enabled), cors_domains(cors_domains), thread_pool(thread_pool) { accept_ctx = new h2o_accept_ctx_t(); h2o_config_init(&config); hostconf = h2o_config_register_host(&config, h2o_iovec_init(H2O_STRLIT("default")), 65535); @@ -34,8 +34,9 @@ HttpServer::HttpServer(const std::string & version, const std::string & listen_a message_dispatcher = new http_message_dispatcher; message_dispatcher->init(ctx.loop); - ssl_refresh_timer.timer.expire_at = 0; // used during destructor - metrics_refresh_timer.timer.expire_at = 0; // used during destructor + // used during destructor + ssl_refresh_timer.timer.expire_at = 0; + metrics_refresh_timer.timer.expire_at = 0; accept_ctx->ssl_ctx = nullptr; } diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 63906142..4990d413 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -80,11 +80,13 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int // flag controls snapshot download size of each RPC braft::FLAGS_raft_max_byte_count_per_rpc = 4 * 1024 * 1024; // 4 MB + // automatic snapshot is disabled since it caused issues during slow follower catch-ups + node_options.snapshot_interval_s = -1; + node_options.catchup_margin = config->get_healthy_read_lag(); node_options.election_timeout_ms = election_timeout_ms; node_options.fsm = this; node_options.node_owns_fsm = false; - node_options.snapshot_interval_s = snapshot_interval_s; node_options.filter_before_copy_remote = true; std::string prefix = "local://" + raft_dir; node_options.log_uri = prefix + "/" + log_dir_name; @@ -262,7 +264,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, auto raw_req = request->_req; const std::string& path = std::string(raw_req->path.base, raw_req->path.len); const std::string& scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len); - const std::string url = get_leader_url_path(leader_addr, path, scheme); + const std::string url = get_node_url_path(leader_addr, path, scheme); thread_pool->enqueue([request, response, server, path, url, this]() { pending_writes++; @@ -318,10 +320,10 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, }); } -std::string ReplicationState::get_leader_url_path(const std::string& leader_addr, const std::string& path, - const std::string& protocol) const { +std::string ReplicationState::get_node_url_path(const std::string& node_addr, const std::string& path, + const std::string& protocol) const { std::vector addr_parts; - StringUtils::split(leader_addr, addr_parts, ":"); + StringUtils::split(node_addr, addr_parts, ":"); std::string leader_host_port = addr_parts[0] + ":" + addr_parts[2]; std::string url = protocol + "://" + leader_host_port + path; return url; @@ -636,7 +638,8 @@ ReplicationState::ReplicationState(HttpServer* server, BatchedIndexer* batched_i config(config), num_collections_parallel_load(num_collections_parallel_load), num_documents_parallel_load(num_documents_parallel_load), - ready(false), shutting_down(false), pending_writes(0) { + ready(false), shutting_down(false), pending_writes(0), + last_snapshot_ts(std::time(nullptr)), snapshot_interval_s(config->get_snapshot_interval_seconds()) { } @@ -690,7 +693,7 @@ void ReplicationState::do_dummy_write() { lock.unlock(); const std::string protocol = api_uses_ssl ? "https" : "http"; - std::string url = get_leader_url_path(leader_addr, "/health", protocol); + std::string url = get_node_url_path(leader_addr, "/health", protocol); std::string api_res; std::map res_headers; @@ -771,6 +774,71 @@ bool ReplicationState::is_leader() { return node->is_leader(); } +void ReplicationState::do_snapshot() { + auto current_ts = std::time(nullptr); + if(current_ts - last_snapshot_ts < snapshot_interval_s) { + //LOG(INFO) << "Skipping snapshot: not enough time has elapsed."; + return; + } + + LOG(INFO) << "Snapshot timer is active, current_ts: " << current_ts << ", last_snapshot_ts: " << last_snapshot_ts; + + if(is_leader()) { + // run the snapshot only if there are no other recovering followers + std::vector peers; + std::shared_lock lock(node_mutex); + node->list_peers(&peers); + std::string leader_addr = node->leader_id().to_string(); + lock.unlock(); + + bool all_peers_healthy = true; + + // iterate peers and check health status + for(const auto& peer: peers) { + const std::string& node_addr = peer.to_string(); + if(leader_addr == node_addr) { + // skip self + continue; + } + + const std::string protocol = api_uses_ssl ? "https" : "http"; + std::string url = get_node_url_path(node_addr, "/health", protocol); + //LOG(INFO) << "Calling url: " << url; + std::string api_res; + std::map res_headers; + long status_code = HttpClient::get_response(url, api_res, res_headers); + bool peer_healthy = (status_code == 200); + + if(!peer_healthy) { + LOG(WARNING) << "Peer " << node_addr << " reported unhealthy during snapshot pre-check."; + } + + all_peers_healthy = all_peers_healthy && peer_healthy; + } + + if(!all_peers_healthy) { + LOG(WARNING) << "Unable to trigger snapshot as one or more of the peers reported unhealthy."; + return ; + } + } + + TimedSnapshotClosure* snapshot_closure = new TimedSnapshotClosure(this); + std::shared_lock lock(node_mutex); + node->snapshot(snapshot_closure); + last_snapshot_ts = current_ts; +} + +void TimedSnapshotClosure::Run() { + // Auto delete this after Done() + std::unique_ptr self_guard(this); + + if(status().ok()) { + LOG(INFO) << "Timed snapshot succeeded!"; + } else { + LOG(ERROR) << "Timed snapshot failed, error: " << status().error_str() << ", code: " << status().error_code(); + } +} + void OnDemandSnapshotClosure::Run() { // Auto delete this after Done() std::unique_ptr self_guard(this); diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index ac1800ba..01276b42 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -322,6 +322,10 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st replication_state.refresh_catchup_status(log_msg); } + if(raft_counter % 60 == 0) { + replication_state.do_snapshot(); + } + raft_counter++; sleep(1); }