generify counter events with different events types

This commit is contained in:
krunal 2024-01-03 16:29:54 +05:30
parent 5115419e1c
commit e379b72813
3 changed files with 137 additions and 39 deletions

View File

@ -50,9 +50,10 @@ struct event_t {
}
};
struct popular_clicks_t {
struct counter_event_t {
std::string counter_field;
std::map<std::string, uint64_t> docid_counts;
std::map<std::string, uint16_t> event_weight_map;
};
struct query_hits_count_t {
@ -153,7 +154,7 @@ private:
std::unordered_map<std::string, QueryAnalytics*> nohits_queries;
// collection => popular clicks
std::unordered_map<std::string, popular_clicks_t> popular_clicks;
std::unordered_map<std::string, counter_event_t> counter_events;
//query collection => events
std::unordered_map<std::string, std::vector<event_t>> query_collection_events;
@ -170,9 +171,9 @@ private:
Option<bool> remove_queries_index(const std::string& name);
Option<bool> create_queries_index(nlohmann::json &payload,
bool upsert,
bool write_to_disk);
Option<bool> create_index(nlohmann::json &payload,
bool upsert,
bool write_to_disk);
public:
@ -182,7 +183,7 @@ public:
static constexpr const char* PURCHASE_EVENT = "$PE";
static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries";
static constexpr const char* NOHITS_QUERIES_TYPE = "nohits_queries";
static constexpr const char* POPULAR_CLICKS_TYPE = "popular_clicks";
static constexpr const char* COUNTER_TYPE = "counter";
static AnalyticsManager& get_instance() {
static AnalyticsManager instance;
@ -227,7 +228,7 @@ public:
nlohmann::json get_events(const std::string& event_type);
std::unordered_map<std::string, popular_clicks_t> get_popular_clicks();
std::unordered_map<std::string, counter_event_t> get_popular_clicks();
Option<bool> write_events_to_store(nlohmann::json& event_jsons);

View File

@ -44,14 +44,14 @@ Option<bool> AnalyticsManager::create_rule(nlohmann::json& payload, bool upsert,
}
if(payload["type"] == POPULAR_QUERIES_TYPE || payload["type"] == NOHITS_QUERIES_TYPE
|| payload["type"] == POPULAR_CLICKS_TYPE) {
return create_queries_index(payload, upsert, write_to_disk);
|| payload["type"] == COUNTER_TYPE) {
return create_index(payload, upsert, write_to_disk);
}
return Option<bool>(400, "Invalid type.");
}
Option<bool> AnalyticsManager::create_queries_index(nlohmann::json &payload, bool upsert, bool write_to_disk) {
Option<bool> AnalyticsManager::create_index(nlohmann::json &payload, bool upsert, bool write_to_disk) {
// params and name are validated upstream
const std::string& suggestion_config_name = payload["name"].get<std::string>();
bool already_exists = suggestion_configs.find(suggestion_config_name) != suggestion_configs.end();
@ -71,6 +71,15 @@ Option<bool> AnalyticsManager::create_queries_index(nlohmann::json &payload, boo
return Option<bool>(400, "Bad or missing destination.");
}
if(payload["type"] == COUNTER_TYPE) {
if (!params["source"].contains("events") || (params["source"].contains("events") &&
(params["source"]["events"].empty()
|| !params["source"]["events"].is_array()
|| !params["source"]["events"][0].is_object()))) {
return Option<bool>(400, "Bad or missing events.");
}
}
size_t limit = 1000;
bool expand_query = false;
@ -115,8 +124,8 @@ Option<bool> AnalyticsManager::create_queries_index(nlohmann::json &payload, boo
if (!upsert && nohits_queries.count(suggestion_collection) != 0) {
return Option<bool>(400, "There's already another configuration for this destination collection.");
}
} else if(payload["type"] == POPULAR_CLICKS_TYPE) {
if (!upsert && popular_clicks.count(suggestion_collection) != 0) {
} else if(payload["type"] == COUNTER_TYPE) {
if (!upsert && counter_events.count(suggestion_collection) != 0) {
return Option<bool>(400, "There's already another configuration for this destination collection.");
}
@ -162,8 +171,12 @@ Option<bool> AnalyticsManager::create_queries_index(nlohmann::json &payload, boo
} else if(payload["type"] == NOHITS_QUERIES_TYPE) {
QueryAnalytics *noresultsQueries = new QueryAnalytics(limit);
nohits_queries.emplace(suggestion_collection, noresultsQueries);
} else if(payload["type"] == POPULAR_CLICKS_TYPE) {
popular_clicks.emplace(suggestion_collection, popular_clicks_t{counter_field, {}});
} else if(payload["type"] == COUNTER_TYPE) {
std::map<std::string, uint16_t> event_weight_map;
for(const auto& event : params["source"]["events"]){
event_weight_map[event["type"]] = event["weight"];
}
counter_events.emplace(suggestion_collection, counter_event_t{counter_field, {}, event_weight_map});
}
if(write_to_disk) {
@ -312,9 +325,15 @@ Option<bool> AnalyticsManager::add_event(const std::string& event_type, const st
event_t event(query, event_type, now_ts_useconds, user_id, doc_id, position);
events_vec.emplace_back(event);
auto popular_clicks_it = popular_clicks.find(query_collection);
if(popular_clicks_it != popular_clicks.end()) {
popular_clicks_it->second.docid_counts[doc_id]++;
auto counter_events_it = counter_events.find(query_collection);
if(counter_events_it != counter_events.end()) {
auto event_weight_map_it = counter_events_it->second.event_weight_map.find(event_type);
if(event_weight_map_it != counter_events_it->second.event_weight_map.end()) {
auto inc_val = event_weight_map_it->second;
counter_events_it->second.docid_counts[doc_id]+= inc_val;
} else {
LOG(ERROR) << "event_type " << event_type << " not defined in analytic rule for counter events.";
}
} else {
LOG(ERROR) << "collection " << query_collection << " not found in analytics rule.";
}
@ -570,13 +589,13 @@ void AnalyticsManager::persist_popular_clicks(ReplicationState *raft_server, uin
}
};
for(const auto& popular_clicks_it : popular_clicks) {
auto coll = popular_clicks_it.first;
for(const auto& counter_event_it : counter_events) {
auto coll = counter_event_it.first;
nlohmann::json doc;
auto counter_field = popular_clicks_it.second.counter_field;
for(const auto& popular_click : popular_clicks_it.second.docid_counts) {
doc["id"] = popular_click.first;
doc[counter_field] = popular_click.second;
auto counter_field = counter_event_it.second.counter_field;
for(const auto& counter_event : counter_event_it.second.docid_counts) {
doc["id"] = counter_event.first;
doc[counter_field] = counter_event.second;
send_http_response(doc.dump(), coll);
}
}
@ -622,9 +641,9 @@ std::unordered_map<std::string, QueryAnalytics*> AnalyticsManager::get_nohits_qu
return nohits_queries;
}
std::unordered_map<std::string, popular_clicks_t> AnalyticsManager::get_popular_clicks() {
std::unordered_map<std::string, counter_event_t> AnalyticsManager::get_popular_clicks() {
std::unique_lock lk(mutex);
return popular_clicks;
return counter_events;
}
nlohmann::json AnalyticsManager::get_events(const std::string& event_type) {

View File

@ -933,10 +933,11 @@ TEST_F(AnalyticsManagerTest, PopularityScore) {
nlohmann::json analytics_rule = R"({
"name": "product_popularity",
"type": "popular_clicks",
"type": "counter",
"params": {
"source": {
"collections": ["products"]
"collections": ["products"],
"events": [{"type": "query_click", "weight": 1}, {"type": "query_purchase", "weight": 5} ]
},
"destination": {
"collection": "products",
@ -952,7 +953,7 @@ TEST_F(AnalyticsManagerTest, PopularityScore) {
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
nlohmann::json event1 = R"({
"type": "query_click",
"type": "query_purchase",
"data": {
"q": "trousers",
"collection": "products",
@ -985,7 +986,7 @@ TEST_F(AnalyticsManagerTest, PopularityScore) {
ASSERT_EQ(1, popular_clicks.size());
ASSERT_EQ("popularity", popular_clicks["products"].counter_field);
ASSERT_EQ(2, popular_clicks["products"].docid_counts.size());
ASSERT_EQ(1, popular_clicks["products"].docid_counts["1"]);
ASSERT_EQ(5, popular_clicks["products"].docid_counts["1"]);
ASSERT_EQ(2, popular_clicks["products"].docid_counts["3"]);
//trigger persistance event
@ -1011,13 +1012,13 @@ TEST_F(AnalyticsManagerTest, PopularityScore) {
ASSERT_EQ(5, results["hits"].size());
ASSERT_EQ("3", results["hits"][0]["document"]["id"]);
ASSERT_EQ(2, results["hits"][0]["document"]["popularity"]);
ASSERT_EQ("Trendy shorts", results["hits"][0]["document"]["title"]);
ASSERT_EQ("1", results["hits"][0]["document"]["id"]);
ASSERT_EQ(5, results["hits"][0]["document"]["popularity"]);
ASSERT_EQ("Funky trousers", results["hits"][0]["document"]["title"]);
ASSERT_EQ("1", results["hits"][1]["document"]["id"]);
ASSERT_EQ(1, results["hits"][1]["document"]["popularity"]);
ASSERT_EQ("Funky trousers", results["hits"][1]["document"]["title"]);
ASSERT_EQ("3", results["hits"][1]["document"]["id"]);
ASSERT_EQ(2, results["hits"][1]["document"]["popularity"]);
ASSERT_EQ("Trendy shorts", results["hits"][1]["document"]["title"]);
}
TEST_F(AnalyticsManagerTest, PopularityScoreValidation) {
@ -1033,10 +1034,11 @@ TEST_F(AnalyticsManagerTest, PopularityScoreValidation) {
nlohmann::json analytics_rule = R"({
"name": "books_popularity",
"type": "popular_clicks",
"type": "counter",
"params": {
"source": {
"collections": ["books"]
"collections": ["books"],
"events": [{"type": "query_click", "weight": 1}, {"type": "query_purchase", "weight": 5} ]
},
"destination": {
"collection": "popular_books",
@ -1051,7 +1053,45 @@ TEST_F(AnalyticsManagerTest, PopularityScoreValidation) {
analytics_rule = R"({
"name": "books_popularity",
"type": "popular_clicks",
"type": "counter",
"params": {
"source": {
"collections": ["books"],
"events": [{"type": "query_click", "weight": 1}, {"type": "query_purchase", "weight": 5} ]
},
"destination": {
"collection": "books",
"counter_field": "popularity_score"
}
}
})"_json;
create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_FALSE(create_op.ok());
ASSERT_EQ("counter_field `popularity_score` not found in destination collection.", create_op.error());
analytics_rule = R"({
"name": "books_popularity",
"type": "popular_click",
"params": {
"source": {
"collections": ["books"],
"events": [{"type": "query_click", "weight": 1}, {"type": "query_purchase", "weight": 5} ]
},
"destination": {
"collection": "books",
"counter_field": "popularity_score"
}
}
})"_json;
create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_FALSE(create_op.ok());
ASSERT_EQ("Invalid type.", create_op.error());
analytics_rule = R"({
"name": "books_popularity",
"type": "counter",
"params": {
"source": {
"collections": ["books"]
@ -1065,7 +1105,45 @@ TEST_F(AnalyticsManagerTest, PopularityScoreValidation) {
create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_FALSE(create_op.ok());
ASSERT_EQ("counter_field `popularity_score` not found in destination collection.", create_op.error());
ASSERT_EQ("Bad or missing events.", create_op.error());
analytics_rule = R"({
"name": "books_popularity",
"type": "counter",
"params": {
"source": {
"collections": ["books"],
"events": []
},
"destination": {
"collection": "books",
"counter_field": "popularity_score"
}
}
})"_json;
create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_FALSE(create_op.ok());
ASSERT_EQ("Bad or missing events.", create_op.error());
analytics_rule = R"({
"name": "books_popularity",
"type": "counter",
"params": {
"source": {
"collections": ["books"],
"events": "query_click"
},
"destination": {
"collection": "books",
"counter_field": "popularity_score"
}
}
})"_json;
create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_FALSE(create_op.ok());
ASSERT_EQ("Bad or missing events.", create_op.error());
}
TEST_F(AnalyticsManagerTest, PurchaseEventsStoreRetrieval) {