Parameterize catchup_min_sequence_diff.

This commit is contained in:
kishorenc 2021-03-04 11:43:50 +05:30
parent 6fa6fd9258
commit 8d4defa93e
4 changed files with 29 additions and 8 deletions

View File

@ -32,6 +32,7 @@ private:
float max_memory_ratio;
int snapshot_interval_seconds;
size_t catch_up_min_sequence_diff;
size_t catch_up_threshold_percentage;
std::string config_file;
@ -48,6 +49,7 @@ protected:
this->enable_cors = false;
this->max_memory_ratio = 1.0f;
this->snapshot_interval_seconds = 3600;
this->catch_up_min_sequence_diff = 3000;
this->catch_up_threshold_percentage = 95;
this->log_slow_requests_time_ms = -1;
}
@ -179,6 +181,10 @@ public:
return this->snapshot_interval_seconds;
}
int get_catch_up_min_sequence_diff() const {
return this->catch_up_min_sequence_diff;
}
int get_catch_up_threshold_percentage() const {
return this->catch_up_threshold_percentage;
}
@ -247,6 +253,10 @@ public:
this->snapshot_interval_seconds = std::stoi(get_env("TYPESENSE_SNAPSHOT_INTERVAL_SECONDS"));
}
if(!get_env("TYPESENSE_CATCH_UP_MIN_SEQUENCE_DIFF").empty()) {
this->catch_up_min_sequence_diff = std::stoi(get_env("TYPESENSE_CATCH_UP_MIN_SEQUENCE_DIFF"));
}
if(!get_env("TYPESENSE_CATCH_UP_THRESHOLD_PERCENTAGE").empty()) {
this->catch_up_threshold_percentage = std::stoi(get_env("TYPESENSE_CATCH_UP_THRESHOLD_PERCENTAGE"));
}
@ -344,6 +354,10 @@ public:
this->snapshot_interval_seconds = (int) reader.GetInteger("server", "snapshot-interval-seconds", 3600);
}
if(reader.Exists("server", "catch-up-min-sequence-diff")) {
this->catch_up_min_sequence_diff = (int) reader.GetInteger("server", "catch-up-min-sequence-diff", 3000);
}
if(reader.Exists("server", "catch-up-threshold-percentage")) {
this->catch_up_threshold_percentage = (int) reader.GetInteger("server", "catch-up-threshold-percentage", 95);
}
@ -423,6 +437,10 @@ public:
this->snapshot_interval_seconds = options.get<int>("snapshot-interval-seconds");
}
if(options.exist("catch-up-min-sequence-diff")) {
this->catch_up_min_sequence_diff = options.get<int>("catch-up-min-sequence-diff");
}
if(options.exist("catch-up-threshold-percentage")) {
this->catch_up_threshold_percentage = options.get<int>("catch-up-threshold-percentage");
}

View File

@ -94,7 +94,6 @@ public:
class ReplicationState : public braft::StateMachine {
private:
static constexpr const char* db_snapshot_name = "db_snapshot";
static const size_t CATCHUP_MIN_SEQUENCE_DIFF = 3000;
braft::Node* volatile node;
butil::atomic<int64_t> leader_term;
@ -104,6 +103,7 @@ private:
ThreadPool* thread_pool;
http_message_dispatcher* message_dispatcher;
const size_t catchup_min_sequence_diff;
const size_t catch_up_threshold_percentage;
std::atomic<bool> caught_up;
@ -126,7 +126,7 @@ public:
static constexpr const char* snapshot_dir_name = "snapshot";
ReplicationState(Store* store, ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher,
bool api_uses_ssl, size_t catch_up_threshold_percentage,
bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage,
bool create_init_db_snapshot, std::atomic<bool>& quit_service);
// Starts this node

View File

@ -478,14 +478,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
// Since leader waits for writes on followers to finish, follower's storage offset could be
// momentarily ahead of the leader's. So we will use std::abs() for checking the difference.
const int64_t seq_diff = std::abs(int64_t(leader_seq) - int64_t(seq_num));
const uint64_t seq_diff = std::abs(int64_t(leader_seq) - int64_t(seq_num));
if(seq_diff < CATCHUP_MIN_SEQUENCE_DIFF) {
if(seq_diff < catchup_min_sequence_diff) {
this->caught_up = true;
return ;
}
// However, if the difference is large, then something is wrong
// However, if the difference is large, then something could be wrong
if(leader_seq < seq_num) {
LOG(ERROR) << "Leader sequence " << leader_seq << " is less than local sequence " << seq_num;
this->caught_up = false;
@ -502,10 +502,11 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
}
ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_message_dispatcher *message_dispatcher,
bool api_uses_ssl, size_t catch_up_threshold_percentage,
bool api_uses_ssl, size_t catchup_min_sequence_diff, size_t catch_up_threshold_percentage,
bool create_init_db_snapshot, std::atomic<bool>& quit_service):
node(nullptr), leader_term(-1), store(store), thread_pool(thread_pool),
message_dispatcher(message_dispatcher), catch_up_threshold_percentage(catch_up_threshold_percentage),
message_dispatcher(message_dispatcher),
catchup_min_sequence_diff(catchup_min_sequence_diff), catch_up_threshold_percentage(catch_up_threshold_percentage),
api_uses_ssl(api_uses_ssl), create_init_db_snapshot(create_init_db_snapshot), shut_down(quit_service) {
}

View File

@ -78,6 +78,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.add<float>("max-memory-ratio", '\0', "Maximum fraction of system memory to be used.", false, 1.0f);
options.add<int>("snapshot-interval-seconds", '\0', "Frequency of replication log snapshots.", false, 3600);
options.add<int>("catch-up-min-sequence-diff", '\0', "The absolute storage sequence difference within which a follower is deemed to have caught up with leader.", false, 3000);
options.add<int>("catch-up-threshold-percentage", '\0', "The threshold at which a follower is deemed to have caught up with leader.", false, 95);
options.add<int>("log-slow-requests-time-ms", '\0', "When > 0, requests that take longer than this duration are logged.", false, -1);
@ -392,7 +393,8 @@ int run_server(const Config & config, const std::string & version, void (*master
// first we start the peering service
ReplicationState replication_state(&store, &thread_pool, server->get_message_dispatcher(),
ssl_enabled, config.get_catch_up_threshold_percentage(),
ssl_enabled, config.get_catch_up_min_sequence_diff(),
config.get_catch_up_threshold_percentage(),
create_init_db_snapshot, quit_raft_service);
std::thread raft_thread([&replication_state, &config, &state_dir]() {