Make on-demand snapshot endpoint asynchronous.

This commit is contained in:
Jason Bosco 2021-01-29 13:58:14 -08:00
parent 0ad8c48115
commit 009e67815c
4 changed files with 40 additions and 12 deletions

View File

@ -76,12 +76,12 @@ public:
class OnDemandSnapshotClosure : public braft::Closure {
private:
ReplicationState* replication_state;
http_req& req;
http_res& res;
http_req* req;
http_res* res;
public:
OnDemandSnapshotClosure(ReplicationState* replication_state, http_req& req, http_res& res):
OnDemandSnapshotClosure(ReplicationState* replication_state, http_req* req, http_res* res):
replication_state(replication_state), req(req), res(res) {}
~OnDemandSnapshotClosure() {}
@ -188,6 +188,8 @@ public:
const std::string& get_ext_snapshot_path() const;
http_message_dispatcher* get_message_dispatcher() const;
static constexpr const char* REPLICATION_MSG = "raft_replication";
private:

View File

@ -112,6 +112,7 @@ long HttpClient::perform_curl(CURL *curl, std::map<std::string, std::string>& re
if (res != CURLE_OK) {
LOG(ERROR) << "CURL failed. Code: " << res << ", strerror: " << curl_easy_strerror(res);
curl_easy_cleanup(curl);
curl_slist_free_all(chunk);
return 500;
}
@ -119,7 +120,9 @@ long HttpClient::perform_curl(CURL *curl, std::map<std::string, std::string>& re
curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &http_code);
extract_response_headers(curl, res_headers);
curl_easy_cleanup(curl);
curl_slist_free_all(chunk);
return http_code == 0 ? 500 : http_code;
}
@ -284,6 +287,11 @@ CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* re
curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, HttpClient::curl_write_async_done);
curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, req_res);
// This is okay as per [docs](http://curl.haxx.se/libcurl/c/curl_easy_setopt.html)
// Strings passed to libcurl as 'char *' arguments, are copied by the library; thus the string storage
// associated to the pointer argument may be overwritten after curl_easy_setopt() returns.
curl_slist_free_all(chunk);
return curl;
}

View File

@ -565,9 +565,12 @@ uint64_t ReplicationState::node_state() const {
void ReplicationState::do_snapshot(const std::string& snapshot_path, http_req& req, http_res& res) {
LOG(INFO) << "Triggerring an on demand snapshot...";
OnDemandSnapshotClosure* snapshot_closure = new OnDemandSnapshotClosure(this, req, res);
ext_snapshot_path = snapshot_path;
node->snapshot(snapshot_closure);
thread_pool->enqueue([&snapshot_path, &req, &res, this]() {
OnDemandSnapshotClosure* snapshot_closure = new OnDemandSnapshotClosure(this, &req, &res);
ext_snapshot_path = snapshot_path;
node->snapshot(snapshot_closure);
});
}
void ReplicationState::set_ext_snapshot_path(const std::string& snapshot_path) {
@ -605,6 +608,10 @@ bool ReplicationState::trigger_vote() {
return false;
}
http_message_dispatcher* ReplicationState::get_message_dispatcher() const {
return message_dispatcher;
}
void InitSnapshotClosure::Run() {
// Auto delete this after Run()
std::unique_ptr<InitSnapshotClosure> self_guard(this);
@ -624,8 +631,9 @@ void OnDemandSnapshotClosure::Run() {
replication_state->set_ext_snapshot_path("");
req.last_chunk_aggregate = true;
res.final = true;
req->last_chunk_aggregate = true;
res->final = true;
res->auto_dispose = false;
nlohmann::json response;
uint32_t status_code;
@ -641,8 +649,17 @@ void OnDemandSnapshotClosure::Run() {
response["error"] = status().error_str();
}
res.status_code = status_code;
res.body = response.dump();
res->status_code = status_code;
res->body = response.dump();
HttpServer::stream_response(req, res);
deferred_req_res_t* req_res = new deferred_req_res_t{req, res, nullptr};
std::unique_ptr<deferred_req_res_t> req_res_guard(req_res);
replication_state->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
// wait for response to be sent
req_res->res->await.wait();
delete req;
delete res;
}

View File

@ -191,8 +191,9 @@ const char* get_internal_ip() {
if (ifa->ifa_addr && ifa->ifa_addr->sa_family==AF_INET) {
auto sa = (struct sockaddr_in *) ifa->ifa_addr;
if(is_private_ip(ntohl(sa->sin_addr.s_addr))) {
char *ip = inet_ntoa(sa->sin_addr);
freeifaddrs(ifap);
return inet_ntoa(sa->sin_addr);
return ip;
}
}
}