From 0f36b15d5f1fbb30f306800aefc733fd79e32f99 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Mon, 18 Oct 2021 12:08:37 +0530 Subject: [PATCH] Avoid reading request object after enqueuing. --- src/batched_indexer.cpp | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/batched_indexer.cpp b/src/batched_indexer.cpp index b50a9bb3..1483fe4c 100644 --- a/src/batched_indexer.cpp +++ b/src/batched_indexer.cpp @@ -62,21 +62,26 @@ void BatchedIndexer::enqueue(const std::shared_ptr& req, const std::sh } } + bool is_old_serialized_request = (req->start_ts == 0); + bool read_more_input = (req->_req != nullptr && req->_req->proceed_req); + if(req->last_chunk_aggregate) { //LOG(INFO) << "Last chunk for req_id: " << req->start_ts << ", queue_id: " << queue_id; queued_writes += (chunk_sequence + 1); + std::unique_lock lk(mutex); + request_to_chunk.erase(req->start_ts); + lk.unlock(); + { std::unique_lock lk(qmutuxes[queue_id]); queues[queue_id].emplace_back(req->start_ts); } - std::unique_lock lk(mutex); - request_to_chunk.erase(req->start_ts); - lk.unlock(); + // IMPORTANT: must not read `req` variables (except _req) henceforth to prevent data races with indexing thread - if(req->start_ts == 0) { + if(is_old_serialized_request == 0) { // Indicates a serialized request from a version that did not support batching (v0.21 and below). // We can only do serial writes as we cannot reliably distinguish one streaming request from another. // So, wait for `req_res_map` to be empty before proceeding @@ -92,7 +97,7 @@ void BatchedIndexer::enqueue(const std::shared_ptr& req, const std::sh } } - if(req->_req != nullptr && req->_req->proceed_req) { + if(read_more_input) { // Tell the http library to read more input data deferred_req_res_t* req_res = new deferred_req_res_t(req, res, server, true); server->get_message_dispatcher()->send_message(HttpServer::REQUEST_PROCEED_MESSAGE, req_res);