diff --git a/include/core_api.h b/include/core_api.h index 68bfdb72..96b844d8 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -178,8 +178,6 @@ void get_collections_for_auth(std::map& req_params, co std::vector& collections, std::vector& embedded_params_vec); -void log_running_queries(); - bool is_doc_import_route(uint64_t route_hash); bool is_coll_create_route(uint64_t route_hash); diff --git a/include/housekeeper.h b/include/housekeeper.h index 10b09546..3cbe9eb6 100644 --- a/include/housekeeper.h +++ b/include/housekeeper.h @@ -2,6 +2,7 @@ #include #include #include +#include "http_data.h" class HouseKeeper { private: @@ -9,14 +10,31 @@ private: std::condition_variable cv; std::atomic quit = false; - std::atomic hnsw_repair_interval_s = 1800; std::atomic remove_expired_keys_interval_s = 3600; + std::atomic memory_req_min_age_s = 10; + std::atomic memory_usage_interval_s = 3; + + // used to track in-flight queries so they can be logged during a crash / rapid memory growth + std::mutex ifq_mutex; + + struct req_metadata_t { + std::shared_ptr req; + uint64_t active_memory = 0; + bool already_logged = false; + + req_metadata_t(const std::shared_ptr& req, uint64_t active_memory): + req(req), active_memory(active_memory) { + + } + }; + + std::map in_flight_queries; + std::atomic active_memory_used = 0; HouseKeeper() {} ~HouseKeeper() {} - public: static HouseKeeper &get_instance() { @@ -28,7 +46,19 @@ public: void operator=(HouseKeeper const &) = delete; - void init(uint32_t interval_seconds); + void init(); + + uint64_t get_active_memory_used(); + + void add_req(const std::shared_ptr& req); + + void remove_req(uint64_t req_id); + + std::string get_query_log(const std::shared_ptr& req); + + void log_bad_queries(); + + void log_running_queries(); void run(); diff --git a/include/system_metrics.h b/include/system_metrics.h index 94363afa..a6e87b24 100644 --- a/include/system_metrics.h +++ b/include/system_metrics.h @@ -139,8 +139,6 @@ private: static uint64_t linux_get_mem_available_bytes(); - static uint64_t get_memory_active_bytes(); - static uint64_t get_memory_non_proc_bytes(); public: @@ -152,6 +150,8 @@ public: non_proc_mem_bytes = memory_used_bytes - get_memory_active_bytes(); } + static uint64_t get_memory_active_bytes(); + static void linux_get_network_data(const std::string & stat_path, uint64_t& received_bytes, uint64_t& sent_bytes); void get(const std::string & data_dir_path, nlohmann::json& result); diff --git a/src/core_api.cpp b/src/core_api.cpp index 962c4ecc..c55537d1 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "typesense_server_utils.h" #include "core_api.h" #include "string_utils.h" @@ -44,24 +45,17 @@ public: } }; -// used to log the queries that were in-flight during a crash -std::mutex ifq_mutex; -std::unordered_map> in_flight_queries; class in_flight_req_guard_t { uint64_t req_id; public: in_flight_req_guard_t(const std::shared_ptr& req) { - std::unique_lock ifq_lock(ifq_mutex); - in_flight_queries.emplace(req->start_ts, req); req_id = req->start_ts; - ifq_lock.unlock(); + HouseKeeper::get_instance().add_req(req); } ~in_flight_req_guard_t() { - std::unique_lock ifq_lock(ifq_mutex); - in_flight_queries.erase(req_id); - ifq_lock.unlock(); + HouseKeeper::get_instance().remove_req(req_id); } }; @@ -75,29 +69,6 @@ bool get_alter_in_progress(const std::string& collection) { return alters_in_progress.count(collection) != 0; } -void log_running_queries() { - std::unique_lock ifq_lock(ifq_mutex); - if(in_flight_queries.empty()) { - LOG(INFO) << "No in-flight search queries were found."; - return ; - } - - LOG(INFO) << "Dump of in-flight search queries:"; - - for(const auto& kv: in_flight_queries) { - std::string query_string = "?"; - std::string search_payload = kv.second->body; - StringUtils::erase_char(search_payload, '\n'); - for(const auto& param_kv: kv.second->params) { - if(param_kv.first != http_req::AUTH_HEADER && param_kv.first != http_req::USER_HEADER) { - query_string += param_kv.first + "=" + param_kv.second + "&"; - } - } - - LOG(INFO) << "id=" << kv.first << ", qs=" << query_string << ", body=" << search_payload; - } -} - bool handle_authentication(std::map& req_params, std::vector& embedded_params_vec, const std::string& body, @@ -418,7 +389,7 @@ bool get_debug(const std::shared_ptr& req, const std::shared_ptr #include +#include #include "housekeeper.h" void HouseKeeper::run() { - uint64_t prev_hnsw_repair_s = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count(); - uint64_t prev_remove_expired_keys_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); uint64_t prev_db_compaction_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); + uint64_t prev_memory_usage_s = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + while(!quit) { std::unique_lock lk(mutex); - cv.wait_for(lk, std::chrono::seconds(60), [&] { return quit.load(); }); + cv.wait_for(lk, std::chrono::milliseconds(3050), [&] { return quit.load(); }); if(quit) { lk.unlock(); @@ -23,6 +25,13 @@ void HouseKeeper::run() { auto now_ts_seconds = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); + // update system memory usage + if (now_ts_seconds - prev_memory_usage_s >= memory_usage_interval_s) { + active_memory_used = SystemMetrics::get_memory_active_bytes(); + prev_memory_usage_s = now_ts_seconds; + log_bad_queries(); + } + // perform compaction on underlying store if enabled if(Config::get_instance().get_db_compaction_interval() > 0) { if(now_ts_seconds - prev_db_compaction_s >= Config::get_instance().get_db_compaction_interval()) { @@ -34,27 +43,6 @@ void HouseKeeper::run() { } } - /*if(now_ts_seconds - prev_hnsw_repair_s >= hnsw_repair_interval_s) { - // iterate through all collections and repair all hnsw graphs (if any) - auto coll_names = CollectionManager::get_instance().get_collection_names(); - - for(auto& coll_name: coll_names) { - auto coll = CollectionManager::get_instance().get_collection(coll_name); - if(coll == nullptr) { - continue; - } - - coll->do_housekeeping(); - } - - if(!coll_names.empty()) { - LOG(INFO) << "Ran housekeeping for " << coll_names.size() << " collections."; - } - - prev_hnsw_repair_s = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count(); - }*/ - if (now_ts_seconds - prev_remove_expired_keys_s >= remove_expired_keys_interval_s) { // Do housekeeping for authmanager CollectionManager::get_instance().getAuthManager().do_housekeeping(); @@ -72,6 +60,78 @@ void HouseKeeper::stop() { cv.notify_all(); } -void HouseKeeper::init(uint32_t interval_seconds) { - this->hnsw_repair_interval_s = interval_seconds; +void HouseKeeper::init() { +} + +uint64_t HouseKeeper::get_active_memory_used() { + return active_memory_used; +} + +void HouseKeeper::add_req(const std::shared_ptr& req) { + std::unique_lock ifq_lock(ifq_mutex); + in_flight_queries.emplace(req->start_ts, req_metadata_t(req, get_active_memory_used())); +} + +void HouseKeeper::remove_req(uint64_t req_id) { + std::unique_lock ifq_lock(ifq_mutex); + in_flight_queries.erase(req_id); +} + +std::string HouseKeeper::get_query_log(const std::shared_ptr& req) { + std::string search_payload = req->body; + StringUtils::erase_char(search_payload, '\n'); + std::string query_string = "?"; + for(const auto& param_kv: req->params) { + if(param_kv.first != http_req::AUTH_HEADER && param_kv.first != http_req::USER_HEADER) { + query_string += param_kv.first + "=" + param_kv.second + "&"; + } + } + + return std::string("id=") + std::to_string(req->start_ts) + ", qs=" + query_string + ", body=" + search_payload; +} + +void HouseKeeper::log_running_queries() { + std::unique_lock ifq_lock(ifq_mutex); + if(in_flight_queries.empty()) { + LOG(INFO) << "No in-flight search queries were found."; + return ; + } + + LOG(INFO) << "Dump of in-flight search queries:"; + + for(const auto& kv: in_flight_queries) { + LOG(INFO) << get_query_log(kv.second.req); + } +} + +void HouseKeeper::log_bad_queries() { + std::unique_lock ifq_lock(ifq_mutex); + + auto now_ts_seconds = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + + for(auto& kv: in_flight_queries) { + auto req_ts = kv.first; + + if(now_ts_seconds - req_ts < memory_req_min_age_s) { + // since we use a map it's already ordered ascending on timestamp + break; + } + + if(kv.second.already_logged) { + continue; + } + + // query that's atleast 10 seconds old: check if memory difference exceeds 1 GB + int64_t memory_req_start = kv.second.active_memory; + int64_t curr_memory = active_memory_used; + int64_t memory_diff = curr_memory - memory_req_start; + const int64_t one_gb = 1073741824; + + if(memory_diff > one_gb) { + LOG(INFO) << "Detected bad query, start_ts: " << req_ts << ", memory_diff: " << memory_diff + << ", " << get_query_log(kv.second.req); + kv.second.already_logged = true; + } + } } diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 6e554a1f..c91b91de 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -1,3 +1,4 @@ +#include #include "typesense_server_utils.h" #include "core_api.h" #include "tsconfig.h" @@ -131,7 +132,7 @@ void crash_callback(int sig, backward::StackTrace& st) { } } - log_running_queries(); + HouseKeeper::get_instance().log_running_queries(); LOG(ERROR) << "Typesense " << TYPESENSE_VERSION << " is terminating abruptly."; } diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 4218029b..b6bb3207 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -516,7 +516,7 @@ int run_server(const Config & config, const std::string & version, void (*master ConversationManager::get_instance().run(); }); - HouseKeeper::get_instance().init(config.get_housekeeping_interval()); + HouseKeeper::get_instance().init(); std::thread housekeeping_thread([]() { HouseKeeper::get_instance().run(); });