From 10fd97c32dc6c66b7b311216b9e130d958f01602 Mon Sep 17 00:00:00 2001 From: kishorenc Date: Thu, 24 Dec 2020 19:32:17 +0530 Subject: [PATCH] Node vote API for triggering leader rotation. --- include/core_api.h | 6 +++++- include/http_server.h | 2 ++ include/raft_server.h | 6 +++++- src/core_api.cpp | 11 +++++++++++ src/http_server.cpp | 4 ++++ src/main/typesense_server.cpp | 1 + src/raft_server.cpp | 11 +++++++++++ 7 files changed, 39 insertions(+), 2 deletions(-) diff --git a/include/core_api.h b/include/core_api.h index ee0bbb25..9c35f2a1 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -73,7 +73,7 @@ bool get_key(http_req& req, http_res& res); bool del_key(http_req& req, http_res& res); -// Metrics +// Health + Metrics bool get_debug(http_req& req, http_res& res); @@ -85,8 +85,12 @@ bool get_metrics_json(http_req& req, http_res& res); bool get_log_sequence(http_req& req, http_res& res); +// operations + bool post_snapshot(http_req& req, http_res& res); +bool post_vote(http_req& req, http_res& res); + // Misc helpers bool raft_write_send_response(void *data); diff --git a/include/http_server.h b/include/http_server.h index 51f8bade..3a0acb5b 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -173,4 +173,6 @@ public: void defer_processing(http_req& req, http_res& res, size_t timeout_ms); void do_snapshot(const std::string& snapshot_path, http_req& req, http_res& res); + + bool trigger_vote(); }; diff --git a/include/raft_server.h b/include/raft_server.h index b5e11693..248d528a 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -94,7 +94,7 @@ public: class ReplicationState : public braft::StateMachine { private: static constexpr const char* db_snapshot_name = "db_snapshot"; - static const size_t CATCHUP_MIN_SEQUENCE_DIFF = 3000; // ~ actual 1K documents + static const size_t CATCHUP_MIN_SEQUENCE_DIFF = 3000; braft::Node* volatile node; butil::atomic leader_term; @@ -117,6 +117,8 @@ private: std::string ext_snapshot_path; + int election_timeout_interval_ms; + public: static constexpr const char* log_dir_name = "log"; @@ -141,6 +143,8 @@ public: // updates cluster membership void refresh_nodes(const std::string & nodes); + bool trigger_vote(); + bool has_leader_term() const { return leader_term.load(butil::memory_order_acquire) > 0; } diff --git a/src/core_api.cpp b/src/core_api.cpp index a415096e..f8e4d8f9 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -1340,6 +1340,17 @@ bool post_snapshot(http_req& req, http_res& res) { return true; } +bool post_vote(http_req& req, http_res& res) { + res.status_code = 200; + res.content_type_header = "application/json"; + + nlohmann::json response; + response["success"] = server->trigger_vote(); + res.body = response.dump(); + + return true; +} + bool get_synonyms(http_req &req, http_res &res) { CollectionManager & collectionManager = CollectionManager::get_instance(); Collection *collection = collectionManager.get_collection(req.params["collection"]); diff --git a/src/http_server.cpp b/src/http_server.cpp index 25741a39..f808587e 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -809,3 +809,7 @@ bool HttpServer::has_exited() const { void HttpServer::do_snapshot(const std::string& snapshot_path, http_req& req, http_res& res) { return replication_state->do_snapshot(snapshot_path, req, res); } + +bool HttpServer::trigger_vote() { + return replication_state->trigger_vote(); +} diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 38b396c1..a31b4fde 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -62,6 +62,7 @@ void master_server_routes() { server->post("/health", post_health); server->get("/sequence", get_log_sequence); server->post("/operations/snapshot", post_snapshot, false, true); + server->post("/operations/vote", post_vote, false, false); } int main(int argc, char **argv) { diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 25fad5e0..2ff060b6 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -25,6 +25,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int int election_timeout_ms, int snapshot_interval_s, const std::string & raft_dir, const std::string & nodes) { + this->election_timeout_interval_ms = election_timeout_ms; this->raft_dir_path = raft_dir; braft::NodeOptions node_options; @@ -576,6 +577,16 @@ void ReplicationState::do_dummy_write() { LOG(INFO) << "Dummy write to " << url << ", status = " << status_code << ", response = " << api_res; } +bool ReplicationState::trigger_vote() { + if(node) { + auto status = node->vote(election_timeout_interval_ms); + LOG(INFO) << "Triggered vote. Ok? " << status.ok() << ", status: " << status; + return status.ok(); + } + + return false; +} + void InitSnapshotClosure::Run() { // Auto delete this after Run() std::unique_ptr self_guard(this);