Expose snapshot_max_byte_count_per_rpc param.

This commit is contained in:
Kishore Nallan 2022-06-17 15:24:26 +05:30
parent 19ac76b681
commit ec217bd7d7
4 changed files with 27 additions and 6 deletions

View File

@ -36,6 +36,7 @@ private:
float max_memory_ratio;
int snapshot_interval_seconds;
int snapshot_max_byte_count_per_rpc;
std::atomic<size_t> healthy_read_lag;
std::atomic<size_t> healthy_write_lag;
@ -63,6 +64,7 @@ protected:
this->enable_cors = true;
this->max_memory_ratio = 1.0f;
this->snapshot_interval_seconds = 3600;
this->snapshot_max_byte_count_per_rpc = 4194304;
this->healthy_read_lag = 1000;
this->healthy_write_lag = 500;
this->log_slow_requests_time_ms = -1;
@ -217,6 +219,10 @@ public:
return this->snapshot_interval_seconds;
}
int get_snapshot_max_byte_count_per_rpc() const {
return this->snapshot_max_byte_count_per_rpc;
}
size_t get_healthy_read_lag() const {
return this->healthy_read_lag;
}
@ -356,6 +362,10 @@ public:
this->ssl_refresh_interval_seconds = std::stoi(get_env("TYPESENSE_SSL_REFRESH_INTERVAL_SECONDS"));
}
if(!get_env("TYPESENSE_SNAPSHOT_MAX_BYTE_COUNT_PER_RPC").empty()) {
this->snapshot_max_byte_count_per_rpc = std::stoi(get_env("TYPESENSE_SNAPSHOT_MAX_BYTE_COUNT_PER_RPC"));
}
this->enable_access_logging = ("TRUE" == get_env("TYPESENSE_ENABLE_ACCESS_LOGGING"));
if(!get_env("TYPESENSE_DISK_USED_MAX_PERCENTAGE").empty()) {
@ -463,6 +473,10 @@ public:
this->snapshot_interval_seconds = (int) reader.GetInteger("server", "snapshot-interval-seconds", 3600);
}
if(reader.Exists("server", "snapshot-max-byte-count-per-rpc")) {
this->snapshot_max_byte_count_per_rpc = (int) reader.GetInteger("server", "snapshot-max-byte-count-per-rpc", 4194304);
}
if(reader.Exists("server", "healthy-read-lag")) {
this->healthy_read_lag = (size_t) reader.GetInteger("server", "healthy-read-lag", 1000);
}
@ -580,6 +594,10 @@ public:
this->snapshot_interval_seconds = options.get<int>("snapshot-interval-seconds");
}
if(options.exist("snapshot-max-byte-count-per-rpc")) {
this->snapshot_max_byte_count_per_rpc = options.get<int>("snapshot-max-byte-count-per-rpc");
}
if(options.exist("healthy-read-lag")) {
this->healthy_read_lag = options.get<size_t>("healthy-read-lag");
}

View File

@ -178,7 +178,7 @@ public:
// Starts this node
int start(const butil::EndPoint & peering_endpoint, int api_port,
int election_timeout_ms, int snapshot_interval_s,
int election_timeout_ms, int snapshot_max_byte_count_per_rpc,
const std::string & raft_dir, const std::string & nodes,
const std::atomic<bool>& quit_abruptly);

View File

@ -28,7 +28,7 @@ void ReplicationClosure::Run() {
// State machine implementation
int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int api_port,
int election_timeout_ms, int snapshot_interval_s,
int election_timeout_ms, int snapshot_max_byte_count_per_rpc,
const std::string & raft_dir, const std::string & nodes,
const std::atomic<bool>& quit_abruptly) {
@ -78,7 +78,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int
braft::FLAGS_raft_max_append_entries_cache_size = 8;
// flag controls snapshot download size of each RPC
braft::FLAGS_raft_max_byte_count_per_rpc = 4 * 1024 * 1024; // 4 MB
braft::FLAGS_raft_max_byte_count_per_rpc = snapshot_max_byte_count_per_rpc;
// automatic snapshot is disabled since it caused issues during slow follower catch-ups
node_options.snapshot_interval_s = -1;

View File

@ -85,6 +85,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.add<float>("max-memory-ratio", '\0', "Maximum fraction of system memory to be used.", false, 1.0f);
options.add<int>("snapshot-interval-seconds", '\0', "Frequency of replication log snapshots.", false, 3600);
options.add<int>("snapshot-max-byte-count-per-rpc", '\0', "Maximum snapshot file size in bytes transferred for each RPC.", false, 4194304);
options.add<size_t>("healthy-read-lag", '\0', "Reads are rejected if the updates lag behind this threshold.", false, 1000);
options.add<size_t>("healthy-write-lag", '\0', "Writes are rejected if the updates lag behind this threshold.", false, 500);
options.add<int>("log-slow-requests-time-ms", '\0', "When > 0, requests that take longer than this duration are logged.", false, -1);
@ -243,7 +244,7 @@ const char* get_internal_ip(const std::string& subnet_cidr) {
int start_raft_server(ReplicationState& replication_state, const std::string& state_dir, const std::string& path_to_nodes,
const std::string& peering_address, uint32_t peering_port, const std::string& peering_subnet,
uint32_t api_port, int snapshot_interval_seconds) {
uint32_t api_port, int snapshot_interval_seconds, int snapshot_max_byte_count_per_rpc) {
if(path_to_nodes.empty()) {
LOG(INFO) << "Since no --nodes argument is provided, starting a single node Typesense cluster.";
@ -291,7 +292,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
// Reference: https://github.com/apache/incubator-brpc/blob/122770d/docs/en/client.md#timeout
size_t election_timeout_ms = 5000;
if (replication_state.start(peering_endpoint, api_port, election_timeout_ms, snapshot_interval_seconds, state_dir,
if (replication_state.start(peering_endpoint, api_port, election_timeout_ms, snapshot_max_byte_count_per_rpc, state_dir,
nodes_config_op.get(), quit_raft_service) != 0) {
LOG(ERROR) << "Failed to start peering state";
exit(-1);
@ -299,6 +300,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
LOG(INFO) << "Typesense peering service is running on " << raft_server.listen_address();
LOG(INFO) << "Snapshot interval configured as: " << snapshot_interval_seconds << "s";
LOG(INFO) << "Snapshot max byte count configured as: " << snapshot_max_byte_count_per_rpc;
// Wait until 'CTRL-C' is pressed. then Stop() and Join() the service
size_t raft_counter = 0;
@ -461,7 +463,8 @@ int run_server(const Config & config, const std::string & version, void (*master
config.get_peering_port(),
config.get_peering_subnet(),
config.get_api_port(),
config.get_snapshot_interval_seconds());
config.get_snapshot_interval_seconds(),
config.get_snapshot_max_byte_count_per_rpc());
LOG(INFO) << "Shutting down batch indexer...";
batch_indexer->stop();