Log long running queries with large memory usage.

This commit is contained in:
Kishore Nallan 2024-11-09 18:26:39 +05:30
parent ec1e53aac5
commit 7cd4f90453
7 changed files with 129 additions and 69 deletions

View File

@ -178,8 +178,6 @@ void get_collections_for_auth(std::map<std::string, std::string>& req_params, co
std::vector<collection_key_t>& collections,
std::vector<nlohmann::json>& embedded_params_vec);
void log_running_queries();
bool is_doc_import_route(uint64_t route_hash);
bool is_coll_create_route(uint64_t route_hash);

View File

@ -2,6 +2,7 @@
#include <mutex>
#include <atomic>
#include <condition_variable>
#include "http_data.h"
class HouseKeeper {
private:
@ -9,14 +10,31 @@ private:
std::condition_variable cv;
std::atomic<bool> quit = false;
std::atomic<uint32_t> hnsw_repair_interval_s = 1800;
std::atomic<uint32_t> remove_expired_keys_interval_s = 3600;
std::atomic<uint32_t> memory_req_min_age_s = 10;
std::atomic<uint32_t> memory_usage_interval_s = 3;
// used to track in-flight queries so they can be logged during a crash / rapid memory growth
std::mutex ifq_mutex;
struct req_metadata_t {
std::shared_ptr<http_req> req;
uint64_t active_memory = 0;
bool already_logged = false;
req_metadata_t(const std::shared_ptr<http_req>& req, uint64_t active_memory):
req(req), active_memory(active_memory) {
}
};
std::map<uint64_t, req_metadata_t> in_flight_queries;
std::atomic<uint64_t> active_memory_used = 0;
HouseKeeper() {}
~HouseKeeper() {}
public:
static HouseKeeper &get_instance() {
@ -28,7 +46,19 @@ public:
void operator=(HouseKeeper const &) = delete;
void init(uint32_t interval_seconds);
void init();
uint64_t get_active_memory_used();
void add_req(const std::shared_ptr<http_req>& req);
void remove_req(uint64_t req_id);
std::string get_query_log(const std::shared_ptr<http_req>& req);
void log_bad_queries();
void log_running_queries();
void run();

View File

@ -139,8 +139,6 @@ private:
static uint64_t linux_get_mem_available_bytes();
static uint64_t get_memory_active_bytes();
static uint64_t get_memory_non_proc_bytes();
public:
@ -152,6 +150,8 @@ public:
non_proc_mem_bytes = memory_used_bytes - get_memory_active_bytes();
}
static uint64_t get_memory_active_bytes();
static void linux_get_network_data(const std::string & stat_path, uint64_t& received_bytes, uint64_t& sent_bytes);
void get(const std::string & data_dir_path, nlohmann::json& result);

View File

@ -3,6 +3,7 @@
#include <app_metrics.h>
#include <regex>
#include <analytics_manager.h>
#include <housekeeper.h>
#include "typesense_server_utils.h"
#include "core_api.h"
#include "string_utils.h"
@ -44,24 +45,17 @@ public:
}
};
// used to log the queries that were in-flight during a crash
std::mutex ifq_mutex;
std::unordered_map<uint64_t, std::shared_ptr<http_req>> in_flight_queries;
class in_flight_req_guard_t {
uint64_t req_id;
public:
in_flight_req_guard_t(const std::shared_ptr<http_req>& req) {
std::unique_lock ifq_lock(ifq_mutex);
in_flight_queries.emplace(req->start_ts, req);
req_id = req->start_ts;
ifq_lock.unlock();
HouseKeeper::get_instance().add_req(req);
}
~in_flight_req_guard_t() {
std::unique_lock ifq_lock(ifq_mutex);
in_flight_queries.erase(req_id);
ifq_lock.unlock();
HouseKeeper::get_instance().remove_req(req_id);
}
};
@ -75,29 +69,6 @@ bool get_alter_in_progress(const std::string& collection) {
return alters_in_progress.count(collection) != 0;
}
void log_running_queries() {
std::unique_lock ifq_lock(ifq_mutex);
if(in_flight_queries.empty()) {
LOG(INFO) << "No in-flight search queries were found.";
return ;
}
LOG(INFO) << "Dump of in-flight search queries:";
for(const auto& kv: in_flight_queries) {
std::string query_string = "?";
std::string search_payload = kv.second->body;
StringUtils::erase_char(search_payload, '\n');
for(const auto& param_kv: kv.second->params) {
if(param_kv.first != http_req::AUTH_HEADER && param_kv.first != http_req::USER_HEADER) {
query_string += param_kv.first + "=" + param_kv.second + "&";
}
}
LOG(INFO) << "id=" << kv.first << ", qs=" << query_string << ", body=" << search_payload;
}
}
bool handle_authentication(std::map<std::string, std::string>& req_params,
std::vector<nlohmann::json>& embedded_params_vec,
const std::string& body,
@ -418,7 +389,7 @@ bool get_debug(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_
}
if(log_inflight_queries) {
log_running_queries();
HouseKeeper::get_instance().log_running_queries();
}
nlohmann::json result;

View File

@ -1,19 +1,21 @@
#include <map>
#include <collection_manager.h>
#include <system_metrics.h>
#include "housekeeper.h"
void HouseKeeper::run() {
uint64_t prev_hnsw_repair_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t prev_remove_expired_keys_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t prev_db_compaction_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t prev_memory_usage_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
while(!quit) {
std::unique_lock lk(mutex);
cv.wait_for(lk, std::chrono::seconds(60), [&] { return quit.load(); });
cv.wait_for(lk, std::chrono::milliseconds(3050), [&] { return quit.load(); });
if(quit) {
lk.unlock();
@ -23,6 +25,13 @@ void HouseKeeper::run() {
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
// update system memory usage
if (now_ts_seconds - prev_memory_usage_s >= memory_usage_interval_s) {
active_memory_used = SystemMetrics::get_memory_active_bytes();
prev_memory_usage_s = now_ts_seconds;
log_bad_queries();
}
// perform compaction on underlying store if enabled
if(Config::get_instance().get_db_compaction_interval() > 0) {
if(now_ts_seconds - prev_db_compaction_s >= Config::get_instance().get_db_compaction_interval()) {
@ -34,27 +43,6 @@ void HouseKeeper::run() {
}
}
/*if(now_ts_seconds - prev_hnsw_repair_s >= hnsw_repair_interval_s) {
// iterate through all collections and repair all hnsw graphs (if any)
auto coll_names = CollectionManager::get_instance().get_collection_names();
for(auto& coll_name: coll_names) {
auto coll = CollectionManager::get_instance().get_collection(coll_name);
if(coll == nullptr) {
continue;
}
coll->do_housekeeping();
}
if(!coll_names.empty()) {
LOG(INFO) << "Ran housekeeping for " << coll_names.size() << " collections.";
}
prev_hnsw_repair_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}*/
if (now_ts_seconds - prev_remove_expired_keys_s >= remove_expired_keys_interval_s) {
// Do housekeeping for authmanager
CollectionManager::get_instance().getAuthManager().do_housekeeping();
@ -72,6 +60,78 @@ void HouseKeeper::stop() {
cv.notify_all();
}
void HouseKeeper::init(uint32_t interval_seconds) {
this->hnsw_repair_interval_s = interval_seconds;
void HouseKeeper::init() {
}
uint64_t HouseKeeper::get_active_memory_used() {
return active_memory_used;
}
void HouseKeeper::add_req(const std::shared_ptr<http_req>& req) {
std::unique_lock ifq_lock(ifq_mutex);
in_flight_queries.emplace(req->start_ts, req_metadata_t(req, get_active_memory_used()));
}
void HouseKeeper::remove_req(uint64_t req_id) {
std::unique_lock ifq_lock(ifq_mutex);
in_flight_queries.erase(req_id);
}
std::string HouseKeeper::get_query_log(const std::shared_ptr<http_req>& req) {
std::string search_payload = req->body;
StringUtils::erase_char(search_payload, '\n');
std::string query_string = "?";
for(const auto& param_kv: req->params) {
if(param_kv.first != http_req::AUTH_HEADER && param_kv.first != http_req::USER_HEADER) {
query_string += param_kv.first + "=" + param_kv.second + "&";
}
}
return std::string("id=") + std::to_string(req->start_ts) + ", qs=" + query_string + ", body=" + search_payload;
}
void HouseKeeper::log_running_queries() {
std::unique_lock ifq_lock(ifq_mutex);
if(in_flight_queries.empty()) {
LOG(INFO) << "No in-flight search queries were found.";
return ;
}
LOG(INFO) << "Dump of in-flight search queries:";
for(const auto& kv: in_flight_queries) {
LOG(INFO) << get_query_log(kv.second.req);
}
}
void HouseKeeper::log_bad_queries() {
std::unique_lock ifq_lock(ifq_mutex);
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
for(auto& kv: in_flight_queries) {
auto req_ts = kv.first;
if(now_ts_seconds - req_ts < memory_req_min_age_s) {
// since we use a map it's already ordered ascending on timestamp
break;
}
if(kv.second.already_logged) {
continue;
}
// query that's atleast 10 seconds old: check if memory difference exceeds 1 GB
int64_t memory_req_start = kv.second.active_memory;
int64_t curr_memory = active_memory_used;
int64_t memory_diff = curr_memory - memory_req_start;
const int64_t one_gb = 1073741824;
if(memory_diff > one_gb) {
LOG(INFO) << "Detected bad query, start_ts: " << req_ts << ", memory_diff: " << memory_diff
<< ", " << get_query_log(kv.second.req);
kv.second.already_logged = true;
}
}
}

View File

@ -1,3 +1,4 @@
#include <housekeeper.h>
#include "typesense_server_utils.h"
#include "core_api.h"
#include "tsconfig.h"
@ -131,7 +132,7 @@ void crash_callback(int sig, backward::StackTrace& st) {
}
}
log_running_queries();
HouseKeeper::get_instance().log_running_queries();
LOG(ERROR) << "Typesense " << TYPESENSE_VERSION << " is terminating abruptly.";
}

View File

@ -516,7 +516,7 @@ int run_server(const Config & config, const std::string & version, void (*master
ConversationManager::get_instance().run();
});
HouseKeeper::get_instance().init(config.get_housekeeping_interval());
HouseKeeper::get_instance().init();
std::thread housekeeping_thread([]() {
HouseKeeper::get_instance().run();
});