Request-side import streaming via follower is working.

This commit is contained in:
kishorenc 2020-09-01 20:12:08 +05:30
parent 1f726d747a
commit 10957633fb
9 changed files with 217 additions and 11 deletions

View File

@ -3,6 +3,8 @@
#include <string>
#include <map>
#include <curl/curl.h>
#include "http_data.h"
#include "http_server.h"
/*
NOTE: This is a really primitive blocking client meant only for specific Typesense use cases.
@ -16,10 +18,16 @@ private:
~HttpClient() = default;
static size_t curl_write (void *contents, size_t size, size_t nmemb, std::string *s);
static size_t curl_write(char *contents, size_t size, size_t nmemb, std::string *s);
static size_t curl_write_async(char *buffer, size_t size, size_t nmemb, void* context);
static CURL* init_curl(const std::string& url, std::string& response);
static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res);
static size_t curl_req_send_callback(char* buffer, size_t size, size_t nitems, void *userdata);
static long perform_curl(CURL *curl, std::map<std::string, std::string>& res_headers);
public:
@ -42,6 +50,8 @@ public:
static long post_response(const std::string & url, const std::string & body, std::string & response,
std::map<std::string, std::string>& res_headers, long timeout_ms=4000);
static long post_response_async(const std::string &url, http_req* request, http_res* response);
static long put_response(const std::string & url, const std::string & body, std::string & response,
std::map<std::string, std::string>& res_headers, long timeout_ms=4000);

View File

@ -31,11 +31,14 @@ struct http_res {
std::string body;
bool final;
// fulfilled by an async response handler to pass control back to raft replica apply thread
// fulfilled by an async response handler to pass control back for further writes
std::promise<bool>* promise = nullptr;
h2o_generator_t* generator = nullptr;
// for async requests, automatically progresses request body on response proceed
bool async_request_proceed = true;
http_res(): status_code(501), content_type_header("application/json; charset=utf-8"), final(true) {
}
@ -143,12 +146,15 @@ struct http_req {
void* data;
// used during forwarding of requests from follower to leader
std::atomic<int> proxy_status;
// for deffered processing of async handlers
h2o_custom_timer_t defer_timer;
http_req(): _req(nullptr), route_hash(1),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body_index(0), data(nullptr) {
chunk_len(0), body_index(0), data(nullptr), proxy_status(0) {
}
@ -156,7 +162,7 @@ struct http_req {
const std::map<std::string, std::string> & params, const std::string& body):
_req(_req), http_method(http_method), route_hash(route_hash), params(params),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body(body), body_index(0), data(nullptr) {
chunk_len(0), body(body), body_index(0), data(nullptr), proxy_status(0) {
}

View File

@ -146,10 +146,13 @@ public:
static bool on_stop_server(void *data);
static bool on_stream_response_message(void *data);
std::string get_version();
static constexpr const char* AUTH_HEADER = "x-typesense-api-key";
static constexpr const char* STOP_SERVER_MESSAGE = "STOP_SERVER";
static constexpr const char* STREAM_RESPONSE_MESSAGE = "STREAM_RESPONSE";
static int process_request(http_req* request, http_res* response, route_path *rpath,
const h2o_custom_req_handler_t *req_handler);

View File

@ -90,6 +90,8 @@ Option<bool> CollectionManager::load(const size_t init_batch_size) {
std::vector<std::string> collection_meta_jsons;
store->scan_fill(Collection::COLLECTION_META_PREFIX, collection_meta_jsons);
LOG(INFO) << "Found " << collection_meta_jsons.size() << " collection(s) on disk.";
for(auto & collection_meta_json: collection_meta_jsons) {
nlohmann::json collection_meta;

View File

@ -18,6 +18,22 @@ long HttpClient::post_response(const std::string &url, const std::string &body,
return perform_curl(curl, res_headers);
}
long HttpClient::post_response_async(const std::string &url, http_req* request, http_res* response) {
deferred_req_res_t* req_res = new deferred_req_res_t{request, response, nullptr};
std::unique_ptr<deferred_req_res_t> index_arg_guard(req_res);
CURL *curl = init_curl_async(url, req_res);
if(curl == nullptr) {
return 500;
}
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_perform(curl);
curl_easy_cleanup(curl);
return 0;
}
long HttpClient::put_response(const std::string &url, const std::string &body, std::string &response,
std::map<std::string, std::string>& res_headers, long timeout_ms) {
CURL *curl = init_curl(url, response);
@ -102,6 +118,125 @@ void HttpClient::extract_response_headers(CURL* curl, std::map<std::string, std:
res_headers.emplace("content-type", content_type);
}
size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nitems, void* userdata) {
// callback for request body to be sent to remote host
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(userdata);
size_t max_req_bytes = (size * nitems);
const char* total_body_buf = req_res->req->body.c_str();
size_t available_body_bytes = (req_res->req->body.size() - req_res->req->body_index);
// copy data into `buffer` not exceeding max_req_bytes
size_t bytes_to_read = std::min(max_req_bytes, available_body_bytes);
memcpy(buffer, total_body_buf + req_res->req->body_index, bytes_to_read);
req_res->req->body_index += bytes_to_read;
LOG(INFO) << "Wrote " << bytes_to_read << " bytes to request body (max_buffer_bytes=" << max_req_bytes << ")";
LOG(INFO) << "req_res->req->body_index: " << req_res->req->body_index;
LOG(INFO) << "req_res->req->body.size(): " << req_res->req->body.size();
if(req_res->req->body_index == req_res->req->body.size()) {
LOG(INFO) << "Current body buffer has been consumed fully.";
size_t written = req_res->req->chunk_len;
req_res->req->chunk_len = 0;
req_res->req->body_index = 0;
req_res->req->body = "";
req_res->res->final = req_res->req->last_chunk_aggregate;
req_res->req->proxy_status = -1;
auto stream_state = (req_res->req->last_chunk_aggregate) ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
if(req_res->req->last_chunk_aggregate) {
LOG(INFO) << "Request forwarding done.";
if(req_res->req->_req->proceed_req) {
req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state);
}
} else {
LOG(INFO) << "Pausing forwarding and requesting more input.";
//curl_easy_pause(req_res->req->data, CURL_READFUNC_PAUSE);
req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state);
while(req_res->req->proxy_status != 0 && req_res->req->body.empty()) {
LOG(INFO) << "Sleeping for 1 second...";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
req_res->req->proxy_status = 1;
LOG(INFO) << "Buffer refilled, unpausing request forwarding, body_size=" << req_res->req->body.size();
//curl_easy_pause(req_res->req->data, CURLPAUSE_RECV_CONT);
}
}
return bytes_to_read;
}
size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, void *context) {
// callback for response body to be sent back to client
LOG(INFO) << "curl_write_async";
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(context);
size_t res_size = size * nmemb;
// FIXME: use header from remote response
// we've got response from remote host: write to client and ask for more request body
req_res->res->content_type_header = "text/plain; charset=utf8";
req_res->res->status_code = 200;
// FIXME: cannot mutate body since it will be used by h2o to write
req_res->res->body = std::string(buffer, res_size);
LOG(INFO) << "curl_write_async response, res body size: " << req_res->res->body.size();
// this needs to be sent from http thread
//h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(req_res->res->generator);
//HttpServer *server = custom_generator->h2o_handler->http_server;
//server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
return res_size;
}
CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* req_res) {
CURL *curl = curl_easy_init();
if(curl == nullptr) {
return nullptr;
}
req_res->req->data = curl;
struct curl_slist *chunk = nullptr;
std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
// callback called every time request body is needed
curl_easy_setopt(curl, CURLOPT_READFUNCTION, HttpClient::curl_req_send_callback);
// context to callback
curl_easy_setopt(curl, CURLOPT_READDATA, (void *)req_res);
if(!ca_cert_path.empty()) {
curl_easy_setopt(curl, CURLOPT_CAINFO, ca_cert_path.c_str());
} else {
LOG(WARNING) << "Unable to locate system SSL certificates.";
}
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 4000);
// to allow self-signed certs
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HttpClient::curl_write_async);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, req_res);
return curl;
}
CURL *HttpClient::init_curl(const std::string& url, std::string& response) {
CURL *curl = curl_easy_init();
@ -131,7 +266,7 @@ CURL *HttpClient::init_curl(const std::string& url, std::string& response) {
return curl;
}
size_t HttpClient::curl_write(void *contents, size_t size, size_t nmemb, std::string *s) {
s->append((char*)contents, size*nmemb);
size_t HttpClient::curl_write(char *contents, size_t size, size_t nmemb, std::string *s) {
s->append(contents, size*nmemb);
return size*nmemb;
}

View File

@ -484,6 +484,8 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) {
int HttpServer::process_request(http_req* request, http_res* response, route_path *rpath,
const h2o_custom_req_handler_t *handler) {
//LOG(INFO) << "process_request called";
// for writes, we delegate to replication_state to handle response
if(rpath->http_method == "POST" || rpath->http_method == "PUT" || rpath->http_method == "DELETE") {
handler->http_server->get_replication_state()->write(request, response);
@ -564,6 +566,12 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
// if the request itself is async, we will proceed the request to fetch input content
// otherwise, call the handler since it will be the handler that will be producing content
if(!custom_generator->response->async_request_proceed) {
// request progression is not tied to response generation
//LOG(INFO) << "Ignoring request proceed";
return ;
}
if (custom_generator->rpath->async_req &&
custom_generator->request->_req && custom_generator->request->_req->proceed_req) {
@ -595,6 +603,7 @@ void HttpServer::stream_response(http_req& request, http_res& response) {
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(response.generator);
if (req->res.status == 0) {
//LOG(INFO) << "h2o_start_response";
response.status_code = (response.status_code == 0) ? 503 : response.status_code; // just to be sure
req->res.status = response.status_code;
req->res.reason = http_res::get_status_reason(response.status_code);
@ -604,10 +613,14 @@ void HttpServer::stream_response(http_req& request, http_res& response) {
h2o_start_response(req, &custom_generator->super);
}
//LOG(INFO) << "stream_response, body_size: " << response.body.size();
h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX);
response.body = "";
// FIXME: should this be moved outside?
custom_generator->response->final = request.last_chunk_aggregate;
const h2o_send_state_t state = custom_generator->response->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
h2o_send(req, &body, 1, state);
@ -713,3 +726,9 @@ uint64_t HttpServer::node_state() const {
return replication_state->node_state();
}
bool HttpServer::on_stream_response_message(void *data) {
//LOG(INFO) << "on_stream_response_message";
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(data);
stream_response(*req_res->req, *req_res->res);
return true;
}

View File

@ -316,6 +316,11 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
size_t num_indexed = 0;
for(auto & index_rec: iter_batch) {
if(!index_rec.indexed.ok()) {
// some records could have been invalidated upstream
continue;
}
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.document, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema);

View File

@ -96,6 +96,7 @@ void ReplicationState::write(http_req* request, http_res* response) {
if (!is_leader()) {
if(node->leader_id().is_empty()) {
// Handle no leader scenario
// FIXME: could happen in the middle of streaming, so a h2o_start_response can bomb.
LOG(ERROR) << "Rejecting write: could not find a leader.";
response->set_500("Could not find a leader.");
auto replication_arg = new AsyncIndexArg{request, response, nullptr};
@ -103,8 +104,15 @@ void ReplicationState::write(http_req* request, http_res* response) {
return message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
}
if (request->proxy_status == -1) {
// indicates async request body of in-flight request
//LOG(INFO) << "Inflight request, updating proxy status, body size: " << request->body.size();
request->proxy_status = 0;
return ;
}
const std::string & leader_addr = node->leader_id().to_string();
//LOG(INFO) << "Redirecting write to leader at: " << leader_addr;
LOG(INFO) << "Redirecting write to leader at: " << leader_addr;
thread_pool->enqueue([leader_addr, request, response, this]() {
auto raw_req = request->_req;
@ -118,10 +126,27 @@ void ReplicationState::write(http_req* request, http_res* response) {
std::map<std::string, std::string> res_headers;
if(request->http_method == "POST") {
std::string api_res;
long status = HttpClient::post_response(url, request->body, api_res, res_headers);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
if(path_parts.back().rfind("import", 0) == 0) {
// imports are handled asynchronously
response->async_request_proceed = false;
request->proxy_status = 1;
long status = HttpClient::post_response_async(url, request, response);
if(status == 500) {
response->content_type_header = res_headers["content-type"];
response->set_500("");
} else {
return ;
}
} else {
std::string api_res;
long status = HttpClient::post_response(url, request->body, api_res, res_headers);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
}
} else if(request->http_method == "PUT") {
std::string api_res;
long status = HttpClient::put_response(url, request->body, api_res, res_headers);

View File

@ -348,6 +348,7 @@ int run_server(const Config & config, const std::string & version, void (*master
server->on(SEND_RESPONSE_MSG, on_send_response);
server->on(ReplicationState::REPLICATION_MSG, raft_write_send_response);
server->on(HttpServer::STREAM_RESPONSE_MESSAGE, HttpServer::on_stream_response_message);
// first we start the peering service