diff --git a/include/http_client.h b/include/http_client.h index e5c5ff5d..cda44f6c 100644 --- a/include/http_client.h +++ b/include/http_client.h @@ -3,6 +3,8 @@ #include #include #include +#include "http_data.h" +#include "http_server.h" /* NOTE: This is a really primitive blocking client meant only for specific Typesense use cases. @@ -16,10 +18,16 @@ private: ~HttpClient() = default; - static size_t curl_write (void *contents, size_t size, size_t nmemb, std::string *s); + static size_t curl_write(char *contents, size_t size, size_t nmemb, std::string *s); + + static size_t curl_write_async(char *buffer, size_t size, size_t nmemb, void* context); static CURL* init_curl(const std::string& url, std::string& response); + static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res); + + static size_t curl_req_send_callback(char* buffer, size_t size, size_t nitems, void *userdata); + static long perform_curl(CURL *curl, std::map& res_headers); public: @@ -42,6 +50,8 @@ public: static long post_response(const std::string & url, const std::string & body, std::string & response, std::map& res_headers, long timeout_ms=4000); + static long post_response_async(const std::string &url, http_req* request, http_res* response); + static long put_response(const std::string & url, const std::string & body, std::string & response, std::map& res_headers, long timeout_ms=4000); diff --git a/include/http_data.h b/include/http_data.h index be380a12..446e94a3 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -31,11 +31,14 @@ struct http_res { std::string body; bool final; - // fulfilled by an async response handler to pass control back to raft replica apply thread + // fulfilled by an async response handler to pass control back for further writes std::promise* promise = nullptr; h2o_generator_t* generator = nullptr; + // for async requests, automatically progresses request body on response proceed + bool async_request_proceed = true; + http_res(): status_code(501), content_type_header("application/json; charset=utf-8"), final(true) { } @@ -143,12 +146,15 @@ struct http_req { void* data; + // used during forwarding of requests from follower to leader + std::atomic proxy_status; + // for deffered processing of async handlers h2o_custom_timer_t defer_timer; http_req(): _req(nullptr), route_hash(1), first_chunk_aggregate(true), last_chunk_aggregate(false), - chunk_len(0), body_index(0), data(nullptr) { + chunk_len(0), body_index(0), data(nullptr), proxy_status(0) { } @@ -156,7 +162,7 @@ struct http_req { const std::map & params, const std::string& body): _req(_req), http_method(http_method), route_hash(route_hash), params(params), first_chunk_aggregate(true), last_chunk_aggregate(false), - chunk_len(0), body(body), body_index(0), data(nullptr) { + chunk_len(0), body(body), body_index(0), data(nullptr), proxy_status(0) { } diff --git a/include/http_server.h b/include/http_server.h index 51e0e6f0..7622df15 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -146,10 +146,13 @@ public: static bool on_stop_server(void *data); + static bool on_stream_response_message(void *data); + std::string get_version(); static constexpr const char* AUTH_HEADER = "x-typesense-api-key"; static constexpr const char* STOP_SERVER_MESSAGE = "STOP_SERVER"; + static constexpr const char* STREAM_RESPONSE_MESSAGE = "STREAM_RESPONSE"; static int process_request(http_req* request, http_res* response, route_path *rpath, const h2o_custom_req_handler_t *req_handler); diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index f4810be7..035b82f4 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -90,6 +90,8 @@ Option CollectionManager::load(const size_t init_batch_size) { std::vector collection_meta_jsons; store->scan_fill(Collection::COLLECTION_META_PREFIX, collection_meta_jsons); + LOG(INFO) << "Found " << collection_meta_jsons.size() << " collection(s) on disk."; + for(auto & collection_meta_json: collection_meta_jsons) { nlohmann::json collection_meta; diff --git a/src/http_client.cpp b/src/http_client.cpp index 8da5bf02..7dd54965 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -18,6 +18,22 @@ long HttpClient::post_response(const std::string &url, const std::string &body, return perform_curl(curl, res_headers); } +long HttpClient::post_response_async(const std::string &url, http_req* request, http_res* response) { + deferred_req_res_t* req_res = new deferred_req_res_t{request, response, nullptr}; + std::unique_ptr index_arg_guard(req_res); + + CURL *curl = init_curl_async(url, req_res); + if(curl == nullptr) { + return 500; + } + + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_perform(curl); + curl_easy_cleanup(curl); + + return 0; +} + long HttpClient::put_response(const std::string &url, const std::string &body, std::string &response, std::map& res_headers, long timeout_ms) { CURL *curl = init_curl(url, response); @@ -102,6 +118,125 @@ void HttpClient::extract_response_headers(CURL* curl, std::map(userdata); + size_t max_req_bytes = (size * nitems); + + const char* total_body_buf = req_res->req->body.c_str(); + size_t available_body_bytes = (req_res->req->body.size() - req_res->req->body_index); + + // copy data into `buffer` not exceeding max_req_bytes + size_t bytes_to_read = std::min(max_req_bytes, available_body_bytes); + + memcpy(buffer, total_body_buf + req_res->req->body_index, bytes_to_read); + + req_res->req->body_index += bytes_to_read; + + LOG(INFO) << "Wrote " << bytes_to_read << " bytes to request body (max_buffer_bytes=" << max_req_bytes << ")"; + LOG(INFO) << "req_res->req->body_index: " << req_res->req->body_index; + LOG(INFO) << "req_res->req->body.size(): " << req_res->req->body.size(); + + if(req_res->req->body_index == req_res->req->body.size()) { + LOG(INFO) << "Current body buffer has been consumed fully."; + size_t written = req_res->req->chunk_len; + + req_res->req->chunk_len = 0; + req_res->req->body_index = 0; + req_res->req->body = ""; + + req_res->res->final = req_res->req->last_chunk_aggregate; + req_res->req->proxy_status = -1; + auto stream_state = (req_res->req->last_chunk_aggregate) ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; + + if(req_res->req->last_chunk_aggregate) { + LOG(INFO) << "Request forwarding done."; + if(req_res->req->_req->proceed_req) { + req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state); + } + } else { + LOG(INFO) << "Pausing forwarding and requesting more input."; + //curl_easy_pause(req_res->req->data, CURL_READFUNC_PAUSE); + req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state); + + while(req_res->req->proxy_status != 0 && req_res->req->body.empty()) { + LOG(INFO) << "Sleeping for 1 second..."; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + req_res->req->proxy_status = 1; + LOG(INFO) << "Buffer refilled, unpausing request forwarding, body_size=" << req_res->req->body.size(); + //curl_easy_pause(req_res->req->data, CURLPAUSE_RECV_CONT); + } + } + + return bytes_to_read; +} + +size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, void *context) { + // callback for response body to be sent back to client + LOG(INFO) << "curl_write_async"; + + deferred_req_res_t* req_res = static_cast(context); + size_t res_size = size * nmemb; + + // FIXME: use header from remote response + // we've got response from remote host: write to client and ask for more request body + req_res->res->content_type_header = "text/plain; charset=utf8"; + req_res->res->status_code = 200; + + // FIXME: cannot mutate body since it will be used by h2o to write + req_res->res->body = std::string(buffer, res_size); + + LOG(INFO) << "curl_write_async response, res body size: " << req_res->res->body.size(); + + // this needs to be sent from http thread + //h2o_custom_generator_t* custom_generator = reinterpret_cast(req_res->res->generator); + //HttpServer *server = custom_generator->h2o_handler->http_server; + //server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); + + return res_size; +} + +CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* req_res) { + CURL *curl = curl_easy_init(); + + if(curl == nullptr) { + return nullptr; + } + + req_res->req->data = curl; + + struct curl_slist *chunk = nullptr; + std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key; + chunk = curl_slist_append(chunk, api_key_header.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); + + // callback called every time request body is needed + curl_easy_setopt(curl, CURLOPT_READFUNCTION, HttpClient::curl_req_send_callback); + + // context to callback + curl_easy_setopt(curl, CURLOPT_READDATA, (void *)req_res); + + if(!ca_cert_path.empty()) { + curl_easy_setopt(curl, CURLOPT_CAINFO, ca_cert_path.c_str()); + } else { + LOG(WARNING) << "Unable to locate system SSL certificates."; + } + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 4000); + + // to allow self-signed certs + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L); + + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HttpClient::curl_write_async); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, req_res); + + return curl; +} + CURL *HttpClient::init_curl(const std::string& url, std::string& response) { CURL *curl = curl_easy_init(); @@ -131,7 +266,7 @@ CURL *HttpClient::init_curl(const std::string& url, std::string& response) { return curl; } -size_t HttpClient::curl_write(void *contents, size_t size, size_t nmemb, std::string *s) { - s->append((char*)contents, size*nmemb); +size_t HttpClient::curl_write(char *contents, size_t size, size_t nmemb, std::string *s) { + s->append(contents, size*nmemb); return size*nmemb; } diff --git a/src/http_server.cpp b/src/http_server.cpp index 5036d6c8..4569a0f6 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -484,6 +484,8 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) { int HttpServer::process_request(http_req* request, http_res* response, route_path *rpath, const h2o_custom_req_handler_t *handler) { + //LOG(INFO) << "process_request called"; + // for writes, we delegate to replication_state to handle response if(rpath->http_method == "POST" || rpath->http_method == "PUT" || rpath->http_method == "DELETE") { handler->http_server->get_replication_state()->write(request, response); @@ -564,6 +566,12 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { // if the request itself is async, we will proceed the request to fetch input content // otherwise, call the handler since it will be the handler that will be producing content + if(!custom_generator->response->async_request_proceed) { + // request progression is not tied to response generation + //LOG(INFO) << "Ignoring request proceed"; + return ; + } + if (custom_generator->rpath->async_req && custom_generator->request->_req && custom_generator->request->_req->proceed_req) { @@ -595,6 +603,7 @@ void HttpServer::stream_response(http_req& request, http_res& response) { h2o_custom_generator_t* custom_generator = reinterpret_cast(response.generator); if (req->res.status == 0) { + //LOG(INFO) << "h2o_start_response"; response.status_code = (response.status_code == 0) ? 503 : response.status_code; // just to be sure req->res.status = response.status_code; req->res.reason = http_res::get_status_reason(response.status_code); @@ -604,10 +613,14 @@ void HttpServer::stream_response(http_req& request, http_res& response) { h2o_start_response(req, &custom_generator->super); } + //LOG(INFO) << "stream_response, body_size: " << response.body.size(); + h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX); response.body = ""; + // FIXME: should this be moved outside? custom_generator->response->final = request.last_chunk_aggregate; + const h2o_send_state_t state = custom_generator->response->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; h2o_send(req, &body, 1, state); @@ -713,3 +726,9 @@ uint64_t HttpServer::node_state() const { return replication_state->node_state(); } +bool HttpServer::on_stream_response_message(void *data) { + //LOG(INFO) << "on_stream_response_message"; + deferred_req_res_t* req_res = static_cast(data); + stream_response(*req_res->req, *req_res->res); + return true; +} diff --git a/src/index.cpp b/src/index.cpp index fc7d1156..41f82f7e 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -316,6 +316,11 @@ size_t Index::batch_memory_index(Index *index, std::vector & iter_ size_t num_indexed = 0; for(auto & index_rec: iter_batch) { + if(!index_rec.indexed.ok()) { + // some records could have been invalidated upstream + continue; + } + Option validation_op = validate_index_in_memory(index_rec.document, index_rec.seq_id, default_sorting_field, search_schema, facet_schema); diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 8ab9fe39..d1a24c29 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -96,6 +96,7 @@ void ReplicationState::write(http_req* request, http_res* response) { if (!is_leader()) { if(node->leader_id().is_empty()) { // Handle no leader scenario + // FIXME: could happen in the middle of streaming, so a h2o_start_response can bomb. LOG(ERROR) << "Rejecting write: could not find a leader."; response->set_500("Could not find a leader."); auto replication_arg = new AsyncIndexArg{request, response, nullptr}; @@ -103,8 +104,15 @@ void ReplicationState::write(http_req* request, http_res* response) { return message_dispatcher->send_message(REPLICATION_MSG, replication_arg); } + if (request->proxy_status == -1) { + // indicates async request body of in-flight request + //LOG(INFO) << "Inflight request, updating proxy status, body size: " << request->body.size(); + request->proxy_status = 0; + return ; + } + const std::string & leader_addr = node->leader_id().to_string(); - //LOG(INFO) << "Redirecting write to leader at: " << leader_addr; + LOG(INFO) << "Redirecting write to leader at: " << leader_addr; thread_pool->enqueue([leader_addr, request, response, this]() { auto raw_req = request->_req; @@ -118,10 +126,27 @@ void ReplicationState::write(http_req* request, http_res* response) { std::map res_headers; if(request->http_method == "POST") { - std::string api_res; - long status = HttpClient::post_response(url, request->body, api_res, res_headers); - response->content_type_header = res_headers["content-type"]; - response->set_body(status, api_res); + std::vector path_parts; + StringUtils::split(path, path_parts, "/"); + + if(path_parts.back().rfind("import", 0) == 0) { + // imports are handled asynchronously + response->async_request_proceed = false; + request->proxy_status = 1; + + long status = HttpClient::post_response_async(url, request, response); + if(status == 500) { + response->content_type_header = res_headers["content-type"]; + response->set_500(""); + } else { + return ; + } + } else { + std::string api_res; + long status = HttpClient::post_response(url, request->body, api_res, res_headers); + response->content_type_header = res_headers["content-type"]; + response->set_body(status, api_res); + } } else if(request->http_method == "PUT") { std::string api_res; long status = HttpClient::put_response(url, request->body, api_res, res_headers); diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 076ad979..e09a12c2 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -348,6 +348,7 @@ int run_server(const Config & config, const std::string & version, void (*master server->on(SEND_RESPONSE_MSG, on_send_response); server->on(ReplicationState::REPLICATION_MSG, raft_write_send_response); + server->on(HttpServer::STREAM_RESPONSE_MESSAGE, HttpServer::on_stream_response_message); // first we start the peering service