diff --git a/include/http_client.h b/include/http_client.h index 4fa9b685..8305d1b5 100644 --- a/include/http_client.h +++ b/include/http_client.h @@ -28,11 +28,14 @@ private: static CURL* init_curl(const std::string& url, std::string& response); - static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk); + static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk, + bool send_ts_api_header); static size_t curl_req_send_callback(char* buffer, size_t size, size_t nitems, void *userdata); - static long perform_curl(CURL *curl, std::map& res_headers, struct curl_slist *chunk = nullptr); + static long perform_curl(CURL *curl, std::map& res_headers, + struct curl_slist *chunk = nullptr, + bool send_ts_api_header = false); public: static HttpClient & get_instance() { @@ -48,23 +51,33 @@ public: static long download_file(const std::string& url, const std::string& file_path); static long get_response(const std::string& url, std::string& response, - std::map& res_headers, const std::unordered_map& headers = {}, long timeout_ms=4000); + std::map& res_headers, + const std::unordered_map& headers = {}, + long timeout_ms=4000, + bool send_ts_api_header = false); static long delete_response(const std::string& url, std::string& response, - std::map& res_headers, long timeout_ms=120000); + std::map& res_headers, long timeout_ms=120000, + bool send_ts_api_header = false); static long post_response(const std::string & url, const std::string & body, std::string & response, - std::map& res_headers, const std::unordered_map& headers = {}, long timeout_ms=4000); + std::map& res_headers, + const std::unordered_map& headers = {}, + long timeout_ms=4000, + bool send_ts_api_header = false); static long post_response_async(const std::string &url, const std::shared_ptr request, const std::shared_ptr response, - HttpServer* server); + HttpServer* server, + bool send_ts_api_header = false); static long put_response(const std::string & url, const std::string & body, std::string & response, - std::map& res_headers, long timeout_ms=4000); + std::map& res_headers, long timeout_ms=4000, + bool send_ts_api_header = false); static long patch_response(const std::string & url, const std::string & body, std::string & response, - std::map& res_headers, long timeout_ms=4000); + std::map& res_headers, long timeout_ms=4000, + bool send_ts_api_header = false); static void extract_response_headers(CURL* curl, std::map &res_headers); }; diff --git a/src/analytics_manager.cpp b/src/analytics_manager.cpp index 1721fcae..4851dd46 100644 --- a/src/analytics_manager.cpp +++ b/src/analytics_manager.cpp @@ -244,34 +244,46 @@ void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64 // we will persist aggregation every hour continue; } - // send http request - std::string leader_url = raft_server->get_leader_url(); - if(!leader_url.empty()) { - const std::string& resource_url = leader_url + "collections/" + suggestion_coll + - "/documents/import?action=emplace"; - std::string res; - std::map res_headers; - std::unordered_map headers; - long status_code = HttpClient::post_response(resource_url, import_payload, - res, res_headers, headers, 10*1000); - if(status_code != 200) { - LOG(ERROR) << "Error while sending query suggestions events to leader. " - << "Status code: " << status_code << ", response: " << res; - } else { - LOG(INFO) << "Sent query suggestions to leader for aggregation."; - popularQueries->reset_local_counts(); + prev_persistence_s = now_ts_seconds; - if(raft_server->is_leader()) { - // try to run top-K compaction of suggestion collection - auto coll = CollectionManager::get_instance().get_collection(suggestion_coll); - if (coll == nullptr) { - LOG(ERROR) << "No collection found for suggestions aggregation: " + suggestion_coll; - continue; - } + std::string import_payload; + popularQueries->serialize_as_docs(import_payload); - coll->truncate_after_top_k("count", popularQueries->get_k()); + if(import_payload.empty()) { + continue; + } + // send http request + std::string leader_url = raft_server->get_leader_url(); + if(!leader_url.empty()) { + const std::string& base_url = leader_url + "collections/" + suggestion_coll; + std::string res; + + const std::string& update_url = base_url + "/documents/import?action=emplace"; + std::map res_headers; + long status_code = HttpClient::post_response(update_url, import_payload, + res, res_headers, {}, 10*1000, true); + + if(status_code != 200) { + LOG(ERROR) << "Error while sending query suggestions events to leader. " + << "Status code: " << status_code << ", response: " << res; + } else { + LOG(INFO) << "Query aggregation for collection: " + suggestion_coll; + popularQueries->reset_local_counts(); + + if(raft_server->is_leader()) { + // try to run top-K compaction of suggestion collection + const std::string top_k_param = "count:" + std::to_string(popularQueries->get_k()); + const std::string& truncate_topk_url = base_url + "/documents?top_k_by=" + top_k_param; + res.clear(); + res_headers.clear(); + status_code = HttpClient::delete_response(truncate_topk_url, res, res_headers, 10*1000, true); + if(status_code != 200) { + LOG(ERROR) << "Error while running top K for query suggestions collection. " + << "Status code: " << status_code << ", response: " << res; + } else { + LOG(INFO) << "Top K aggregation for collection: " + suggestion_coll; } } } diff --git a/src/http_client.cpp b/src/http_client.cpp index 0829400f..95f0c923 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -16,7 +16,9 @@ struct client_state_t: public req_state_t { }; long HttpClient::post_response(const std::string &url, const std::string &body, std::string &response, - std::map& res_headers, const std::unordered_map& headers, long timeout_ms) { + std::map& res_headers, + const std::unordered_map& headers, long timeout_ms, + bool send_ts_api_header) { CURL *curl = init_curl(url, response); if(curl == nullptr) { return 500; @@ -35,12 +37,13 @@ long HttpClient::post_response(const std::string &url, const std::string &body, long HttpClient::post_response_async(const std::string &url, const std::shared_ptr request, - const std::shared_ptr response, HttpServer* server) { + const std::shared_ptr response, HttpServer* server, + bool send_ts_api_header) { deferred_req_res_t* req_res = new deferred_req_res_t(request, response, server, false); std::unique_ptr req_res_guard(req_res); struct curl_slist* chunk = nullptr; - CURL *curl = init_curl_async(url, req_res, chunk); + CURL *curl = init_curl_async(url, req_res, chunk, send_ts_api_header); if(curl == nullptr) { return 500; } @@ -55,7 +58,8 @@ long HttpClient::post_response_async(const std::string &url, const std::shared_p } long HttpClient::put_response(const std::string &url, const std::string &body, std::string &response, - std::map& res_headers, long timeout_ms) { + std::map& res_headers, long timeout_ms, + bool send_ts_api_header) { CURL *curl = init_curl(url, response); if(curl == nullptr) { return 500; @@ -67,7 +71,8 @@ long HttpClient::put_response(const std::string &url, const std::string &body, s } long HttpClient::patch_response(const std::string &url, const std::string &body, std::string &response, - std::map& res_headers, long timeout_ms) { + std::map& res_headers, long timeout_ms, + bool send_ts_api_header) { CURL *curl = init_curl(url, response); if(curl == nullptr) { return 500; @@ -79,18 +84,21 @@ long HttpClient::patch_response(const std::string &url, const std::string &body, } long HttpClient::delete_response(const std::string &url, std::string &response, - std::map& res_headers, long timeout_ms) { + std::map& res_headers, long timeout_ms, + bool send_ts_api_header) { CURL *curl = init_curl(url, response); if(curl == nullptr) { return 500; } curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); - return perform_curl(curl, res_headers); + return perform_curl(curl, res_headers, nullptr, send_ts_api_header); } long HttpClient::get_response(const std::string &url, std::string &response, - std::map& res_headers, const std::unordered_map& headers, long timeout_ms) { + std::map& res_headers, + const std::unordered_map& headers, + long timeout_ms, bool send_ts_api_header) { CURL *curl = init_curl(url, response); if(curl == nullptr) { return 500; @@ -131,9 +139,14 @@ void HttpClient::init(const std::string &api_key) { } } -long HttpClient::perform_curl(CURL *curl, std::map& res_headers, struct curl_slist *chunk) { - std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key; - chunk = curl_slist_append(chunk, api_key_header.c_str()); +long HttpClient::perform_curl(CURL *curl, std::map& res_headers, struct curl_slist *chunk, + bool send_ts_api_header) { + + if(send_ts_api_header) { + std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key; + chunk = curl_slist_append(chunk, api_key_header.c_str()); + } + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); CURLcode res = curl_easy_perform(curl); @@ -284,7 +297,8 @@ size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) { return 0; } -CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk) { +CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk, + bool send_ts_api_header) { CURL *curl = curl_easy_init(); if(curl == nullptr) { @@ -293,8 +307,10 @@ CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* re req_res->req->data = new client_state_t(curl); // destruction of data is managed by req destructor - std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key; - chunk = curl_slist_append(chunk, api_key_header.c_str()); + if(send_ts_api_header) { + std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key; + chunk = curl_slist_append(chunk, api_key_header.c_str()); + } // set content length std::string content_length_header = std::string("content-length: ") + std::to_string(req_res->req->_req->content_length); diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 14c7323f..920b6b34 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -9,6 +9,7 @@ #include #include "rocksdb/utilities/checkpoint.h" #include "thread_local_vars.h" +#include "core_api.h" namespace braft { DECLARE_int32(raft_do_snapshot_min_index_gap); @@ -295,7 +296,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, if(path_parts.back().rfind("import", 0) == 0) { // imports are handled asynchronously response->proxied_stream = true; - long status = HttpClient::post_response_async(url, request, response, server); + long status = HttpClient::post_response_async(url, request, response, server, true); if(status == 500) { response->content_type_header = res_headers["content-type"]; @@ -306,23 +307,31 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, } } else { std::string api_res; - long status = HttpClient::post_response(url, request->body, api_res, res_headers); + long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 4000, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } } else if(request->http_method == "PUT") { std::string api_res; - long status = HttpClient::put_response(url, request->body, api_res, res_headers); + long status = HttpClient::put_response(url, request->body, api_res, res_headers, 4000, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else if(request->http_method == "DELETE") { std::string api_res; - long status = HttpClient::delete_response(url, api_res, res_headers); + long status = HttpClient::delete_response(url, api_res, res_headers, 120000, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else if(request->http_method == "PATCH") { std::string api_res; - long status = HttpClient::patch_response(url, request->body, api_res, res_headers); + route_path* rpath = nullptr; + bool route_found = server->get_route(request->route_hash, &rpath); + + long timeout_ms = 4 * 1000; + if(route_found && rpath->handler == patch_update_collection) { + timeout_ms = 300 * 1000; // 5 minutes for patching a collection which can take some time + } + + long status = HttpClient::patch_response(url, request->body, api_res, res_headers, timeout_ms, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else { @@ -667,7 +676,7 @@ void ReplicationState::refresh_catchup_status(bool log_msg) { std::string api_res; std::map res_headers; - long status_code = HttpClient::get_response(url, api_res, res_headers); + long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 4000, true); if(status_code == 200) { // compare leader's applied log with local applied to see if we are lagging nlohmann::json leader_status = nlohmann::json::parse(api_res); @@ -760,7 +769,7 @@ void ReplicationState::do_dummy_write() { std::string api_res; std::map res_headers; - long status_code = HttpClient::post_response(url, "", api_res, res_headers); + long status_code = HttpClient::post_response(url, "", api_res, res_headers, {}, 4000, true); LOG(INFO) << "Dummy write to " << url << ", status = " << status_code << ", response = " << api_res; } @@ -927,7 +936,7 @@ void ReplicationState::do_snapshot(const std::string& nodes) { std::string url = get_node_url_path(peer_addr, "/health", protocol); std::string api_res; std::map res_headers; - long status_code = HttpClient::get_response(url, api_res, res_headers); + long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 4000, true); bool peer_healthy = (status_code == 200); //LOG(INFO) << "do_snapshot, status_code: " << status_code;