From ec217bd7d715b900dfb99338ceed1375b9edf916 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Fri, 17 Jun 2022 15:24:26 +0530 Subject: [PATCH] Expose snapshot_max_byte_count_per_rpc param. --- include/config.h | 18 ++++++++++++++++++ include/raft_server.h | 2 +- src/raft_server.cpp | 4 ++-- src/typesense_server_utils.cpp | 9 ++++++--- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/include/config.h b/include/config.h index ed99c52c..495aeb8d 100644 --- a/include/config.h +++ b/include/config.h @@ -36,6 +36,7 @@ private: float max_memory_ratio; int snapshot_interval_seconds; + int snapshot_max_byte_count_per_rpc; std::atomic healthy_read_lag; std::atomic healthy_write_lag; @@ -63,6 +64,7 @@ protected: this->enable_cors = true; this->max_memory_ratio = 1.0f; this->snapshot_interval_seconds = 3600; + this->snapshot_max_byte_count_per_rpc = 4194304; this->healthy_read_lag = 1000; this->healthy_write_lag = 500; this->log_slow_requests_time_ms = -1; @@ -217,6 +219,10 @@ public: return this->snapshot_interval_seconds; } + int get_snapshot_max_byte_count_per_rpc() const { + return this->snapshot_max_byte_count_per_rpc; + } + size_t get_healthy_read_lag() const { return this->healthy_read_lag; } @@ -356,6 +362,10 @@ public: this->ssl_refresh_interval_seconds = std::stoi(get_env("TYPESENSE_SSL_REFRESH_INTERVAL_SECONDS")); } + if(!get_env("TYPESENSE_SNAPSHOT_MAX_BYTE_COUNT_PER_RPC").empty()) { + this->snapshot_max_byte_count_per_rpc = std::stoi(get_env("TYPESENSE_SNAPSHOT_MAX_BYTE_COUNT_PER_RPC")); + } + this->enable_access_logging = ("TRUE" == get_env("TYPESENSE_ENABLE_ACCESS_LOGGING")); if(!get_env("TYPESENSE_DISK_USED_MAX_PERCENTAGE").empty()) { @@ -463,6 +473,10 @@ public: this->snapshot_interval_seconds = (int) reader.GetInteger("server", "snapshot-interval-seconds", 3600); } + if(reader.Exists("server", "snapshot-max-byte-count-per-rpc")) { + this->snapshot_max_byte_count_per_rpc = (int) reader.GetInteger("server", "snapshot-max-byte-count-per-rpc", 4194304); + } + if(reader.Exists("server", "healthy-read-lag")) { this->healthy_read_lag = (size_t) reader.GetInteger("server", "healthy-read-lag", 1000); } @@ -580,6 +594,10 @@ public: this->snapshot_interval_seconds = options.get("snapshot-interval-seconds"); } + if(options.exist("snapshot-max-byte-count-per-rpc")) { + this->snapshot_max_byte_count_per_rpc = options.get("snapshot-max-byte-count-per-rpc"); + } + if(options.exist("healthy-read-lag")) { this->healthy_read_lag = options.get("healthy-read-lag"); } diff --git a/include/raft_server.h b/include/raft_server.h index 9d1003d1..0d5e3579 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -178,7 +178,7 @@ public: // Starts this node int start(const butil::EndPoint & peering_endpoint, int api_port, - int election_timeout_ms, int snapshot_interval_s, + int election_timeout_ms, int snapshot_max_byte_count_per_rpc, const std::string & raft_dir, const std::string & nodes, const std::atomic& quit_abruptly); diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 4990d413..36d248f4 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -28,7 +28,7 @@ void ReplicationClosure::Run() { // State machine implementation int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int api_port, - int election_timeout_ms, int snapshot_interval_s, + int election_timeout_ms, int snapshot_max_byte_count_per_rpc, const std::string & raft_dir, const std::string & nodes, const std::atomic& quit_abruptly) { @@ -78,7 +78,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int braft::FLAGS_raft_max_append_entries_cache_size = 8; // flag controls snapshot download size of each RPC - braft::FLAGS_raft_max_byte_count_per_rpc = 4 * 1024 * 1024; // 4 MB + braft::FLAGS_raft_max_byte_count_per_rpc = snapshot_max_byte_count_per_rpc; // automatic snapshot is disabled since it caused issues during slow follower catch-ups node_options.snapshot_interval_s = -1; diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 01276b42..b232d1bf 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -85,6 +85,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.add("max-memory-ratio", '\0', "Maximum fraction of system memory to be used.", false, 1.0f); options.add("snapshot-interval-seconds", '\0', "Frequency of replication log snapshots.", false, 3600); + options.add("snapshot-max-byte-count-per-rpc", '\0', "Maximum snapshot file size in bytes transferred for each RPC.", false, 4194304); options.add("healthy-read-lag", '\0', "Reads are rejected if the updates lag behind this threshold.", false, 1000); options.add("healthy-write-lag", '\0', "Writes are rejected if the updates lag behind this threshold.", false, 500); options.add("log-slow-requests-time-ms", '\0', "When > 0, requests that take longer than this duration are logged.", false, -1); @@ -243,7 +244,7 @@ const char* get_internal_ip(const std::string& subnet_cidr) { int start_raft_server(ReplicationState& replication_state, const std::string& state_dir, const std::string& path_to_nodes, const std::string& peering_address, uint32_t peering_port, const std::string& peering_subnet, - uint32_t api_port, int snapshot_interval_seconds) { + uint32_t api_port, int snapshot_interval_seconds, int snapshot_max_byte_count_per_rpc) { if(path_to_nodes.empty()) { LOG(INFO) << "Since no --nodes argument is provided, starting a single node Typesense cluster."; @@ -291,7 +292,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st // Reference: https://github.com/apache/incubator-brpc/blob/122770d/docs/en/client.md#timeout size_t election_timeout_ms = 5000; - if (replication_state.start(peering_endpoint, api_port, election_timeout_ms, snapshot_interval_seconds, state_dir, + if (replication_state.start(peering_endpoint, api_port, election_timeout_ms, snapshot_max_byte_count_per_rpc, state_dir, nodes_config_op.get(), quit_raft_service) != 0) { LOG(ERROR) << "Failed to start peering state"; exit(-1); @@ -299,6 +300,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st LOG(INFO) << "Typesense peering service is running on " << raft_server.listen_address(); LOG(INFO) << "Snapshot interval configured as: " << snapshot_interval_seconds << "s"; + LOG(INFO) << "Snapshot max byte count configured as: " << snapshot_max_byte_count_per_rpc; // Wait until 'CTRL-C' is pressed. then Stop() and Join() the service size_t raft_counter = 0; @@ -461,7 +463,8 @@ int run_server(const Config & config, const std::string & version, void (*master config.get_peering_port(), config.get_peering_subnet(), config.get_api_port(), - config.get_snapshot_interval_seconds()); + config.get_snapshot_interval_seconds(), + config.get_snapshot_max_byte_count_per_rpc()); LOG(INFO) << "Shutting down batch indexer..."; batch_indexer->stop();