diff --git a/include/http_client.h b/include/http_client.h index d074a764..79287105 100644 --- a/include/http_client.h +++ b/include/http_client.h @@ -32,7 +32,8 @@ public: void init(const std::string & api_key); - static long get_response(const std::string & url, std::string & response); + static long get_response(const std::string & url, std::string & response, long timeout_ms=4000); - static long post_response(const std::string & url, const std::string & body, std::string & response); + static long post_response(const std::string & url, const std::string & body, std::string & response, + long timeout_ms=4000); }; diff --git a/include/http_data.h b/include/http_data.h index 0b03a68e..695fbfff 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -28,7 +28,9 @@ struct http_res { case 201: return "Created"; case 400: return "Bad Request"; case 401: return "Unauthorized"; + case 403: return "Forbidden"; case 404: return "Not Found"; + case 405: return "Not Allowed"; case 409: return "Conflict"; case 422: return "Unprocessable Entity"; case 500: return "Internal Server Error"; @@ -81,9 +83,9 @@ struct http_res { body = "{\"message\": \"" + message + "\"}"; } - void send_500(const std::string & res_body) { + void send_500(const std::string & message) { status_code = 500; - body = res_body; + body = "{\"message\": \"" + message + "\"}"; } void send(uint32_t code, const std::string & message) { diff --git a/src/http_client.cpp b/src/http_client.cpp index 134484eb..46c42657 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -6,14 +6,16 @@ std::string HttpClient::api_key = ""; std::string HttpClient::ca_cert_path = ""; -long HttpClient::post_response(const std::string &url, const std::string &body, std::string &response) { +long HttpClient::post_response(const std::string &url, const std::string &body, std::string &response, long timeout_ms) { CURL *curl = init_curl(url, response); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms); return perform_curl(curl); } -long HttpClient::get_response(const std::string &url, std::string &response) { +long HttpClient::get_response(const std::string &url, std::string &response, long timeout_ms) { CURL *curl = init_curl(url, response); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms); if(curl == nullptr) { return 0; @@ -68,6 +70,8 @@ CURL *HttpClient::init_curl(const std::string &url, std::string &buffer) { } curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 300); + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); // to allow self-signed certs curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HttpClient::curl_write); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buffer); diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 0631f3aa..7fe56d7a 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -91,26 +91,41 @@ int ReplicationState::start(const int api_port, int raft_port, int election_time void ReplicationState::write(http_req* request, http_res* response) { if (!is_leader()) { - LOG(INFO) << "Redirecting write to leader."; + if(node->leader_id().is_empty()) { + // Handle no leader scenario + LOG(ERROR) << "Rejecting write: could not find a leader."; + response->send_500("Could not find a leader."); + auto replication_arg = new AsyncIndexArg{request, response, nullptr}; + replication_arg->req->route_index = static_cast(ROUTE_CODES::RETURN_EARLY); + return message_dispatcher->send_message(REPLICATION_MSG, replication_arg); + } - thread_pool->enqueue([](http_req* request, http_res* response, - http_message_dispatcher* message_dispatcher, braft::Node* node) { + const std::string & leader_addr = node->leader_id().to_string(); + LOG(INFO) << "Redirecting write to leader at: " << leader_addr; + + thread_pool->enqueue([leader_addr, request, response, this]() { auto raw_req = request->_req; std::string scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len); std::vector addr_parts; - StringUtils::split(node->leader_id().to_string(), addr_parts, ":"); + StringUtils::split(leader_addr, addr_parts, ":"); std::string leader_host_port = addr_parts[0] + ":" + addr_parts[2]; const std::string & path = std::string(raw_req->path.base, raw_req->path.len); std::string url = scheme + "://" + leader_host_port + path; - std::string api_res; - long status = HttpClient::post_response(url, request->body, api_res); - response->send(status, api_res); + if(request->http_method == "POST") { + std::string api_res; + long status = HttpClient::post_response(url, request->body, api_res); + response->send(status, api_res); + } else { + const std::string& err = "Forwarding for http method not implemented: " + request->http_method; + LOG(ERROR) << err; + response->send_500(err); + } auto replication_arg = new AsyncIndexArg{request, response, nullptr}; replication_arg->req->route_index = static_cast(ROUTE_CODES::RETURN_EARLY); message_dispatcher->send_message(REPLICATION_MSG, replication_arg); - }, request, response, message_dispatcher, node); + }); return ; } diff --git a/src/replicator.cpp b/src/replicator.cpp index 4329d10c..1cc8d05d 100644 --- a/src/replicator.cpp +++ b/src/replicator.cpp @@ -93,7 +93,7 @@ void Replicator::start(http_message_dispatcher* message_dispatcher, const std::s std::string url = master_host_port+"/replication/updates?seq_number="+std::to_string(latest_seq_num+1); std::string json_response; - long status_code = HttpClient::get_response(url, json_response); + long status_code = HttpClient::get_response(url, json_response, 15000); if(status_code == 200) { nlohmann::json json_content = nlohmann::json::parse(json_response); diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 80f6da57..990d58b3 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -318,6 +318,7 @@ int run_server(const Config & config, const std::string & version, }); // wait for raft service to be ready before starting http + // TODO: should not return until either follower or leader has started ready_future.get(); if(config.get_master().empty()) {