diff --git a/include/http_client.h b/include/http_client.h index a777bfec..b57a2916 100644 --- a/include/http_client.h +++ b/include/http_client.h @@ -20,13 +20,15 @@ private: static size_t curl_write(char *contents, size_t size, size_t nmemb, std::string *s); + static size_t curl_header(char *buffer, size_t size, size_t nmemb, void* context); + static size_t curl_write_async(char *buffer, size_t size, size_t nmemb, void* context); static size_t curl_write_async_done(void* context, curl_socket_t item); static CURL* init_curl(const std::string& url, std::string& response); - static CURL* init_curl_async(const std::string& url, request_response* req_res); + static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res); static size_t curl_req_send_callback(char* buffer, size_t size, size_t nitems, void *userdata); @@ -52,7 +54,8 @@ public: static long post_response(const std::string & url, const std::string & body, std::string & response, std::map& res_headers, long timeout_ms=4000); - static long post_response_async(const std::string &url, http_req* request, http_res* response); + static long post_response_async(const std::string &url, http_req* request, http_res* response, + HttpServer* server); static long put_response(const std::string & url, const std::string & body, std::string & response, std::map& res_headers, long timeout_ms=4000); diff --git a/include/http_data.h b/include/http_data.h index cead00a8..cdd60ada 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -36,8 +36,8 @@ struct http_res { h2o_generator_t* generator = nullptr; - // for async requests, automatically progresses request body on response proceed - bool proceed_req_after_write = true; + // indicates whether follower is proxying this response stream from leader + bool proxied_stream = false; http_res(): status_code(0), content_type_header("application/json; charset=utf-8"), final(true) { @@ -321,8 +321,11 @@ struct http_message_dispatcher { h2o_multithread_message_t *message = H2O_STRUCT_FROM_MEMBER(h2o_multithread_message_t, link, messages->next); h2o_custom_res_message_t *custom_message = reinterpret_cast(message); - if(custom_message->message_handlers->count(custom_message->type) != 0) { - auto handler = custom_message->message_handlers->at(custom_message->type); + const std::map::const_iterator handler_itr = + custom_message->message_handlers->find(custom_message->type); + + if(handler_itr != custom_message->message_handlers->end()) { + auto handler = handler_itr->second; (handler)(custom_message->data); } diff --git a/include/http_server.h b/include/http_server.h index 471ccb05..58180d45 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -26,7 +26,7 @@ struct h2o_custom_req_handler_t { struct h2o_custom_generator_t { h2o_generator_t super; h2o_custom_req_handler_t* h2o_handler; - route_path *rpath; + route_path* rpath; http_req* request; http_res* response; }; @@ -57,8 +57,11 @@ private: bool exit_loop = false; +private: + std::string version; + // must be a vector since order of routes matter std::vector> routes; const std::string listen_address; @@ -144,6 +147,8 @@ public: void stop(); + bool has_exited() const; + void clear_timeouts(const std::vector & timers, bool trigger_callback = true); static bool on_stop_server(void *data); diff --git a/include/raft_server.h b/include/raft_server.h index e2a8be01..3eda8425 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -90,6 +90,8 @@ private: bool create_init_db_snapshot; + std::atomic& shut_down; + public: static constexpr const char* log_dir_name = "log"; @@ -97,7 +99,7 @@ public: static constexpr const char* snapshot_dir_name = "snapshot"; ReplicationState(Store* store, ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher, - bool create_init_db_snapshot); + bool create_init_db_snapshot, std::atomic& quit_service); ~ReplicationState() { delete node; @@ -131,6 +133,8 @@ public: // Shut this node down. void shutdown() { + LOG(INFO) << "Replication state shutdown."; + shut_down = true; if (node) { node->shutdown(nullptr); } @@ -140,6 +144,7 @@ public: void join() { if (node) { node->join(); + node = nullptr; } } diff --git a/src/http_client.cpp b/src/http_client.cpp index 6eb8cabc..8788c11a 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -18,9 +18,9 @@ long HttpClient::post_response(const std::string &url, const std::string &body, return perform_curl(curl, res_headers); } -long HttpClient::post_response_async(const std::string &url, http_req* request, http_res* response) { - request_response* req_res = new request_response{request, response}; - std::unique_ptr req_res_guard(req_res); +long HttpClient::post_response_async(const std::string &url, http_req* request, http_res* response, HttpServer* server) { + deferred_req_res_t* req_res = new deferred_req_res_t{request, response, server}; + std::unique_ptr req_res_guard(req_res); CURL *curl = init_curl_async(url, req_res); if(curl == nullptr) { @@ -120,7 +120,13 @@ void HttpClient::extract_response_headers(CURL* curl, std::map(userdata); + deferred_req_res_t* req_res = static_cast(userdata); + + if(req_res->req->_req == nullptr) { + // underlying client request is dead, don't proxy anymore data to upstream (leader) + return 0; + } + size_t max_req_bytes = (size * nitems); const char* total_body_buf = req_res->req->body.c_str(); @@ -143,8 +149,7 @@ size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nite req_res->req->body_index = 0; req_res->req->body = ""; - h2o_custom_generator_t* custom_generator = reinterpret_cast(req_res->res->generator); - HttpServer *server = custom_generator->h2o_handler->http_server; + HttpServer *server = req_res->server; if(req_res->req->last_chunk_aggregate) { LOG(INFO) << "Request forwarding done."; @@ -169,17 +174,54 @@ size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nite return bytes_to_read; } +size_t HttpClient::curl_header(char *buffer, size_t size, size_t nmemb, void *context) { + deferred_req_res_t* req_res = static_cast(context); + size_t header_size = size * nmemb; + + std::string header(buffer, header_size); + + if(header.rfind("HTTP", 0) == 0) { + // status field, e.g. "HTTP/1.1 404 Not Found" + std::vector parts; + StringUtils::split(header, parts, " "); + if(parts.size() >= 2 && StringUtils::is_uint32_t(parts[1])) { + req_res->res->status_code = std::stoi(parts[1]); + } else { + req_res->res->status_code = 500; + } + } else if(header.rfind("content-type", 0) == 0) { + // e.g. "content-type: application/json; charset=utf-8" + std::vector parts; + StringUtils::split(header, parts, ":"); + if(parts.size() == 2) { + req_res->res->content_type_header = parts[1]; + } else { + req_res->res->content_type_header = "application/json; charset=utf-8"; + } + } + + LOG(INFO) << "header:|" << header << "|"; + + return header_size; +} + size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, void *context) { // callback for response body to be sent back to client LOG(INFO) << "curl_write_async"; + deferred_req_res_t* req_res = static_cast(context); + + if(req_res->req->_req == nullptr) { + // underlying client request is dead, don't try to send anymore data + return 0; + } - request_response* req_res = static_cast(context); size_t res_size = size * nmemb; // FIXME: use header from remote response // we've got response from remote host: write to client and ask for more request body req_res->res->content_type_header = "text/plain; charset=utf8"; req_res->res->status_code = 200; + req_res->res->body = std::string(buffer, res_size); req_res->res->final = false; @@ -189,9 +231,7 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi LOG(INFO) << "curl_write_async response, res body size: " << req_res->res->body.size(); - h2o_custom_generator_t* custom_generator = reinterpret_cast(req_res->res->generator); - HttpServer *server = custom_generator->h2o_handler->http_server; - server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); + req_res->server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); // wait until response is sent LOG(INFO) << "Waiting for response to be sent"; @@ -206,18 +246,16 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) { LOG(INFO) << "curl_write_async_done"; - request_response* req_res = static_cast(context); + deferred_req_res_t* req_res = static_cast(context); req_res->res->body = ""; req_res->res->final = true; - h2o_custom_generator_t* custom_generator = reinterpret_cast(req_res->res->generator); - HttpServer *server = custom_generator->h2o_handler->http_server; - server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); + req_res->server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); return 0; } -CURL *HttpClient::init_curl_async(const std::string& url, request_response* req_res) { +CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* req_res) { CURL *curl = curl_easy_init(); if(curl == nullptr) { @@ -250,6 +288,9 @@ CURL *HttpClient::init_curl_async(const std::string& url, request_response* req_ curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L); + //curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, curl_header); + //curl_easy_setopt(curl, CURLOPT_HEADERDATA, req_res); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HttpClient::curl_write_async); curl_easy_setopt(curl, CURLOPT_WRITEDATA, req_res); @@ -292,4 +333,3 @@ size_t HttpClient::curl_write(char *contents, size_t size, size_t nmemb, std::st s->append(contents, size*nmemb); return size*nmemb; } - diff --git a/src/http_server.cpp b/src/http_server.cpp index 91ffc2b7..5096368f 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -525,6 +525,8 @@ void HttpServer::defer_processing(http_req& req, http_res& res, size_t timeout_m h2o_timer_unlink(&req.defer_timer.timer); h2o_timer_link(ctx.loop, timeout_ms, &req.defer_timer.timer); + //LOG(INFO) << "defer_processing, exit_loop: " << exit_loop << ", res.promise: " << res.promise; + if(exit_loop && res.promise) { // otherwise, replication thread could be stuck waiting on a future res.promise->set_value(true); @@ -569,12 +571,21 @@ void HttpServer::response_abort(h2o_generator_t *generator, h2o_req_t *req) { LOG(INFO) << "response_abort called"; h2o_custom_generator_t* custom_generator = reinterpret_cast(generator); + custom_generator->request->_req = nullptr; + custom_generator->response->final = true; + if(custom_generator->response->promise) { // returns control back to caller (raft replication or follower forward) - LOG(INFO) << "response_abort: fulfilling promise."; + LOG(INFO) << "response: fulfilling promise."; custom_generator->response->promise->set_value(true); custom_generator->response->promise = nullptr; } + + if(custom_generator->request->promise) { + LOG(INFO) << "request: fulfilling promise."; + custom_generator->request->promise->set_value(true); + custom_generator->request->promise = nullptr; + } } void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { @@ -588,9 +599,9 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { custom_generator->response->promise = nullptr; } - LOG(INFO) << "proceed_req_after_write: " << custom_generator->response->proceed_req_after_write; - if(!custom_generator->response->proceed_req_after_write) { - // request progression is not tied to response generation + LOG(INFO) << "proxied_stream: " << custom_generator->response->proxied_stream; + if(custom_generator->response->proxied_stream) { + // request progression should not be tied to response generation LOG(INFO) << "Ignoring request proceed"; return ; } @@ -617,7 +628,7 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { void HttpServer::stream_response(http_req& request, http_res& response) { LOG(INFO) << "stream_response called"; if(request._req == nullptr) { - // raft log replay + // raft log replay or when underlying request is aborted LOG(INFO) << "request._req == nullptr"; if(response.promise) { @@ -667,8 +678,18 @@ void HttpServer::destroy_request_response(http_req* request, http_res* response) delete deferred_req_res; } - delete request; - delete response; + response->final = true; + request->_req = nullptr; + + if(response->proxied_stream) { + // lifecycle of proxied resources are managed by curl client proxying the transfer + // we will just nullify _req to indicate that original request is dead + LOG(INFO) << "Ignoring request/response cleanup since response is proxied."; + } else { + LOG(INFO) << "destroy_request_response: deleting req/res"; + delete request; + delete response; + } } void HttpServer::set_auth_handler(bool (*handler)(std::map& params, const route_path& rpath, @@ -717,6 +738,7 @@ HttpServer::~HttpServer() { } h2o_timerwheel_run(ctx.loop->_timeouts, 9999999999999); + h2o_timerwheel_destroy(ctx.loop->_timeouts); h2o_context_dispose(&ctx); @@ -772,9 +794,13 @@ bool HttpServer::on_request_proceed_message(void *data) { size_t written = req_res->req->chunk_len; req_res->req->chunk_len = 0; - if(req_res->req->_req->proceed_req) { + if(req_res->req->_req && req_res->req->_req->proceed_req) { req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state); } return true; } + +bool HttpServer::has_exited() const { + return exit_loop; +} diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 237efa16..a840a3a1 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -92,6 +92,10 @@ std::string ReplicationState::to_nodes_config(const butil::EndPoint& peering_end } void ReplicationState::write(http_req* request, http_res* response) { + if(!node) { + return ; + } + if (!node->is_leader()) { return follower_write(request, response); } @@ -125,11 +129,6 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con LOG(INFO) << "follower_write_count: " << follower_write_count; - if(follower_write_count == 1) { - //LOG(INFO) << "follower_write, will sleep for 10 seconds..."; - //std::this_thread::sleep_for(std::chrono::seconds(10)); - } - if(node->leader_id().is_empty()) { // Handle no leader scenario LOG(ERROR) << "Rejecting write: could not find a leader."; @@ -159,7 +158,10 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con const std::string & leader_addr = node->leader_id().to_string(); LOG(INFO) << "Redirecting write to leader at: " << leader_addr; - thread_pool->enqueue([leader_addr, request, response, this]() { + h2o_custom_generator_t* custom_generator = reinterpret_cast(response->generator); + HttpServer* server = custom_generator->h2o_handler->http_server; + + thread_pool->enqueue([leader_addr, request, response, server, this]() { auto raw_req = request->_req; std::string scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len); std::vector addr_parts; @@ -176,9 +178,15 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con if(path_parts.back().rfind("import", 0) == 0) { // imports are handled asynchronously - response->proceed_req_after_write = false; + response->proxied_stream = true; + long status = HttpClient::post_response_async(url, request, response, server); + + // must manage life cycle for forwarded requests + delete request; + delete response; + + LOG(INFO) << "Import call done."; - long status = HttpClient::post_response_async(url, request, response); if(status == 500) { response->content_type_header = res_headers["content-type"]; response->set_500(""); @@ -259,10 +267,17 @@ void ReplicationState::on_apply(braft::Iterator& iter) { std::promise promise; std::future future = promise.get_future(); auto replication_arg = new AsyncIndexArg{request, response, &promise}; + message_dispatcher->send_message(REPLICATION_MSG, replication_arg); + LOG(INFO) << "Raft write waiting for future"; future.get(); LOG(INFO) << "Raft write got the future"; + + if(shut_down) { + iter.set_error_and_rollback(); + return; + } } } @@ -410,10 +425,10 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { } ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_message_dispatcher *message_dispatcher, - bool create_init_db_snapshot): + bool create_init_db_snapshot, std::atomic& quit_service): node(nullptr), leader_term(-1), store(store), thread_pool(thread_pool), message_dispatcher(message_dispatcher), init_readiness_count(0), - create_init_db_snapshot(create_init_db_snapshot) { + create_init_db_snapshot(create_init_db_snapshot), shut_down(quit_service) { } diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 4350d340..e484fdb9 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -37,7 +37,6 @@ void catch_interrupt(int sig) { LOG(INFO) << "Stopping Typesense server..."; signal(sig, SIG_IGN); // ignore for now as we want to shut down elegantly quit_raft_service = true; - server->stop(); } void catch_crash(int sig) { @@ -261,6 +260,9 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st raft_server.Join(); LOG(INFO) << "Typesense peering service has quit."; + + server->stop(); + return 0; } @@ -354,7 +356,8 @@ int run_server(const Config & config, const std::string & version, void (*master // first we start the peering service ThreadPool thread_pool(32); - ReplicationState replication_state(&store, &thread_pool, server->get_message_dispatcher(), create_init_db_snapshot); + ReplicationState replication_state(&store, &thread_pool, server->get_message_dispatcher(), + create_init_db_snapshot, quit_raft_service); std::thread raft_thread([&replication_state, &config, &state_dir]() { std::string path_to_nodes = config.get_nodes(); @@ -369,7 +372,7 @@ int run_server(const Config & config, const std::string & version, void (*master // we are out of the event loop here - LOG(INFO) << "Typesense API service has quit. Stopping peering service..."; + LOG(INFO) << "Typesense API service has quit."; quit_raft_service = true; raft_thread.join();