Prevent multiple alters from running in-parallel.

This commit is contained in:
Kishore Nallan 2023-09-16 20:45:09 +05:30
parent ab62726355
commit 675a2fc402
3 changed files with 32 additions and 1 deletions

View File

@ -9,6 +9,10 @@ bool handle_authentication(std::map<std::string, std::string>& req_params,
const std::string& body, const route_path& rpath,
const std::string& req_auth_key);
void set_alter_in_progress(bool in_progress);
bool get_alter_in_progress();
// Collections
bool get_collections(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);

View File

@ -1,6 +1,5 @@
#include <chrono>
#include <thread>
#include <cstdlib>
#include <app_metrics.h>
#include <regex>
#include <analytics_manager.h>
@ -23,10 +22,20 @@ using namespace std::chrono_literals;
std::shared_mutex mutex;
LRU::Cache<uint64_t, cached_res_t> res_cache;
std::atomic<bool> alter_in_progress = false;
void init_api(uint32_t cache_num_entries) {
res_cache.capacity(cache_num_entries);
}
void set_alter_in_progress(bool in_progress) {
alter_in_progress = in_progress;
}
bool get_alter_in_progress() {
return alter_in_progress;
}
bool handle_authentication(std::map<std::string, std::string>& req_params,
std::vector<nlohmann::json>& embedded_params_vec,
const std::string& body,
@ -218,6 +227,7 @@ bool patch_update_collection(const std::shared_ptr<http_req>& req, const std::sh
} catch(const std::exception& e) {
//LOG(ERROR) << "JSON error: " << e.what();
res->set_400("Bad JSON.");
alter_in_progress = false;
return false;
}
@ -226,15 +236,18 @@ bool patch_update_collection(const std::shared_ptr<http_req>& req, const std::sh
if(collection == nullptr) {
res->set_404();
alter_in_progress = false;
return false;
}
auto alter_op = collection->alter(req_json);
if(!alter_op.ok()) {
res->set(alter_op.code(), alter_op.error());
alter_in_progress = false;
return false;
}
alter_in_progress = false;
res->set_200(req_json.dump());
return true;
}

View File

@ -258,6 +258,20 @@ void ReplicationState::write(const std::shared_ptr<http_req>& request, const std
return message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
}
route_path* rpath = nullptr;
bool route_found = server->get_route(request->route_hash, &rpath);
if(route_found && rpath->handler == patch_update_collection) {
if(get_alter_in_progress()) {
response->set_422("Another collection update operation is in progress.");
response->final = true;
auto req_res = new async_req_res_t(request, response, true);
return message_dispatcher->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
}
set_alter_in_progress(true);
}
std::shared_lock lock(node_mutex);
if(!node) {