Further refactor indexing/response workflow.

This commit is contained in:
Kishore Nallan 2021-10-08 18:50:06 +05:30
parent 7276b7317b
commit 0bc9574112
2 changed files with 49 additions and 35 deletions

View File

@ -41,19 +41,39 @@ struct h2o_custom_generator_t {
};
struct stream_response_state_t {
private:
h2o_req_t* req = nullptr;
public:
bool is_req_early_exit = false;
bool is_req_http1 = true;
bool is_res_start = true;
bool is_res_final = true;
uint32_t res_status_code = 0;
std::string res_content_type;
h2o_send_state_t send_state = H2O_SEND_STATE_IN_PROGRESS;
h2o_iovec_t res_body{};
h2o_generator_t* generator = nullptr;
stream_response_state_t(h2o_req_t* _req): req(_req), is_res_start(req->res.status == 0) {
}
void set_response(uint32_t status_code, const std::string& content_type, const std::string& body) {
res_body = h2o_strdup(&req->pool, body.c_str(), SIZE_MAX);
if(is_res_start) {
req->res.status = status_code;
req->res.reason = http_res::get_status_reason(status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL,
content_type.c_str(), content_type.size());
}
}
h2o_req_t* get_req() {
return req;
}
};
struct deferred_req_res_t {
@ -82,35 +102,28 @@ public:
// used to manage lifecycle of async actions
const bool destroy_after_use;
// stores http lib related datastructures to avoid race conditions between indexing and http write threads
stream_response_state_t res_state;
async_req_res_t(const std::shared_ptr<http_req>& h_req, const std::shared_ptr<http_res>& h_res,
const bool destroy_after_use) : req(h_req), res(h_res), destroy_after_use(destroy_after_use) {
const bool destroy_after_use) :
req(h_req), res(h_res), destroy_after_use(destroy_after_use), res_state(h_req->_req) {
if(!res->is_alive) {
if(!res->is_alive || req->_req == nullptr || res->generator == nullptr) {
return;
}
h2o_custom_generator_t* res_generator = (res->generator == nullptr) ? nullptr :
static_cast<h2o_custom_generator_t*>(res->generator.load());
// ***IMPORTANT***
// We limit writing to fields of `res_state.req` to prevent race conditions with http thread
// Check `HttpServer::stream_response()` for overlapping writes.
res_state.is_req_early_exit = (res_generator == nullptr) ? false :
(res_generator->rpath->async_req && res->final && !req->last_chunk_aggregate);
h2o_custom_generator_t* res_generator = static_cast<h2o_custom_generator_t*>(res->generator.load());
res_state.is_req_early_exit = (res_generator->rpath->async_req && res->final && !req->last_chunk_aggregate);
res_state.is_req_http1 = req->is_http_v1;
res_state.req = req->_req;
res_state.is_res_start = (req->_req == nullptr) ? true : (req->_req->res.status == 0);
res_state.is_res_final = res->final;
res_state.res_status_code = res->status_code;
res_state.res_content_type = res->content_type_header;
if(req->_req != nullptr) {
res_state.res_body = h2o_strdup(&req->_req->pool, res->body.c_str(), SIZE_MAX);
}
res_state.send_state = res->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
res_state.generator = (res_generator == nullptr) ? nullptr : &res_generator->h2o_generator;
res_state.set_response(res->status_code, res->content_type_header, res->body);
}
bool is_alive() {

View File

@ -708,21 +708,27 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
void HttpServer::stream_response(stream_response_state_t& state) {
// LOG(INFO) << "stream_response called";
// std::this_thread::sleep_for(std::chrono::milliseconds (5000));
//std::this_thread::sleep_for(std::chrono::milliseconds (5000));
// ***IMPORTANT***
// We must ensure that fields of `state.req` are not written to for preventing race conditions with indexing thread
// Check `async_req_res_t` constructor for overlapping writes.
h2o_req_t* req = state.get_req();
if(state.is_req_early_exit) {
// premature termination of async request: handle this explicitly as otherwise, request is not being closed
LOG(INFO) << "Premature termination of async request.";
if (state.req->_generator == nullptr) {
h2o_start_response(state.req, reinterpret_cast<h2o_generator_t*>(state.generator));
if (req->_generator == nullptr) {
h2o_start_response(req, state.generator);
}
if(state.is_req_http1) {
h2o_send(state.req, &state.res_body, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(state.req);
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(req);
} else {
h2o_send(state.req, &state.res_body, 1, H2O_SEND_STATE_ERROR);
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_ERROR);
}
return ;
@ -731,15 +737,10 @@ void HttpServer::stream_response(stream_response_state_t& state) {
if (state.is_res_start) {
/*LOG(INFO) << "h2o_start_response, content_type=" << state.res_content_type
<< ",response.status_code=" << state.res_status_code;*/
state.req->res.status = state.res_status_code;
state.req->res.reason = http_res::get_status_reason(state.res_status_code);
h2o_add_header(&state.req->pool, &state.req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL,
state.res_content_type.c_str(), state.res_content_type.size());
h2o_start_response(state.req, reinterpret_cast<h2o_generator_t*>(state.generator));
h2o_start_response(req, state.generator);
}
const h2o_send_state_t send_state = state.is_res_final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
h2o_send(state.req, &state.res_body, 1, send_state);
h2o_send(req, &state.res_body, 1, state.send_state);
//LOG(INFO) << "stream_response after send";
}