Merge pull request #1408 from krunal1313/event_anaylytics

Events expiry
This commit is contained in:
Kishore Nallan 2023-12-07 07:27:46 +05:30 committed by GitHub
commit c1ea41cab7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 359 additions and 11 deletions

View File

@ -215,4 +215,10 @@ public:
uint64_t hits_count);
nlohmann::json get_query_hits_counts();
void checkEventsExpiry();
uint64_t get_current_time_us();
void resetAnalyticsStore();
};

View File

@ -5,10 +5,12 @@
#include "http_client.h"
#include "collection_manager.h"
#include "lru/lru.hpp"
#include "string_utils.h"
LRU::Cache<std::string, event_cache_t> events_cache;
#define CLICK_EVENTS_RATE_LIMIT_SEC 60
#define CLICK_EVENTS_RATE_LIMIT_COUNT 5
#define EVENTS_TTL_INTERVAL_US 2592000000000 //30days
Option<bool> AnalyticsManager::create_rule(nlohmann::json& payload, bool upsert, bool write_to_disk) {
/*
@ -341,6 +343,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
continue;
}
checkEventsExpiry();
persist_query_events(raft_server, prev_persistence_s);
persist_query_hits_click_events(raft_server, prev_persistence_s);
@ -441,9 +444,9 @@ void AnalyticsManager::persist_query_hits_click_events(ReplicationState *raft_se
// lock is held by caller
nlohmann::json payload_json = nlohmann::json::array();
auto send_http_response = [&](const std::string& event_type) {
auto send_http_response = [&](const std::string& event_type)->bool {
if(payload_json.empty()) {
return;
return false;
}
const std::string import_payload = payload_json.dump();
@ -461,10 +464,11 @@ void AnalyticsManager::persist_query_hits_click_events(ReplicationState *raft_se
if (status_code != 200) {
LOG(ERROR) << "Error while sending " << event_type <<" to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
query_collection_click_events.clear();
return false;
}
return true;
}
return false;
};
for (const auto &click_events_collection_it: query_collection_click_events) {
@ -480,9 +484,13 @@ void AnalyticsManager::persist_query_hits_click_events(ReplicationState *raft_se
}
}
send_http_response("click_events");
if(send_http_response("click_events")) {
query_collection_click_events.clear();
}
payload_json.clear();
for (const auto &query_collection_hits_count_it: query_collection_hits_count) {
auto collection_id = CollectionManager::get_instance().get_collection(
query_collection_hits_count_it.first)->get_collection_id();
@ -495,7 +503,9 @@ void AnalyticsManager::persist_query_hits_click_events(ReplicationState *raft_se
payload_json.push_back(query_hits_count_json);
}
}
send_http_response("query_hits_counts");
if(send_http_response("query_hits_counts")) {
query_collection_hits_count.clear();
}
}
void AnalyticsManager::stop() {
@ -571,17 +581,16 @@ nlohmann::json AnalyticsManager::get_query_hits_counts() {
}
Option<bool> AnalyticsManager::write_events_to_store(nlohmann::json &event_jsons) {
//LOG(INFO) << "writing events to analytics db";
for(const auto& event_json : event_jsons) {
auto collection_id = event_json["collection_id"].get<std::string>();
auto timestamp = event_json["timestamp"].get<uint64_t>();
std::string key = "";
std::string key = "_" + StringUtils::serialize_uint64_t(timestamp) + "_" + collection_id;
if(event_json["event_type"] == "click_events") {
key = std::string(CLICK_EVENT) + "_" + collection_id + "_" +
std::to_string(timestamp);
key = std::string(CLICK_EVENT) + key;
} else if(event_json["event_type"] == "query_hits_counts") {
key = std::string(QUERY_HITS_COUNT) + "_" + collection_id + "_" +
std::to_string(timestamp);
key = std::string(QUERY_HITS_COUNT) + key;
}
if(analytics_store) {
@ -599,4 +608,70 @@ Option<bool> AnalyticsManager::write_events_to_store(nlohmann::json &event_jsons
void AnalyticsManager::resetRateLimit() {
events_cache.clear();
}
void AnalyticsManager::resetAnalyticsStore() {
const std::string click_events_prefix = std::string(CLICK_EVENT) + "_";
const std::string query_hits_prefix = std::string(QUERY_HITS_COUNT) + "_";
//delete click events
auto delete_prefix_begin = click_events_prefix;
auto delete_prefix_end = click_events_prefix + "`";
analytics_store->delete_range(delete_prefix_begin, delete_prefix_end);
//delete query hits counts
delete_prefix_begin = query_hits_prefix;
delete_prefix_end = query_hits_prefix + "`";
analytics_store->delete_range(delete_prefix_begin, delete_prefix_end);
}
#ifdef TEST_BUILD
uint64_t AnalyticsManager::get_current_time_us() {
uint64_t now_ts_useconds = 1701851345000000 + EVENTS_TTL_INTERVAL_US;
return now_ts_useconds;
}
#else
uint64_t AnalyticsManager::get_current_time_us() {
uint64_t now_ts_useconds = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
return now_ts_useconds;
}
#endif
void AnalyticsManager::checkEventsExpiry() {
if (analytics_store) {
//LOG(INFO) << "checking for events expiry";
//we check for 30days events validity, events older than 30 days will be removed from db
auto ts_ttl_useconds = get_current_time_us() - EVENTS_TTL_INTERVAL_US;
const std::string click_events_prefix = std::string(CLICK_EVENT) + "_";
const std::string query_hits_prefix = std::string(QUERY_HITS_COUNT) + "_";
//first remove click events
auto delete_prefix_begin = click_events_prefix;
auto delete_prefix_end = delete_prefix_begin + StringUtils::serialize_uint64_t(ts_ttl_useconds);
auto iter = analytics_store->get_iterator();
iter->Seek(delete_prefix_end);
if (!iter->Valid()) { //exact key or key greater than not found
delete_prefix_end = std::string(CLICK_EVENT) + "`";
}
analytics_store->delete_range(delete_prefix_begin, delete_prefix_end);
//now remove query hits counts
delete_prefix_begin = query_hits_prefix;
delete_prefix_end = std::string(QUERY_HITS_COUNT) + "_" +
StringUtils::serialize_uint64_t(ts_ttl_useconds);
iter->SeekToFirst();
iter->Seek(delete_prefix_end);
analytics_store->delete_range(delete_prefix_begin, delete_prefix_end);
}
}

View File

@ -446,6 +446,9 @@ TEST_F(AnalyticsManagerTest, NoresultsQueries) {
}
TEST_F(AnalyticsManagerTest, QueryHitsCount) {
//flush all events from analytics store
analyticsManager.resetAnalyticsStore();
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
@ -500,4 +503,268 @@ TEST_F(AnalyticsManagerTest, QueryHitsCount) {
ASSERT_EQ("funky", result[1]["query"]);
ASSERT_EQ(1, result[1]["hits_count"]);
ASSERT_EQ(1625365616, result[1]["timestamp"]);
}
TEST_F(AnalyticsManagerTest, EventsExpiryBasic) {
//flush all events from analytics store
analyticsManager.resetAnalyticsStore();
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection *titles_coll = collectionManager.create_collection(titles_schema).get();
nlohmann::json events = nlohmann::json::array();
auto ts = 1701851341000000;
nlohmann::json event;
event["event_type"] = "click_events";
event["q"] = "technology";
event["collection_id"] = "0";
event["doc_id"] = "21";
event["position"] = 2;
event["user_id"] = 13;
event["timestamp"] = ts;
events.push_back(event);
event["doc_id"] = "12";
event["position"] = 3;
event["timestamp"] = ts+1000000; //1 sec later
events.push_back(event);
ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok());
nlohmann::json resp = analyticsManager.get_click_events();
ASSERT_EQ(2, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ("21", resp[0]["doc_id"]);
ASSERT_EQ(2, resp[0]["position"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ("technology", resp[1]["q"]);
ASSERT_EQ("12", resp[1]["doc_id"]);
ASSERT_EQ(3, resp[1]["position"]);
ASSERT_EQ(13, resp[1]["user_id"]);
analyticsManager.checkEventsExpiry();
resp = analyticsManager.get_click_events();
ASSERT_EQ(0, resp.size());
//add query hits events with click events
nlohmann::json event2;
event2["event_type"] = "query_hits_counts";
event2["q"] = "technology";
event2["collection_id"] = "0";
event2["user_id"] = 13;
event2["hits_count"] = 124;
event2["timestamp"] = ts + 10000000; //after 10s
events.push_back(event2);
ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok());
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ(124, resp[0]["hits_count"]);
//now old click events will be deleted on checking expiry but query hits events will be remaining
// assumming ttl is ts + 5 sec
analyticsManager.checkEventsExpiry();
resp = analyticsManager.get_click_events();
ASSERT_EQ(0, resp.size());
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ(124, resp[0]["hits_count"]);
}
TEST_F(AnalyticsManagerTest, EventsExpiryAll) {
//flush all events from analytics store
analyticsManager.resetAnalyticsStore();
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection *titles_coll = collectionManager.create_collection(titles_schema).get();
nlohmann::json events = nlohmann::json::array();
auto ts = 1701851341000000;
nlohmann::json event;
event["event_type"] = "click_events";
event["q"] = "technology";
event["collection_id"] = "0";
event["doc_id"] = "21";
event["position"] = 2;
event["user_id"] = 13;
event["timestamp"] = ts;
events.push_back(event);
event["doc_id"] = "12";
event["position"] = 3;
event["timestamp"] = ts + 1000000; //after 1s
events.push_back(event);
nlohmann::json event2;
event2["event_type"] = "query_hits_counts";
event2["q"] = "technology";
event2["collection_id"] = "0";
event2["user_id"] = 13;
event2["hits_count"] = 124;
event2["timestamp"] = ts + 2000000; //after 2s
events.push_back(event2);
ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok());
nlohmann::json resp = analyticsManager.get_click_events();
ASSERT_EQ(2, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ("21", resp[0]["doc_id"]);
ASSERT_EQ(2, resp[0]["position"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ("technology", resp[1]["q"]);
ASSERT_EQ("12", resp[1]["doc_id"]);
ASSERT_EQ(3, resp[1]["position"]);
ASSERT_EQ(13, resp[1]["user_id"]);
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ(124, resp[0]["hits_count"]);
//check for events expiry
// assuming ttl is ts + 5 sec
analyticsManager.checkEventsExpiry();
resp = analyticsManager.get_click_events();
ASSERT_EQ(0, resp.size());
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(0, resp.size());
}
TEST_F(AnalyticsManagerTest, EventsExpiryPartial) {
//flush all events from analytics store
analyticsManager.resetAnalyticsStore();
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection *titles_coll = collectionManager.create_collection(titles_schema).get();
nlohmann::json events = nlohmann::json::array();
auto ts = 1701851341000000;
nlohmann::json event;
event["event_type"] = "click_events";
event["q"] = "technology";
event["collection_id"] = "0";
event["doc_id"] = "21";
event["position"] = 2;
event["user_id"] = 13;
event["timestamp"] = ts;
events.push_back(event);
event["doc_id"] = "12";
event["position"] = 3;
event["timestamp"] = ts + 1000000; //after 1s
events.push_back(event);
event["doc_id"] = "19";
event["position"] = 1;
event["timestamp"] = ts + 6000000; //after 6s
events.push_back(event);
nlohmann::json event2;
event2["event_type"] = "query_hits_counts";
event2["q"] = "technology";
event2["collection_id"] = "0";
event2["user_id"] = 13;
event2["hits_count"] = 124;
event2["timestamp"] = ts;
events.push_back(event2);
event2["q"] = "industry";
event2["user_id"] = 13;
event2["hits_count"] = 214;
event2["timestamp"] = ts + 2000000; //after 2s
events.push_back(event2);
event2["q"] = "management";
event2["user_id"] = 13;
event2["hits_count"] = 834;
event2["timestamp"] = ts + 8000000; //after 8s
events.push_back(event2);
ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok());
nlohmann::json resp = analyticsManager.get_click_events();
ASSERT_EQ(3, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ("21", resp[0]["doc_id"]);
ASSERT_EQ(2, resp[0]["position"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ("technology", resp[1]["q"]);
ASSERT_EQ("12", resp[1]["doc_id"]);
ASSERT_EQ(3, resp[1]["position"]);
ASSERT_EQ(13, resp[1]["user_id"]);
ASSERT_EQ("technology", resp[2]["q"]);
ASSERT_EQ("19", resp[2]["doc_id"]);
ASSERT_EQ(1, resp[2]["position"]);
ASSERT_EQ(13, resp[2]["user_id"]);
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(3, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ(124, resp[0]["hits_count"]);
ASSERT_EQ("industry", resp[1]["q"]);
ASSERT_EQ(13, resp[1]["user_id"]);
ASSERT_EQ(214, resp[1]["hits_count"]);
ASSERT_EQ("management", resp[2]["q"]);
ASSERT_EQ(13, resp[2]["user_id"]);
ASSERT_EQ(834, resp[2]["hits_count"]);
//check for events expiry
// assuming ttl is ts + 5 sec
//only events in ttl interval will be removed
analyticsManager.checkEventsExpiry();
resp = analyticsManager.get_click_events();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ("19", resp[0]["doc_id"]);
ASSERT_EQ(1, resp[0]["position"]);
ASSERT_EQ(13, resp[0]["user_id"]);
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("management", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ(834, resp[0]["hits_count"]);
}