Add resource check to batch indexer as well.

This commit is contained in:
Kishore Nallan 2022-12-10 19:21:54 +05:30
parent 8be1f33f38
commit e3dee78a1b
7 changed files with 39 additions and 13 deletions

View File

@ -6,6 +6,7 @@
#include "http_data.h"
#include "threadpool.h"
#include "http_server.h"
#include "config.h"
class BatchedIndexer {
private:
@ -73,6 +74,8 @@ private:
// When set, all writes (both live and log serialized) are skipped with 422 response
const std::atomic<bool>& skip_writes;
const Config& config;
static const size_t GC_INTERVAL_SECONDS = 60;
static const size_t GC_PRUNE_MAX_SECONDS = 3600;
@ -85,7 +88,7 @@ public:
static const constexpr char* RAFT_REQ_LOG_PREFIX = "$RL_";
BatchedIndexer(HttpServer* server, Store* store, Store* meta_store, size_t num_threads,
const std::atomic<bool>& skip_writes);
const Config& config, const std::atomic<bool>& skip_writes);
~BatchedIndexer();

View File

@ -4,7 +4,8 @@
#include <string>
#include <sys/statvfs.h>
struct cached_resource_stat_t {
class cached_resource_stat_t {
private:
const static size_t REFRESH_INTERVAL_SECS = 5;
uint64_t disk_total_bytes = 0;
uint64_t disk_used_bytes = 0;
@ -17,6 +18,17 @@ struct cached_resource_stat_t {
uint64_t last_checked_ts = 0;
cached_resource_stat_t() = default;
~cached_resource_stat_t() = default;
public:
static cached_resource_stat_t& get_instance() {
static cached_resource_stat_t instance;
return instance;
}
enum resource_check_t {
OK,
OUT_OF_DISK,

View File

@ -136,8 +136,6 @@ private:
std::atomic<bool> shutting_down;
std::atomic<size_t> pending_writes;
cached_resource_stat_t cached_disk_stat;
const uint64_t snapshot_interval_s; // frequency of actual snapshotting
uint64_t last_snapshot_ts; // when last snapshot ran

View File

@ -1,12 +1,13 @@
#include "batched_indexer.h"
#include "core_api.h"
#include "thread_local_vars.h"
#include "cached_resource_stat.h"
BatchedIndexer::BatchedIndexer(HttpServer* server, Store* store, Store* meta_store, const size_t num_threads,
const std::atomic<bool>& skip_writes):
const Config& config, const std::atomic<bool>& skip_writes):
server(server), store(store), meta_store(meta_store), num_threads(num_threads),
last_gc_run(std::chrono::high_resolution_clock::now()), quit(false),
skip_writes(skip_writes) {
config(config), skip_writes(skip_writes) {
queues.resize(num_threads);
qmutuxes = new await_t[num_threads];
skip_index_iter_upper_bound = new rocksdb::Slice(skip_index_upper_bound_key);
@ -194,8 +195,19 @@ void BatchedIndexer::run() {
else {
//LOG(INFO) << "index req " << req_id << ", chunk index: " << orig_req_res.next_chunk_index;
auto resource_check = cached_resource_stat_t::get_instance()
.has_enough_resources(config.get_data_dir(),
config.get_disk_used_max_percentage(),
config.get_memory_used_max_percentage());
if(route_found) {
if (resource_check != cached_resource_stat_t::OK && orig_req->http_method != "DELETE") {
orig_res->set_422("Rejecting write: running out of resource type: " +
std::string(magic_enum::enum_name(resource_check)));
orig_res->final = true;
async_res = false;
}
else if(route_found) {
if(skip_writes && found_rpath->handler != post_config) {
orig_res->set(422, "Skipping write.");
async_req_res_t* async_req_res = new async_req_res_t(orig_req, orig_res, true);

View File

@ -71,7 +71,7 @@ cached_resource_stat_t::has_enough_resources(const std::string& data_dir_path,
return cached_resource_stat_t::OK;
}
LOG(INFO) << "memory_total_bytes: " << memory_total_bytes << ", memory_available_bytes: " << memory_available_bytes;
//LOG(INFO) << "memory_total_bytes: " << memory_total_bytes << ", memory_available_bytes: " << memory_available_bytes;
// Calculate sum of RAM + SWAP used as all_memory_used
uint64_t all_memory_used = (memory_total_bytes - memory_available_bytes) + (swap_total_bytes - swap_free_bytes);
@ -85,7 +85,7 @@ cached_resource_stat_t::has_enough_resources(const std::string& data_dir_path,
((100ULL - memory_used_max_percentage) * memory_total_bytes) / 100);
uint64_t free_mem = (memory_total_bytes - all_memory_used);
LOG(INFO) << "free_mem: " << free_mem << ", memory_free_min_bytes: " << memory_free_min_bytes;
//LOG(INFO) << "free_mem: " << free_mem << ", memory_free_min_bytes: " << memory_free_min_bytes;
if(free_mem < memory_free_min_bytes) {
return cached_resource_stat_t::OUT_OF_MEMORY;

View File

@ -186,9 +186,10 @@ void ReplicationState::write(const std::shared_ptr<http_req>& request, const std
}
// reject write if disk space is running out
auto resource_check = cached_disk_stat.has_enough_resources(raft_dir_path, config->get_disk_used_max_percentage(),
config->get_memory_used_max_percentage());
if (resource_check != cached_resource_stat_t::OK) {
auto resource_check = cached_resource_stat_t::get_instance().has_enough_resources(raft_dir_path,
config->get_disk_used_max_percentage(), config->get_memory_used_max_percentage());
if (resource_check != cached_resource_stat_t::OK && request->http_method != "DELETE") {
response->set_422("Rejecting write: running out of resource type: " +
std::string(magic_enum::enum_name(resource_check)));
response->final = true;

View File

@ -439,7 +439,7 @@ int run_server(const Config & config, const std::string & version, void (*master
bool ssl_enabled = (!config.get_ssl_cert().empty() && !config.get_ssl_cert_key().empty());
BatchedIndexer* batch_indexer = new BatchedIndexer(server, &store, &meta_store, num_threads,
config.get_skip_writes());
config, config.get_skip_writes());
CollectionManager & collectionManager = CollectionManager::get_instance();
collectionManager.init(&store, &app_thread_pool, config.get_max_memory_ratio(),