diff --git a/include/config.h b/include/config.h index 515b8a0c..b9464060 100644 --- a/include/config.h +++ b/include/config.h @@ -32,6 +32,7 @@ private: float max_memory_ratio; int snapshot_interval_seconds; + size_t catch_up_min_sequence_diff; size_t catch_up_threshold_percentage; std::string config_file; @@ -48,6 +49,7 @@ protected: this->enable_cors = false; this->max_memory_ratio = 1.0f; this->snapshot_interval_seconds = 3600; + this->catch_up_min_sequence_diff = 3000; this->catch_up_threshold_percentage = 95; this->log_slow_requests_time_ms = -1; } @@ -179,6 +181,10 @@ public: return this->snapshot_interval_seconds; } + int get_catch_up_min_sequence_diff() const { + return this->catch_up_min_sequence_diff; + } + int get_catch_up_threshold_percentage() const { return this->catch_up_threshold_percentage; } @@ -247,6 +253,10 @@ public: this->snapshot_interval_seconds = std::stoi(get_env("TYPESENSE_SNAPSHOT_INTERVAL_SECONDS")); } + if(!get_env("TYPESENSE_CATCH_UP_MIN_SEQUENCE_DIFF").empty()) { + this->catch_up_min_sequence_diff = std::stoi(get_env("TYPESENSE_CATCH_UP_MIN_SEQUENCE_DIFF")); + } + if(!get_env("TYPESENSE_CATCH_UP_THRESHOLD_PERCENTAGE").empty()) { this->catch_up_threshold_percentage = std::stoi(get_env("TYPESENSE_CATCH_UP_THRESHOLD_PERCENTAGE")); } @@ -344,6 +354,10 @@ public: this->snapshot_interval_seconds = (int) reader.GetInteger("server", "snapshot-interval-seconds", 3600); } + if(reader.Exists("server", "catch-up-min-sequence-diff")) { + this->catch_up_min_sequence_diff = (int) reader.GetInteger("server", "catch-up-min-sequence-diff", 3000); + } + if(reader.Exists("server", "catch-up-threshold-percentage")) { this->catch_up_threshold_percentage = (int) reader.GetInteger("server", "catch-up-threshold-percentage", 95); } @@ -423,6 +437,10 @@ public: this->snapshot_interval_seconds = options.get("snapshot-interval-seconds"); } + if(options.exist("catch-up-min-sequence-diff")) { + this->catch_up_min_sequence_diff = options.get("catch-up-min-sequence-diff"); + } + if(options.exist("catch-up-threshold-percentage")) { this->catch_up_threshold_percentage = options.get("catch-up-threshold-percentage"); } diff --git a/include/raft_server.h b/include/raft_server.h index 41460a80..823e8df7 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -94,7 +94,6 @@ public: class ReplicationState : public braft::StateMachine { private: static constexpr const char* db_snapshot_name = "db_snapshot"; - static const size_t CATCHUP_MIN_SEQUENCE_DIFF = 3000; braft::Node* volatile node; butil::atomic leader_term; @@ -104,6 +103,7 @@ private: ThreadPool* thread_pool; http_message_dispatcher* message_dispatcher; + const size_t catchup_min_sequence_diff; const size_t catch_up_threshold_percentage; std::atomic caught_up; @@ -126,7 +126,7 @@ public: static constexpr const char* snapshot_dir_name = "snapshot"; ReplicationState(Store* store, ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher, - bool api_uses_ssl, size_t catch_up_threshold_percentage, + bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage, bool create_init_db_snapshot, std::atomic& quit_service); // Starts this node diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 121c3a12..c6e1084b 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -478,14 +478,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { // Since leader waits for writes on followers to finish, follower's storage offset could be // momentarily ahead of the leader's. So we will use std::abs() for checking the difference. - const int64_t seq_diff = std::abs(int64_t(leader_seq) - int64_t(seq_num)); + const uint64_t seq_diff = std::abs(int64_t(leader_seq) - int64_t(seq_num)); - if(seq_diff < CATCHUP_MIN_SEQUENCE_DIFF) { + if(seq_diff < catchup_min_sequence_diff) { this->caught_up = true; return ; } - // However, if the difference is large, then something is wrong + // However, if the difference is large, then something could be wrong if(leader_seq < seq_num) { LOG(ERROR) << "Leader sequence " << leader_seq << " is less than local sequence " << seq_num; this->caught_up = false; @@ -502,10 +502,11 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { } ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_message_dispatcher *message_dispatcher, - bool api_uses_ssl, size_t catch_up_threshold_percentage, + bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage, bool create_init_db_snapshot, std::atomic& quit_service): node(nullptr), leader_term(-1), store(store), thread_pool(thread_pool), - message_dispatcher(message_dispatcher), catch_up_threshold_percentage(catch_up_threshold_percentage), + message_dispatcher(message_dispatcher), + catchup_min_sequence_diff(catchup_min_sequence_diff), catch_up_threshold_percentage(catch_up_threshold_percentage), api_uses_ssl(api_uses_ssl), create_init_db_snapshot(create_init_db_snapshot), shut_down(quit_service) { } diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 68fedc6f..fb5ee8a4 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -78,6 +78,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.add("max-memory-ratio", '\0', "Maximum fraction of system memory to be used.", false, 1.0f); options.add("snapshot-interval-seconds", '\0', "Frequency of replication log snapshots.", false, 3600); + options.add("catch-up-min-sequence-diff", '\0', "The absolute storage sequence difference within which a follower is deemed to have caught up with leader.", false, 3000); options.add("catch-up-threshold-percentage", '\0', "The threshold at which a follower is deemed to have caught up with leader.", false, 95); options.add("log-slow-requests-time-ms", '\0', "When > 0, requests that take longer than this duration are logged.", false, -1); @@ -392,7 +393,8 @@ int run_server(const Config & config, const std::string & version, void (*master // first we start the peering service ReplicationState replication_state(&store, &thread_pool, server->get_message_dispatcher(), - ssl_enabled, config.get_catch_up_threshold_percentage(), + ssl_enabled, config.get_catch_up_min_sequence_diff(), + config.get_catch_up_threshold_percentage(), create_init_db_snapshot, quit_raft_service); std::thread raft_thread([&replication_state, &config, &state_dir]() {