Collect and send replication updates in an async fashion.

This ensures that the query performance is not affected by replication running constantly in the background.
This commit is contained in:
Kishore Nallan 2017-09-02 15:56:07 +05:30
parent 6a465a0289
commit ea246f1c45
5 changed files with 78 additions and 26 deletions

View File

@ -67,8 +67,9 @@ struct http_req {
struct route_path {
std::string http_method;
std::vector<std::string> path_parts;
void (*handler)(http_req & req, http_res &);
void (*handler)(http_req &, http_res &);
bool authenticated;
bool async;
inline bool operator< (const route_path& rhs) const {
return true;

View File

@ -20,6 +20,9 @@ private:
h2o_globalconf_t config;
h2o_context_t ctx;
h2o_accept_ctx_t* accept_ctx;
h2o_hostconf_t *hostconf;
h2o_multithread_queue_t* message_queue;
h2o_multithread_receiver_t* message_receiver;
std::vector<route_path> routes;
@ -27,8 +30,6 @@ private:
const uint32_t listen_port;
h2o_hostconf_t *hostconf;
static void on_accept(h2o_socket_t *listener, const char *err);
int create_listener();
@ -42,20 +43,24 @@ private:
static int catch_all_handler(h2o_handler_t *self, h2o_req_t *req);
static void on_message(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages);
static int send_401_unauthorized(h2o_req_t *req);
void send_response(h2o_req_t* req, h2o_generator_t & generator, const http_res & response);
public:
HttpServer(std::string listen_address, uint32_t listen_port);
~HttpServer();
void get(const std::string & path, void (*handler)(http_req & req, http_res &), bool authenticated);
void get(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false);
void post(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated);
void post(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false);
void put(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated);
void put(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false);
void del(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated);
void del(const std::string & path, void (*handler)(http_req & req, http_res & res), bool authenticated, bool async = false);
int run();

View File

@ -334,7 +334,7 @@ void del_remove_document(http_req & req, http_res & res) {
}
}
void get_replication_updates(http_req &req, http_res &res) {
void get_replication_updates(http_req & req, http_res & res) {
if(!StringUtils::is_uint64_t(req.params["seq_number"])) {
return res.send_400("The value of the parameter `seq_number` must be an unsigned integer.");
}

View File

@ -2,6 +2,7 @@
#include "http_server.h"
#include "string_utils.h"
#include <regex>
#include <thread>
#include <signal.h>
#include <h2o.h>
@ -10,6 +11,13 @@ struct h2o_custom_req_handler_t {
HttpServer* http_server;
};
struct h2o_custom_res_message_t {
h2o_multithread_message_t super;
h2o_req_t *req;
http_res* response;
HttpServer* http_server;
};
HttpServer::HttpServer(std::string listen_address, uint32_t listen_port):
listen_address(listen_address), listen_port(listen_port) {
accept_ctx = new h2o_accept_ctx_t();
@ -64,6 +72,10 @@ int HttpServer::run() {
signal(SIGPIPE, SIG_IGN);
h2o_context_init(&ctx, h2o_evloop_create(), &config);
message_queue = h2o_multithread_create_queue(ctx.loop);
message_receiver = new h2o_multithread_receiver_t();
h2o_multithread_register_receiver(message_queue, message_receiver, on_message);
if (create_listener() != 0) {
std::cerr << "Failed to listen on " << listen_address << ":" << listen_port << std::endl
<< "Error: " << strerror(errno) << std::endl;
@ -75,6 +87,18 @@ int HttpServer::run() {
return 0;
}
void HttpServer::on_message(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages) {
while (!h2o_linklist_is_empty(messages)) {
h2o_generator_t generator = {NULL, NULL};
h2o_multithread_message_t *message = H2O_STRUCT_FROM_MEMBER(h2o_multithread_message_t, link, messages->next);
h2o_custom_res_message_t *custom_message = reinterpret_cast<h2o_custom_res_message_t*>(message);
custom_message->http_server->send_response(custom_message->req, generator, *custom_message->response);
h2o_linklist_unlink(&message->link);
delete custom_message->response;
delete custom_message;
}
}
h2o_pathconf_t* HttpServer::register_handler(h2o_hostconf_t *hostconf, const char *path,
int (*on_req)(h2o_handler_t *, h2o_req_t *)) {
// See: https://github.com/h2o/h2o/issues/181#issuecomment-75393049
@ -127,7 +151,6 @@ int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) {
const std::string & http_method = std::string(req->method.base, req->method.len);
const std::string & path = std::string(req->path.base, req->path.len);
h2o_generator_t generator = {NULL, NULL};
std::vector<std::string> path_with_query_parts;
StringUtils::split(path, path_with_query_parts, "?");
@ -190,21 +213,34 @@ int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) {
}
}
if(rpath.async) {
/*
Must call h2o_start_response and h2o_send functions within the same thread that
called the on_req callback.
*/
std::thread response_thread([=]() {
http_req request = {query_map, req_body};
http_res* response = new http_res();
(rpath.handler)(request, *response);
h2o_custom_res_message_t* message = new h2o_custom_res_message_t{{{NULL}}, req, response,
self->http_server};
h2o_multithread_send_message(self->http_server->message_receiver, &message->super);
});
response_thread.detach();
return 0;
}
http_req request = {query_map, req_body};
http_res response;
h2o_generator_t generator = {NULL, NULL};
(rpath.handler)(request, response);
h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX);
req->res.status = response.status_code;
req->res.reason = get_status_reason(response.status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, H2O_STRLIT("application/json; charset=utf-8"));
h2o_start_response(req, &generator);
h2o_send(req, &body, 1, H2O_SEND_STATE_FINAL);
self->http_server->send_response(req, generator, response);
return 0;
}
}
h2o_generator_t generator = {NULL, NULL};
h2o_iovec_t res_body = h2o_strdup(&req->pool, "{ \"message\": \"Not Found\"}", SIZE_MAX);
req->res.status = 404;
req->res.reason = get_status_reason(404);
@ -215,6 +251,15 @@ int HttpServer::catch_all_handler(h2o_handler_t *_self, h2o_req_t *req) {
return 0;
}
void HttpServer::send_response(h2o_req_t* req, h2o_generator_t & generator, const http_res & response) {
h2o_iovec_t body = h2o_strdup(&req->pool, response.body.c_str(), SIZE_MAX);
req->res.status = response.status_code;
req->res.reason = get_status_reason(response.status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, H2O_STRLIT("application/json; charset=utf-8"));
h2o_start_response(req, &generator);
h2o_send(req, &body, 1, H2O_SEND_STATE_FINAL);
}
int HttpServer::send_401_unauthorized(h2o_req_t *req) {
h2o_generator_t generator = {NULL, NULL};
std::string res_body = std::string("{\"message\": \"Forbidden - ") + AUTH_HEADER + " header is invalid or not present.\"}";
@ -227,34 +272,35 @@ int HttpServer::send_401_unauthorized(h2o_req_t *req) {
return 0;
}
void HttpServer::get(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated) {
void HttpServer::get(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) {
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
route_path rpath = {"GET", path_parts, handler, authenticated};
route_path rpath = {"GET", path_parts, handler, authenticated, async};
routes.push_back(rpath);
}
void HttpServer::post(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated) {
void HttpServer::post(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) {
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
route_path rpath = {"POST", path_parts, handler, authenticated};
route_path rpath = {"POST", path_parts, handler, authenticated, async};
routes.push_back(rpath);
}
void HttpServer::put(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated) {
void HttpServer::put(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) {
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
route_path rpath = {"PUT", path_parts, handler, authenticated};
route_path rpath = {"PUT", path_parts, handler, authenticated, async};
routes.push_back(rpath);
}
void HttpServer::del(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated) {
void HttpServer::del(const std::string & path, void (*handler)(http_req &, http_res &), bool authenticated, bool async) {
std::vector<std::string> path_parts;
StringUtils::split(path, path_parts, "/");
route_path rpath = {"DELETE", path_parts, handler, authenticated};
route_path rpath = {"DELETE", path_parts, handler, authenticated, async};
routes.push_back(rpath);
}
HttpServer::~HttpServer() {
delete accept_ctx;
delete message_receiver;
}

View File

@ -40,7 +40,7 @@ int main(int argc, char **argv) {
server.del("/collections/:collection/:id", del_remove_document, true);
// replication
server.get("/replication/updates", get_replication_updates, true);
server.get("/replication/updates", get_replication_updates, true, true);
server.run();
return 0;