Guard for preventing writes when memory/disk thresholds are breached.

This commit is contained in:
Kishore Nallan 2022-11-15 20:25:45 +05:30
parent 64def55527
commit 83495c817b
6 changed files with 152 additions and 28 deletions

View File

@ -0,0 +1,30 @@
#pragma once
#include <cstdint>
#include <chrono>
#include <string>
#include <sys/statvfs.h>
struct cached_resource_stat_t {
const static size_t REFRESH_INTERVAL_SECS = 5;
uint64_t disk_total_bytes = 0;
uint64_t disk_used_bytes = 0;
uint64_t memory_total_bytes = 0;
uint64_t memory_available_bytes = 0;
uint64_t swap_total_bytes = 0;
uint64_t swap_free_bytes = 0;
uint64_t last_checked_ts = 0;
enum resource_check_t {
OK,
OUT_OF_DISK,
OUT_OF_MEMORY
};
// On Mac, we will only check for disk usage
resource_check_t has_enough_resources(const std::string& data_dir_path,
const int disk_used_max_percentage,
const int memory_used_max_percentage);
};

View File

@ -55,6 +55,7 @@ private:
bool enable_access_logging;
int disk_used_max_percentage;
int memory_used_max_percentage;
std::atomic<bool> skip_writes;
@ -77,6 +78,7 @@ protected:
this->ssl_refresh_interval_seconds = 8 * 60 * 60;
this->enable_access_logging = false;
this->disk_used_max_percentage = 100;
this->memory_used_max_percentage = 100;
this->skip_writes = false;
}
@ -267,6 +269,10 @@ public:
return this->disk_used_max_percentage;
}
int get_memory_used_max_percentage() const {
return this->memory_used_max_percentage;
}
std::string get_access_log_path() const {
if(this->log_dir.empty()) {
return "";
@ -384,6 +390,10 @@ public:
this->disk_used_max_percentage = std::stoi(get_env("TYPESENSE_DISK_USED_MAX_PERCENTAGE"));
}
if(!get_env("TYPESENSE_MEMORY_USED_MAX_PERCENTAGE").empty()) {
this->memory_used_max_percentage = std::stoi(get_env("TYPESENSE_MEMORY_USED_MAX_PERCENTAGE"));
}
this->skip_writes = ("TRUE" == get_env("TYPESENSE_SKIP_WRITES"));
}
@ -528,6 +538,10 @@ public:
this->disk_used_max_percentage = (int) reader.GetInteger("server", "disk-used-max-percentage", 100);
}
if(reader.Exists("server", "memory-used-max-percentage")) {
this->memory_used_max_percentage = (int) reader.GetInteger("server", "memory-used-max-percentage", 100);
}
if(reader.Exists("server", "skip-writes")) {
auto skip_writes_str = reader.Get("server", "skip-writes", "false");
this->skip_writes = (skip_writes_str == "true");
@ -653,6 +667,10 @@ public:
this->disk_used_max_percentage = options.get<int>("disk-used-max-percentage");
}
if(options.exist("memory-used-max-percentage")) {
this->memory_used_max_percentage = options.get<int>("memory-used-max-percentage");
}
if(options.exist("skip-writes")) {
this->skip_writes = options.get<bool>("skip-writes");
}

View File

@ -8,12 +8,12 @@
#include <braft/protobuf_file.h> // braft::ProtoBufFile
#include <rocksdb/db.h>
#include <future>
#include <sys/statvfs.h>
#include "http_data.h"
#include "threadpool.h"
#include "http_server.h"
#include "batched_indexer.h"
#include "cached_resource_stat.h"
class Store;
class ReplicationState;
@ -95,30 +95,6 @@ public:
void Run();
};
struct cached_disk_stat_t {
const static size_t REFRESH_INTERVAL_SECS = 30;
uint64_t disk_total_bytes = 0;
uint64_t disk_used_bytes = 0;
uint64_t last_checked_ts = 0;
bool has_enough_space(const std::string& data_dir_path, const int disk_used_max_percentage) {
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
if((now - last_checked_ts) > REFRESH_INTERVAL_SECS) {
struct statvfs st{};
statvfs(data_dir_path.c_str(), &st);
disk_total_bytes = st.f_blocks * st.f_frsize;
disk_used_bytes = (st.f_blocks - st.f_bavail) * st.f_frsize;
last_checked_ts = now;
}
int disk_used_percentage = (double(disk_used_bytes)/double(disk_total_bytes)) * 100;
return disk_used_percentage <= disk_used_max_percentage;
}
};
// Implements braft::StateMachine.
class ReplicationState : public braft::StateMachine {
private:
@ -160,7 +136,7 @@ private:
std::atomic<bool> shutting_down;
std::atomic<size_t> pending_writes;
cached_disk_stat_t cached_disk_stat;
cached_resource_stat_t cached_disk_stat;
const uint64_t snapshot_interval_s; // frequency of actual snapshotting
uint64_t last_snapshot_ts; // when last snapshot ran

View File

@ -0,0 +1,95 @@
#include "cached_resource_stat.h"
#include <fstream>
#include "logger.h"
cached_resource_stat_t::resource_check_t
cached_resource_stat_t::has_enough_resources(const std::string& data_dir_path,
const int disk_used_max_percentage,
const int memory_used_max_percentage) {
if(disk_used_max_percentage == 100 && memory_used_max_percentage == 100) {
return cached_resource_stat_t::OK;
}
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
if((now - last_checked_ts) > REFRESH_INTERVAL_SECS) {
// get disk usage
struct statvfs st{};
statvfs(data_dir_path.c_str(), &st);
disk_total_bytes = st.f_blocks * st.f_frsize;
disk_used_bytes = (st.f_blocks - st.f_bavail) * st.f_frsize;
// get memory and swap usage
std::string token;
std::ifstream file("/proc/meminfo");
while(file >> token) {
if(token == "MemTotal:") {
uint64_t value_kb;
if(file >> value_kb) {
memory_total_bytes = value_kb * 1024;
}
}
else if(token == "MemAvailable:") {
uint64_t value_kb;
if(file >> value_kb) {
memory_available_bytes = value_kb * 1024;
}
}
else if(token == "SwapTotal:") {
uint64_t value_kb;
if(file >> value_kb) {
swap_total_bytes = value_kb * 1024;
}
}
else if(token == "SwapFree:") {
uint64_t value_kb;
if(file >> value_kb) {
swap_free_bytes = value_kb * 1024;
}
// since "SwapFree" appears last in the file
break;
}
}
last_checked_ts = now;
}
int disk_used_percentage = (double(disk_used_bytes)/double(disk_total_bytes)) * 100;
if(disk_used_percentage > disk_used_max_percentage) {
return cached_resource_stat_t::OUT_OF_DISK;
}
if(memory_total_bytes == 0) {
// if there is an error in fetching the stat, we will return `OK`
return cached_resource_stat_t::OK;
}
LOG(INFO) << "memory_total_bytes: " << memory_total_bytes << ", memory_available_bytes: " << memory_available_bytes;
// Calculate sum of RAM + SWAP used as all_memory_used
uint64_t all_memory_used = (memory_total_bytes - memory_available_bytes) + (swap_total_bytes - swap_free_bytes);
if(all_memory_used >= memory_total_bytes) {
return cached_resource_stat_t::OUT_OF_MEMORY;
}
// compare with 500M or `100 - memory_used_max_percentage` of total memory, whichever is lower
uint64_t memory_free_min_bytes = std::min<uint64_t>(500ULL * 1024 * 1024,
((100ULL - memory_used_max_percentage) * memory_total_bytes) / 100);
uint64_t free_mem = (memory_total_bytes - all_memory_used);
LOG(INFO) << "free_mem: " << free_mem << ", memory_free_min_bytes: " << memory_free_min_bytes;
if(free_mem < memory_free_min_bytes) {
return cached_resource_stat_t::OUT_OF_MEMORY;
}
return cached_resource_stat_t::OK;
}

View File

@ -186,8 +186,12 @@ void ReplicationState::write(const std::shared_ptr<http_req>& request, const std
}
// reject write if disk space is running out
if(!cached_disk_stat.has_enough_space(raft_dir_path, config->get_disk_used_max_percentage())) {
response->set_500("Rejecting write: running out of disk space!");
auto resource_check = cached_disk_stat.has_enough_resources(raft_dir_path, config->get_disk_used_max_percentage(),
config->get_memory_used_max_percentage());
if (resource_check != cached_resource_stat_t::OK) {
response->set_422("Rejecting write: running out of resource type: " +
std::string(magic_enum::enum_name(resource_check)));
response->final = true;
auto req_res = new async_req_res_t(request, response, true);
return message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
}

View File

@ -102,6 +102,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.add<bool>("enable-access-logging", '\0', "Enable access logging.", false, false);
options.add<int>("disk-used-max-percentage", '\0', "Reject writes when used disk space exceeds this percentage. Default: 100 (never reject).", false, 100);
options.add<int>("memory-used-max-percentage", '\0', "Reject writes when memory usage exceeds this percentage. Default: 100 (never reject).", false, 100);
options.add<bool>("skip-writes", '\0', "Skip all writes except config changes. Default: false.", false, false);
// DEPRECATED