From 27179b4f8bdeee2de2b35e8ab7db4479bf2bcf64 Mon Sep 17 00:00:00 2001 From: kishorenc Date: Tue, 15 Sep 2020 18:29:38 +0530 Subject: [PATCH] Parameterize snapshot interval duration. --- include/config.h | 18 ++++++++++++++++++ src/raft_server.cpp | 2 +- src/typesense_server_utils.cpp | 12 +++++++++--- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/include/config.h b/include/config.h index b58e2748..d601f350 100644 --- a/include/config.h +++ b/include/config.h @@ -30,6 +30,7 @@ private: bool enable_cors; float max_memory_ratio; + int snapshot_interval_seconds; std::string config_file; int config_file_validity; @@ -42,6 +43,7 @@ public: this->peering_port = 8107; this->enable_cors = false; this->max_memory_ratio = 1.0f; + this->snapshot_interval_seconds = 3600; } // setters @@ -150,6 +152,10 @@ public: return this->max_memory_ratio; } + int get_snapshot_interval_seconds() const { + return this->snapshot_interval_seconds; + } + // loaders std::string get_env(const char *name) { @@ -205,6 +211,10 @@ public: if(!get_env("TYPESENSE_MAX_MEMORY_RATIO").empty()) { this->max_memory_ratio = std::stof(get_env("TYPESENSE_MAX_MEMORY_RATIO")); } + + if(!get_env("TYPESENSE_SNAPSHOT_INTERVAL_SECONDS").empty()) { + this->snapshot_interval_seconds = std::stoi(get_env("TYPESENSE_SNAPSHOT_INTERVAL_SECONDS")); + } } void load_config_file(cmdline::parser & options) { @@ -290,6 +300,10 @@ public: if(reader.Exists("server", "max-memory-ratio")) { this->max_memory_ratio = (float) reader.GetReal("server", "max-memory-ratio", 1.0f); } + + if(reader.Exists("server", "snapshot-interval-seconds")) { + this->snapshot_interval_seconds = (int) reader.GetInteger("server", "snapshot-interval-seconds", 3600); + } } void load_config_cmd_args(cmdline::parser & options) { @@ -357,6 +371,10 @@ public: if(options.exist("max-memory-ratio")) { this->max_memory_ratio = options.get("max-memory-ratio"); } + + if(options.exist("snapshot-interval-seconds")) { + this->snapshot_interval_seconds = options.get("snapshot-interval-seconds"); + } } // validation diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 8ee297a6..b8118306 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -133,7 +133,7 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con // Handle no leader scenario LOG(ERROR) << "Rejecting write: could not find a leader."; - if(request->_req->proceed_req) { + if(request->_req->proceed_req && response->proxied_stream) { // streaming in progress: ensure graceful termination (cannot start response again) LOG(ERROR) << "Terminating streaming request gracefully."; request->await.notify(); diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 640e5864..7228ae53 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -87,6 +87,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.add("enable-cors", '\0', "Enable CORS requests."); options.add("max-memory-ratio", '\0', "Maximum fraction of system memory to be used.", false, 1.0f); + options.add("snapshot-interval-seconds", '\0', "Frequency of replication log snapshots.", false, 3600); options.add("log-dir", '\0', "Path to the log directory.", false, ""); @@ -178,7 +179,8 @@ Option fetch_nodes_config(const std::string& path_to_nodes) { } int start_raft_server(ReplicationState& replication_state, const std::string& state_dir, const std::string& path_to_nodes, - const std::string& peering_address, uint32_t peering_port, uint32_t api_port) { + const std::string& peering_address, uint32_t peering_port, uint32_t api_port, + int snapshot_interval_seconds) { if(path_to_nodes.empty()) { LOG(INFO) << "Since no --nodes argument is provided, starting a single node Typesense cluster."; @@ -222,13 +224,14 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st // Reference: https://github.com/apache/incubator-brpc/blob/122770d/docs/en/client.md#timeout size_t election_timeout_ms = 5000; - if (replication_state.start(peering_endpoint, api_port, election_timeout_ms, 3600, state_dir, + if (replication_state.start(peering_endpoint, api_port, election_timeout_ms, snapshot_interval_seconds, state_dir, nodes_config_op.get()) != 0) { LOG(ERROR) << "Failed to start peering state"; exit(-1); } LOG(INFO) << "Typesense peering service is running on " << raft_server.listen_address(); + LOG(INFO) << "Snapshot interval configured as: " << snapshot_interval_seconds << "s"; // Wait until 'CTRL-C' is pressed. then Stop() and Join() the service size_t raft_counter = 0; @@ -362,7 +365,10 @@ int run_server(const Config & config, const std::string & version, void (*master std::thread raft_thread([&replication_state, &config, &state_dir]() { std::string path_to_nodes = config.get_nodes(); start_raft_server(replication_state, state_dir, path_to_nodes, - config.get_peering_address(), config.get_peering_port(), config.get_api_port()); + config.get_peering_address(), + config.get_peering_port(), + config.get_api_port(), + config.get_snapshot_interval_seconds()); }); LOG(INFO) << "Starting API service...";