merging click events and query hits code

This commit is contained in:
krunal 2023-11-27 15:47:05 +05:30
parent a6b404e7ff
commit 15f1f7ecbc
6 changed files with 63 additions and 115 deletions

View File

@ -198,11 +198,11 @@ public:
Option<bool> add_click_event(const std::string& query_collection, const std::string& query, const std::string& user_id,
std::string doc_id, uint64_t position, const std::string& client_ip);
void persist_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s);
void persist_query_hits_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s);
nlohmann::json get_click_events();
Option<bool> write_click_event_to_store(nlohmann::json& click_event_json);
Option<bool> write_events_to_store(nlohmann::json& event_jsons);
void add_nohits_query(const std::string& query_collection,
const std::string& query, bool live_query, const std::string& user_id);
@ -214,9 +214,5 @@ public:
void add_query_hits_count(const std::string& query_collection, const std::string& query, const std::string& user_id,
uint64_t hits_count);
void persist_query_hits_counts(ReplicationState *raft_server, uint64_t prev_persistence_s);
nlohmann::json get_query_hits_counts();
Option<bool> write_query_hits_counts_to_store(nlohmann::json& query_hits_counts_json);
};

View File

@ -171,12 +171,10 @@ bool del_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared
bool get_click_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_replicate_click_event(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_replicate_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool get_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_replicate_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Misc helpers
void get_collections_for_auth(std::map<std::string, std::string>& req_params, const std::string& body,

View File

@ -342,8 +342,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
}
persist_query_events(raft_server, prev_persistence_s);
persist_click_events(raft_server, prev_persistence_s);
persist_query_hits_counts(raft_server, prev_persistence_s);
persist_query_hits_click_events(raft_server, prev_persistence_s);
prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -438,10 +437,36 @@ void AnalyticsManager::persist_query_events(ReplicationState *raft_server, uint6
}
}
void AnalyticsManager::persist_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s) {
void AnalyticsManager::persist_query_hits_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s) {
// lock is held by caller
nlohmann::json payload_json = nlohmann::json::array();
auto send_http_response = [&](const std::string& event_type) {
if(payload_json.empty()) {
return;
}
const std::string import_payload = payload_json.dump();
std::string leader_url = raft_server->get_leader_url();
if (!leader_url.empty()) {
const std::string &base_url = leader_url + "analytics";
std::string res;
const std::string &update_url = base_url + "/" + event_type +"/replicate";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, {}, 10 * 1000, true);
if (status_code != 200) {
LOG(ERROR) << "Error while sending click events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
query_collection_click_events.clear();
}
}
};
for (const auto &click_events_collection_it: query_collection_click_events) {
auto collection_id = CollectionManager::get_instance().get_collection(
click_events_collection_it.first)->get_collection_id();
@ -450,37 +475,13 @@ void AnalyticsManager::persist_click_events(ReplicationState *raft_server, uint6
nlohmann::json click_event_json;
click_event.to_json(click_event_json);
click_event_json["collection_id"] = std::to_string(collection_id);
click_event_json["event_type"] = "click_events";
payload_json.push_back(click_event_json);
}
}
if(payload_json.empty()) {
return;
}
send_http_response("click_events");
const std::string import_payload = payload_json.dump();
std::string leader_url = raft_server->get_leader_url();
if (!leader_url.empty()) {
const std::string &base_url = leader_url + "analytics";
std::string res;
const std::string &update_url = base_url + "/click_events/replicate";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, {}, 10 * 1000, true);
if (status_code != 200) {
LOG(ERROR) << "Error while sending click events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
query_collection_click_events.clear();
}
}
}
void AnalyticsManager::persist_query_hits_counts(ReplicationState *raft_server, uint64_t prev_persistence_s) {
nlohmann::json payload_json = nlohmann::json::array();
for (const auto &query_collection_hits_count_it: query_collection_hits_count) {
auto collection_id = CollectionManager::get_instance().get_collection(
@ -490,33 +491,11 @@ void AnalyticsManager::persist_query_hits_counts(ReplicationState *raft_server,
nlohmann::json query_hits_count_json;
query_hits_count.to_json(query_hits_count_json);
query_hits_count_json["collection_id"] = std::to_string(collection_id);
query_hits_count_json["event_type"] = "query_hits_counts";
payload_json.push_back(query_hits_count_json);
}
}
if (payload_json.empty()) {
return;
}
const std::string import_payload = payload_json.dump();
std::string leader_url = raft_server->get_leader_url();
if (!leader_url.empty()) {
const std::string &base_url = leader_url + "analytics";
std::string res;
const std::string &update_url = base_url + "/query_hits_counts/replicate";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, {}, 10 * 1000, true);
if (status_code != 200) {
LOG(ERROR) << "Error while sending click events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
query_collection_hits_count.clear();
}
}
send_http_response("query_hits_counts");
}
void AnalyticsManager::stop() {
@ -591,34 +570,25 @@ nlohmann::json AnalyticsManager::get_query_hits_counts() {
return result_json;
}
Option<bool> AnalyticsManager::write_click_event_to_store(nlohmann::json &click_event_jsons) {
for(const auto& click_event_json : click_event_jsons) {
auto collection_id = click_event_json["collection_id"].get<std::string>();
auto timestamp = click_event_json["timestamp"].get<uint64_t>();
const std::string key = std::string(CLICK_EVENT) + "_" + collection_id + "_" +
std::to_string(timestamp);
if(analytics_store) {
bool inserted = analytics_store->insert(key, click_event_json.dump());
if (!inserted) {
return Option<bool>(500, "Unable to insert clickevent into store.");
}
} else {
return Option<bool>(500, "Analytics DB not initialized.");
}
}
return Option<bool>(true);
}
Option<bool> AnalyticsManager::write_events_to_store(nlohmann::json &event_jsons) {
for(const auto& event_json : event_jsons) {
auto collection_id = event_json["collection_id"].get<std::string>();
auto timestamp = event_json["timestamp"].get<uint64_t>();
std::string key = "";
if(event_json["event_type"] == "click_events") {
key = std::string(CLICK_EVENT) + "_" + collection_id + "_" +
std::to_string(timestamp);
} else if(event_json["event_type"] == "query_hits_counts") {
key = std::string(QUERY_HITS_COUNT) + "_" + collection_id + "_" +
std::to_string(timestamp);
}
Option<bool> AnalyticsManager::write_query_hits_counts_to_store(nlohmann::json &query_hits_counts_json) {
for(const auto& query_hits_count : query_hits_counts_json) {
auto collection_id = query_hits_count["collection_id"].get<std::string>();
auto timestamp = query_hits_count["timestamp"].get<uint64_t>();
const std::string key = std::string(QUERY_HITS_COUNT) + "_" + collection_id + "_" +
std::to_string(timestamp);
if(analytics_store) {
bool inserted = analytics_store->insert(key, query_hits_count.dump());
bool inserted = analytics_store->insert(key, event_json.dump());
if (!inserted) {
return Option<bool>(500, "Unable to insert query hits count into store.");
std::string error = "Unable to insert " + std::string(event_json["event_type"]) + " to store";
return Option<bool>(500, error);
}
} else {
return Option<bool>(500, "Analytics DB not initialized.");

View File

@ -2806,7 +2806,7 @@ bool get_click_events(const std::shared_ptr<http_req>& req, const std::shared_pt
return true;
}
bool post_replicate_click_event(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
bool post_replicate_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
nlohmann::json req_json;
try {
@ -2817,13 +2817,13 @@ bool post_replicate_click_event(const std::shared_ptr<http_req>& req, const std:
return false;
}
auto op = AnalyticsManager::get_instance().write_click_event_to_store(req_json);
auto op = AnalyticsManager::get_instance().write_events_to_store(req_json);
if(!op.ok()) {
res->set_body(op.code(), op.error());
return false;
}
res->set_200("click_event_t wrote to DB.");
res->set_200("event wrote to DB.");
return true;
}
@ -2832,25 +2832,4 @@ bool get_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shar
res->set_200(query_hits_counts.dump());
return true;
}
bool post_replicate_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
nlohmann::json req_json;
try {
req_json = nlohmann::json::parse(req->body);
} catch(const std::exception& e) {
LOG(ERROR) << "JSON error: " << e.what();
res->set_400("Bad JSON.");
return false;
}
auto op = AnalyticsManager::get_instance().write_query_hits_counts_to_store(req_json);
if(!op.ok()) {
res->set_body(op.code(), op.error());
return false;
}
res->set_200("query hits counts wrote to DB.");
return true;
}

View File

@ -84,9 +84,9 @@ void master_server_routes() {
//collection based query click events
server->get("/analytics/click_events", get_click_events);
server->post("/analytics/click_events", post_create_event);
server->post("/analytics/click_events/replicate", post_replicate_click_event);
server->post("/analytics/click_events/replicate", post_replicate_events);
server->get("/analytics/query_hits_counts", get_query_hits_counts);
server->post("/analytics/query_hits_counts/replicate", post_replicate_query_hits_counts);
server->post("/analytics/query_hits_counts/replicate", post_replicate_events);
// meta
server->get("/metrics.json", get_metrics_json);

View File

@ -324,14 +324,17 @@ TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) {
event1["collection_id"] = "0";
event1["timestamp"] = 1521512521;
event1["event_type"] = "click_events";
event2["collection_id"] = "0";
event2["timestamp"] = 1521514354;
event2["event_type"] = "click_events";
nlohmann::json click_events = nlohmann::json::array();
click_events.push_back(event1);
click_events.push_back(event2);
req->body = click_events.dump();
ASSERT_TRUE(post_replicate_click_event(req, res));
ASSERT_TRUE(post_replicate_events(req, res));
auto result = analyticsManager.get_click_events();
@ -469,6 +472,8 @@ TEST_F(AnalyticsManagerTest, QueryHitsCount) {
nlohmann::json obj;
obj["collection_id"] = "0";
obj["event_type"] = "query_hits_counts";
obj["query"] = "cool";
obj["timestamp"] = 1625365612;
obj["user_id"] = "1";
@ -482,7 +487,7 @@ TEST_F(AnalyticsManagerTest, QueryHitsCount) {
query_hits_array.push_back(obj);
auto op = analyticsManager.write_query_hits_counts_to_store(query_hits_array);
auto op = analyticsManager.write_events_to_store(query_hits_array);
ASSERT_TRUE(op.ok());
auto result = analyticsManager.get_query_hits_counts();