Reject writes to a follower when there is no leader.

This commit is contained in:
kishorenc 2020-03-23 21:20:37 +05:30
parent b16614eb20
commit 3cb4c3a3b6
6 changed files with 38 additions and 15 deletions

View File

@ -32,7 +32,8 @@ public:
void init(const std::string & api_key);
static long get_response(const std::string & url, std::string & response);
static long get_response(const std::string & url, std::string & response, long timeout_ms=4000);
static long post_response(const std::string & url, const std::string & body, std::string & response);
static long post_response(const std::string & url, const std::string & body, std::string & response,
long timeout_ms=4000);
};

View File

@ -28,7 +28,9 @@ struct http_res {
case 201: return "Created";
case 400: return "Bad Request";
case 401: return "Unauthorized";
case 403: return "Forbidden";
case 404: return "Not Found";
case 405: return "Not Allowed";
case 409: return "Conflict";
case 422: return "Unprocessable Entity";
case 500: return "Internal Server Error";
@ -81,9 +83,9 @@ struct http_res {
body = "{\"message\": \"" + message + "\"}";
}
void send_500(const std::string & res_body) {
void send_500(const std::string & message) {
status_code = 500;
body = res_body;
body = "{\"message\": \"" + message + "\"}";
}
void send(uint32_t code, const std::string & message) {

View File

@ -6,14 +6,16 @@
std::string HttpClient::api_key = "";
std::string HttpClient::ca_cert_path = "";
long HttpClient::post_response(const std::string &url, const std::string &body, std::string &response) {
long HttpClient::post_response(const std::string &url, const std::string &body, std::string &response, long timeout_ms) {
CURL *curl = init_curl(url, response);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms);
return perform_curl(curl);
}
long HttpClient::get_response(const std::string &url, std::string &response) {
long HttpClient::get_response(const std::string &url, std::string &response, long timeout_ms) {
CURL *curl = init_curl(url, response);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms);
if(curl == nullptr) {
return 0;
@ -68,6 +70,8 @@ CURL *HttpClient::init_curl(const std::string &url, std::string &buffer) {
}
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 300);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); // to allow self-signed certs
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HttpClient::curl_write);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer);

View File

@ -91,26 +91,41 @@ int ReplicationState::start(const int api_port, int raft_port, int election_time
void ReplicationState::write(http_req* request, http_res* response) {
if (!is_leader()) {
LOG(INFO) << "Redirecting write to leader.";
if(node->leader_id().is_empty()) {
// Handle no leader scenario
LOG(ERROR) << "Rejecting write: could not find a leader.";
response->send_500("Could not find a leader.");
auto replication_arg = new AsyncIndexArg{request, response, nullptr};
replication_arg->req->route_index = static_cast<int>(ROUTE_CODES::RETURN_EARLY);
return message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
}
thread_pool->enqueue([](http_req* request, http_res* response,
http_message_dispatcher* message_dispatcher, braft::Node* node) {
const std::string & leader_addr = node->leader_id().to_string();
LOG(INFO) << "Redirecting write to leader at: " << leader_addr;
thread_pool->enqueue([leader_addr, request, response, this]() {
auto raw_req = request->_req;
std::string scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len);
std::vector<std::string> addr_parts;
StringUtils::split(node->leader_id().to_string(), addr_parts, ":");
StringUtils::split(leader_addr, addr_parts, ":");
std::string leader_host_port = addr_parts[0] + ":" + addr_parts[2];
const std::string & path = std::string(raw_req->path.base, raw_req->path.len);
std::string url = scheme + "://" + leader_host_port + path;
std::string api_res;
long status = HttpClient::post_response(url, request->body, api_res);
response->send(status, api_res);
if(request->http_method == "POST") {
std::string api_res;
long status = HttpClient::post_response(url, request->body, api_res);
response->send(status, api_res);
} else {
const std::string& err = "Forwarding for http method not implemented: " + request->http_method;
LOG(ERROR) << err;
response->send_500(err);
}
auto replication_arg = new AsyncIndexArg{request, response, nullptr};
replication_arg->req->route_index = static_cast<int>(ROUTE_CODES::RETURN_EARLY);
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
}, request, response, message_dispatcher, node);
});
return ;
}

View File

@ -93,7 +93,7 @@ void Replicator::start(http_message_dispatcher* message_dispatcher, const std::s
std::string url = master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1);
std::string json_response;
long status_code = HttpClient::get_response(url, json_response);
long status_code = HttpClient::get_response(url, json_response, 15000);
if(status_code == 200) {
nlohmann::json json_content = nlohmann::json::parse(json_response);

View File

@ -318,6 +318,7 @@ int run_server(const Config & config, const std::string & version,
});
// wait for raft service to be ready before starting http
// TODO: should not return until either follower or leader has started
ready_future.get();
if(config.get_master().empty()) {