diff --git a/include/batched_indexer.h b/include/batched_indexer.h index feddcd55..32b90196 100644 --- a/include/batched_indexer.h +++ b/include/batched_indexer.h @@ -49,6 +49,8 @@ private: static std::string get_req_prefix_key(uint64_t req_id); + std::atomic queued_writes = 0; + public: BatchedIndexer(HttpServer* server, Store* store, size_t num_threads); @@ -57,6 +59,8 @@ public: void enqueue(const std::shared_ptr& req, const std::shared_ptr& res); + int64_t get_queued_writes(); + void run(); void stop(); diff --git a/src/batched_indexer.cpp b/src/batched_indexer.cpp index a6d6cc74..20bf3b9b 100644 --- a/src/batched_indexer.cpp +++ b/src/batched_indexer.cpp @@ -26,6 +26,7 @@ void BatchedIndexer::enqueue(const std::shared_ptr& req, const std::sh //LOG(INFO) << "req_id: " << req->start_ts << ", chunk_sequence: " << chunk_sequence; store->insert(request_chunk_key, req->serialize()); + queued_writes++; req->body = ""; { @@ -116,6 +117,7 @@ void BatchedIndexer::run() { deferred_req_res); } + queued_writes--; iter->Next(); } @@ -190,3 +192,7 @@ BatchedIndexer::~BatchedIndexer() { void BatchedIndexer::stop() { exit = true; } + +int64_t BatchedIndexer::get_queued_writes() { + return queued_writes; +} diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 110ce5a1..ff3cf605 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -543,8 +543,7 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { << ", committed_index: " << nodeStatus.committed_index << ", known_applied_index: " << nodeStatus.known_applied_index << ", applying_index: " << nodeStatus.applying_index - << ", pending_index: " << nodeStatus.pending_index - << ", disk_index: " << nodeStatus.disk_index + << ", queued_writes: " << batched_indexer->get_queued_writes() << ", pending_queue_size: " << nodeStatus.pending_queue_size << ", local_sequence: " << store->get_latest_seq_number(); @@ -602,6 +601,9 @@ void ReplicationState::refresh_catchup_status(bool log_msg) { int64_t current_index = (n_status.applying_index == 0) ? n_status.known_applied_index : n_status.applying_index; int64_t apply_lag = n_status.last_index - current_index; + // in addition to raft level lag, we should also account for internal batched write queue + int64_t num_queued_writes = batched_indexer->get_queued_writes(); + //LOG(INFO) << "last_index: " << n_status.applying_index << ", known_applied_index: " << n_status.known_applied_index; //LOG(INFO) << "apply_lag: " << apply_lag; @@ -609,14 +611,24 @@ void ReplicationState::refresh_catchup_status(bool log_msg) { LOG_IF(ERROR, log_msg) << apply_lag << " lagging entries > healthy read lag of " << healthy_read_lag; this->read_caught_up = false; } else { - this->read_caught_up = true; + if(num_queued_writes > healthy_read_lag) { + LOG_IF(ERROR, log_msg) << num_queued_writes << " queued writes > healthy read lag of " << healthy_read_lag; + this->read_caught_up = false; + } else { + this->read_caught_up = true; + } } if (apply_lag > healthy_write_lag) { LOG_IF(ERROR, log_msg) << apply_lag << " lagging entries > healthy write lag of " << healthy_write_lag; this->write_caught_up = false; } else { - this->write_caught_up = true; + if(num_queued_writes > healthy_write_lag) { + LOG_IF(ERROR, log_msg) << num_queued_writes << " queued writes > healthy write lag of " << healthy_write_lag; + this->write_caught_up = false; + } else { + this->write_caught_up = true; + } } }