From 2a89acd84ead61ae32f3736d8c5ebe488752a5a6 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sun, 25 Apr 2021 19:11:01 +0530 Subject: [PATCH] Return 503 for both read and write lag. --- include/config.h | 40 ++++++++++++------------- include/raft_server.h | 8 +++-- src/http_server.cpp | 12 ++++---- src/raft_server.cpp | 53 +++++++++++++--------------------- src/typesense_server_utils.cpp | 8 ++--- 5 files changed, 56 insertions(+), 65 deletions(-) diff --git a/include/config.h b/include/config.h index 852acbb6..5de3fea7 100644 --- a/include/config.h +++ b/include/config.h @@ -33,8 +33,8 @@ private: float max_memory_ratio; int snapshot_interval_seconds; - size_t read_max_lag; - size_t write_max_lag; + size_t healthy_read_lag; + size_t healthy_write_lag; std::string config_file; int config_file_validity; @@ -55,8 +55,8 @@ protected: this->enable_cors = false; this->max_memory_ratio = 1.0f; this->snapshot_interval_seconds = 3600; - this->read_max_lag = 1000; - this->write_max_lag = 100; + this->healthy_read_lag = 1000; + this->healthy_write_lag = 500; this->log_slow_requests_time_ms = -1; this->num_collections_parallel_load = 0; // will be set dynamically if not overridden this->num_documents_parallel_load = 1000; @@ -191,12 +191,12 @@ public: return this->snapshot_interval_seconds; } - int get_read_max_lag() const { - return this->read_max_lag; + int get_healthy_read_lag() const { + return this->healthy_read_lag; } - int get_write_max_lag() const { - return this->write_max_lag; + int get_healthy_write_lag() const { + return this->healthy_write_lag; } int get_log_slow_requests_time_ms() const { @@ -279,12 +279,12 @@ public: this->snapshot_interval_seconds = std::stoi(get_env("TYPESENSE_SNAPSHOT_INTERVAL_SECONDS")); } - if(!get_env("TYPESENSE_READ_MAX_LAG").empty()) { - this->read_max_lag = std::stoi(get_env("TYPESENSE_READ_MAX_LAG")); + if(!get_env("TYPESENSE_HEALTHY_READ_LAG").empty()) { + this->healthy_read_lag = std::stoi(get_env("TYPESENSE_HEALTHY_READ_LAG")); } - if(!get_env("TYPESENSE_WRITE_MAX_LAG").empty()) { - this->write_max_lag = std::stoi(get_env("TYPESENSE_WRITE_MAX_LAG")); + if(!get_env("TYPESENSE_HEALTHY_WRITE_LAG").empty()) { + this->healthy_write_lag = std::stoi(get_env("TYPESENSE_HEALTHY_WRITE_LAG")); } if(!get_env("TYPESENSE_LOG_SLOW_REQUESTS_TIME_MS").empty()) { @@ -396,12 +396,12 @@ public: this->snapshot_interval_seconds = (int) reader.GetInteger("server", "snapshot-interval-seconds", 3600); } - if(reader.Exists("server", "read-max-lag")) { - this->read_max_lag = (int) reader.GetInteger("server", "read-max-lag", 1000); + if(reader.Exists("server", "healthy-read-lag")) { + this->healthy_read_lag = (int) reader.GetInteger("server", "healthy-read-lag", 1000); } - if(reader.Exists("server", "write-max-lag")) { - this->write_max_lag = (int) reader.GetInteger("server", "write-max-lag", 100); + if(reader.Exists("server", "healthy-write-lag")) { + this->healthy_write_lag = (int) reader.GetInteger("server", "healthy-write-lag", 100); } if(reader.Exists("server", "log-slow-requests-time-ms")) { @@ -495,12 +495,12 @@ public: this->snapshot_interval_seconds = options.get("snapshot-interval-seconds"); } - if(options.exist("read-max-lag")) { - this->read_max_lag = options.get("read-max-lag"); + if(options.exist("healthy-read-lag")) { + this->healthy_read_lag = options.get("healthy-read-lag"); } - if(options.exist("write-max-lag")) { - this->write_max_lag = options.get("write-max-lag"); + if(options.exist("healthy-write-lag")) { + this->healthy_write_lag = options.get("healthy-write-lag"); } if(options.exist("log-slow-requests-time-ms")) { diff --git a/include/raft_server.h b/include/raft_server.h index e84434ac..306f0c4b 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -108,8 +108,8 @@ private: const bool api_uses_ssl; - const size_t read_max_lag; - const size_t write_max_lag; + const size_t healthy_read_lag; + const size_t healthy_write_lag; const size_t num_collections_parallel_load; const size_t num_documents_parallel_load; @@ -138,7 +138,7 @@ public: ReplicationState(HttpServer* server, Store* store, Store* meta_store, ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher, - bool api_uses_ssl, size_t read_max_lag, size_t write_max_lag, + bool api_uses_ssl, size_t healthy_read_lag, size_t healthy_write_lag, size_t num_collections_parallel_load, size_t num_documents_parallel_load); // Starts this node @@ -231,6 +231,7 @@ private: void on_leader_start(int64_t term) { leader_term.store(term, butil::memory_order_release); + refresh_catchup_status(true); LOG(INFO) << "Node becomes leader, term: " << term; } @@ -253,6 +254,7 @@ private: } void on_start_following(const ::braft::LeaderChangeContext& ctx) { + refresh_catchup_status(true); LOG(INFO) << "Node starts following " << ctx; } diff --git a/src/http_server.cpp b/src/http_server.cpp index bf5b4540..b78e7e46 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -322,7 +322,9 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { // Except for health check, wait for replicating state to be ready before allowing requests // Follower or leader must have started AND data must also have been loaded bool needs_readiness_check = !( - path_without_query == "/health" || path_without_query == "/debug" || path_without_query == "/sequence" + path_without_query == "/health" || path_without_query == "/debug" || + path_without_query == "/stats.json" || path_without_query == "/metrics.json" || + path_without_query == "/sequence" ); if(needs_readiness_check) { @@ -332,14 +334,14 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { bool write_op = !is_read_op; + std::string message = "{ \"message\": \"Not Ready or Lagging\"}"; + if(is_read_op && !h2o_handler->http_server->get_replication_state()->is_read_caught_up()) { - std::string message = "{ \"message\": \"Not Ready\"}"; return send_response(req, 503, message); } - if(write_op && !h2o_handler->http_server->get_replication_state()->is_write_caught_up()) { - std::string message = "{ \"message\": \"Too Many Writes\"}"; - return send_response(req, 429, message); + else if(write_op && !h2o_handler->http_server->get_replication_state()->is_write_caught_up()) { + return send_response(req, 503, message); } } diff --git a/src/raft_server.cpp b/src/raft_server.cpp index afe23552..2f2c5beb 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -55,7 +55,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int // flag controls snapshot download size of each RPC braft::FLAGS_raft_max_byte_count_per_rpc = 4 * 1024 * 1024; // 4 MB - node_options.catchup_margin = read_max_lag; + node_options.catchup_margin = healthy_read_lag; node_options.election_timeout_ms = election_timeout_ms; node_options.fsm = this; node_options.node_owns_fsm = false; @@ -484,7 +484,6 @@ int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) { bool init_db_status = init_db(); - read_caught_up = write_caught_up = (init_db_status == 0); return init_db_status; } @@ -540,16 +539,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { void ReplicationState::refresh_catchup_status(bool log_msg) { std::shared_lock lock(node_mutex); - - if (!node) { - LOG_IF(WARNING, log_msg) << "Node state is not initialized: unable to refresh nodes."; - return; + if(node == nullptr ) { + read_caught_up = write_caught_up = false; + return ; } - if(!node->is_leader() && node->leader_id().is_empty()) { - // follower does not have a leader! - this->read_caught_up = false; - this->write_caught_up = false; + bool leader_or_follower = (node->is_leader() || !node->leader_id().is_empty()); + if(!leader_or_follower) { + read_caught_up = write_caught_up = false; return ; } @@ -557,33 +554,34 @@ void ReplicationState::refresh_catchup_status(bool log_msg) { node->get_status(&n_status); lock.unlock(); - if (n_status.applying_index == 0) { - this->read_caught_up = true; - this->write_caught_up = true; - return ; - } - size_t apply_lag = size_t(n_status.last_index - n_status.known_applied_index); - if (apply_lag > read_max_lag) { - LOG(ERROR) << apply_lag << " lagging entries > read max lag of " + std::to_string(read_max_lag); + //LOG(INFO) << "last_index: " << n_status.applying_index << ", known_applied_index: " << n_status.known_applied_index; + //LOG(INFO) << "apply_lag: " << apply_lag; + + if (apply_lag > healthy_read_lag) { + LOG_IF(ERROR, log_msg) << apply_lag << " lagging entries > read max lag of " + std::to_string(healthy_read_lag); this->read_caught_up = false; + } else { + this->read_caught_up = true; } - if (apply_lag > write_max_lag) { - LOG(ERROR) << apply_lag << " lagging entries > write max lag of " + std::to_string(write_max_lag); + if (apply_lag > healthy_write_lag) { + LOG_IF(ERROR, log_msg) << apply_lag << " lagging entries > write max lag of " + std::to_string(healthy_write_lag); this->write_caught_up = false; + } else { + this->write_caught_up = true; } } ReplicationState::ReplicationState(HttpServer* server, Store *store, Store* meta_store, ThreadPool* thread_pool, http_message_dispatcher *message_dispatcher, bool api_uses_ssl, - size_t read_max_lag, size_t write_max_lag, + size_t healthy_read_lag, size_t healthy_write_lag, size_t num_collections_parallel_load, size_t num_documents_parallel_load): node(nullptr), leader_term(-1), server(server), store(store), meta_store(meta_store), thread_pool(thread_pool), message_dispatcher(message_dispatcher), api_uses_ssl(api_uses_ssl), - read_max_lag(read_max_lag), write_max_lag(write_max_lag), + healthy_read_lag(healthy_read_lag), healthy_write_lag(healthy_write_lag), num_collections_parallel_load(num_collections_parallel_load), num_documents_parallel_load(num_documents_parallel_load), ready(false), shutting_down(false), pending_writes(0) { @@ -591,17 +589,6 @@ ReplicationState::ReplicationState(HttpServer* server, Store *store, Store* meta } bool ReplicationState::is_alive() const { - std::shared_lock lock(node_mutex); - - if(node == nullptr ) { - return false; - } - - bool leader_or_follower = (node->is_leader() || !node->leader_id().is_empty()); - if(!leader_or_follower) { - return false; - } - // for general health check we will only care about the `read_caught_up` threshold return read_caught_up; } diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index cb668ca1..93b84eef 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -79,8 +79,8 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.add("max-memory-ratio", '\0', "Maximum fraction of system memory to be used.", false, 1.0f); options.add("snapshot-interval-seconds", '\0', "Frequency of replication log snapshots.", false, 3600); - options.add("read-max-lag", '\0', "Reads are rejected if the updates lag behind this threshold.", false, 1000); - options.add("write-max-lag", '\0', "Writes are rejected if the updates lag behind this threshold.", false, 100); + options.add("healthy-read-lag", '\0', "Reads are rejected if the updates lag behind this threshold.", false, 1000); + options.add("healthy-write-lag", '\0', "Writes are rejected if the updates lag behind this threshold.", false, 500); options.add("log-slow-requests-time-ms", '\0', "When > 0, requests that take longer than this duration are logged.", false, -1); options.add("num-collections-parallel-load", '\0', "Number of collections that are loaded in parallel during start up.", false, 4); @@ -395,8 +395,8 @@ int run_server(const Config & config, const std::string & version, void (*master ReplicationState replication_state(server, &store, &meta_store, &app_thread_pool, server->get_message_dispatcher(), ssl_enabled, - config.get_read_max_lag(), - config.get_write_max_lag(), + config.get_healthy_read_lag(), + config.get_healthy_write_lag(), num_collections_parallel_load, config.get_num_documents_parallel_load());