From ea246f1c45b0b2cae5c83f919b9de9c872839ef2 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 2 Sep 2017 15:56:07 +0530 Subject: [PATCH] Collect and send replication updates in an async fashion. This ensures that the query performance is not affected by replication running constantly in the background. --- include/http_data.h | 3 +- include/http_server.h | 17 +++++--- src/api.cpp | 2 +- src/http_server.cpp | 80 +++++++++++++++++++++++++++-------- src/main/typesense_server.cpp | 2 +- 5 files changed, 78 insertions(+), 26 deletions(-) diff --git a/include/http_data.h b/include/http_data.h index ba0ace4c..cabd0bc0 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -67,8 +67,9 @@ struct http_req { struct route_path { std::string http_method; std::vector path_parts; - void (*handler)(http_req & req, http_res &); + void (*handler)(http_req &, http_res &); bool authenticated; + bool async; inline bool operator< (const route_path& rhs) const { return true; diff --git a/include/http_server.h b/include/http_server.h index 76a4d8b1..c7fcde65 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -20,6 +20,9 @@ private: h2o_globalconf_t config; h2o_context_t ctx; h2o_accept_ctx_t* accept_ctx; + h2o_hostconf_t *hostconf; + h2o_multithread_queue_t* message_queue; + h2o_multithread_receiver_t* message_receiver; std::vector routes; @@ -27,8 +30,6 @@ private: const uint32_t listen_port; - h2o_hostconf_t *hostconf; - static void on_accept(h2o_socket_t *listener, const char *err); int create_listener(); @@ -42,20 +43,24 @@ private: static int catch_all_handler(h2o_handler_t *self, h2o_req_t *req); + static void on_message(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages); + static int send_401_unauthorized(h2o_req_t *req); + void send_response(h2o_req_t* req, h2o_generator_t & generator, const http_res & response); + public: HttpServer(std::string listen_address, uint32_t listen_port); ~HttpServer(); - void get(const std::string & path, void (*handler)(http_req & req, http_res &), bool authenticated); + void get(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false); - void post(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated); + void post(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false); - void put(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated); + void put(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false); - void del(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated); + void del(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false); int run(); diff --git a/src/api.cpp b/src/api.cpp index fa36faa4..cbcb164a 100644 --- a/src/api.cpp +++ b/src/api.cpp @@ -334,7 +334,7 @@ void del_remove_document(http_req & req, http_res & res) { } } -void get_replication_updates(http_req &req, http_res &res) { +void get_replication_updates(http_req & req, http_res & res) { if(!StringUtils::is_uint64_t(req.params["seq_number"])) { return res.send_400("The value of the parameter `seq_number` must be an unsigned integer."); } diff --git a/src/http_server.cpp b/src/http_server.cpp index f6c15c46..8f9afeb5 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -2,6 +2,7 @@ #include "http_server.h" #include "string_utils.h" #include +#include #include #include @@ -10,6 +11,13 @@ struct h2o_custom_req_handler_t { HttpServer* http_server; }; +struct h2o_custom_res_message_t { + h2o_multithread_message_t super; + h2o_req_t *req; + http_res* response; + HttpServer* http_server; +}; + HttpServer::HttpServer(std::string listen_address, uint32_t listen_port): listen_address(listen_address), listen_port(listen_port) { accept_ctx = new h2o_accept_ctx_t(); @@ -64,6 +72,10 @@ int HttpServer::run() { signal(SIGPIPE, SIG_IGN); h2o_context_init(&ctx, h2o_evloop_create(), &config); + message_queue = h2o_multithread_create_queue(ctx.loop); + message_receiver = new h2o_multithread_receiver_t(); + h2o_multithread_register_receiver(message_queue, message_receiver, on_message); + if (create_listener() != 0) { std::cerr << "Failed to listen on " << listen_address << ":" << listen_port << std::endl << "Error: " << strerror(errno) << std::endl; @@ -75,6 +87,18 @@ int HttpServer::run() { return 0; } +void HttpServer::on_message(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages) { + while (!h2o_linklist_is_empty(messages)) { + h2o_generator_t generator = {NULL, NULL}; + h2o_multithread_message_t *message = H2O_STRUCT_FROM_MEMBER(h2o_multithread_message_t, link, messages->next); + h2o_custom_res_message_t *custom_message = reinterpret_cast(message); + custom_message->http_server->send_response(custom_message->req, generator, *custom_message->response); + h2o_linklist_unlink(&message->link); + delete custom_message->response; + delete custom_message; + } +} + h2o_pathconf_t* HttpServer::register_handler(h2o_hostconf_t *hostconf, const char *path, int (*on_req)(h2o_handler_t *, h2o_req_t *)) { // See: https://github.com/h2o/h2o/issues/181#issuecomment-75393049 @@ -127,7 +151,6 @@ int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) { const std::string & http_method = std::string(req->method.base, req->method.len); const std::string & path = std::string(req->path.base, req->path.len); - h2o_generator_t generator = {NULL, NULL}; std::vector path_with_query_parts; StringUtils::split(path, path_with_query_parts, "?"); @@ -190,21 +213,34 @@ int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) { } } + if(rpath.async) { + /* + Must call h2o_start_response and h2o_send functions within the same thread that + called the on_req callback. + */ + std::thread response_thread([=]() { + http_req request = {query_map, req_body}; + http_res* response = new http_res(); + (rpath.handler)(request, *response); + h2o_custom_res_message_t* message = new h2o_custom_res_message_t{{{NULL}}, req, response, + self->http_server}; + h2o_multithread_send_message(self->http_server->message_receiver, &message->super); + }); + + response_thread.detach(); + return 0; + } + http_req request = {query_map, req_body}; http_res response; + h2o_generator_t generator = {NULL, NULL}; (rpath.handler)(request, response); - - h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX); - req->res.status = response.status_code; - req->res.reason = get_status_reason(response.status_code); - h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, H2O_STRLIT("application/json; charset=utf-8")); - h2o_start_response(req, &generator); - h2o_send(req, &body, 1, H2O_SEND_STATE_FINAL); - + self->http_server->send_response(req, generator, response); return 0; } } + h2o_generator_t generator = {NULL, NULL}; h2o_iovec_t res_body = h2o_strdup(&req->pool, "{ \"message\": \"Not Found\"}", SIZE_MAX); req->res.status = 404; req->res.reason = get_status_reason(404); @@ -215,6 +251,15 @@ int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) { return 0; } +void HttpServer::send_response(h2o_req_t* req, h2o_generator_t & generator, const http_res & response) { + h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX); + req->res.status = response.status_code; + req->res.reason = get_status_reason(response.status_code); + h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, H2O_STRLIT("application/json; charset=utf-8")); + h2o_start_response(req, &generator); + h2o_send(req, &body, 1, H2O_SEND_STATE_FINAL); +} + int HttpServer::send_401_unauthorized(h2o_req_t *req) { h2o_generator_t generator = {NULL, NULL}; std::string res_body = std::string("{\"message\": \"Forbidden - ") + AUTH_HEADER + " header is invalid or not present.\"}"; @@ -227,34 +272,35 @@ int HttpServer::send_401_unauthorized(h2o_req_t *req) { return 0; } -void HttpServer::get(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated) { +void HttpServer::get(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) { std::vector path_parts; StringUtils::split(path, path_parts, "/"); - route_path rpath = {"GET", path_parts, handler, authenticated}; + route_path rpath = {"GET", path_parts, handler, authenticated, async}; routes.push_back(rpath); } -void HttpServer::post(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated) { +void HttpServer::post(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) { std::vector path_parts; StringUtils::split(path, path_parts, "/"); - route_path rpath = {"POST", path_parts, handler, authenticated}; + route_path rpath = {"POST", path_parts, handler, authenticated, async}; routes.push_back(rpath); } -void HttpServer::put(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated) { +void HttpServer::put(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) { std::vector path_parts; StringUtils::split(path, path_parts, "/"); - route_path rpath = {"PUT", path_parts, handler, authenticated}; + route_path rpath = {"PUT", path_parts, handler, authenticated, async}; routes.push_back(rpath); } -void HttpServer::del(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated) { +void HttpServer::del(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) { std::vector path_parts; StringUtils::split(path, path_parts, "/"); - route_path rpath = {"DELETE", path_parts, handler, authenticated}; + route_path rpath = {"DELETE", path_parts, handler, authenticated, async}; routes.push_back(rpath); } HttpServer::~HttpServer() { delete accept_ctx; + delete message_receiver; } \ No newline at end of file diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 502d010b..7f67f891 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -40,7 +40,7 @@ int main(int argc, char **argv) { server.del("/collections/:collection/:id", del_remove_document, true); // replication - server.get("/replication/updates", get_replication_updates, true); + server.get("/replication/updates", get_replication_updates, true, true); server.run(); return 0;