diff --git a/include/cached_resource_stat.h b/include/cached_resource_stat.h new file mode 100644 index 00000000..6c620ee4 --- /dev/null +++ b/include/cached_resource_stat.h @@ -0,0 +1,30 @@ +#pragma once +#include +#include +#include +#include + +struct cached_resource_stat_t { + const static size_t REFRESH_INTERVAL_SECS = 5; + uint64_t disk_total_bytes = 0; + uint64_t disk_used_bytes = 0; + + uint64_t memory_total_bytes = 0; + uint64_t memory_available_bytes = 0; + + uint64_t swap_total_bytes = 0; + uint64_t swap_free_bytes = 0; + + uint64_t last_checked_ts = 0; + + enum resource_check_t { + OK, + OUT_OF_DISK, + OUT_OF_MEMORY + }; + + // On Mac, we will only check for disk usage + resource_check_t has_enough_resources(const std::string& data_dir_path, + const int disk_used_max_percentage, + const int memory_used_max_percentage); +}; diff --git a/include/config.h b/include/config.h index aa4a4d30..305c7f81 100644 --- a/include/config.h +++ b/include/config.h @@ -55,6 +55,7 @@ private: bool enable_access_logging; int disk_used_max_percentage; + int memory_used_max_percentage; std::atomic skip_writes; @@ -77,6 +78,7 @@ protected: this->ssl_refresh_interval_seconds = 8 * 60 * 60; this->enable_access_logging = false; this->disk_used_max_percentage = 100; + this->memory_used_max_percentage = 100; this->skip_writes = false; } @@ -267,6 +269,10 @@ public: return this->disk_used_max_percentage; } + int get_memory_used_max_percentage() const { + return this->memory_used_max_percentage; + } + std::string get_access_log_path() const { if(this->log_dir.empty()) { return ""; @@ -384,6 +390,10 @@ public: this->disk_used_max_percentage = std::stoi(get_env("TYPESENSE_DISK_USED_MAX_PERCENTAGE")); } + if(!get_env("TYPESENSE_MEMORY_USED_MAX_PERCENTAGE").empty()) { + this->memory_used_max_percentage = std::stoi(get_env("TYPESENSE_MEMORY_USED_MAX_PERCENTAGE")); + } + this->skip_writes = ("TRUE" == get_env("TYPESENSE_SKIP_WRITES")); } @@ -528,6 +538,10 @@ public: this->disk_used_max_percentage = (int) reader.GetInteger("server", "disk-used-max-percentage", 100); } + if(reader.Exists("server", "memory-used-max-percentage")) { + this->memory_used_max_percentage = (int) reader.GetInteger("server", "memory-used-max-percentage", 100); + } + if(reader.Exists("server", "skip-writes")) { auto skip_writes_str = reader.Get("server", "skip-writes", "false"); this->skip_writes = (skip_writes_str == "true"); @@ -653,6 +667,10 @@ public: this->disk_used_max_percentage = options.get("disk-used-max-percentage"); } + if(options.exist("memory-used-max-percentage")) { + this->memory_used_max_percentage = options.get("memory-used-max-percentage"); + } + if(options.exist("skip-writes")) { this->skip_writes = options.get("skip-writes"); } diff --git a/include/raft_server.h b/include/raft_server.h index c55ebf0c..d788dada 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -8,12 +8,12 @@ #include // braft::ProtoBufFile #include #include -#include #include "http_data.h" #include "threadpool.h" #include "http_server.h" #include "batched_indexer.h" +#include "cached_resource_stat.h" class Store; class ReplicationState; @@ -95,30 +95,6 @@ public: void Run(); }; -struct cached_disk_stat_t { - const static size_t REFRESH_INTERVAL_SECS = 30; - uint64_t disk_total_bytes = 0; - uint64_t disk_used_bytes = 0; - uint64_t last_checked_ts = 0; - - bool has_enough_space(const std::string& data_dir_path, const int disk_used_max_percentage) { - uint64_t now = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count(); - - if((now - last_checked_ts) > REFRESH_INTERVAL_SECS) { - struct statvfs st{}; - statvfs(data_dir_path.c_str(), &st); - disk_total_bytes = st.f_blocks * st.f_frsize; - disk_used_bytes = (st.f_blocks - st.f_bavail) * st.f_frsize; - last_checked_ts = now; - } - - int disk_used_percentage = (double(disk_used_bytes)/double(disk_total_bytes)) * 100; - return disk_used_percentage <= disk_used_max_percentage; - } -}; - - // Implements braft::StateMachine. class ReplicationState : public braft::StateMachine { private: @@ -160,7 +136,7 @@ private: std::atomic shutting_down; std::atomic pending_writes; - cached_disk_stat_t cached_disk_stat; + cached_resource_stat_t cached_disk_stat; const uint64_t snapshot_interval_s; // frequency of actual snapshotting uint64_t last_snapshot_ts; // when last snapshot ran diff --git a/src/cached_resource_stat.cpp b/src/cached_resource_stat.cpp new file mode 100644 index 00000000..1a7eb5bf --- /dev/null +++ b/src/cached_resource_stat.cpp @@ -0,0 +1,95 @@ +#include "cached_resource_stat.h" +#include +#include "logger.h" + +cached_resource_stat_t::resource_check_t +cached_resource_stat_t::has_enough_resources(const std::string& data_dir_path, + const int disk_used_max_percentage, + const int memory_used_max_percentage) { + + if(disk_used_max_percentage == 100 && memory_used_max_percentage == 100) { + return cached_resource_stat_t::OK; + } + + uint64_t now = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + + if((now - last_checked_ts) > REFRESH_INTERVAL_SECS) { + // get disk usage + struct statvfs st{}; + statvfs(data_dir_path.c_str(), &st); + disk_total_bytes = st.f_blocks * st.f_frsize; + disk_used_bytes = (st.f_blocks - st.f_bavail) * st.f_frsize; + + // get memory and swap usage + std::string token; + std::ifstream file("/proc/meminfo"); + + while(file >> token) { + if(token == "MemTotal:") { + uint64_t value_kb; + if(file >> value_kb) { + memory_total_bytes = value_kb * 1024; + } + } + + else if(token == "MemAvailable:") { + uint64_t value_kb; + if(file >> value_kb) { + memory_available_bytes = value_kb * 1024; + } + } + + else if(token == "SwapTotal:") { + uint64_t value_kb; + if(file >> value_kb) { + swap_total_bytes = value_kb * 1024; + } + } + + else if(token == "SwapFree:") { + uint64_t value_kb; + if(file >> value_kb) { + swap_free_bytes = value_kb * 1024; + } + + // since "SwapFree" appears last in the file + break; + } + } + + last_checked_ts = now; + } + + int disk_used_percentage = (double(disk_used_bytes)/double(disk_total_bytes)) * 100; + if(disk_used_percentage > disk_used_max_percentage) { + return cached_resource_stat_t::OUT_OF_DISK; + } + + if(memory_total_bytes == 0) { + // if there is an error in fetching the stat, we will return `OK` + return cached_resource_stat_t::OK; + } + + LOG(INFO) << "memory_total_bytes: " << memory_total_bytes << ", memory_available_bytes: " << memory_available_bytes; + + // Calculate sum of RAM + SWAP used as all_memory_used + uint64_t all_memory_used = (memory_total_bytes - memory_available_bytes) + (swap_total_bytes - swap_free_bytes); + + if(all_memory_used >= memory_total_bytes) { + return cached_resource_stat_t::OUT_OF_MEMORY; + } + + // compare with 500M or `100 - memory_used_max_percentage` of total memory, whichever is lower + uint64_t memory_free_min_bytes = std::min(500ULL * 1024 * 1024, + ((100ULL - memory_used_max_percentage) * memory_total_bytes) / 100); + uint64_t free_mem = (memory_total_bytes - all_memory_used); + + LOG(INFO) << "free_mem: " << free_mem << ", memory_free_min_bytes: " << memory_free_min_bytes; + + if(free_mem < memory_free_min_bytes) { + return cached_resource_stat_t::OUT_OF_MEMORY; + } + + return cached_resource_stat_t::OK; +} diff --git a/src/raft_server.cpp b/src/raft_server.cpp index a4e5398e..e63af5a2 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -186,8 +186,12 @@ void ReplicationState::write(const std::shared_ptr& request, const std } // reject write if disk space is running out - if(!cached_disk_stat.has_enough_space(raft_dir_path, config->get_disk_used_max_percentage())) { - response->set_500("Rejecting write: running out of disk space!"); + auto resource_check = cached_disk_stat.has_enough_resources(raft_dir_path, config->get_disk_used_max_percentage(), + config->get_memory_used_max_percentage()); + if (resource_check != cached_resource_stat_t::OK) { + response->set_422("Rejecting write: running out of resource type: " + + std::string(magic_enum::enum_name(resource_check))); + response->final = true; auto req_res = new async_req_res_t(request, response, true); return message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); } diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 5332caec..8e6ceeca 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -102,6 +102,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.add("enable-access-logging", '\0', "Enable access logging.", false, false); options.add("disk-used-max-percentage", '\0', "Reject writes when used disk space exceeds this percentage. Default: 100 (never reject).", false, 100); + options.add("memory-used-max-percentage", '\0', "Reject writes when memory usage exceeds this percentage. Default: 100 (never reject).", false, 100); options.add("skip-writes", '\0', "Skip all writes except config changes. Default: false.", false, false); // DEPRECATED