From e3dee78a1b6337ece8087d605d12de1c6545264f Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 10 Dec 2022 19:21:54 +0530 Subject: [PATCH] Add resource check to batch indexer as well. --- include/batched_indexer.h | 5 ++++- include/cached_resource_stat.h | 14 +++++++++++++- include/raft_server.h | 2 -- src/batched_indexer.cpp | 18 +++++++++++++++--- src/cached_resource_stat.cpp | 4 ++-- src/raft_server.cpp | 7 ++++--- src/typesense_server_utils.cpp | 2 +- 7 files changed, 39 insertions(+), 13 deletions(-) diff --git a/include/batched_indexer.h b/include/batched_indexer.h index c9af312f..95073747 100644 --- a/include/batched_indexer.h +++ b/include/batched_indexer.h @@ -6,6 +6,7 @@ #include "http_data.h" #include "threadpool.h" #include "http_server.h" +#include "config.h" class BatchedIndexer { private: @@ -73,6 +74,8 @@ private: // When set, all writes (both live and log serialized) are skipped with 422 response const std::atomic& skip_writes; + const Config& config; + static const size_t GC_INTERVAL_SECONDS = 60; static const size_t GC_PRUNE_MAX_SECONDS = 3600; @@ -85,7 +88,7 @@ public: static const constexpr char* RAFT_REQ_LOG_PREFIX = "$RL_"; BatchedIndexer(HttpServer* server, Store* store, Store* meta_store, size_t num_threads, - const std::atomic& skip_writes); + const Config& config, const std::atomic& skip_writes); ~BatchedIndexer(); diff --git a/include/cached_resource_stat.h b/include/cached_resource_stat.h index 6c620ee4..2c409dd0 100644 --- a/include/cached_resource_stat.h +++ b/include/cached_resource_stat.h @@ -4,7 +4,8 @@ #include #include -struct cached_resource_stat_t { +class cached_resource_stat_t { +private: const static size_t REFRESH_INTERVAL_SECS = 5; uint64_t disk_total_bytes = 0; uint64_t disk_used_bytes = 0; @@ -17,6 +18,17 @@ struct cached_resource_stat_t { uint64_t last_checked_ts = 0; + cached_resource_stat_t() = default; + + ~cached_resource_stat_t() = default; + +public: + + static cached_resource_stat_t& get_instance() { + static cached_resource_stat_t instance; + return instance; + } + enum resource_check_t { OK, OUT_OF_DISK, diff --git a/include/raft_server.h b/include/raft_server.h index d788dada..5ae7cf27 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -136,8 +136,6 @@ private: std::atomic shutting_down; std::atomic pending_writes; - cached_resource_stat_t cached_disk_stat; - const uint64_t snapshot_interval_s; // frequency of actual snapshotting uint64_t last_snapshot_ts; // when last snapshot ran diff --git a/src/batched_indexer.cpp b/src/batched_indexer.cpp index ba82b1a9..9d8bea08 100644 --- a/src/batched_indexer.cpp +++ b/src/batched_indexer.cpp @@ -1,12 +1,13 @@ #include "batched_indexer.h" #include "core_api.h" #include "thread_local_vars.h" +#include "cached_resource_stat.h" BatchedIndexer::BatchedIndexer(HttpServer* server, Store* store, Store* meta_store, const size_t num_threads, - const std::atomic& skip_writes): + const Config& config, const std::atomic& skip_writes): server(server), store(store), meta_store(meta_store), num_threads(num_threads), last_gc_run(std::chrono::high_resolution_clock::now()), quit(false), - skip_writes(skip_writes) { + config(config), skip_writes(skip_writes) { queues.resize(num_threads); qmutuxes = new await_t[num_threads]; skip_index_iter_upper_bound = new rocksdb::Slice(skip_index_upper_bound_key); @@ -194,8 +195,19 @@ void BatchedIndexer::run() { else { //LOG(INFO) << "index req " << req_id << ", chunk index: " << orig_req_res.next_chunk_index; + auto resource_check = cached_resource_stat_t::get_instance() + .has_enough_resources(config.get_data_dir(), + config.get_disk_used_max_percentage(), + config.get_memory_used_max_percentage()); - if(route_found) { + if (resource_check != cached_resource_stat_t::OK && orig_req->http_method != "DELETE") { + orig_res->set_422("Rejecting write: running out of resource type: " + + std::string(magic_enum::enum_name(resource_check))); + orig_res->final = true; + async_res = false; + } + + else if(route_found) { if(skip_writes && found_rpath->handler != post_config) { orig_res->set(422, "Skipping write."); async_req_res_t* async_req_res = new async_req_res_t(orig_req, orig_res, true); diff --git a/src/cached_resource_stat.cpp b/src/cached_resource_stat.cpp index 1a7eb5bf..852e1aea 100644 --- a/src/cached_resource_stat.cpp +++ b/src/cached_resource_stat.cpp @@ -71,7 +71,7 @@ cached_resource_stat_t::has_enough_resources(const std::string& data_dir_path, return cached_resource_stat_t::OK; } - LOG(INFO) << "memory_total_bytes: " << memory_total_bytes << ", memory_available_bytes: " << memory_available_bytes; + //LOG(INFO) << "memory_total_bytes: " << memory_total_bytes << ", memory_available_bytes: " << memory_available_bytes; // Calculate sum of RAM + SWAP used as all_memory_used uint64_t all_memory_used = (memory_total_bytes - memory_available_bytes) + (swap_total_bytes - swap_free_bytes); @@ -85,7 +85,7 @@ cached_resource_stat_t::has_enough_resources(const std::string& data_dir_path, ((100ULL - memory_used_max_percentage) * memory_total_bytes) / 100); uint64_t free_mem = (memory_total_bytes - all_memory_used); - LOG(INFO) << "free_mem: " << free_mem << ", memory_free_min_bytes: " << memory_free_min_bytes; + //LOG(INFO) << "free_mem: " << free_mem << ", memory_free_min_bytes: " << memory_free_min_bytes; if(free_mem < memory_free_min_bytes) { return cached_resource_stat_t::OUT_OF_MEMORY; diff --git a/src/raft_server.cpp b/src/raft_server.cpp index e63af5a2..0f332bd6 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -186,9 +186,10 @@ void ReplicationState::write(const std::shared_ptr& request, const std } // reject write if disk space is running out - auto resource_check = cached_disk_stat.has_enough_resources(raft_dir_path, config->get_disk_used_max_percentage(), - config->get_memory_used_max_percentage()); - if (resource_check != cached_resource_stat_t::OK) { + auto resource_check = cached_resource_stat_t::get_instance().has_enough_resources(raft_dir_path, + config->get_disk_used_max_percentage(), config->get_memory_used_max_percentage()); + + if (resource_check != cached_resource_stat_t::OK && request->http_method != "DELETE") { response->set_422("Rejecting write: running out of resource type: " + std::string(magic_enum::enum_name(resource_check))); response->final = true; diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 8e6ceeca..4ad8d321 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -439,7 +439,7 @@ int run_server(const Config & config, const std::string & version, void (*master bool ssl_enabled = (!config.get_ssl_cert().empty() && !config.get_ssl_cert_key().empty()); BatchedIndexer* batch_indexer = new BatchedIndexer(server, &store, &meta_store, num_threads, - config.get_skip_writes()); + config, config.get_skip_writes()); CollectionManager & collectionManager = CollectionManager::get_instance(); collectionManager.init(&store, &app_thread_pool, config.get_max_memory_ratio(),