From 3189c1c4ccdc5e8199a6323437e3e94da3f95b70 Mon Sep 17 00:00:00 2001 From: kishorenc Date: Wed, 9 Sep 2020 07:34:06 +0530 Subject: [PATCH] Fix import follower hang. --- include/http_data.h | 52 ++++++++++++++++++++++++++------- src/core_api.cpp | 21 +++++--------- src/http_client.cpp | 27 +++++++++-------- src/http_server.cpp | 70 +++++++++++++++++---------------------------- src/raft_server.cpp | 29 ++++++++++--------- 5 files changed, 103 insertions(+), 96 deletions(-) diff --git a/include/http_data.h b/include/http_data.h index cdd60ada..f883ecff 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -8,6 +8,7 @@ #include #include "json.hpp" #include "string_utils.h" +#include "logger.h" #define H2O_USE_LIBUV 0 extern "C" { @@ -25,6 +26,37 @@ struct h2o_custom_timer_t { } }; +enum class ROUTE_CODES { + NOT_FOUND = 1, + ALREADY_HANDLED = 2, +}; + +class await_t { +private: + + std::mutex mcv; + std::condition_variable cv; + bool ready; + +public: + + await_t(): ready(false) {} + + void notify() { + { + std::lock_guard lk(mcv); + ready = true; + } + cv.notify_all(); + } + + void wait() { + auto lk = std::unique_lock(mcv); + cv.wait(lk, [&] { return ready; }); + ready = false; + } +}; + struct http_res { uint32_t status_code; std::string content_type_header; @@ -32,13 +64,17 @@ struct http_res { bool final; // fulfilled by an async response handler to pass control back for further writes - std::promise* promise = nullptr; + // use `mark_proceed` and `wait_proceed` instead of accessing this directly + await_t await; h2o_generator_t* generator = nullptr; // indicates whether follower is proxying this response stream from leader bool proxied_stream = false; + // indicates whether this object is eligible for disposal at the end of req/res cycle + bool auto_dispose = true; + http_res(): status_code(0), content_type_header("application/json; charset=utf-8"), final(true) { } @@ -125,11 +161,6 @@ struct http_res { } }; -enum class ROUTE_CODES { - NOT_FOUND = 1, - ALREADY_HANDLED = 2, -}; - struct http_req { h2o_req_t* _req; std::string http_method; @@ -147,14 +178,15 @@ struct http_req { void* data; // used during forwarding of requests from follower to leader - std::promise* promise = nullptr; + // use `mark_proceed` and `wait_proceed` instead of accessing this directly + await_t await; // for deffered processing of async handlers h2o_custom_timer_t defer_timer; http_req(): _req(nullptr), route_hash(1), first_chunk_aggregate(true), last_chunk_aggregate(false), - chunk_len(0), body_index(0), data(nullptr), promise(nullptr) { + chunk_len(0), body_index(0), data(nullptr) { } @@ -162,7 +194,7 @@ struct http_req { const std::map & params, const std::string& body): _req(_req), http_method(http_method), route_hash(route_hash), params(params), first_chunk_aggregate(true), last_chunk_aggregate(false), - chunk_len(0), body(body), body_index(0), data(nullptr), promise(nullptr) { + chunk_len(0), body(body), body_index(0), data(nullptr) { } @@ -347,5 +379,5 @@ struct http_message_dispatcher { struct AsyncIndexArg { http_req* req; http_res* res; - std::promise* promise; + await_t* await; }; diff --git a/src/core_api.cpp b/src/core_api.cpp index 5cdc6269..905d48db 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -1074,7 +1074,7 @@ bool del_key(http_req &req, http_res &res) { } bool raft_write_send_response(void *data) { - //LOG(INFO) << "raft_write_send_response called"; + LOG(INFO) << "raft_write_send_response called"; AsyncIndexArg* index_arg = static_cast(data); std::unique_ptr index_arg_guard(index_arg); @@ -1087,27 +1087,20 @@ bool raft_write_send_response(void *data) { route_path* found_rpath = nullptr; bool route_found = server->get_route(index_arg->req->route_hash, &found_rpath); if(route_found) { - // for an async response handler, we need to assign the promise async_res = found_rpath->async_res; - if(async_res) { - index_arg->res->promise = index_arg->promise; - } - - // now we can call the request handler found_rpath->handler(*index_arg->req, *index_arg->res); } else { index_arg->res->set_404(); } } - if(!async_res) { - // only handle synchronous responses as async ones are handled by their handlers - server->send_response(index_arg->req, index_arg->res); + LOG(INFO) << "raft_write_send_response, async_res=" << async_res; - if(index_arg->promise != nullptr) { - index_arg->promise->set_value(true); // returns control back to raft replication thread - index_arg->promise = nullptr; - } + // only handle synchronous responses as async ones are handled by their handlers + if(!async_res) { + // send response and return control back to raft replication thread + LOG(INFO) << "raft_write_send_response: sending response"; + server->send_response(index_arg->req, index_arg->res); } return true; diff --git a/src/http_client.cpp b/src/http_client.cpp index e7486a87..52203dbb 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -119,11 +119,13 @@ void HttpClient::extract_response_headers(CURL* curl, std::map(userdata); if(req_res->req->_req == nullptr) { // underlying client request is dead, don't proxy anymore data to upstream (leader) + //LOG(INFO) << "req_res->req->_req is: null"; return 0; } @@ -156,17 +158,11 @@ size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nite server->get_message_dispatcher()->send_message(HttpServer::REQUEST_PROCEED_MESSAGE, req_res); } else { LOG(INFO) << "Pausing forwarding and requesting more input."; - std::promise promise; - std::future future = promise.get_future(); - req_res->req->promise = &promise; - server->get_message_dispatcher()->send_message(HttpServer::REQUEST_PROCEED_MESSAGE, req_res); LOG(INFO) << "Waiting for request body to be ready"; - future.get(); + req_res->req->await.wait(); LOG(INFO) << "Request body is ready"; - - req_res->req->promise = nullptr; LOG(INFO) << "Buffer refilled, unpausing request forwarding, body_size=" << req_res->req->body.size(); } } @@ -204,21 +200,15 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi req_res->res->body = std::string(buffer, res_size); req_res->res->final = false; - std::promise promise; - std::future future = promise.get_future(); - req_res->res->promise = &promise; - LOG(INFO) << "curl_write_async response, res body size: " << req_res->res->body.size(); req_res->server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); // wait until response is sent LOG(INFO) << "Waiting for response to be sent"; - future.get(); + req_res->res->await.wait(); LOG(INFO) << "Response sent"; - req_res->res->promise = nullptr; - return res_size; } @@ -231,6 +221,9 @@ size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) { req_res->server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); + // wait until final response is flushed or response object will be destroyed by caller + req_res->res->await.wait(); + return 0; } @@ -246,7 +239,13 @@ CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* re struct curl_slist *chunk = nullptr; std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key; chunk = curl_slist_append(chunk, api_key_header.c_str()); + + // set content length + std::string content_length_header = std::string("content-length: ") + std::to_string(req_res->req->_req->content_length); + chunk = curl_slist_append(chunk, content_length_header.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk); + //curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE); // callback called every time request body is needed curl_easy_setopt(curl, CURLOPT_READFUNCTION, HttpClient::curl_req_send_callback); diff --git a/src/http_server.cpp b/src/http_server.cpp index a9dc31da..24964a26 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -400,6 +400,7 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { if(req->proceed_req == nullptr) { // Full request body is already available, so we don't care if handler is async or not + //LOG(INFO) << "Full request body is already available: " << req->entity.len; request->last_chunk_aggregate = true; return process_request(request, response, rpath, h2o_handler); } else { @@ -529,12 +530,11 @@ void HttpServer::defer_processing(http_req& req, http_res& res, size_t timeout_m h2o_timer_unlink(&req.defer_timer.timer); h2o_timer_link(ctx.loop, timeout_ms, &req.defer_timer.timer); - //LOG(INFO) << "defer_processing, exit_loop: " << exit_loop << ", res.promise: " << res.promise; + //LOG(INFO) << "defer_processing, exit_loop: " << exit_loop << ", res.await: " << res.await; - if(exit_loop && res.promise) { + if(exit_loop) { // otherwise, replication thread could be stuck waiting on a future - res.promise->set_value(true); - res.promise = nullptr; + res.await.notify(); } } @@ -554,6 +554,8 @@ int HttpServer::send_response(h2o_req_t *req, int status_code, const std::string } void HttpServer::send_response(http_req* request, http_res* response) { + LOG(INFO) << "send_response, request->_req=" << request->_req; + if(request->_req == nullptr) { // indicates serialized request and response -- lifecycle must be managed here return destroy_request_response(request, response); @@ -578,32 +580,21 @@ void HttpServer::response_abort(h2o_generator_t *generator, h2o_req_t *req) { custom_generator->request->_req = nullptr; custom_generator->response->final = true; - if(custom_generator->response->promise) { - // returns control back to caller (raft replication or follower forward) - LOG(INFO) << "response: fulfilling promise."; - custom_generator->response->promise->set_value(true); - custom_generator->response->promise = nullptr; - } - - if(custom_generator->request->promise) { - LOG(INFO) << "request: fulfilling promise."; - custom_generator->request->promise->set_value(true); - custom_generator->request->promise = nullptr; - } + // returns control back to caller (raft replication or follower forward) + LOG(INFO) << "response_abort: fulfilling req & res proceed."; + custom_generator->response->await.notify(); + custom_generator->request->await.notify(); } void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { LOG(INFO) << "response_proceed called"; h2o_custom_generator_t* custom_generator = reinterpret_cast(generator); - if(custom_generator->response->promise) { - // returns control back to caller (raft replication or follower forward) - LOG(INFO) << "response_proceed: fulfilling promise."; - custom_generator->response->promise->set_value(true); - custom_generator->response->promise = nullptr; - } + custom_generator->response->await.notify(); LOG(INFO) << "proxied_stream: " << custom_generator->response->proxied_stream; + LOG(INFO) << "response.final: " << custom_generator->response->final; + if(custom_generator->response->proxied_stream) { // request progression should not be tied to response generation LOG(INFO) << "Ignoring request proceed"; @@ -634,13 +625,6 @@ void HttpServer::stream_response(http_req& request, http_res& response) { if(request._req == nullptr) { // raft log replay or when underlying request is aborted LOG(INFO) << "request._req == nullptr"; - - if(response.promise) { - // returns control back to raft replication thread - response.promise->set_value(true); - response.promise = nullptr; - } - destroy_request_response(&request, &response); return; } @@ -649,7 +633,8 @@ void HttpServer::stream_response(http_req& request, http_res& response) { h2o_custom_generator_t* custom_generator = reinterpret_cast(response.generator); if (req->res.status == 0) { - LOG(INFO) << "h2o_start_response, content_type=" << response.content_type_header; + LOG(INFO) << "h2o_start_response, content_type=" << response.content_type_header + << ",response.status_code=" << response.status_code; response.status_code = (response.status_code == 0) ? 503 : response.status_code; // just to be sure req->res.status = response.status_code; req->res.reason = http_res::get_status_reason(response.status_code); @@ -667,32 +652,29 @@ void HttpServer::stream_response(http_req& request, http_res& response) { const h2o_send_state_t state = custom_generator->response->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; h2o_send(req, &body, 1, state); - - // for intermediate responses, promise fulfillment will be handled by `response_proceed` - if(custom_generator->response->final && response.promise) { - // returns control back to raft replication thread - response.promise->set_value(true); - response.promise = nullptr; - } } void HttpServer::destroy_request_response(http_req* request, http_res* response) { if(request->defer_timer.data != nullptr) { deferred_req_res_t* deferred_req_res = static_cast(request->defer_timer.data); + h2o_timer_unlink(&request->defer_timer.timer); delete deferred_req_res; } - response->final = true; - request->_req = nullptr; + LOG(INFO) << "destroy_request_response, response->proxied_stream=" << response->proxied_stream + << ", request->_req=" << request->_req << ", response->await=" << &response->await; - if(response->proxied_stream) { - // lifecycle of proxied resources are managed by curl client proxying the transfer - // we will just nullify _req to indicate that original request is dead - LOG(INFO) << "Ignoring request/response cleanup since response is proxied."; - } else { + if(response->auto_dispose) { LOG(INFO) << "destroy_request_response: deleting req/res"; delete request; delete response; + } else { + // lifecycle of proxied/replicated resources are managed externally + // we will just nullify _req to indicate that original request is dead + LOG(INFO) << "Ignoring request/response cleanup since auto_dispose is false."; + response->final = true; + request->_req = nullptr; + response->await.notify(); } } diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 945c2609..36194b16 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -127,11 +127,10 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con // Handle no leader scenario LOG(ERROR) << "Rejecting write: could not find a leader."; - if(request->_req->proceed_req && request->promise) { + if(request->_req->proceed_req) { // streaming in progress: ensure graceful termination (cannot start response again) LOG(ERROR) << "Terminating streaming request gracefully."; - request->promise->set_value(true); - request->promise = nullptr; + request->await.notify(); return ; } @@ -141,11 +140,10 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con return message_dispatcher->send_message(REPLICATION_MSG, replication_arg); } - if (request->_req->proceed_req && request->promise) { + if (request->_req->proceed_req && response->proxied_stream) { // indicates async request body of in-flight request LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size(); - request->promise->set_value(true); - request->promise = nullptr; + request->await.notify(); return ; } @@ -173,6 +171,7 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con if(path_parts.back().rfind("import", 0) == 0) { // imports are handled asynchronously response->proxied_stream = true; + response->auto_dispose = false; long status = HttpClient::post_response_async(url, request, response, server); // must manage life cycle for forwarded requests @@ -257,16 +256,18 @@ void ReplicationState::on_apply(braft::Iterator& iter) { // Now that the log has been parsed, perform the actual operation // Call http server thread for write and response back to client (if `response` is NOT null) // We use a future to block current thread until the async flow finishes - - std::promise promise; - std::future future = promise.get_future(); - auto replication_arg = new AsyncIndexArg{request, response, &promise}; - + response->auto_dispose = false; + auto replication_arg = new AsyncIndexArg{request, response, nullptr}; message_dispatcher->send_message(REPLICATION_MSG, replication_arg); - LOG(INFO) << "Raft write waiting for future"; - future.get(); - LOG(INFO) << "Raft write got the future"; + LOG(INFO) << "Raft write waiting to proceed"; + response->await.wait(); + LOG(INFO) << "Raft write ready to proceed, response->final=" << response->final; + + if(response->final) { + delete request; + delete response; + } if(shut_down) { iter.set_error_and_rollback();