Merge pull request #1425 from krunal1313/event_anaylytics

adding popularity score
This commit is contained in:
Kishore Nallan 2023-12-13 09:35:52 +05:30 committed by GitHub
commit d0018a1746
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 264 additions and 1 deletions

View File

@ -46,6 +46,11 @@ struct click_event_t {
}
};
struct popular_clicks_t {
std::string counter_field;
std::map<std::string, uint64_t> docid_counts;
};
struct query_hits_count_t {
std::string query;
uint64_t timestamp;
@ -137,6 +142,9 @@ private:
// suggestion collection => nohits queries
std::unordered_map<std::string, QueryAnalytics*> nohits_queries;
// collection => popular clicks
std::unordered_map<std::string, popular_clicks_t> popular_clicks;
//query collection => click events
std::unordered_map<std::string, std::vector<click_event_t>> query_collection_click_events;
@ -163,6 +171,7 @@ public:
static constexpr const char* QUERY_HITS_COUNT = "$QH";
static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries";
static constexpr const char* NOHITS_QUERIES_TYPE = "nohits_queries";
static constexpr const char* POPULAR_CLICKS_TYPE = "popular_clicks";
static AnalyticsManager& get_instance() {
static AnalyticsManager instance;
@ -200,8 +209,12 @@ public:
void persist_query_hits_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s);
void persist_popular_clicks(ReplicationState *raft_server, uint64_t prev_persistence_s);
nlohmann::json get_click_events();
std::unordered_map<std::string, popular_clicks_t> get_popular_clicks();
Option<bool> write_events_to_store(nlohmann::json& event_jsons);
void add_nohits_query(const std::string& query_collection,

View File

@ -399,6 +399,8 @@ public:
std::vector<field> get_fields();
bool contains_field(const std::string&);
std::unordered_map<std::string, field> get_dynamic_fields();
tsl::htrie_map<char, field> get_schema();

View File

@ -43,7 +43,8 @@ Option<bool> AnalyticsManager::create_rule(nlohmann::json& payload, bool upsert,
return Option<bool>(400, "Bad or missing params.");
}
if(payload["type"] == POPULAR_QUERIES_TYPE || payload["type"] == NOHITS_QUERIES_TYPE) {
if(payload["type"] == POPULAR_QUERIES_TYPE || payload["type"] == NOHITS_QUERIES_TYPE
|| payload["type"] == POPULAR_CLICKS_TYPE) {
return create_queries_index(payload, upsert, write_to_disk);
}
@ -84,6 +85,15 @@ Option<bool> AnalyticsManager::create_queries_index(nlohmann::json &payload, boo
return Option<bool>(400, "Must contain a valid destination collection.");
}
std::string counter_field;
if(params["destination"].contains("counter_field")) {
if (!params["destination"]["counter_field"].is_string()) {
return Option<bool>(400, "Must contain a valid counter_field.");
}
counter_field = params["destination"]["counter_field"].get<std::string>();
}
const std::string& suggestion_collection = params["destination"]["collection"].get<std::string>();
suggestion_config_t suggestion_config;
suggestion_config.name = suggestion_config_name;
@ -98,6 +108,19 @@ Option<bool> AnalyticsManager::create_queries_index(nlohmann::json &payload, boo
if (!upsert && nohits_queries.count(suggestion_collection) != 0) {
return Option<bool>(400, "There's already another configuration for this destination collection.");
}
} else if(payload["type"] == POPULAR_CLICKS_TYPE) {
if (!upsert && popular_clicks.count(suggestion_collection) != 0) {
return Option<bool>(400, "There's already another configuration for this destination collection.");
}
auto coll = CollectionManager::get_instance().get_collection(suggestion_collection).get();
if(coll != nullptr) {
if (!coll->contains_field(counter_field)) {
return Option<bool>(404, "counter_field `" + counter_field + "` not found in destination collection.");
}
} else {
return Option<bool>(404, "Collection `" + suggestion_collection + "` not found.");
}
}
for(const auto& coll: params["source"]["collections"]) {
@ -131,6 +154,8 @@ Option<bool> AnalyticsManager::create_queries_index(nlohmann::json &payload, boo
} else if(payload["type"] == NOHITS_QUERIES_TYPE) {
QueryAnalytics *noresultsQueries = new QueryAnalytics(limit);
nohits_queries.emplace(suggestion_collection, noresultsQueries);
} else if(payload["type"] == POPULAR_CLICKS_TYPE) {
popular_clicks.emplace(suggestion_collection, popular_clicks_t{counter_field, {}});
}
if(write_to_disk) {
@ -278,6 +303,13 @@ Option<bool> AnalyticsManager::add_click_event(const std::string &query_collecti
click_event_t click_event(query, now_ts_useconds, user_id, doc_id, position);
click_events_vec.emplace_back(click_event);
auto popular_clicks_it = popular_clicks.find(query_collection);
if(popular_clicks_it != popular_clicks.end()) {
popular_clicks_it->second.docid_counts[doc_id]++;
} else {
LOG(ERROR) << "collection " << query_collection << " not found in analytics rule.";
}
return Option<bool>(true);
}
@ -346,6 +378,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
checkEventsExpiry();
persist_query_events(raft_server, prev_persistence_s);
persist_query_hits_click_events(raft_server, prev_persistence_s);
persist_popular_clicks(raft_server, prev_persistence_s);
prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -508,6 +541,37 @@ void AnalyticsManager::persist_query_hits_click_events(ReplicationState *raft_se
}
}
void AnalyticsManager::persist_popular_clicks(ReplicationState *raft_server, uint64_t prev_persistence_s) {
auto send_http_response = [&](const std::string& import_payload, const std::string& collection) {
std::string leader_url = raft_server->get_leader_url();
if (!leader_url.empty()) {
const std::string &base_url = leader_url + "collections/" + collection;
std::string res;
const std::string &update_url = base_url + "/documents/import?action=update";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, {}, 10 * 1000, true);
if (status_code != 200) {
LOG(ERROR) << "Error while sending popular_clicks events to leader. "
<< "Status code: " << status_code << ", response: " << res;
}
}
};
for(const auto& popular_clicks_it : popular_clicks) {
auto coll = popular_clicks_it.first;
nlohmann::json doc;
auto counter_field = popular_clicks_it.second.counter_field;
for(const auto& popular_click : popular_clicks_it.second.docid_counts) {
doc["id"] = popular_click.first;
doc[counter_field] = popular_click.second;
send_http_response(doc.dump(), coll);
}
}
}
void AnalyticsManager::stop() {
quit = true;
cv.notify_all();
@ -544,6 +608,11 @@ std::unordered_map<std::string, QueryAnalytics*> AnalyticsManager::get_nohits_qu
return nohits_queries;
}
std::unordered_map<std::string, popular_clicks_t> AnalyticsManager::get_popular_clicks() {
std::unique_lock lk(mutex);
return popular_clicks;
}
nlohmann::json AnalyticsManager::get_click_events() {
std::unique_lock lk(mutex);
std::vector<std::string> click_event_jsons;

View File

@ -4361,6 +4361,11 @@ std::vector<field> Collection::get_fields() {
return fields;
}
bool Collection::contains_field(const std::string &field) {
std::shared_lock lock(mutex);
return search_schema.find(field) != search_schema.end();
}
std::unordered_map<std::string, field> Collection::get_dynamic_fields() {
std::shared_lock lock(mutex);
return dynamic_fields;

View File

@ -767,4 +767,178 @@ TEST_F(AnalyticsManagerTest, EventsExpiryPartial) {
ASSERT_EQ("management", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ(834, resp[0]["hits_count"]);
}
TEST_F(AnalyticsManagerTest, PopularityScore) {
//reset click event rate limit
analyticsManager.resetRateLimit();
nlohmann::json products_schema = R"({
"name": "products",
"fields": [
{"name": "title", "type": "string"},
{"name": "popularity", "type": "int32"}
]
})"_json;
Collection* products_coll = collectionManager.create_collection(products_schema).get();
nlohmann::json doc;
doc["popularity"] = 0;
doc["id"] = "0";
doc["title"] = "Cool trousers";
ASSERT_TRUE(products_coll->add(doc.dump()).ok());
doc["id"] = "1";
doc["title"] = "Funky trousers";
ASSERT_TRUE(products_coll->add(doc.dump()).ok());
doc["id"] = "2";
doc["title"] = "Casual shorts";
ASSERT_TRUE(products_coll->add(doc.dump()).ok());
doc["id"] = "3";
doc["title"] = "Trendy shorts";
ASSERT_TRUE(products_coll->add(doc.dump()).ok());
doc["id"] = "4";
doc["title"] = "Formal pants";
ASSERT_TRUE(products_coll->add(doc.dump()).ok());
nlohmann::json analytics_rule = R"({
"name": "product_popularity",
"type": "popular_clicks",
"params": {
"source": {
"collections": ["products"]
},
"destination": {
"collection": "products",
"counter_field": "popularity"
}
}
})"_json;
auto create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_TRUE(create_op.ok());
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
nlohmann::json event1 = R"({
"type": "query_click",
"data": {
"q": "trousers",
"collection": "products",
"doc_id": "1",
"position": 2,
"user_id": "13"
}
})"_json;
req->body = event1.dump();
ASSERT_TRUE(post_create_event(req, res));
nlohmann::json event2 = R"({
"type": "query_click",
"data": {
"q": "shorts",
"collection": "products",
"doc_id": "3",
"position": 4,
"user_id": "11"
}
})"_json;
req->body = event2.dump();
ASSERT_TRUE(post_create_event(req, res));
ASSERT_TRUE(post_create_event(req, res));
auto popular_clicks = analyticsManager.get_popular_clicks();
ASSERT_EQ(1, popular_clicks.size());
ASSERT_EQ("popularity", popular_clicks["products"].counter_field);
ASSERT_EQ(2, popular_clicks["products"].docid_counts.size());
ASSERT_EQ(1, popular_clicks["products"].docid_counts["1"]);
ASSERT_EQ(2, popular_clicks["products"].docid_counts["3"]);
//trigger persistance event
for(const auto& popular_clicks_it : popular_clicks) {
auto coll = popular_clicks_it.first;
nlohmann::json doc;
auto counter_field = popular_clicks_it.second.counter_field;
req->params["collection"] = "products";
req->params["action"] = "update";
for(const auto& popular_click : popular_clicks_it.second.docid_counts) {
doc["id"] = popular_click.first;
doc[counter_field] = popular_click.second;
req->body = doc.dump();
post_import_documents(req, res);
}
}
sort_fields = {sort_by("popularity", "DESC")};
auto results = products_coll->search("*", {}, "", {},
sort_fields, {0}, 10, 1, FREQUENCY,{false},
Index::DROP_TOKENS_THRESHOLD,spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>()).get();
ASSERT_EQ(5, results["hits"].size());
ASSERT_EQ("3", results["hits"][0]["document"]["id"]);
ASSERT_EQ(2, results["hits"][0]["document"]["popularity"]);
ASSERT_EQ("Trendy shorts", results["hits"][0]["document"]["title"]);
ASSERT_EQ("1", results["hits"][1]["document"]["id"]);
ASSERT_EQ(1, results["hits"][1]["document"]["popularity"]);
ASSERT_EQ("Funky trousers", results["hits"][1]["document"]["title"]);
}
TEST_F(AnalyticsManagerTest, PopularityScoreValidation) {
nlohmann::json products_schema = R"({
"name": "books",
"fields": [
{"name": "title", "type": "string"},
{"name": "popularity", "type": "int32"}
]
})"_json;
Collection* products_coll = collectionManager.create_collection(products_schema).get();
nlohmann::json analytics_rule = R"({
"name": "books_popularity",
"type": "popular_clicks",
"params": {
"source": {
"collections": ["books"]
},
"destination": {
"collection": "popular_books",
"counter_field": "popularity"
}
}
})"_json;
auto create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_FALSE(create_op.ok());
ASSERT_EQ("Collection `popular_books` not found.", create_op.error());
analytics_rule = R"({
"name": "books_popularity",
"type": "popular_clicks",
"params": {
"source": {
"collections": ["books"]
},
"destination": {
"collection": "books",
"counter_field": "popularity_score"
}
}
})"_json;
create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_FALSE(create_op.ok());
ASSERT_EQ("counter_field `popularity_score` not found in destination collection.", create_op.error());
}