Parameterize snapshot interval duration.

This commit is contained in:
kishorenc 2020-09-15 18:29:38 +05:30
parent 97394c146a
commit 27179b4f8b
3 changed files with 28 additions and 4 deletions

View File

@ -30,6 +30,7 @@ private:
bool enable_cors;
float max_memory_ratio;
int snapshot_interval_seconds;
std::string config_file;
int config_file_validity;
@ -42,6 +43,7 @@ public:
this->peering_port = 8107;
this->enable_cors = false;
this->max_memory_ratio = 1.0f;
this->snapshot_interval_seconds = 3600;
}
// setters
@ -150,6 +152,10 @@ public:
return this->max_memory_ratio;
}
int get_snapshot_interval_seconds() const {
return this->snapshot_interval_seconds;
}
// loaders
std::string get_env(const char *name) {
@ -205,6 +211,10 @@ public:
if(!get_env("TYPESENSE_MAX_MEMORY_RATIO").empty()) {
this->max_memory_ratio = std::stof(get_env("TYPESENSE_MAX_MEMORY_RATIO"));
}
if(!get_env("TYPESENSE_SNAPSHOT_INTERVAL_SECONDS").empty()) {
this->snapshot_interval_seconds = std::stoi(get_env("TYPESENSE_SNAPSHOT_INTERVAL_SECONDS"));
}
}
void load_config_file(cmdline::parser & options) {
@ -290,6 +300,10 @@ public:
if(reader.Exists("server", "max-memory-ratio")) {
this->max_memory_ratio = (float) reader.GetReal("server", "max-memory-ratio", 1.0f);
}
if(reader.Exists("server", "snapshot-interval-seconds")) {
this->snapshot_interval_seconds = (int) reader.GetInteger("server", "snapshot-interval-seconds", 3600);
}
}
void load_config_cmd_args(cmdline::parser & options) {
@ -357,6 +371,10 @@ public:
if(options.exist("max-memory-ratio")) {
this->max_memory_ratio = options.get<float>("max-memory-ratio");
}
if(options.exist("snapshot-interval-seconds")) {
this->snapshot_interval_seconds = options.get<int>("snapshot-interval-seconds");
}
}
// validation

View File

@ -133,7 +133,7 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con
// Handle no leader scenario
LOG(ERROR) << "Rejecting write: could not find a leader.";
if(request->_req->proceed_req) {
if(request->_req->proceed_req && response->proxied_stream) {
// streaming in progress: ensure graceful termination (cannot start response again)
LOG(ERROR) << "Terminating streaming request gracefully.";
request->await.notify();

View File

@ -87,6 +87,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.add("enable-cors", '\0', "Enable CORS requests.");
options.add<float>("max-memory-ratio", '\0', "Maximum fraction of system memory to be used.", false, 1.0f);
options.add<int>("snapshot-interval-seconds", '\0', "Frequency of replication log snapshots.", false, 3600);
options.add<std::string>("log-dir", '\0', "Path to the log directory.", false, "");
@ -178,7 +179,8 @@ Option<std::string> fetch_nodes_config(const std::string& path_to_nodes) {
}
int start_raft_server(ReplicationState& replication_state, const std::string& state_dir, const std::string& path_to_nodes,
const std::string& peering_address, uint32_t peering_port, uint32_t api_port) {
const std::string& peering_address, uint32_t peering_port, uint32_t api_port,
int snapshot_interval_seconds) {
if(path_to_nodes.empty()) {
LOG(INFO) << "Since no --nodes argument is provided, starting a single node Typesense cluster.";
@ -222,13 +224,14 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
// Reference: https://github.com/apache/incubator-brpc/blob/122770d/docs/en/client.md#timeout
size_t election_timeout_ms = 5000;
if (replication_state.start(peering_endpoint, api_port, election_timeout_ms, 3600, state_dir,
if (replication_state.start(peering_endpoint, api_port, election_timeout_ms, snapshot_interval_seconds, state_dir,
nodes_config_op.get()) != 0) {
LOG(ERROR) << "Failed to start peering state";
exit(-1);
}
LOG(INFO) << "Typesense peering service is running on " << raft_server.listen_address();
LOG(INFO) << "Snapshot interval configured as: " << snapshot_interval_seconds << "s";
// Wait until 'CTRL-C' is pressed. then Stop() and Join() the service
size_t raft_counter = 0;
@ -362,7 +365,10 @@ int run_server(const Config & config, const std::string & version, void (*master
std::thread raft_thread([&replication_state, &config, &state_dir]() {
std::string path_to_nodes = config.get_nodes();
start_raft_server(replication_state, state_dir, path_to_nodes,
config.get_peering_address(), config.get_peering_port(), config.get_api_port());
config.get_peering_address(),
config.get_peering_port(),
config.get_api_port(),
config.get_snapshot_interval_seconds());
});
LOG(INFO) << "Starting API service...";