diff --git a/include/http_server.h b/include/http_server.h index b83ae775..6708bef1 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -290,4 +290,6 @@ public: void persist_applying_index(); int64_t get_num_queued_writes(); + + void decr_pending_writes(); }; diff --git a/include/raft_server.h b/include/raft_server.h index 0a9c8368..98f77d4d 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -239,6 +239,8 @@ public: std::string get_leader_url() const; + void decr_pending_writes(); + private: friend class ReplicationClosure; diff --git a/src/http_client.cpp b/src/http_client.cpp index 79a62a4a..cef788c6 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -295,6 +295,7 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) { //LOG(INFO) << "curl_write_async_done"; deferred_req_res_t* req_res = static_cast(context); + req_res->server->decr_pending_writes(); if(!req_res->res->is_alive) { // underlying client request is dead, don't try to send anymore data diff --git a/src/http_server.cpp b/src/http_server.cpp index 50391ed8..5a962610 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -1104,3 +1104,7 @@ bool HttpServer::is_leader() const { ThreadPool* HttpServer::get_meta_thread_pool() const { return meta_thread_pool; } + +void HttpServer::decr_pending_writes() { + return replication_state->decr_pending_writes(); +} diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 70ebde09..51baad90 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -303,18 +303,17 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, response->content_type_header = res_headers["content-type"]; response->set_500(""); } else { - pending_writes--; return ; } } else { std::string api_res; - long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 10*1000, true); + long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 0, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } } else if(request->http_method == "PUT") { std::string api_res; - long status = HttpClient::put_response(url, request->body, api_res, res_headers, 10*1000, true); + long status = HttpClient::put_response(url, request->body, api_res, res_headers, 0, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else if(request->http_method == "DELETE") { @@ -327,13 +326,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, std::string api_res; route_path* rpath = nullptr; bool route_found = server->get_route(request->route_hash, &rpath); - - long timeout_ms = 10 * 1000; - if(route_found && rpath->handler == patch_update_collection) { - timeout_ms = 0; // patching a collection can take a long time - } - - long status = HttpClient::patch_response(url, request->body, api_res, res_headers, timeout_ms, true); + long status = HttpClient::patch_response(url, request->body, api_res, res_headers, 0, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else { @@ -562,14 +555,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes, const size_t raf node->get_status(&nodeStatus); LOG(INFO) << "Term: " << nodeStatus.term - << ", last_index index: " << nodeStatus.last_index - << ", committed_index: " << nodeStatus.committed_index - << ", known_applied_index: " << nodeStatus.known_applied_index - << ", applying_index: " << nodeStatus.applying_index - << ", queued_writes: " << batched_indexer->get_queued_writes() - << ", pending_writes: " << pending_writes - << ", pending_queue_size: " << nodeStatus.pending_queue_size - << ", local_sequence: " << store->get_latest_seq_number(); + << ", pending_queue: " << nodeStatus.pending_queue_size + << ", last_index: " << nodeStatus.last_index + << ", committed: " << nodeStatus.committed_index + << ", known_applied: " << nodeStatus.known_applied_index + << ", applying: " << nodeStatus.applying_index + << ", pending_writes: " << pending_writes + << ", queued_writes: " << batched_indexer->get_queued_writes() + << ", local_sequence: " << store->get_latest_seq_number(); if(node->is_leader()) { RefreshNodesClosure* refresh_nodes_done = new RefreshNodesClosure; @@ -987,6 +980,10 @@ std::string ReplicationState::get_leader_url() const { return get_node_url_path(leader_addr, "/", protocol); } +void ReplicationState::decr_pending_writes() { + pending_writes--; +} + void TimedSnapshotClosure::Run() { // Auto delete this after Done() std::unique_ptr self_guard(this);