Node vote API for triggering leader rotation.

This commit is contained in:
kishorenc 2020-12-24 19:32:17 +05:30
parent 66a44a5afc
commit 10fd97c32d
7 changed files with 39 additions and 2 deletions

View File

@ -73,7 +73,7 @@ bool get_key(http_req& req, http_res& res);
bool del_key(http_req& req, http_res& res);
// Metrics
// Health + Metrics
bool get_debug(http_req& req, http_res& res);
@ -85,8 +85,12 @@ bool get_metrics_json(http_req& req, http_res& res);
bool get_log_sequence(http_req& req, http_res& res);
// operations
bool post_snapshot(http_req& req, http_res& res);
bool post_vote(http_req& req, http_res& res);
// Misc helpers
bool raft_write_send_response(void *data);

View File

@ -173,4 +173,6 @@ public:
void defer_processing(http_req& req, http_res& res, size_t timeout_ms);
void do_snapshot(const std::string& snapshot_path, http_req& req, http_res& res);
bool trigger_vote();
};

View File

@ -94,7 +94,7 @@ public:
class ReplicationState : public braft::StateMachine {
private:
static constexpr const char* db_snapshot_name = "db_snapshot";
static const size_t CATCHUP_MIN_SEQUENCE_DIFF = 3000; // ~ actual 1K documents
static const size_t CATCHUP_MIN_SEQUENCE_DIFF = 3000;
braft::Node* volatile node;
butil::atomic<int64_t> leader_term;
@ -117,6 +117,8 @@ private:
std::string ext_snapshot_path;
int election_timeout_interval_ms;
public:
static constexpr const char* log_dir_name = "log";
@ -141,6 +143,8 @@ public:
// updates cluster membership
void refresh_nodes(const std::string & nodes);
bool trigger_vote();
bool has_leader_term() const {
return leader_term.load(butil::memory_order_acquire) > 0;
}

View File

@ -1340,6 +1340,17 @@ bool post_snapshot(http_req& req, http_res& res) {
return true;
}
bool post_vote(http_req& req, http_res& res) {
res.status_code = 200;
res.content_type_header = "application/json";
nlohmann::json response;
response["success"] = server->trigger_vote();
res.body = response.dump();
return true;
}
bool get_synonyms(http_req &req, http_res &res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection *collection = collectionManager.get_collection(req.params["collection"]);

View File

@ -809,3 +809,7 @@ bool HttpServer::has_exited() const {
void HttpServer::do_snapshot(const std::string& snapshot_path, http_req& req, http_res& res) {
return replication_state->do_snapshot(snapshot_path, req, res);
}
bool HttpServer::trigger_vote() {
return replication_state->trigger_vote();
}

View File

@ -62,6 +62,7 @@ void master_server_routes() {
server->post("/health", post_health);
server->get("/sequence", get_log_sequence);
server->post("/operations/snapshot", post_snapshot, false, true);
server->post("/operations/vote", post_vote, false, false);
}
int main(int argc, char **argv) {

View File

@ -25,6 +25,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int
int election_timeout_ms, int snapshot_interval_s,
const std::string & raft_dir, const std::string & nodes) {
this->election_timeout_interval_ms = election_timeout_ms;
this->raft_dir_path = raft_dir;
braft::NodeOptions node_options;
@ -576,6 +577,16 @@ void ReplicationState::do_dummy_write() {
LOG(INFO) << "Dummy write to " << url << ", status = " << status_code << ", response = " << api_res;
}
bool ReplicationState::trigger_vote() {
if(node) {
auto status = node->vote(election_timeout_interval_ms);
LOG(INFO) << "Triggered vote. Ok? " << status.ok() << ", status: " << status;
return status.ok();
}
return false;
}
void InitSnapshotClosure::Run() {
// Auto delete this after Run()
std::unique_ptr<InitSnapshotClosure> self_guard(this);