Do explicit compaction instead of automatic periodic compaction.

This is required until RocksDB fixes the behavior of periodic compaction seconds: https://github.com/facebook/rocksdb/issues/12165
This commit is contained in:
Kishore Nallan 2023-12-27 17:26:30 +05:30
parent 94f5e4f8f5
commit b1805978b6
5 changed files with 35 additions and 25 deletions

View File

@ -9,7 +9,7 @@ private:
std::condition_variable cv;
std::atomic<bool> quit = false;
std::atomic<uint32_t> interval_seconds = 1800;
std::atomic<uint32_t> hnsw_repair_interval_s = 1800;
HouseKeeper() {}

View File

@ -84,8 +84,7 @@ public:
Store(const std::string & state_dir_path,
const size_t wal_ttl_secs = 24*60*60,
const size_t wal_size_mb = 1024, bool disable_wal = true,
const size_t db_compaction_interval = 604800): state_dir_path(state_dir_path) {
const size_t wal_size_mb = 1024, bool disable_wal = true): state_dir_path(state_dir_path) {
// Optimize RocksDB
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
@ -95,7 +94,6 @@ public:
options.max_write_buffer_number = 2;
options.merge_operator.reset(new UInt64AddOperator);
options.compression = rocksdb::CompressionType::kSnappyCompression;
options.periodic_compaction_seconds = db_compaction_interval;
options.max_log_file_size = 4*1048576;
options.keep_log_file_num = 5;

View File

@ -102,7 +102,7 @@ protected:
this->enable_search_analytics = false;
this->analytics_flush_interval = 3600; // in seconds
this->housekeeping_interval = 1800; // in seconds
this->db_compaction_interval = 604800; // in seconds
this->db_compaction_interval = 0; // in seconds, disabled
}
Config(Config const&) {
@ -632,7 +632,7 @@ public:
}
if(reader.Exists("server", "db-compaction-interval")) {
this->db_compaction_interval = (int) reader.GetInteger("server", "db-compaction-interval", 1800);
this->db_compaction_interval = (int) reader.GetInteger("server", "db-compaction-interval", 0);
}
if(reader.Exists("server", "thread-pool-size")) {

View File

@ -2,7 +2,10 @@
#include "housekeeper.h"
void HouseKeeper::run() {
uint64_t prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
uint64_t prev_hnsw_repair_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t prev_db_compaction_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
while(!quit) {
@ -17,29 +20,38 @@ void HouseKeeper::run() {
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
if(now_ts_seconds - prev_persistence_s < interval_seconds) {
continue;
// perform compaction on underlying store if enabled
if(Config::get_instance().get_db_compaction_interval() > 0) {
if(now_ts_seconds - prev_db_compaction_s >= Config::get_instance().get_db_compaction_interval()) {
LOG(INFO) << "Starting DB compaction.";
CollectionManager::get_instance().get_store()->compact_all();
LOG(INFO) << "Finished DB compaction.";
prev_db_compaction_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
}
// iterate through all collections and repair all hnsw graphs
auto coll_names = CollectionManager::get_instance().get_collection_names();
if(now_ts_seconds - prev_hnsw_repair_s >= hnsw_repair_interval_s) {
// iterate through all collections and repair all hnsw graphs (if any)
auto coll_names = CollectionManager::get_instance().get_collection_names();
for(auto& coll_name: coll_names) {
auto coll = CollectionManager::get_instance().get_collection(coll_name);
if(coll == nullptr) {
continue;
for(auto& coll_name: coll_names) {
auto coll = CollectionManager::get_instance().get_collection(coll_name);
if(coll == nullptr) {
continue;
}
coll->do_housekeeping();
}
coll->do_housekeeping();
}
if(!coll_names.empty()) {
LOG(INFO) << "Ran housekeeping for " << coll_names.size() << " collections.";
}
if(!coll_names.empty()) {
LOG(INFO) << "Ran housekeeping for " << coll_names.size() << " collections.";
prev_hnsw_repair_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
lk.unlock();
}
}
@ -50,5 +62,5 @@ void HouseKeeper::stop() {
}
void HouseKeeper::init(uint32_t interval_seconds) {
this->interval_seconds = interval_seconds;
this->hnsw_repair_interval_s = interval_seconds;
}

View File

@ -395,7 +395,7 @@ int run_server(const Config & config, const std::string & version, void (*master
ThreadPool replication_thread_pool(num_threads);
// primary DB used for storing the documents: we will not use WAL since Raft provides that
Store store(db_dir, 24*60*60, 1024, true, config.get_db_compaction_interval());
Store store(db_dir, 24*60*60, 1024, true);
// meta DB for storing house keeping things
Store meta_store(meta_dir, 24*60*60, 1024, false);
@ -403,7 +403,7 @@ int run_server(const Config & config, const std::string & version, void (*master
//analytics DB for storing query click events
std::unique_ptr<Store> analytics_store = nullptr;
if(!analytics_dir.empty()) {
analytics_store.reset(new Store(analytics_dir, 24 * 60 * 60, 1024, true, config.get_db_compaction_interval()));
analytics_store.reset(new Store(analytics_dir, 24 * 60 * 60, 1024, true));
}
curl_global_init(CURL_GLOBAL_SSL);