Handle disconnects during import.

This commit is contained in:
kishorenc 2020-09-04 07:21:40 +05:30
parent 6d7c6b2ea0
commit 58b52687ea
8 changed files with 145 additions and 45 deletions

View File

@ -20,13 +20,15 @@ private:
static size_t curl_write(char *contents, size_t size, size_t nmemb, std::string *s);
static size_t curl_header(char *buffer, size_t size, size_t nmemb, void* context);
static size_t curl_write_async(char *buffer, size_t size, size_t nmemb, void* context);
static size_t curl_write_async_done(void* context, curl_socket_t item);
static CURL* init_curl(const std::string& url, std::string& response);
static CURL* init_curl_async(const std::string& url, request_response* req_res);
static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res);
static size_t curl_req_send_callback(char* buffer, size_t size, size_t nitems, void *userdata);
@ -52,7 +54,8 @@ public:
static long post_response(const std::string & url, const std::string & body, std::string & response,
std::map<std::string, std::string>& res_headers, long timeout_ms=4000);
static long post_response_async(const std::string &url, http_req* request, http_res* response);
static long post_response_async(const std::string &url, http_req* request, http_res* response,
HttpServer* server);
static long put_response(const std::string & url, const std::string & body, std::string & response,
std::map<std::string, std::string>& res_headers, long timeout_ms=4000);

View File

@ -36,8 +36,8 @@ struct http_res {
h2o_generator_t* generator = nullptr;
// for async requests, automatically progresses request body on response proceed
bool proceed_req_after_write = true;
// indicates whether follower is proxying this response stream from leader
bool proxied_stream = false;
http_res(): status_code(0), content_type_header("application/json; charset=utf-8"), final(true) {
@ -321,8 +321,11 @@ struct http_message_dispatcher {
h2o_multithread_message_t *message = H2O_STRUCT_FROM_MEMBER(h2o_multithread_message_t, link, messages->next);
h2o_custom_res_message_t *custom_message = reinterpret_cast<h2o_custom_res_message_t*>(message);
if(custom_message->message_handlers->count(custom_message->type) != 0) {
auto handler = custom_message->message_handlers->at(custom_message->type);
const std::map<std::string, bool (*)(void*)>::const_iterator handler_itr =
custom_message->message_handlers->find(custom_message->type);
if(handler_itr != custom_message->message_handlers->end()) {
auto handler = handler_itr->second;
(handler)(custom_message->data);
}

View File

@ -26,7 +26,7 @@ struct h2o_custom_req_handler_t {
struct h2o_custom_generator_t {
h2o_generator_t super;
h2o_custom_req_handler_t* h2o_handler;
route_path *rpath;
route_path* rpath;
http_req* request;
http_res* response;
};
@ -57,8 +57,11 @@ private:
bool exit_loop = false;
private:
std::string version;
// must be a vector since order of routes matter
std::vector<std::pair<uint64_t, route_path>> routes;
const std::string listen_address;
@ -144,6 +147,8 @@ public:
void stop();
bool has_exited() const;
void clear_timeouts(const std::vector<h2o_timer_t*> & timers, bool trigger_callback = true);
static bool on_stop_server(void *data);

View File

@ -90,6 +90,8 @@ private:
bool create_init_db_snapshot;
std::atomic<bool>& shut_down;
public:
static constexpr const char* log_dir_name = "log";
@ -97,7 +99,7 @@ public:
static constexpr const char* snapshot_dir_name = "snapshot";
ReplicationState(Store* store, ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher,
bool create_init_db_snapshot);
bool create_init_db_snapshot, std::atomic<bool>& quit_service);
~ReplicationState() {
delete node;
@ -131,6 +133,8 @@ public:
// Shut this node down.
void shutdown() {
LOG(INFO) << "Replication state shutdown.";
shut_down = true;
if (node) {
node->shutdown(nullptr);
}
@ -140,6 +144,7 @@ public:
void join() {
if (node) {
node->join();
node = nullptr;
}
}

View File

@ -18,9 +18,9 @@ long HttpClient::post_response(const std::string &url, const std::string &body,
return perform_curl(curl, res_headers);
}
long HttpClient::post_response_async(const std::string &url, http_req* request, http_res* response) {
request_response* req_res = new request_response{request, response};
std::unique_ptr<request_response> req_res_guard(req_res);
long HttpClient::post_response_async(const std::string &url, http_req* request, http_res* response, HttpServer* server) {
deferred_req_res_t* req_res = new deferred_req_res_t{request, response, server};
std::unique_ptr<deferred_req_res_t> req_res_guard(req_res);
CURL *curl = init_curl_async(url, req_res);
if(curl == nullptr) {
@ -120,7 +120,13 @@ void HttpClient::extract_response_headers(CURL* curl, std::map<std::string, std:
size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nitems, void* userdata) {
// callback for request body to be sent to remote host
request_response* req_res = static_cast<request_response *>(userdata);
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(userdata);
if(req_res->req->_req == nullptr) {
// underlying client request is dead, don't proxy anymore data to upstream (leader)
return 0;
}
size_t max_req_bytes = (size * nitems);
const char* total_body_buf = req_res->req->body.c_str();
@ -143,8 +149,7 @@ size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nite
req_res->req->body_index = 0;
req_res->req->body = "";
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(req_res->res->generator);
HttpServer *server = custom_generator->h2o_handler->http_server;
HttpServer *server = req_res->server;
if(req_res->req->last_chunk_aggregate) {
LOG(INFO) << "Request forwarding done.";
@ -169,17 +174,54 @@ size_t HttpClient::curl_req_send_callback(char* buffer, size_t size, size_t nite
return bytes_to_read;
}
size_t HttpClient::curl_header(char *buffer, size_t size, size_t nmemb, void *context) {
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(context);
size_t header_size = size * nmemb;
std::string header(buffer, header_size);
if(header.rfind("HTTP", 0) == 0) {
// status field, e.g. "HTTP/1.1 404 Not Found"
std::vector<std::string> parts;
StringUtils::split(header, parts, " ");
if(parts.size() >= 2 && StringUtils::is_uint32_t(parts[1])) {
req_res->res->status_code = std::stoi(parts[1]);
} else {
req_res->res->status_code = 500;
}
} else if(header.rfind("content-type", 0) == 0) {
// e.g. "content-type: application/json; charset=utf-8"
std::vector<std::string> parts;
StringUtils::split(header, parts, ":");
if(parts.size() == 2) {
req_res->res->content_type_header = parts[1];
} else {
req_res->res->content_type_header = "application/json; charset=utf-8";
}
}
LOG(INFO) << "header:|" << header << "|";
return header_size;
}
size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, void *context) {
// callback for response body to be sent back to client
LOG(INFO) << "curl_write_async";
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(context);
if(req_res->req->_req == nullptr) {
// underlying client request is dead, don't try to send anymore data
return 0;
}
request_response* req_res = static_cast<request_response *>(context);
size_t res_size = size * nmemb;
// FIXME: use header from remote response
// we've got response from remote host: write to client and ask for more request body
req_res->res->content_type_header = "text/plain; charset=utf8";
req_res->res->status_code = 200;
req_res->res->body = std::string(buffer, res_size);
req_res->res->final = false;
@ -189,9 +231,7 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi
LOG(INFO) << "curl_write_async response, res body size: " << req_res->res->body.size();
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(req_res->res->generator);
HttpServer *server = custom_generator->h2o_handler->http_server;
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
req_res->server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
// wait until response is sent
LOG(INFO) << "Waiting for response to be sent";
@ -206,18 +246,16 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi
size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) {
LOG(INFO) << "curl_write_async_done";
request_response* req_res = static_cast<request_response *>(context);
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(context);
req_res->res->body = "";
req_res->res->final = true;
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(req_res->res->generator);
HttpServer *server = custom_generator->h2o_handler->http_server;
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
req_res->server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
return 0;
}
CURL *HttpClient::init_curl_async(const std::string& url, request_response* req_res) {
CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* req_res) {
CURL *curl = curl_easy_init();
if(curl == nullptr) {
@ -250,6 +288,9 @@ CURL *HttpClient::init_curl_async(const std::string& url, request_response* req_
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L);
//curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, curl_header);
//curl_easy_setopt(curl, CURLOPT_HEADERDATA, req_res);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HttpClient::curl_write_async);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, req_res);
@ -292,4 +333,3 @@ size_t HttpClient::curl_write(char *contents, size_t size, size_t nmemb, std::st
s->append(contents, size*nmemb);
return size*nmemb;
}

View File

@ -525,6 +525,8 @@ void HttpServer::defer_processing(http_req& req, http_res& res, size_t timeout_m
h2o_timer_unlink(&req.defer_timer.timer);
h2o_timer_link(ctx.loop, timeout_ms, &req.defer_timer.timer);
//LOG(INFO) << "defer_processing, exit_loop: " << exit_loop << ", res.promise: " << res.promise;
if(exit_loop && res.promise) {
// otherwise, replication thread could be stuck waiting on a future
res.promise->set_value(true);
@ -569,12 +571,21 @@ void HttpServer::response_abort(h2o_generator_t *generator, h2o_req_t *req) {
LOG(INFO) << "response_abort called";
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t*>(generator);
custom_generator->request->_req = nullptr;
custom_generator->response->final = true;
if(custom_generator->response->promise) {
// returns control back to caller (raft replication or follower forward)
LOG(INFO) << "response_abort: fulfilling promise.";
LOG(INFO) << "response: fulfilling promise.";
custom_generator->response->promise->set_value(true);
custom_generator->response->promise = nullptr;
}
if(custom_generator->request->promise) {
LOG(INFO) << "request: fulfilling promise.";
custom_generator->request->promise->set_value(true);
custom_generator->request->promise = nullptr;
}
}
void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
@ -588,9 +599,9 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
custom_generator->response->promise = nullptr;
}
LOG(INFO) << "proceed_req_after_write: " << custom_generator->response->proceed_req_after_write;
if(!custom_generator->response->proceed_req_after_write) {
// request progression is not tied to response generation
LOG(INFO) << "proxied_stream: " << custom_generator->response->proxied_stream;
if(custom_generator->response->proxied_stream) {
// request progression should not be tied to response generation
LOG(INFO) << "Ignoring request proceed";
return ;
}
@ -617,7 +628,7 @@ void HttpServer::response_proceed(h2o_generator_t *generator, h2o_req_t *req) {
void HttpServer::stream_response(http_req& request, http_res& response) {
LOG(INFO) << "stream_response called";
if(request._req == nullptr) {
// raft log replay
// raft log replay or when underlying request is aborted
LOG(INFO) << "request._req == nullptr";
if(response.promise) {
@ -667,8 +678,18 @@ void HttpServer::destroy_request_response(http_req* request, http_res* response)
delete deferred_req_res;
}
delete request;
delete response;
response->final = true;
request->_req = nullptr;
if(response->proxied_stream) {
// lifecycle of proxied resources are managed by curl client proxying the transfer
// we will just nullify _req to indicate that original request is dead
LOG(INFO) << "Ignoring request/response cleanup since response is proxied.";
} else {
LOG(INFO) << "destroy_request_response: deleting req/res";
delete request;
delete response;
}
}
void HttpServer::set_auth_handler(bool (*handler)(std::map<std::string, std::string>& params, const route_path& rpath,
@ -717,6 +738,7 @@ HttpServer::~HttpServer() {
}
h2o_timerwheel_run(ctx.loop->_timeouts, 9999999999999);
h2o_timerwheel_destroy(ctx.loop->_timeouts);
h2o_context_dispose(&ctx);
@ -772,9 +794,13 @@ bool HttpServer::on_request_proceed_message(void *data) {
size_t written = req_res->req->chunk_len;
req_res->req->chunk_len = 0;
if(req_res->req->_req->proceed_req) {
if(req_res->req->_req && req_res->req->_req->proceed_req) {
req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state);
}
return true;
}
bool HttpServer::has_exited() const {
return exit_loop;
}

View File

@ -92,6 +92,10 @@ std::string ReplicationState::to_nodes_config(const butil::EndPoint& peering_end
}
void ReplicationState::write(http_req* request, http_res* response) {
if(!node) {
return ;
}
if (!node->is_leader()) {
return follower_write(request, response);
}
@ -125,11 +129,6 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con
LOG(INFO) << "follower_write_count: " << follower_write_count;
if(follower_write_count == 1) {
//LOG(INFO) << "follower_write, will sleep for 10 seconds...";
//std::this_thread::sleep_for(std::chrono::seconds(10));
}
if(node->leader_id().is_empty()) {
// Handle no leader scenario
LOG(ERROR) << "Rejecting write: could not find a leader.";
@ -159,7 +158,10 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con
const std::string & leader_addr = node->leader_id().to_string();
LOG(INFO) << "Redirecting write to leader at: " << leader_addr;
thread_pool->enqueue([leader_addr, request, response, this]() {
h2o_custom_generator_t* custom_generator = reinterpret_cast<h2o_custom_generator_t *>(response->generator);
HttpServer* server = custom_generator->h2o_handler->http_server;
thread_pool->enqueue([leader_addr, request, response, server, this]() {
auto raw_req = request->_req;
std::string scheme = std::string(raw_req->scheme->name.base, raw_req->scheme->name.len);
std::vector<std::string> addr_parts;
@ -176,9 +178,15 @@ void ReplicationState::follower_write(http_req *request, http_res *response) con
if(path_parts.back().rfind("import", 0) == 0) {
// imports are handled asynchronously
response->proceed_req_after_write = false;
response->proxied_stream = true;
long status = HttpClient::post_response_async(url, request, response, server);
// must manage life cycle for forwarded requests
delete request;
delete response;
LOG(INFO) << "Import call done.";
long status = HttpClient::post_response_async(url, request, response);
if(status == 500) {
response->content_type_header = res_headers["content-type"];
response->set_500("");
@ -259,10 +267,17 @@ void ReplicationState::on_apply(braft::Iterator& iter) {
std::promise<bool> promise;
std::future<bool> future = promise.get_future();
auto replication_arg = new AsyncIndexArg{request, response, &promise};
message_dispatcher->send_message(REPLICATION_MSG, replication_arg);
LOG(INFO) << "Raft write waiting for future";
future.get();
LOG(INFO) << "Raft write got the future";
if(shut_down) {
iter.set_error_and_rollback();
return;
}
}
}
@ -410,10 +425,10 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
}
ReplicationState::ReplicationState(Store *store, ThreadPool* thread_pool, http_message_dispatcher *message_dispatcher,
bool create_init_db_snapshot):
bool create_init_db_snapshot, std::atomic<bool>& quit_service):
node(nullptr), leader_term(-1), store(store), thread_pool(thread_pool),
message_dispatcher(message_dispatcher), init_readiness_count(0),
create_init_db_snapshot(create_init_db_snapshot) {
create_init_db_snapshot(create_init_db_snapshot), shut_down(quit_service) {
}

View File

@ -37,7 +37,6 @@ void catch_interrupt(int sig) {
LOG(INFO) << "Stopping Typesense server...";
signal(sig, SIG_IGN); // ignore for now as we want to shut down elegantly
quit_raft_service = true;
server->stop();
}
void catch_crash(int sig) {
@ -261,6 +260,9 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
raft_server.Join();
LOG(INFO) << "Typesense peering service has quit.";
server->stop();
return 0;
}
@ -354,7 +356,8 @@ int run_server(const Config & config, const std::string & version, void (*master
// first we start the peering service
ThreadPool thread_pool(32);
ReplicationState replication_state(&store, &thread_pool, server->get_message_dispatcher(), create_init_db_snapshot);
ReplicationState replication_state(&store, &thread_pool, server->get_message_dispatcher(),
create_init_db_snapshot, quit_raft_service);
std::thread raft_thread([&replication_state, &config, &state_dir]() {
std::string path_to_nodes = config.get_nodes();
@ -369,7 +372,7 @@ int run_server(const Config & config, const std::string & version, void (*master
// we are out of the event loop here
LOG(INFO) << "Typesense API service has quit. Stopping peering service...";
LOG(INFO) << "Typesense API service has quit.";
quit_raft_service = true;
raft_thread.join();