From a8a8c60e0f1975b7061183a00779e466be4cc998 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 12 Oct 2021 20:44:54 +0530 Subject: [PATCH] Wrap last chunk aggregate flag with atomic. --- include/http_data.h | 4 ++-- src/http_server.cpp | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/include/http_data.h b/include/http_data.h index ff575bb6..7190c8f1 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -196,7 +196,7 @@ struct http_req { std::map params; bool first_chunk_aggregate; - bool last_chunk_aggregate; + std::atomic last_chunk_aggregate; size_t chunk_len; std::string body; @@ -311,7 +311,7 @@ struct http_req { content["route_hash"] = route_hash; content["params"] = params; content["first_chunk_aggregate"] = first_chunk_aggregate; - content["last_chunk_aggregate"] = last_chunk_aggregate; + content["last_chunk_aggregate"] = last_chunk_aggregate.load(); content["body"] = body; content["metadata"] = metadata; content["start_ts"] = start_ts; diff --git a/src/http_server.cpp b/src/http_server.cpp index 5105091e..b7c3a6d7 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -875,6 +875,8 @@ bool HttpServer::on_stream_response_message(void *data) { bool HttpServer::on_request_proceed_message(void *data) { //LOG(INFO) << "on_request_proceed_message"; + // This callback will run concurrently to batch indexer's run() so care must be taken to protect access + // to variables that are written to by the batch indexer, which for now is only: last_chunk_aggregate (atomic) deferred_req_res_t* req_res = static_cast(data); auto stream_state = (req_res->req->last_chunk_aggregate) ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;