From 0bc95741126c07aa9d66915d994142aca0db8875 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Fri, 8 Oct 2021 18:50:06 +0530 Subject: [PATCH] Further refactor indexing/response workflow. --- include/http_server.h | 57 ++++++++++++++++++++++++++----------------- src/http_server.cpp | 27 ++++++++++---------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/include/http_server.h b/include/http_server.h index 16683121..41554942 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -41,19 +41,39 @@ struct h2o_custom_generator_t { }; struct stream_response_state_t { +private: + h2o_req_t* req = nullptr; +public: + bool is_req_early_exit = false; bool is_req_http1 = true; bool is_res_start = true; - bool is_res_final = true; - - uint32_t res_status_code = 0; - std::string res_content_type; + h2o_send_state_t send_state = H2O_SEND_STATE_IN_PROGRESS; h2o_iovec_t res_body{}; h2o_generator_t* generator = nullptr; + + stream_response_state_t(h2o_req_t* _req): req(_req), is_res_start(req->res.status == 0) { + + } + + void set_response(uint32_t status_code, const std::string& content_type, const std::string& body) { + res_body = h2o_strdup(&req->pool, body.c_str(), SIZE_MAX); + + if(is_res_start) { + req->res.status = status_code; + req->res.reason = http_res::get_status_reason(status_code); + h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, + content_type.c_str(), content_type.size()); + } + } + + h2o_req_t* get_req() { + return req; + } }; struct deferred_req_res_t { @@ -82,35 +102,28 @@ public: // used to manage lifecycle of async actions const bool destroy_after_use; + // stores http lib related datastructures to avoid race conditions between indexing and http write threads stream_response_state_t res_state; async_req_res_t(const std::shared_ptr& h_req, const std::shared_ptr& h_res, - const bool destroy_after_use) : req(h_req), res(h_res), destroy_after_use(destroy_after_use) { + const bool destroy_after_use) : + req(h_req), res(h_res), destroy_after_use(destroy_after_use), res_state(h_req->_req) { - if(!res->is_alive) { + if(!res->is_alive || req->_req == nullptr || res->generator == nullptr) { return; } - h2o_custom_generator_t* res_generator = (res->generator == nullptr) ? nullptr : - static_cast(res->generator.load()); + // ***IMPORTANT*** + // We limit writing to fields of `res_state.req` to prevent race conditions with http thread + // Check `HttpServer::stream_response()` for overlapping writes. - res_state.is_req_early_exit = (res_generator == nullptr) ? false : - (res_generator->rpath->async_req && res->final && !req->last_chunk_aggregate); + h2o_custom_generator_t* res_generator = static_cast(res->generator.load()); + res_state.is_req_early_exit = (res_generator->rpath->async_req && res->final && !req->last_chunk_aggregate); res_state.is_req_http1 = req->is_http_v1; - - res_state.req = req->_req; - res_state.is_res_start = (req->_req == nullptr) ? true : (req->_req->res.status == 0); - res_state.is_res_final = res->final; - - res_state.res_status_code = res->status_code; - res_state.res_content_type = res->content_type_header; - - if(req->_req != nullptr) { - res_state.res_body = h2o_strdup(&req->_req->pool, res->body.c_str(), SIZE_MAX); - } - + res_state.send_state = res->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; res_state.generator = (res_generator == nullptr) ? nullptr : &res_generator->h2o_generator; + res_state.set_response(res->status_code, res->content_type_header, res->body); } bool is_alive() { diff --git a/src/http_server.cpp b/src/http_server.cpp index b186e599..5105091e 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -708,21 +708,27 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) { void HttpServer::stream_response(stream_response_state_t& state) { // LOG(INFO) << "stream_response called"; - // std::this_thread::sleep_for(std::chrono::milliseconds (5000)); + //std::this_thread::sleep_for(std::chrono::milliseconds (5000)); + + // ***IMPORTANT*** + // We must ensure that fields of `state.req` are not written to for preventing race conditions with indexing thread + // Check `async_req_res_t` constructor for overlapping writes. + + h2o_req_t* req = state.get_req(); if(state.is_req_early_exit) { // premature termination of async request: handle this explicitly as otherwise, request is not being closed LOG(INFO) << "Premature termination of async request."; - if (state.req->_generator == nullptr) { - h2o_start_response(state.req, reinterpret_cast(state.generator)); + if (req->_generator == nullptr) { + h2o_start_response(req, state.generator); } if(state.is_req_http1) { - h2o_send(state.req, &state.res_body, 1, H2O_SEND_STATE_FINAL); - h2o_dispose_request(state.req); + h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL); + h2o_dispose_request(req); } else { - h2o_send(state.req, &state.res_body, 1, H2O_SEND_STATE_ERROR); + h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_ERROR); } return ; @@ -731,15 +737,10 @@ void HttpServer::stream_response(stream_response_state_t& state) { if (state.is_res_start) { /*LOG(INFO) << "h2o_start_response, content_type=" << state.res_content_type << ",response.status_code=" << state.res_status_code;*/ - state.req->res.status = state.res_status_code; - state.req->res.reason = http_res::get_status_reason(state.res_status_code); - h2o_add_header(&state.req->pool, &state.req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, - state.res_content_type.c_str(), state.res_content_type.size()); - h2o_start_response(state.req, reinterpret_cast(state.generator)); + h2o_start_response(req, state.generator); } - const h2o_send_state_t send_state = state.is_res_final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; - h2o_send(state.req, &state.res_body, 1, send_state); + h2o_send(req, &state.res_body, 1, state.send_state); //LOG(INFO) << "stream_response after send"; }