mirror of
https://github.com/typesense/typesense.git
synced 2025-05-21 06:02:26 +08:00
Account for queued writes when computing read+write lags.
This commit is contained in:
parent
26351a6984
commit
a890d036ed
@ -49,6 +49,8 @@ private:
|
||||
|
||||
static std::string get_req_prefix_key(uint64_t req_id);
|
||||
|
||||
std::atomic<int64_t> queued_writes = 0;
|
||||
|
||||
public:
|
||||
|
||||
BatchedIndexer(HttpServer* server, Store* store, size_t num_threads);
|
||||
@ -57,6 +59,8 @@ public:
|
||||
|
||||
void enqueue(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
|
||||
|
||||
int64_t get_queued_writes();
|
||||
|
||||
void run();
|
||||
|
||||
void stop();
|
||||
|
@ -26,6 +26,7 @@ void BatchedIndexer::enqueue(const std::shared_ptr<http_req>& req, const std::sh
|
||||
//LOG(INFO) << "req_id: " << req->start_ts << ", chunk_sequence: " << chunk_sequence;
|
||||
|
||||
store->insert(request_chunk_key, req->serialize());
|
||||
queued_writes++;
|
||||
req->body = "";
|
||||
|
||||
{
|
||||
@ -116,6 +117,7 @@ void BatchedIndexer::run() {
|
||||
deferred_req_res);
|
||||
}
|
||||
|
||||
queued_writes--;
|
||||
iter->Next();
|
||||
}
|
||||
|
||||
@ -190,3 +192,7 @@ BatchedIndexer::~BatchedIndexer() {
|
||||
void BatchedIndexer::stop() {
|
||||
exit = true;
|
||||
}
|
||||
|
||||
int64_t BatchedIndexer::get_queued_writes() {
|
||||
return queued_writes;
|
||||
}
|
||||
|
@ -543,8 +543,7 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
|
||||
<< ", committed_index: " << nodeStatus.committed_index
|
||||
<< ", known_applied_index: " << nodeStatus.known_applied_index
|
||||
<< ", applying_index: " << nodeStatus.applying_index
|
||||
<< ", pending_index: " << nodeStatus.pending_index
|
||||
<< ", disk_index: " << nodeStatus.disk_index
|
||||
<< ", queued_writes: " << batched_indexer->get_queued_writes()
|
||||
<< ", pending_queue_size: " << nodeStatus.pending_queue_size
|
||||
<< ", local_sequence: " << store->get_latest_seq_number();
|
||||
|
||||
@ -602,6 +601,9 @@ void ReplicationState::refresh_catchup_status(bool log_msg) {
|
||||
int64_t current_index = (n_status.applying_index == 0) ? n_status.known_applied_index : n_status.applying_index;
|
||||
int64_t apply_lag = n_status.last_index - current_index;
|
||||
|
||||
// in addition to raft level lag, we should also account for internal batched write queue
|
||||
int64_t num_queued_writes = batched_indexer->get_queued_writes();
|
||||
|
||||
//LOG(INFO) << "last_index: " << n_status.applying_index << ", known_applied_index: " << n_status.known_applied_index;
|
||||
//LOG(INFO) << "apply_lag: " << apply_lag;
|
||||
|
||||
@ -609,14 +611,24 @@ void ReplicationState::refresh_catchup_status(bool log_msg) {
|
||||
LOG_IF(ERROR, log_msg) << apply_lag << " lagging entries > healthy read lag of " << healthy_read_lag;
|
||||
this->read_caught_up = false;
|
||||
} else {
|
||||
this->read_caught_up = true;
|
||||
if(num_queued_writes > healthy_read_lag) {
|
||||
LOG_IF(ERROR, log_msg) << num_queued_writes << " queued writes > healthy read lag of " << healthy_read_lag;
|
||||
this->read_caught_up = false;
|
||||
} else {
|
||||
this->read_caught_up = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (apply_lag > healthy_write_lag) {
|
||||
LOG_IF(ERROR, log_msg) << apply_lag << " lagging entries > healthy write lag of " << healthy_write_lag;
|
||||
this->write_caught_up = false;
|
||||
} else {
|
||||
this->write_caught_up = true;
|
||||
if(num_queued_writes > healthy_write_lag) {
|
||||
LOG_IF(ERROR, log_msg) << num_queued_writes << " queued writes > healthy write lag of " << healthy_write_lag;
|
||||
this->write_caught_up = false;
|
||||
} else {
|
||||
this->write_caught_up = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user