Address concurrency edge case in import streaming.

Manage concurrent access by the req/res variables between batch indexer and http response threads.
This commit is contained in:
Kishore Nallan 2021-10-07 19:38:54 +05:30
parent 8a751daa89
commit 87f18b03f3
10 changed files with 195 additions and 182 deletions

View File

@ -238,7 +238,7 @@ private:
Index* init_index();
static std::vector<char> to_char_array(std::vector<std::string> strs);
static std::vector<char> to_char_array(const std::vector<std::string>& strs);
public:

View File

@ -39,7 +39,8 @@ struct http_res {
std::string body;
std::atomic<bool> final;
void* generator = nullptr;
std::atomic<bool> is_alive;
std::atomic<void*> generator = nullptr;
// indicates whether follower is proxying this response stream from leader
bool proxied_stream = false;
@ -48,18 +49,20 @@ struct http_res {
std::condition_variable cv;
bool ready;
http_res(): status_code(0), content_type_header("application/json; charset=utf-8"), final(true), ready(false) {
http_res(void* generator): status_code(0), content_type_header("application/json; charset=utf-8"), final(true),
is_alive(generator != nullptr), generator(generator), ready(false) {
}
~http_res() {
//LOG(INFO) << "http_res " << this;
//LOG(INFO) << "~http_res " << this;
}
void load(uint32_t status_code, const std::string& content_type_header, const std::string& body) {
void set_content(uint32_t status_code, const std::string& content_type_header, const std::string& body, const bool final) {
this->status_code = status_code;
this->content_type_header = content_type_header;
this->body = body;
this->final = final;
}
void wait() {
@ -206,7 +209,6 @@ struct http_req {
h2o_custom_timer_t defer_timer;
uint64_t start_ts;
bool deserialized_request;
std::mutex mcv;
std::condition_variable cv;
@ -214,9 +216,11 @@ struct http_req {
int64_t log_index;
std::atomic<bool> is_http_v1;
http_req(): _req(nullptr), route_hash(1),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body_index(0), data(nullptr), deserialized_request(true), ready(false), log_index(0) {
chunk_len(0), body_index(0), data(nullptr), ready(false), log_index(0), is_http_v1(true) {
start_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -227,17 +231,21 @@ struct http_req {
const std::map<std::string, std::string> & params, const std::string& body):
_req(_req), http_method(http_method), path_without_query(path_without_query), route_hash(route_hash),
params(params), first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body(body), body_index(0), data(nullptr), deserialized_request(false), ready(false),
chunk_len(0), body(body), body_index(0), data(nullptr), ready(false),
log_index(0) {
start_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
if(_req != nullptr) {
is_http_v1 = (_req->version < 0x200);
}
}
~http_req() {
//LOG(INFO) << "~http_req " << this;
if(!deserialized_request) {
if(_req != nullptr) {
uint64_t now = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t ms_since_start = (now - start_ts) / 1000;
@ -282,7 +290,7 @@ struct http_req {
// NOTE: we don't ser/de all fields, only ones needed for write forwarding
// Take care to check for existence of key to ensure backward compatibility during upgrade
void load_from_json(const std::string& serialized_content, const bool is_deserialized) {
void load_from_json(const std::string& serialized_content) {
nlohmann::json content = nlohmann::json::parse(serialized_content);
route_hash = content["route_hash"];
body += content["body"];
@ -296,9 +304,6 @@ struct http_req {
last_chunk_aggregate = content.count("last_chunk_aggregate") != 0 ? content["last_chunk_aggregate"].get<bool>() : false;
start_ts = content.count("start_ts") != 0 ? content["start_ts"].get<uint64_t>() : 0;
log_index = content.count("log_index") != 0 ? content["log_index"].get<int64_t>() : 0;
_req = nullptr;
deserialized_request = is_deserialized;
}
std::string to_json() const {
@ -314,10 +319,6 @@ struct http_req {
return content.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore);
}
bool is_http_v1() {
return (_req->version < 0x200);
}
};
struct route_path {

View File

@ -25,7 +25,7 @@ struct h2o_custom_req_handler_t {
};
struct h2o_custom_generator_t {
h2o_generator_t super;
h2o_generator_t h2o_generator;
h2o_custom_req_handler_t* h2o_handler;
route_path* rpath;
std::shared_ptr<http_req> request;
@ -40,20 +40,92 @@ struct h2o_custom_generator_t {
}
};
struct stream_response_state_t {
h2o_req_t* req = nullptr;
bool is_req_early_exit = false;
bool is_req_http1 = true;
bool is_res_start = true;
bool is_res_final = true;
uint32_t res_status_code = 0;
std::string res_content_type;
h2o_iovec_t res_body{};
h2o_generator_t* generator = nullptr;
};
struct deferred_req_res_t {
const std::shared_ptr<http_req> req;
const std::shared_ptr<http_res> res;
HttpServer* server;
// used to manage lifecycle of non-async responses
bool destroy_after_stream_response;
// used to manage lifecycle of async actions
bool destroy_after_use;
deferred_req_res_t(const std::shared_ptr<http_req> &req, const std::shared_ptr<http_res> &res,
HttpServer *server, bool destroy_after_stream_response = false) :
req(req), res(res), server(server), destroy_after_stream_response(destroy_after_stream_response) {}
HttpServer *server, bool destroy_after_use) :
req(req), res(res), server(server), destroy_after_use(destroy_after_use) {}
};
struct async_req_res_t {
// NOTE: care must be taken to ensure that concurrent writes are protected as some fields are also used by http lib
private:
// not exposed or accessed, here only for reference counting
const std::shared_ptr<http_req> req;
const std::shared_ptr<http_res> res;
public:
// used to manage lifecycle of async actions
const bool destroy_after_use;
stream_response_state_t res_state;
async_req_res_t(const std::shared_ptr<http_req>& h_req, const std::shared_ptr<http_res>& h_res,
const bool destroy_after_use) : req(h_req), res(h_res), destroy_after_use(destroy_after_use) {
if(!res->is_alive) {
return;
}
h2o_custom_generator_t* res_generator = (res->generator == nullptr) ? nullptr :
static_cast<h2o_custom_generator_t*>(res->generator.load());
res_state.is_req_early_exit = (res_generator == nullptr) ? false :
(res_generator->rpath->async_req && res->final && !req->last_chunk_aggregate);
res_state.is_req_http1 = req->is_http_v1;
res_state.req = req->_req;
res_state.is_res_start = (req->_req == nullptr) ? true : (req->_req->res.status == 0);
res_state.is_res_final = res->final;
res_state.res_status_code = res->status_code;
res_state.res_content_type = res->content_type_header;
if(req->_req != nullptr) {
res_state.res_body = h2o_strdup(&req->_req->pool, res->body.c_str(), SIZE_MAX);
}
res_state.generator = (res_generator == nullptr) ? nullptr : &res_generator->h2o_generator;
}
bool is_alive() {
return res->is_alive;
}
void req_notify() {
return req->notify();
}
void res_notify() {
return res->notify();
}
};
struct defer_processing_t {
const std::shared_ptr<http_req> req;
const std::shared_ptr<http_res> res;
@ -180,12 +252,7 @@ public:
void send_message(const std::string & type, void* data);
void send_response(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response);
static void destroy_request_response(const std::shared_ptr<http_req>& request,
const std::shared_ptr<http_res>& response);
static void stream_response(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response);
static void stream_response(stream_response_state_t& state);
uint64_t find_route(const std::vector<std::string> & path_parts, const std::string & http_method,
route_path** found_rpath);

View File

@ -11,6 +11,10 @@ BatchedIndexer::BatchedIndexer(HttpServer* server, Store* store, const size_t nu
}
void BatchedIndexer::enqueue(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
// Called by the raft write thread: goal is to quickly send the request to a queue and move on
// NOTE: it's ok to access `req` and `res` in this function without synchronization
// because the read thread for *this* request is paused now and resumes only messaged at the end
std::string& coll_name = req->params["collection"];
if(coll_name.empty()) {
@ -73,6 +77,7 @@ void BatchedIndexer::enqueue(const std::shared_ptr<http_req>& req, const std::sh
}
if(req->_req != nullptr && req->_req->proceed_req) {
// Tell the http library to read more input data
deferred_req_res_t* req_res = new deferred_req_res_t(req, res, server, true);
server->get_message_dispatcher()->send_message(HttpServer::REQUEST_PROCEED_MESSAGE, req_res);
}
@ -97,26 +102,27 @@ void BatchedIndexer::run() {
qlk.unlock();
std::unique_lock mlk(mutex);
req_res_t orig_req_res = req_res_map[req_id];
req_res_t& orig_req_res = req_res_map[req_id];
mlk.unlock();
// scan db for all logs associated with request
const std::string& req_key_prefix = get_req_prefix_key(req_id);
rocksdb::Iterator* iter = store->scan(req_key_prefix);
std::string prev_body = ""; // used to handle partial JSON documents caused by chunking
std::string prev_body; // used to handle partial JSON documents caused by chunking
const std::shared_ptr<http_res>& orig_res = orig_req_res.res;
const std::shared_ptr<http_req>& orig_req = orig_req_res.req;
bool is_live_req = orig_res->is_alive;
while(iter->Valid() && iter->key().starts_with(req_key_prefix)) {
std::shared_ptr<http_req>& orig_req = orig_req_res.req;
auto _req = orig_req->_req;
orig_req->body = prev_body;
orig_req->load_from_json(iter->value().ToString(), _req == nullptr);
orig_req->_req = _req;
orig_req->load_from_json(iter->value().ToString());
// update thread local for reference during a crash
write_log_index = orig_req->log_index;
//LOG(INFO) << "original request: " << orig_req_res.req << ", _req: " << orig_req_res.req->_req;
//LOG(INFO) << "original request: " << orig_req_res.req << ", req: " << orig_req_res.req->req;
route_path* found_rpath = nullptr;
bool route_found = server->get_route(orig_req->route_hash, &found_rpath);
@ -124,19 +130,20 @@ void BatchedIndexer::run() {
if(route_found) {
async_res = found_rpath->async_res;
found_rpath->handler(orig_req, orig_req_res.res);
found_rpath->handler(orig_req, orig_res);
prev_body = orig_req->body;
} else {
orig_req_res.res->set_404();
prev_body = "";
orig_res->set_404();
}
if(!async_res && orig_req_res.req->_req != nullptr) {
deferred_req_res_t* deferred_req_res = new deferred_req_res_t(orig_req_res.req,
orig_req_res.res,
server, true);
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE,
deferred_req_res);
if(is_live_req && (!route_found ||!async_res)) {
// sync request get a response immediately
async_req_res_t* async_req_res = new async_req_res_t(orig_req, orig_res, true);
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, async_req_res);
}
if(!route_found) {
break;
}
queued_writes--;

View File

@ -2389,7 +2389,7 @@ DIRTY_VALUES Collection::parse_dirty_values_option(std::string& dirty_values) co
return dirty_values_action;
}
std::vector<char> Collection::to_char_array(std::vector<std::string> strings) {
std::vector<char> Collection::to_char_array(const std::vector<std::string>& strings) {
std::vector<char> vec;
for(const auto& s: strings) {
if(s.length() == 1) {

View File

@ -33,11 +33,11 @@ bool handle_authentication(std::map<std::string, std::string>& req_params, const
}
void stream_response(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
if(req->_req == nullptr) {
if(!res->is_alive) {
return ;
}
auto req_res = new deferred_req_res_t(req, res, server, true);
auto req_res = new async_req_res_t(req, res, true);
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
}
@ -240,7 +240,7 @@ bool get_search(const std::shared_ptr<http_req>& req, const std::shared_ptr<http
if(hit_it != res_cache.end()) {
//LOG(INFO) << "Result found in cache.";
const auto& cached_value = hit_it.value();
res->load(cached_value.status_code, cached_value.content_type_header, cached_value.body);
res->set_content(cached_value.status_code, cached_value.content_type_header, cached_value.body, true);
return true;
}
}
@ -286,7 +286,7 @@ bool post_multi_search(const std::shared_ptr<http_req>& req, const std::shared_p
if(hit_it != res_cache.end()) {
//LOG(INFO) << "Result found in cache.";
const auto& cached_value = hit_it.value();
res->load(cached_value.status_code, cached_value.content_type_header, cached_value.body);
res->set_content(cached_value.status_code, cached_value.content_type_header, cached_value.body, true);
return true;
}
}
@ -616,7 +616,7 @@ bool post_import_documents(const std::shared_ptr<http_req>& req, const std::shar
}
//LOG(INFO) << "json_lines.size after: " << json_lines.size() << ", stream_proceed: " << stream_proceed;
//LOG(INFO) << "json_lines.size: " << json_lines.size() << ", req->stream_state: " << req->stream_state;
//LOG(INFO) << "json_lines.size: " << json_lines.size() << ", req->res_state: " << req->res_state;
// When only one partial record arrives as a chunk, an empty body is pushed to response stream
bool single_partial_record_body = (json_lines.empty() && !req->body.empty());
@ -636,7 +636,9 @@ bool post_import_documents(const std::shared_ptr<http_req>& req, const std::shar
//response_stream << import_summary_json << "\n";
for (size_t i = 0; i < json_lines.size(); i++) {
if(i == json_lines.size()-1 && req->body_index == req->body.size() && req->last_chunk_aggregate) {
bool res_final = req->last_chunk_aggregate && (i == json_lines.size()-1);
if(res_final) {
// indicates last record of last batch
response_stream << json_lines[i];
} else {
@ -649,7 +651,7 @@ bool post_import_documents(const std::shared_ptr<http_req>& req, const std::shar
res->status_code = 200;
res->body = response_stream.str();
res->final = req->last_chunk_aggregate;
res->final.store(req->last_chunk_aggregate);
stream_response(req, res);
return true;

View File

@ -147,9 +147,9 @@ size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nite
// callback for request body to be sent to remote host
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(userdata);
if(req_res->req->_req == nullptr) {
if(!req_res->res->is_alive) {
// underlying client request is dead, don't proxy anymore data to upstream (leader)
//LOG(INFO) << "req_res->req->_req is: null";
//LOG(INFO) << "req_res->req->req is: null";
return 0;
}
@ -195,7 +195,7 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi
//LOG(INFO) << "curl_write_async";
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(context);
if(req_res->req->_req == nullptr) {
if(!req_res->res->is_alive) {
// underlying client request is dead, don't try to send anymore data
return 0;
}
@ -239,7 +239,7 @@ size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) {
//LOG(INFO) << "curl_write_async_done";
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(context);
if(req_res->req->_req == nullptr) {
if(!req_res->res->is_alive) {
// underlying client request is dead, don't try to send anymore data
return 0;
}

View File

@ -260,16 +260,14 @@ uint64_t HttpServer::find_route(const std::vector<std::string> & path_parts, con
void HttpServer::on_res_generator_dispose(void *self) {
//LOG(INFO) << "on_res_generator_dispose fires";
h2o_custom_generator_t* custom_generator = *static_cast<h2o_custom_generator_t**>(self);
//LOG(INFO) << "on_res_generator_dispose fires, req use count " << custom_generator->req().use_count();
destroy_request_response(custom_generator->req(), custom_generator->res());
/*LOG(INFO) << "Deleting custom_generator, res: " << custom_generator->res();
<< ", refcount: " << custom_generator->res().use_count();*/
custom_generator->res()->final = true;
custom_generator->res()->generator = nullptr;
delete custom_generator;
custom_generator->res()->is_alive = false;
custom_generator->req()->notify();
custom_generator->res()->notify();
//LOG(INFO) << "Deleted custom_generator";
delete custom_generator;
}
int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
@ -398,16 +396,16 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
std::shared_ptr<http_req> request = std::make_shared<http_req>(req, rpath->http_method, path_without_query,
route_hash, query_map, body);
std::shared_ptr<http_res> response = std::make_shared<http_res>();
// add custom generator with a dispose function for cleaning up resources
h2o_custom_generator_t* custom_gen = new h2o_custom_generator_t;
custom_gen->super = h2o_generator_t {response_proceed, response_abort};
std::shared_ptr<http_res> response = std::make_shared<http_res>(custom_gen);
custom_gen->h2o_generator = h2o_generator_t {response_proceed, response_abort};
custom_gen->request = request;
custom_gen->response = response;
custom_gen->rpath = rpath;
custom_gen->h2o_handler = h2o_handler;
response->generator = &custom_gen->super;
h2o_custom_generator_t** allocated_generator = static_cast<h2o_custom_generator_t**>(
h2o_mem_alloc_shared(&req->pool, sizeof(*allocated_generator), on_res_generator_dispose)
@ -468,8 +466,8 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) {
/*
LOG(INFO) << "async_req_cb, chunk.len=" << chunk.len
<< ", request->_req->entity.len=" << request->_req->entity.len
<< ", content_len: " << request->_req->content_length
<< ", request->req->entity.len=" << request->req->entity.len
<< ", content_len: " << request->req->content_length
<< ", is_end_stream=" << is_end_stream;
*/
@ -506,13 +504,13 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) {
request->body += chunk_str;
request->chunk_len += chunk.len;
/*LOG(INFO) << "entity: " << std::string(request->_req->entity.base, std::min<size_t>(40, request->_req->entity.len))
/*LOG(INFO) << "entity: " << std::string(request->req->entity.base, std::min<size_t>(40, request->req->entity.len))
<< ", chunk len: " << std::string(chunk.base, std::min<size_t>(40, chunk.len));*/
//std::this_thread::sleep_for(std::chrono::seconds(30));
//LOG(INFO) << "request->body.size(): " << request->body.size() << ", request->chunk_len=" << request->chunk_len;
// LOG(INFO) << "req->entity.len: " << request->_req->entity.len << ", request->chunk_len=" << request->chunk_len;
// LOG(INFO) << "req->entity.len: " << request->req->entity.len << ", request->chunk_len=" << request->chunk_len;
bool async_req = custom_generator->rpath->async_req;
@ -527,7 +525,7 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) {
bool exceeds_chunk_limit;
if(!request->is_http_v1() && request->first_chunk_aggregate) {
if(!request->is_http_v1 && request->first_chunk_aggregate) {
exceeds_chunk_limit = ((request->chunk_len + request->_req->entity.len) >= ACTIVE_STREAM_WINDOW_SIZE);
} else {
exceeds_chunk_limit = (request->chunk_len >= ACTIVE_STREAM_WINDOW_SIZE);
@ -557,7 +555,7 @@ int HttpServer::async_req_cb(void *ctx, h2o_iovec_t chunk, int is_end_stream) {
// we are not ready to fire the request handler, so that means we need to buffer the request further
// this could be because we are a) dealing with a HTTP v1 request or b) a synchronous request
if(request->is_http_v1()) {
if(request->is_http_v1) {
// http v1 callbacks fire on small chunk sizes, so fetch more to match window size of http v2 buffer
size_t written = chunk.len;
request->_req->proceed_req(request->_req, written, H2O_SEND_STATE_IN_PROGRESS);
@ -590,15 +588,14 @@ int HttpServer::process_request(const std::shared_ptr<http_req>& request, const
auto message_dispatcher = handler->http_server->get_message_dispatcher();
// LOG(INFO) << "Before enqueue res: " << response
handler->http_server->get_thread_pool()->enqueue([http_server, rpath, message_dispatcher,
request, response]() {
handler->http_server->get_thread_pool()->enqueue([rpath, message_dispatcher, request, response]() {
// call the API handler
//LOG(INFO) << "Wait for response " << response.get() << ", action: " << rpath->_get_action();
(rpath->handler)(request, response);
if(!rpath->async_res) {
// lifecycle of non async res will be owned by stream responder
auto req_res = new deferred_req_res_t(request, response, http_server, true);
auto req_res = new async_req_res_t(request, response, true);
message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
}
//LOG(INFO) << "Response done " << response.get();
@ -641,7 +638,7 @@ void HttpServer::defer_processing(const std::shared_ptr<http_req>& req, const st
if(req->defer_timer.data == nullptr) {
//LOG(INFO) << "req->defer_timer.data is null";
auto deferred_req_res = new deferred_req_res_t(req, res, this);
auto deferred_req_res = new deferred_req_res_t(req, res, this, false);
//LOG(INFO) << "req use count " << req.use_count();
req->defer_timer.data = deferred_req_res;
h2o_timer_init(&req->defer_timer.timer, on_deferred_process_request);
@ -655,7 +652,7 @@ void HttpServer::defer_processing(const std::shared_ptr<http_req>& req, const st
if(exit_loop) {
// otherwise, replication thread could be stuck waiting on a future
req->_req = nullptr;
res->is_alive = false;
req->notify();
res->notify();
}
@ -676,32 +673,12 @@ int HttpServer::send_response(h2o_req_t *req, int status_code, const std::string
return 0;
}
void HttpServer::send_response(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) {
//LOG(INFO) << "send_response, request->_req=" << request->_req;
if(request->_req == nullptr) {
// indicates serialized request and response
return ;
}
h2o_req_t* req = request->_req;
h2o_generator_t* generator = static_cast<h2o_generator_t*>(response->generator);
h2o_iovec_t body = h2o_strdup(&req->pool, response->body.c_str(), SIZE_MAX);
req->res.status = response->status_code;
req->res.reason = http_res::get_status_reason(response->status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE,
nullptr, response->content_type_header.c_str(), response->content_type_header.size());
h2o_start_response(req, generator);
h2o_send(req, &body, 1, H2O_SEND_STATE_FINAL);
}
void HttpServer::response_abort(h2o_generator_t *generator, h2o_req_t *req) {
LOG(INFO) << "response_abort called";
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t*>(generator);
custom_generator->req()->_req = nullptr;
custom_generator->res()->final = true;
custom_generator->res()->is_alive = false;
//LOG(INFO) << "response_abort: fulfilling req & res proceed.";
}
@ -729,91 +706,42 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
}
}
void HttpServer::stream_response(const std::shared_ptr<http_req>& request, const std::shared_ptr<http_res>& response) {
//LOG(INFO) << "stream_response called " << response.get();
void HttpServer::stream_response(stream_response_state_t& state) {
// LOG(INFO) << "stream_response called";
// std::this_thread::sleep_for(std::chrono::milliseconds (5000));
if(response->generator == nullptr) {
// generator has been disposed, underlying request is probably dead
//LOG(INFO) << "response->generator == nullptr";
request->_req = nullptr;
response->notify();
return;
}
if(request->_req == nullptr) {
// underlying request has been cancelled
//LOG(INFO) << "stream_response, request._req == nullptr";
response->notify();
return;
}
h2o_req_t* req = request->_req;
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(response->generator);
response->status_code = (response->status_code == 0) ? 503 : response->status_code; // just to be sure
if(custom_generator->rpath->async_req && custom_generator->res()->final &&
!custom_generator->req()->last_chunk_aggregate) {
if(state.is_req_early_exit) {
// premature termination of async request: handle this explicitly as otherwise, request is not being closed
LOG(INFO) << "Premature termination of async request.";
req->res.status = response->status_code;
req->res.reason = http_res::get_status_reason(response->status_code);
h2o_iovec_t body = h2o_strdup(&req->pool, response->body.c_str(), SIZE_MAX);
if (req->_generator == nullptr) {
h2o_start_response(req, &custom_generator->super);
if (state.req->_generator == nullptr) {
h2o_start_response(state.req, reinterpret_cast<h2o_generator_t*>(state.generator));
}
if(request->is_http_v1()) {
h2o_send(req, &body, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(req);
if(state.is_req_http1) {
h2o_send(state.req, &state.res_body, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(state.req);
} else {
h2o_send(req, &body, 1, H2O_SEND_STATE_ERROR);
h2o_send(state.req, &state.res_body, 1, H2O_SEND_STATE_ERROR);
}
return ;
}
if (req->res.status == 0) {
//LOG(INFO) << "h2o_start_response, content_type=" << response.content_type_header
// << ",response.status_code=" << response.status_code;
req->res.status = response->status_code;
req->res.reason = http_res::get_status_reason(response->status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL,
response->content_type_header.c_str(),
response->content_type_header.size());
h2o_start_response(req, &custom_generator->super);
if (state.is_res_start) {
/*LOG(INFO) << "h2o_start_response, content_type=" << state.res_content_type
<< ",response.status_code=" << state.res_status_code;*/
state.req->res.status = state.res_status_code;
state.req->res.reason = http_res::get_status_reason(state.res_status_code);
h2o_add_header(&state.req->pool, &state.req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL,
state.res_content_type.c_str(), state.res_content_type.size());
h2o_start_response(state.req, reinterpret_cast<h2o_generator_t*>(state.generator));
}
/*LOG(INFO) << "stream_response, body_size: " << response->body.size() << ", response_final="
<< custom_generator->response->final;*/
const h2o_send_state_t send_state = state.is_res_final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
h2o_send(state.req, &state.res_body, 1, send_state);
h2o_iovec_t body = h2o_strdup(&req->pool, response->body.c_str(), SIZE_MAX);
response->body = "";
const h2o_send_state_t state = custom_generator->res()->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
h2o_send(req, &body, 1, state);
// LOG(INFO) << "stream_response after send";
}
void HttpServer::destroy_request_response(const std::shared_ptr<http_req>& request,
const std::shared_ptr<http_res>& response) {
//LOG(INFO) << "destroy_request_response, req use count: " << request.use_count() << " req " << request.get();
/*LOG(INFO) << "destroy_request_response, response->proxied_stream=" << response->proxied_stream
<< ", request->_req=" << request->_req << ", response->await=" << &response->await;*/
//LOG(INFO) << "destroy_request_response, response: " << response << ", response->auto_dispose: " << response->auto_dispose;
//LOG(INFO) << "after destroy_request_response, req use count: " << request.use_count() << " req " << request.get();
request->_req = nullptr;
response->final = true;
request->notify();
response->notify();
//LOG(INFO) << "stream_response after send";
}
void HttpServer::set_auth_handler(bool (*handler)(std::map<std::string, std::string>& params, const std::string& body,
@ -925,11 +853,19 @@ uint64_t HttpServer::node_state() const {
bool HttpServer::on_stream_response_message(void *data) {
//LOG(INFO) << "on_stream_response_message";
auto req_res = static_cast<deferred_req_res_t *>(data);
stream_response(req_res->req, req_res->res);
auto req_res = static_cast<async_req_res_t *>(data);
if(req_res->destroy_after_stream_response) {
//LOG(INFO) << "delete req_res";
// NOTE: access to `req` and `res` objects must be synchronized and wrapped by `req_res`
if(req_res->is_alive()) {
stream_response(req_res->res_state);
} else {
// serialized request or generator has been disposed (underlying request is probably dead)
req_res->req_notify();
req_res->res_notify();
}
if(req_res->destroy_after_use) {
delete req_res;
}
@ -948,7 +884,7 @@ bool HttpServer::on_request_proceed_message(void *data) {
req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state);
}
if(req_res->destroy_after_stream_response) {
if(req_res->destroy_after_use) {
delete req_res;
}

View File

@ -193,7 +193,7 @@ void ReplicationState::write(const std::shared_ptr<http_req>& request, const std
//LOG(INFO) << "write(), force shutdown";
response->set_503("Shutting down.");
response->final = true;
request->_req = nullptr;
response->is_alive = false;
request->notify();
return ;
}
@ -245,13 +245,13 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
if(request->_req->proceed_req && response->proxied_stream) {
// streaming in progress: ensure graceful termination (cannot start response again)
LOG(ERROR) << "Terminating streaming request gracefully.";
request->_req = nullptr;
response->is_alive = false;
request->notify();
return ;
}
response->set_500("Could not find a leader.");
auto req_res = new deferred_req_res_t(request, response, server, true);
auto req_res = new async_req_res_t(request, response, true);
return message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
}
@ -265,7 +265,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
const std::string & leader_addr = node->leader_id().to_string();
//LOG(INFO) << "Redirecting write to leader at: " << leader_addr;
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(response->generator);
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(response->generator.load());
HttpServer* server = custom_generator->h2o_handler->http_server;
auto raw_req = request->_req;
@ -321,7 +321,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
response->set_500(err);
}
auto req_res = new deferred_req_res_t(request, response, server, true);
auto req_res = new async_req_res_t(request, response, true);
message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
pending_writes--;
});
@ -360,11 +360,11 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
//LOG(INFO) << "Post assignment " << request_generated.get() << ", use count: " << request_generated.use_count();
const std::shared_ptr<http_res>& response_generated = iter.done() ?
dynamic_cast<ReplicationClosure*>(iter.done())->get_response() : std::make_shared<http_res>();
dynamic_cast<ReplicationClosure*>(iter.done())->get_response() : std::make_shared<http_res>(nullptr);
if(!iter.done()) {
// indicates log serialized request
request_generated->load_from_json(iter.data().to_string(), true);
request_generated->load_from_json(iter.data().to_string());
}
request_generated->log_index = iter.index();
@ -816,7 +816,7 @@ void OnDemandSnapshotClosure::Run() {
res->status_code = status_code;
res->body = response.dump();
auto req_res = new deferred_req_res_t(req, res, nullptr, true);
auto req_res = new async_req_res_t(req, res, true);
replication_state->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
// wait for response to be sent

View File

@ -179,7 +179,7 @@ TEST_F(CoreAPIUtilsTest, StatefulRemoveDocs) {
TEST_F(CoreAPIUtilsTest, MultiSearchEmbeddedKeys) {
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
req->params["filter_by"] = "user_id: 100";
nlohmann::json body;