From 009e67815c7684a0b02a78f740ffcb13229c250b Mon Sep 17 00:00:00 2001 From: Jason Bosco Date: Fri, 29 Jan 2021 13:58:14 -0800 Subject: [PATCH] Make on-demand snapshot endpoint asynchronous. --- include/raft_server.h | 8 +++++--- src/http_client.cpp | 8 ++++++++ src/raft_server.cpp | 33 +++++++++++++++++++++++++-------- src/typesense_server_utils.cpp | 3 ++- 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/include/raft_server.h b/include/raft_server.h index 41bc8ec5..09eb5d21 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -76,12 +76,12 @@ public: class OnDemandSnapshotClosure : public braft::Closure { private: ReplicationState* replication_state; - http_req& req; - http_res& res; + http_req* req; + http_res* res; public: - OnDemandSnapshotClosure(ReplicationState* replication_state, http_req& req, http_res& res): + OnDemandSnapshotClosure(ReplicationState* replication_state, http_req* req, http_res* res): replication_state(replication_state), req(req), res(res) {} ~OnDemandSnapshotClosure() {} @@ -188,6 +188,8 @@ public: const std::string& get_ext_snapshot_path() const; + http_message_dispatcher* get_message_dispatcher() const; + static constexpr const char* REPLICATION_MSG = "raft_replication"; private: diff --git a/src/http_client.cpp b/src/http_client.cpp index b6579c81..b1962402 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -112,6 +112,7 @@ long HttpClient::perform_curl(CURL *curl, std::map& re if (res != CURLE_OK) { LOG(ERROR) << "CURL failed. Code: " << res << ", strerror: " << curl_easy_strerror(res); curl_easy_cleanup(curl); + curl_slist_free_all(chunk); return 500; } @@ -119,7 +120,9 @@ long HttpClient::perform_curl(CURL *curl, std::map& re curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code); extract_response_headers(curl, res_headers); + curl_easy_cleanup(curl); + curl_slist_free_all(chunk); return http_code == 0 ? 500 : http_code; } @@ -284,6 +287,11 @@ CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* re curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, HttpClient::curl_write_async_done); curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, req_res); + // This is okay as per [docs](http://curl.haxx.se/libcurl/c/curl_easy_setopt.html) + // Strings passed to libcurl as 'char *' arguments, are copied by the library; thus the string storage + // associated to the pointer argument may be overwritten after curl_easy_setopt() returns. + curl_slist_free_all(chunk); + return curl; } diff --git a/src/raft_server.cpp b/src/raft_server.cpp index cf43c49f..2d419fde 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -565,9 +565,12 @@ uint64_t ReplicationState::node_state() const { void ReplicationState::do_snapshot(const std::string& snapshot_path, http_req& req, http_res& res) { LOG(INFO) << "Triggerring an on demand snapshot..."; - OnDemandSnapshotClosure* snapshot_closure = new OnDemandSnapshotClosure(this, req, res); - ext_snapshot_path = snapshot_path; - node->snapshot(snapshot_closure); + + thread_pool->enqueue([&snapshot_path, &req, &res, this]() { + OnDemandSnapshotClosure* snapshot_closure = new OnDemandSnapshotClosure(this, &req, &res); + ext_snapshot_path = snapshot_path; + node->snapshot(snapshot_closure); + }); } void ReplicationState::set_ext_snapshot_path(const std::string& snapshot_path) { @@ -605,6 +608,10 @@ bool ReplicationState::trigger_vote() { return false; } +http_message_dispatcher* ReplicationState::get_message_dispatcher() const { + return message_dispatcher; +} + void InitSnapshotClosure::Run() { // Auto delete this after Run() std::unique_ptr self_guard(this); @@ -624,8 +631,9 @@ void OnDemandSnapshotClosure::Run() { replication_state->set_ext_snapshot_path(""); - req.last_chunk_aggregate = true; - res.final = true; + req->last_chunk_aggregate = true; + res->final = true; + res->auto_dispose = false; nlohmann::json response; uint32_t status_code; @@ -641,8 +649,17 @@ void OnDemandSnapshotClosure::Run() { response["error"] = status().error_str(); } - res.status_code = status_code; - res.body = response.dump(); + res->status_code = status_code; + res->body = response.dump(); - HttpServer::stream_response(req, res); + deferred_req_res_t* req_res = new deferred_req_res_t{req, res, nullptr}; + std::unique_ptr req_res_guard(req_res); + + replication_state->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); + + // wait for response to be sent + req_res->res->await.wait(); + + delete req; + delete res; } diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index d0a39643..d24be610 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -191,8 +191,9 @@ const char* get_internal_ip() { if (ifa->ifa_addr && ifa->ifa_addr->sa_family==AF_INET) { auto sa = (struct sockaddr_in *) ifa->ifa_addr; if(is_private_ip(ntohl(sa->sin_addr.s_addr))) { + char *ip = inet_ntoa(sa->sin_addr); freeifaddrs(ifap); - return inet_ntoa(sa->sin_addr); + return ip; } } }