Don't send ts api key in curl client requests by default.

This commit is contained in:
Kishore Nallan 2023-05-30 16:28:36 +05:30
parent ab20edf8a0
commit d8fc509fe9
4 changed files with 104 additions and 54 deletions

View File

@ -28,11 +28,14 @@ private:
static CURL* init_curl(const std::string& url, std::string& response);
static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk);
static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk,
bool send_ts_api_header);
static size_t curl_req_send_callback(char* buffer, size_t size, size_t nitems, void *userdata);
static long perform_curl(CURL *curl, std::map<std::string, std::string>& res_headers, struct curl_slist *chunk = nullptr);
static long perform_curl(CURL *curl, std::map<std::string, std::string>& res_headers,
struct curl_slist *chunk = nullptr,
bool send_ts_api_header = false);
public:
static HttpClient & get_instance() {
@ -48,23 +51,33 @@ public:
static long download_file(const std::string& url, const std::string& file_path);
static long get_response(const std::string& url, std::string& response,
std::map<std::string, std::string>& res_headers, const std::unordered_map<std::string, std::string>& headers = {}, long timeout_ms=4000);
std::map<std::string, std::string>& res_headers,
const std::unordered_map<std::string, std::string>& headers = {},
long timeout_ms=4000,
bool send_ts_api_header = false);
static long delete_response(const std::string& url, std::string& response,
std::map<std::string, std::string>& res_headers, long timeout_ms=120000);
std::map<std::string, std::string>& res_headers, long timeout_ms=120000,
bool send_ts_api_header = false);
static long post_response(const std::string & url, const std::string & body, std::string & response,
std::map<std::string, std::string>& res_headers, const std::unordered_map<std::string, std::string>& headers = {}, long timeout_ms=4000);
std::map<std::string, std::string>& res_headers,
const std::unordered_map<std::string, std::string>& headers = {},
long timeout_ms=4000,
bool send_ts_api_header = false);
static long post_response_async(const std::string &url, const std::shared_ptr<http_req> request,
const std::shared_ptr<http_res> response,
HttpServer* server);
HttpServer* server,
bool send_ts_api_header = false);
static long put_response(const std::string & url, const std::string & body, std::string & response,
std::map<std::string, std::string>& res_headers, long timeout_ms=4000);
std::map<std::string, std::string>& res_headers, long timeout_ms=4000,
bool send_ts_api_header = false);
static long patch_response(const std::string & url, const std::string & body, std::string & response,
std::map<std::string, std::string>& res_headers, long timeout_ms=4000);
std::map<std::string, std::string>& res_headers, long timeout_ms=4000,
bool send_ts_api_header = false);
static void extract_response_headers(CURL* curl, std::map<std::string, std::string> &res_headers);
};

View File

@ -244,34 +244,46 @@ void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64
// we will persist aggregation every hour
continue;
}
// send http request
std::string leader_url = raft_server->get_leader_url();
if(!leader_url.empty()) {
const std::string& resource_url = leader_url + "collections/" + suggestion_coll +
"/documents/import?action=emplace";
std::string res;
std::map<std::string, std::string> res_headers;
std::unordered_map<std::string, std::string> headers;
long status_code = HttpClient::post_response(resource_url, import_payload,
res, res_headers, headers, 10*1000);
if(status_code != 200) {
LOG(ERROR) << "Error while sending query suggestions events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Sent query suggestions to leader for aggregation.";
popularQueries->reset_local_counts();
prev_persistence_s = now_ts_seconds;
if(raft_server->is_leader()) {
// try to run top-K compaction of suggestion collection
auto coll = CollectionManager::get_instance().get_collection(suggestion_coll);
if (coll == nullptr) {
LOG(ERROR) << "No collection found for suggestions aggregation: " + suggestion_coll;
continue;
}
std::string import_payload;
popularQueries->serialize_as_docs(import_payload);
coll->truncate_after_top_k("count", popularQueries->get_k());
if(import_payload.empty()) {
continue;
}
// send http request
std::string leader_url = raft_server->get_leader_url();
if(!leader_url.empty()) {
const std::string& base_url = leader_url + "collections/" + suggestion_coll;
std::string res;
const std::string& update_url = base_url + "/documents/import?action=emplace";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, {}, 10*1000, true);
if(status_code != 200) {
LOG(ERROR) << "Error while sending query suggestions events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Query aggregation for collection: " + suggestion_coll;
popularQueries->reset_local_counts();
if(raft_server->is_leader()) {
// try to run top-K compaction of suggestion collection
const std::string top_k_param = "count:" + std::to_string(popularQueries->get_k());
const std::string& truncate_topk_url = base_url + "/documents?top_k_by=" + top_k_param;
res.clear();
res_headers.clear();
status_code = HttpClient::delete_response(truncate_topk_url, res, res_headers, 10*1000, true);
if(status_code != 200) {
LOG(ERROR) << "Error while running top K for query suggestions collection. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Top K aggregation for collection: " + suggestion_coll;
}
}
}

View File

@ -16,7 +16,9 @@ struct client_state_t: public req_state_t {
};
long HttpClient::post_response(const std::string &url, const std::string &body, std::string &response,
std::map<std::string, std::string>& res_headers, const std::unordered_map<std::string, std::string>& headers, long timeout_ms) {
std::map<std::string, std::string>& res_headers,
const std::unordered_map<std::string, std::string>& headers, long timeout_ms,
bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
if(curl == nullptr) {
return 500;
@ -35,12 +37,13 @@ long HttpClient::post_response(const std::string &url, const std::string &body,
long HttpClient::post_response_async(const std::string &url, const std::shared_ptr<http_req> request,
const std::shared_ptr<http_res> response, HttpServer* server) {
const std::shared_ptr<http_res> response, HttpServer* server,
bool send_ts_api_header) {
deferred_req_res_t* req_res = new deferred_req_res_t(request, response, server, false);
std::unique_ptr<deferred_req_res_t> req_res_guard(req_res);
struct curl_slist* chunk = nullptr;
CURL *curl = init_curl_async(url, req_res, chunk);
CURL *curl = init_curl_async(url, req_res, chunk, send_ts_api_header);
if(curl == nullptr) {
return 500;
}
@ -55,7 +58,8 @@ long HttpClient::post_response_async(const std::string &url, const std::shared_p
}
long HttpClient::put_response(const std::string &url, const std::string &body, std::string &response,
std::map<std::string, std::string>& res_headers, long timeout_ms) {
std::map<std::string, std::string>& res_headers, long timeout_ms,
bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
if(curl == nullptr) {
return 500;
@ -67,7 +71,8 @@ long HttpClient::put_response(const std::string &url, const std::string &body, s
}
long HttpClient::patch_response(const std::string &url, const std::string &body, std::string &response,
std::map<std::string, std::string>& res_headers, long timeout_ms) {
std::map<std::string, std::string>& res_headers, long timeout_ms,
bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
if(curl == nullptr) {
return 500;
@ -79,18 +84,21 @@ long HttpClient::patch_response(const std::string &url, const std::string &body,
}
long HttpClient::delete_response(const std::string &url, std::string &response,
std::map<std::string, std::string>& res_headers, long timeout_ms) {
std::map<std::string, std::string>& res_headers, long timeout_ms,
bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
if(curl == nullptr) {
return 500;
}
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
return perform_curl(curl, res_headers);
return perform_curl(curl, res_headers, nullptr, send_ts_api_header);
}
long HttpClient::get_response(const std::string &url, std::string &response,
std::map<std::string, std::string>& res_headers, const std::unordered_map<std::string, std::string>& headers, long timeout_ms) {
std::map<std::string, std::string>& res_headers,
const std::unordered_map<std::string, std::string>& headers,
long timeout_ms, bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
if(curl == nullptr) {
return 500;
@ -131,9 +139,14 @@ void HttpClient::init(const std::string &api_key) {
}
}
long HttpClient::perform_curl(CURL *curl, std::map<std::string, std::string>& res_headers, struct curl_slist *chunk) {
std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
long HttpClient::perform_curl(CURL *curl, std::map<std::string, std::string>& res_headers, struct curl_slist *chunk,
bool send_ts_api_header) {
if(send_ts_api_header) {
std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
}
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
CURLcode res = curl_easy_perform(curl);
@ -284,7 +297,8 @@ size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) {
return 0;
}
CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk) {
CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk,
bool send_ts_api_header) {
CURL *curl = curl_easy_init();
if(curl == nullptr) {
@ -293,8 +307,10 @@ CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* re
req_res->req->data = new client_state_t(curl); // destruction of data is managed by req destructor
std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
if(send_ts_api_header) {
std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
}
// set content length
std::string content_length_header = std::string("content-length: ") + std::to_string(req_res->req->_req->content_length);

View File

@ -9,6 +9,7 @@
#include <http_client.h>
#include "rocksdb/utilities/checkpoint.h"
#include "thread_local_vars.h"
#include "core_api.h"
namespace braft {
DECLARE_int32(raft_do_snapshot_min_index_gap);
@ -295,7 +296,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
if(path_parts.back().rfind("import", 0) == 0) {
// imports are handled asynchronously
response->proxied_stream = true;
long status = HttpClient::post_response_async(url, request, response, server);
long status = HttpClient::post_response_async(url, request, response, server, true);
if(status == 500) {
response->content_type_header = res_headers["content-type"];
@ -306,23 +307,31 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
}
} else {
std::string api_res;
long status = HttpClient::post_response(url, request->body, api_res, res_headers);
long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 4000, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
}
} else if(request->http_method == "PUT") {
std::string api_res;
long status = HttpClient::put_response(url, request->body, api_res, res_headers);
long status = HttpClient::put_response(url, request->body, api_res, res_headers, 4000, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
} else if(request->http_method == "DELETE") {
std::string api_res;
long status = HttpClient::delete_response(url, api_res, res_headers);
long status = HttpClient::delete_response(url, api_res, res_headers, 120000, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
} else if(request->http_method == "PATCH") {
std::string api_res;
long status = HttpClient::patch_response(url, request->body, api_res, res_headers);
route_path* rpath = nullptr;
bool route_found = server->get_route(request->route_hash, &rpath);
long timeout_ms = 4 * 1000;
if(route_found && rpath->handler == patch_update_collection) {
timeout_ms = 300 * 1000; // 5 minutes for patching a collection which can take some time
}
long status = HttpClient::patch_response(url, request->body, api_res, res_headers, timeout_ms, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
} else {
@ -667,7 +676,7 @@ void ReplicationState::refresh_catchup_status(bool log_msg) {
std::string api_res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::get_response(url, api_res, res_headers);
long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 4000, true);
if(status_code == 200) {
// compare leader's applied log with local applied to see if we are lagging
nlohmann::json leader_status = nlohmann::json::parse(api_res);
@ -760,7 +769,7 @@ void ReplicationState::do_dummy_write() {
std::string api_res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(url, "", api_res, res_headers);
long status_code = HttpClient::post_response(url, "", api_res, res_headers, {}, 4000, true);
LOG(INFO) << "Dummy write to " << url << ", status = " << status_code << ", response = " << api_res;
}
@ -927,7 +936,7 @@ void ReplicationState::do_snapshot(const std::string& nodes) {
std::string url = get_node_url_path(peer_addr, "/health", protocol);
std::string api_res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::get_response(url, api_res, res_headers);
long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 4000, true);
bool peer_healthy = (status_code == 200);
//LOG(INFO) << "do_snapshot, status_code: " << status_code;