Fix import follower hang.

This commit is contained in:
kishorenc 2020-09-09 07:34:06 +05:30
parent 1b01de0c0c
commit 3189c1c4cc
5 changed files with 103 additions and 96 deletions

View File

@ -8,6 +8,7 @@
#include <chrono>
#include "json.hpp"
#include "string_utils.h"
#include "logger.h"
#define H2O_USE_LIBUV 0
extern "C" {
@ -25,6 +26,37 @@ struct h2o_custom_timer_t {
}
};
enum class ROUTE_CODES {
NOT_FOUND = 1,
ALREADY_HANDLED = 2,
};
class await_t {
private:
std::mutex mcv;
std::condition_variable cv;
bool ready;
public:
await_t(): ready(false) {}
void notify() {
{
std::lock_guard<std::mutex> lk(mcv);
ready = true;
}
cv.notify_all();
}
void wait() {
auto lk = std::unique_lock<std::mutex>(mcv);
cv.wait(lk, [&] { return ready; });
ready = false;
}
};
struct http_res {
uint32_t status_code;
std::string content_type_header;
@ -32,13 +64,17 @@ struct http_res {
bool final;
// fulfilled by an async response handler to pass control back for further writes
std::promise<bool>* promise = nullptr;
// use `mark_proceed` and `wait_proceed` instead of accessing this directly
await_t await;
h2o_generator_t* generator = nullptr;
// indicates whether follower is proxying this response stream from leader
bool proxied_stream = false;
// indicates whether this object is eligible for disposal at the end of req/res cycle
bool auto_dispose = true;
http_res(): status_code(0), content_type_header("application/json; charset=utf-8"), final(true) {
}
@ -125,11 +161,6 @@ struct http_res {
}
};
enum class ROUTE_CODES {
NOT_FOUND = 1,
ALREADY_HANDLED = 2,
};
struct http_req {
h2o_req_t* _req;
std::string http_method;
@ -147,14 +178,15 @@ struct http_req {
void* data;
// used during forwarding of requests from follower to leader
std::promise<bool>* promise = nullptr;
// use `mark_proceed` and `wait_proceed` instead of accessing this directly
await_t await;
// for deffered processing of async handlers
h2o_custom_timer_t defer_timer;
http_req(): _req(nullptr), route_hash(1),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body_index(0), data(nullptr), promise(nullptr) {
chunk_len(0), body_index(0), data(nullptr) {
}
@ -162,7 +194,7 @@ struct http_req {
const std::map<std::string, std::string> & params, const std::string& body):
_req(_req), http_method(http_method), route_hash(route_hash), params(params),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body(body), body_index(0), data(nullptr), promise(nullptr) {
chunk_len(0), body(body), body_index(0), data(nullptr) {
}
@ -347,5 +379,5 @@ struct http_message_dispatcher {
struct AsyncIndexArg {
http_req* req;
http_res* res;
std::promise<bool>* promise;
await_t* await;
};

View File

@ -1074,7 +1074,7 @@ bool del_key(http_req &req, http_res &res) {
}
bool raft_write_send_response(void *data) {
//LOG(INFO) << "raft_write_send_response called";
LOG(INFO) << "raft_write_send_response called";
AsyncIndexArg* index_arg = static_cast<AsyncIndexArg*>(data);
std::unique_ptr<AsyncIndexArg> index_arg_guard(index_arg);
@ -1087,27 +1087,20 @@ bool raft_write_send_response(void *data) {
route_path* found_rpath = nullptr;
bool route_found = server->get_route(index_arg->req->route_hash, &found_rpath);
if(route_found) {
// for an async response handler, we need to assign the promise
async_res = found_rpath->async_res;
if(async_res) {
index_arg->res->promise = index_arg->promise;
}
// now we can call the request handler
found_rpath->handler(*index_arg->req, *index_arg->res);
} else {
index_arg->res->set_404();
}
}
if(!async_res) {
// only handle synchronous responses as async ones are handled by their handlers
server->send_response(index_arg->req, index_arg->res);
LOG(INFO) << "raft_write_send_response, async_res=" << async_res;
if(index_arg->promise != nullptr) {
index_arg->promise->set_value(true); // returns control back to raft replication thread
index_arg->promise = nullptr;
}
// only handle synchronous responses as async ones are handled by their handlers
if(!async_res) {
// send response and return control back to raft replication thread
LOG(INFO) << "raft_write_send_response: sending response";
server->send_response(index_arg->req, index_arg->res);
}
return true;

View File

@ -119,11 +119,13 @@ void HttpClient::extract_response_headers(CURL* curl, std::map<std::string, std:
}
size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nitems, void* userdata) {
//LOG(INFO) << "curl_req_send_callback";
// callback for request body to be sent to remote host
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(userdata);
if(req_res->req->_req == nullptr) {
// underlying client request is dead, don't proxy anymore data to upstream (leader)
//LOG(INFO) << "req_res->req->_req is: null";
return 0;
}
@ -156,17 +158,11 @@ size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nite
server->get_message_dispatcher()->send_message(HttpServer::REQUEST_PROCEED_MESSAGE, req_res);
} else {
LOG(INFO) << "Pausing forwarding and requesting more input.";
std::promise<bool> promise;
std::future<bool> future = promise.get_future();
req_res->req->promise = &promise;
server->get_message_dispatcher()->send_message(HttpServer::REQUEST_PROCEED_MESSAGE, req_res);
LOG(INFO) << "Waiting for request body to be ready";
future.get();
req_res->req->await.wait();
LOG(INFO) << "Request body is ready";
req_res->req->promise = nullptr;
LOG(INFO) << "Buffer refilled, unpausing request forwarding, body_size=" << req_res->req->body.size();
}
}
@ -204,21 +200,15 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi
req_res->res->body = std::string(buffer, res_size);
req_res->res->final = false;
std::promise<bool> promise;
std::future<bool> future = promise.get_future();
req_res->res->promise = &promise;
LOG(INFO) << "curl_write_async response, res body size: " << req_res->res->body.size();
req_res->server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
// wait until response is sent
LOG(INFO) << "Waiting for response to be sent";
future.get();
req_res->res->await.wait();
LOG(INFO) << "Response sent";
req_res->res->promise = nullptr;
return res_size;
}
@ -231,6 +221,9 @@ size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) {
req_res->server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
// wait until final response is flushed or response object will be destroyed by caller
req_res->res->await.wait();
return 0;
}
@ -246,7 +239,13 @@ CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* re
struct curl_slist *chunk = nullptr;
std::string api_key_header = std::string("x-typesense-api-key: ") + HttpClient::api_key;
chunk = curl_slist_append(chunk, api_key_header.c_str());
// set content length
std::string content_length_header = std::string("content-length: ") + std::to_string(req_res->req->_req->content_length);
chunk = curl_slist_append(chunk, content_length_header.c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, chunk);
//curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE);
// callback called every time request body is needed
curl_easy_setopt(curl, CURLOPT_READFUNCTION, HttpClient::curl_req_send_callback);

View File

@ -400,6 +400,7 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
if(req->proceed_req == nullptr) {
// Full request body is already available, so we don't care if handler is async or not
//LOG(INFO) << "Full request body is already available: " << req->entity.len;
request->last_chunk_aggregate = true;
return process_request(request, response, rpath, h2o_handler);
} else {
@ -529,12 +530,11 @@ void HttpServer::defer_processing(http_req& req, http_res& res, size_t timeout_m
h2o_timer_unlink(&req.defer_timer.timer);
h2o_timer_link(ctx.loop, timeout_ms, &req.defer_timer.timer);
//LOG(INFO) << "defer_processing, exit_loop: " << exit_loop << ", res.promise: " << res.promise;
//LOG(INFO) << "defer_processing, exit_loop: " << exit_loop << ", res.await: " << res.await;
if(exit_loop && res.promise) {
if(exit_loop) {
// otherwise, replication thread could be stuck waiting on a future
res.promise->set_value(true);
res.promise = nullptr;
res.await.notify();
}
}
@ -554,6 +554,8 @@ int HttpServer::send_response(h2o_req_t *req, int status_code, const std::string
}
void HttpServer::send_response(http_req* request, http_res* response) {
LOG(INFO) << "send_response, request->_req=" << request->_req;
if(request->_req == nullptr) {
// indicates serialized request and response -- lifecycle must be managed here
return destroy_request_response(request, response);
@ -578,32 +580,21 @@ void HttpServer::response_abort(h2o_generator_t *generator, h2o_req_t *req) {
custom_generator->request->_req = nullptr;
custom_generator->response->final = true;
if(custom_generator->response->promise) {
// returns control back to caller (raft replication or follower forward)
LOG(INFO) << "response: fulfilling promise.";
custom_generator->response->promise->set_value(true);
custom_generator->response->promise = nullptr;
}
if(custom_generator->request->promise) {
LOG(INFO) << "request: fulfilling promise.";
custom_generator->request->promise->set_value(true);
custom_generator->request->promise = nullptr;
}
// returns control back to caller (raft replication or follower forward)
LOG(INFO) << "response_abort: fulfilling req & res proceed.";
custom_generator->response->await.notify();
custom_generator->request->await.notify();
}
void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
LOG(INFO) << "response_proceed called";
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t*>(generator);
if(custom_generator->response->promise) {
// returns control back to caller (raft replication or follower forward)
LOG(INFO) << "response_proceed: fulfilling promise.";
custom_generator->response->promise->set_value(true);
custom_generator->response->promise = nullptr;
}
custom_generator->response->await.notify();
LOG(INFO) << "proxied_stream: " << custom_generator->response->proxied_stream;
LOG(INFO) << "response.final: " << custom_generator->response->final;
if(custom_generator->response->proxied_stream) {
// request progression should not be tied to response generation
LOG(INFO) << "Ignoring request proceed";
@ -634,13 +625,6 @@ void HttpServer::stream_response(http_req& request, http_res& response) {
if(request._req == nullptr) {
// raft log replay or when underlying request is aborted
LOG(INFO) << "request._req == nullptr";
if(response.promise) {
// returns control back to raft replication thread
response.promise->set_value(true);
response.promise = nullptr;
}
destroy_request_response(&request, &response);
return;
}
@ -649,7 +633,8 @@ void HttpServer::stream_response(http_req& request, http_res& response) {
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(response.generator);
if (req->res.status == 0) {
LOG(INFO) << "h2o_start_response, content_type=" << response.content_type_header;
LOG(INFO) << "h2o_start_response, content_type=" << response.content_type_header
<< ",response.status_code=" << response.status_code;
response.status_code = (response.status_code == 0) ? 503 : response.status_code; // just to be sure
req->res.status = response.status_code;
req->res.reason = http_res::get_status_reason(response.status_code);
@ -667,32 +652,29 @@ void HttpServer::stream_response(http_req& request, http_res& response) {
const h2o_send_state_t state = custom_generator->response->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
h2o_send(req, &body, 1, state);
// for intermediate responses, promise fulfillment will be handled by `response_proceed`
if(custom_generator->response->final && response.promise) {
// returns control back to raft replication thread
response.promise->set_value(true);
response.promise = nullptr;
}
}
void HttpServer::destroy_request_response(http_req* request, http_res* response) {
if(request->defer_timer.data != nullptr) {
deferred_req_res_t* deferred_req_res = static_cast<deferred_req_res_t*>(request->defer_timer.data);
h2o_timer_unlink(&request->defer_timer.timer);
delete deferred_req_res;
}
response->final = true;
request->_req = nullptr;
LOG(INFO) << "destroy_request_response, response->proxied_stream=" << response->proxied_stream
<< ", request->_req=" << request->_req << ", response->await=" << &response->await;
if(response->proxied_stream) {
// lifecycle of proxied resources are managed by curl client proxying the transfer
// we will just nullify _req to indicate that original request is dead
LOG(INFO) << "Ignoring request/response cleanup since response is proxied.";
} else {
if(response->auto_dispose) {
LOG(INFO) << "destroy_request_response: deleting req/res";
delete request;
delete response;
} else {
// lifecycle of proxied/replicated resources are managed externally
// we will just nullify _req to indicate that original request is dead
LOG(INFO) << "Ignoring request/response cleanup since auto_dispose is false.";
response->final = true;
request->_req = nullptr;
response->await.notify();
}
}

View File

@ -127,11 +127,10 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con
// Handle no leader scenario
LOG(ERROR) << "Rejecting write: could not find a leader.";
if(request->_req->proceed_req && request->promise) {
if(request->_req->proceed_req) {
// streaming in progress: ensure graceful termination (cannot start response again)
LOG(ERROR) << "Terminating streaming request gracefully.";
request->promise->set_value(true);
request->promise = nullptr;
request->await.notify();
return ;
}
@ -141,11 +140,10 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con
return message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
}
if (request->_req->proceed_req && request->promise) {
if (request->_req->proceed_req && response->proxied_stream) {
// indicates async request body of in-flight request
LOG(INFO) << "Inflight proxied request, returning control to caller, body_size=" << request->body.size();
request->promise->set_value(true);
request->promise = nullptr;
request->await.notify();
return ;
}
@ -173,6 +171,7 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con
if(path_parts.back().rfind("import", 0) == 0) {
// imports are handled asynchronously
response->proxied_stream = true;
response->auto_dispose = false;
long status = HttpClient::post_response_async(url, request, response, server);
// must manage life cycle for forwarded requests
@ -257,16 +256,18 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
// Now that the log has been parsed, perform the actual operation
// Call http server thread for write and response back to client (if `response` is NOT null)
// We use a future to block current thread until the async flow finishes
std::promise<bool> promise;
std::future<bool> future = promise.get_future();
auto replication_arg = new AsyncIndexArg{request, response, &promise};
response->auto_dispose = false;
auto replication_arg = new AsyncIndexArg{request, response, nullptr};
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
LOG(INFO) << "Raft write waiting for future";
future.get();
LOG(INFO) << "Raft write got the future";
LOG(INFO) << "Raft write waiting to proceed";
response->await.wait();
LOG(INFO) << "Raft write ready to proceed, response->final=" << response->final;
if(response->final) {
delete request;
delete response;
}
if(shut_down) {
iter.set_error_and_rollback();