diff --git a/include/analytics_manager.h b/include/analytics_manager.h index 9d9e9fd3..281e03c3 100644 --- a/include/analytics_manager.h +++ b/include/analytics_manager.h @@ -46,6 +46,11 @@ struct click_event_t { } }; +struct popular_clicks_t { + std::string counter_field; + std::map docid_counts; +}; + struct query_hits_count_t { std::string query; uint64_t timestamp; @@ -137,6 +142,9 @@ private: // suggestion collection => nohits queries std::unordered_map nohits_queries; + // collection => popular clicks + std::unordered_map popular_clicks; + //query collection => click events std::unordered_map> query_collection_click_events; @@ -163,6 +171,7 @@ public: static constexpr const char* QUERY_HITS_COUNT = "$QH"; static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries"; static constexpr const char* NOHITS_QUERIES_TYPE = "nohits_queries"; + static constexpr const char* POPULAR_CLICKS_TYPE = "popular_clicks"; static AnalyticsManager& get_instance() { static AnalyticsManager instance; @@ -200,8 +209,12 @@ public: void persist_query_hits_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s); + void persist_popular_clicks(ReplicationState *raft_server, uint64_t prev_persistence_s); + nlohmann::json get_click_events(); + std::unordered_map get_popular_clicks(); + Option write_events_to_store(nlohmann::json& event_jsons); void add_nohits_query(const std::string& query_collection, diff --git a/include/collection.h b/include/collection.h index bc4b87ad..64f6dd30 100644 --- a/include/collection.h +++ b/include/collection.h @@ -399,6 +399,8 @@ public: std::vector get_fields(); + bool contains_field(const std::string&); + std::unordered_map get_dynamic_fields(); tsl::htrie_map get_schema(); diff --git a/src/analytics_manager.cpp b/src/analytics_manager.cpp index 74c3b1df..ad6003ac 100644 --- a/src/analytics_manager.cpp +++ b/src/analytics_manager.cpp @@ -43,7 +43,8 @@ Option AnalyticsManager::create_rule(nlohmann::json& payload, bool upsert, return Option(400, "Bad or missing params."); } - if(payload["type"] == POPULAR_QUERIES_TYPE || payload["type"] == NOHITS_QUERIES_TYPE) { + if(payload["type"] == POPULAR_QUERIES_TYPE || payload["type"] == NOHITS_QUERIES_TYPE + || payload["type"] == POPULAR_CLICKS_TYPE) { return create_queries_index(payload, upsert, write_to_disk); } @@ -84,6 +85,15 @@ Option AnalyticsManager::create_queries_index(nlohmann::json &payload, boo return Option(400, "Must contain a valid destination collection."); } + std::string counter_field; + + if(params["destination"].contains("counter_field")) { + if (!params["destination"]["counter_field"].is_string()) { + return Option(400, "Must contain a valid counter_field."); + } + counter_field = params["destination"]["counter_field"].get(); + } + const std::string& suggestion_collection = params["destination"]["collection"].get(); suggestion_config_t suggestion_config; suggestion_config.name = suggestion_config_name; @@ -98,6 +108,19 @@ Option AnalyticsManager::create_queries_index(nlohmann::json &payload, boo if (!upsert && nohits_queries.count(suggestion_collection) != 0) { return Option(400, "There's already another configuration for this destination collection."); } + } else if(payload["type"] == POPULAR_CLICKS_TYPE) { + if (!upsert && popular_clicks.count(suggestion_collection) != 0) { + return Option(400, "There's already another configuration for this destination collection."); + } + + auto coll = CollectionManager::get_instance().get_collection(suggestion_collection).get(); + if(coll != nullptr) { + if (!coll->contains_field(counter_field)) { + return Option(404, "counter_field `" + counter_field + "` not found in destination collection."); + } + } else { + return Option(404, "Collection `" + suggestion_collection + "` not found."); + } } for(const auto& coll: params["source"]["collections"]) { @@ -131,6 +154,8 @@ Option AnalyticsManager::create_queries_index(nlohmann::json &payload, boo } else if(payload["type"] == NOHITS_QUERIES_TYPE) { QueryAnalytics *noresultsQueries = new QueryAnalytics(limit); nohits_queries.emplace(suggestion_collection, noresultsQueries); + } else if(payload["type"] == POPULAR_CLICKS_TYPE) { + popular_clicks.emplace(suggestion_collection, popular_clicks_t{counter_field, {}}); } if(write_to_disk) { @@ -278,6 +303,13 @@ Option AnalyticsManager::add_click_event(const std::string &query_collecti click_event_t click_event(query, now_ts_useconds, user_id, doc_id, position); click_events_vec.emplace_back(click_event); + auto popular_clicks_it = popular_clicks.find(query_collection); + if(popular_clicks_it != popular_clicks.end()) { + popular_clicks_it->second.docid_counts[doc_id]++; + } else { + LOG(ERROR) << "collection " << query_collection << " not found in analytics rule."; + } + return Option(true); } @@ -346,6 +378,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) { checkEventsExpiry(); persist_query_events(raft_server, prev_persistence_s); persist_query_hits_click_events(raft_server, prev_persistence_s); + persist_popular_clicks(raft_server, prev_persistence_s); prev_persistence_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -508,6 +541,37 @@ void AnalyticsManager::persist_query_hits_click_events(ReplicationState *raft_se } } +void AnalyticsManager::persist_popular_clicks(ReplicationState *raft_server, uint64_t prev_persistence_s) { + auto send_http_response = [&](const std::string& import_payload, const std::string& collection) { + std::string leader_url = raft_server->get_leader_url(); + if (!leader_url.empty()) { + const std::string &base_url = leader_url + "collections/" + collection; + std::string res; + + const std::string &update_url = base_url + "/documents/import?action=update"; + std::map res_headers; + long status_code = HttpClient::post_response(update_url, import_payload, + res, res_headers, {}, 10 * 1000, true); + + if (status_code != 200) { + LOG(ERROR) << "Error while sending popular_clicks events to leader. " + << "Status code: " << status_code << ", response: " << res; + } + } + }; + + for(const auto& popular_clicks_it : popular_clicks) { + auto coll = popular_clicks_it.first; + nlohmann::json doc; + auto counter_field = popular_clicks_it.second.counter_field; + for(const auto& popular_click : popular_clicks_it.second.docid_counts) { + doc["id"] = popular_click.first; + doc[counter_field] = popular_click.second; + send_http_response(doc.dump(), coll); + } + } +} + void AnalyticsManager::stop() { quit = true; cv.notify_all(); @@ -544,6 +608,11 @@ std::unordered_map AnalyticsManager::get_nohits_qu return nohits_queries; } +std::unordered_map AnalyticsManager::get_popular_clicks() { + std::unique_lock lk(mutex); + return popular_clicks; +} + nlohmann::json AnalyticsManager::get_click_events() { std::unique_lock lk(mutex); std::vector click_event_jsons; diff --git a/src/collection.cpp b/src/collection.cpp index f3ba4796..8d3fd554 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -4361,6 +4361,11 @@ std::vector Collection::get_fields() { return fields; } +bool Collection::contains_field(const std::string &field) { + std::shared_lock lock(mutex); + return search_schema.find(field) != search_schema.end(); +} + std::unordered_map Collection::get_dynamic_fields() { std::shared_lock lock(mutex); return dynamic_fields; diff --git a/test/analytics_manager_test.cpp b/test/analytics_manager_test.cpp index 641da345..750cbbf8 100644 --- a/test/analytics_manager_test.cpp +++ b/test/analytics_manager_test.cpp @@ -767,4 +767,178 @@ TEST_F(AnalyticsManagerTest, EventsExpiryPartial) { ASSERT_EQ("management", resp[0]["q"]); ASSERT_EQ(13, resp[0]["user_id"]); ASSERT_EQ(834, resp[0]["hits_count"]); +} + +TEST_F(AnalyticsManagerTest, PopularityScore) { + //reset click event rate limit + analyticsManager.resetRateLimit(); + + nlohmann::json products_schema = R"({ + "name": "products", + "fields": [ + {"name": "title", "type": "string"}, + {"name": "popularity", "type": "int32"} + ] + })"_json; + + Collection* products_coll = collectionManager.create_collection(products_schema).get(); + + nlohmann::json doc; + doc["popularity"] = 0; + + doc["id"] = "0"; + doc["title"] = "Cool trousers"; + ASSERT_TRUE(products_coll->add(doc.dump()).ok()); + + doc["id"] = "1"; + doc["title"] = "Funky trousers"; + ASSERT_TRUE(products_coll->add(doc.dump()).ok()); + + doc["id"] = "2"; + doc["title"] = "Casual shorts"; + ASSERT_TRUE(products_coll->add(doc.dump()).ok()); + + doc["id"] = "3"; + doc["title"] = "Trendy shorts"; + ASSERT_TRUE(products_coll->add(doc.dump()).ok()); + + doc["id"] = "4"; + doc["title"] = "Formal pants"; + ASSERT_TRUE(products_coll->add(doc.dump()).ok()); + + nlohmann::json analytics_rule = R"({ + "name": "product_popularity", + "type": "popular_clicks", + "params": { + "source": { + "collections": ["products"] + }, + "destination": { + "collection": "products", + "counter_field": "popularity" + } + } + })"_json; + + auto create_op = analyticsManager.create_rule(analytics_rule, false, true); + ASSERT_TRUE(create_op.ok()); + + std::shared_ptr req = std::make_shared(); + std::shared_ptr res = std::make_shared(nullptr); + + nlohmann::json event1 = R"({ + "type": "query_click", + "data": { + "q": "trousers", + "collection": "products", + "doc_id": "1", + "position": 2, + "user_id": "13" + } + })"_json; + + req->body = event1.dump(); + ASSERT_TRUE(post_create_event(req, res)); + + nlohmann::json event2 = R"({ + "type": "query_click", + "data": { + "q": "shorts", + "collection": "products", + "doc_id": "3", + "position": 4, + "user_id": "11" + } + })"_json; + + req->body = event2.dump(); + ASSERT_TRUE(post_create_event(req, res)); + + ASSERT_TRUE(post_create_event(req, res)); + + auto popular_clicks = analyticsManager.get_popular_clicks(); + ASSERT_EQ(1, popular_clicks.size()); + ASSERT_EQ("popularity", popular_clicks["products"].counter_field); + ASSERT_EQ(2, popular_clicks["products"].docid_counts.size()); + ASSERT_EQ(1, popular_clicks["products"].docid_counts["1"]); + ASSERT_EQ(2, popular_clicks["products"].docid_counts["3"]); + + //trigger persistance event + for(const auto& popular_clicks_it : popular_clicks) { + auto coll = popular_clicks_it.first; + nlohmann::json doc; + auto counter_field = popular_clicks_it.second.counter_field; + req->params["collection"] = "products"; + req->params["action"] = "update"; + for(const auto& popular_click : popular_clicks_it.second.docid_counts) { + doc["id"] = popular_click.first; + doc[counter_field] = popular_click.second; + req->body = doc.dump(); + post_import_documents(req, res); + } + } + + sort_fields = {sort_by("popularity", "DESC")}; + auto results = products_coll->search("*", {}, "", {}, + sort_fields, {0}, 10, 1, FREQUENCY,{false}, + Index::DROP_TOKENS_THRESHOLD,spp::sparse_hash_set(), + spp::sparse_hash_set()).get(); + + ASSERT_EQ(5, results["hits"].size()); + + ASSERT_EQ("3", results["hits"][0]["document"]["id"]); + ASSERT_EQ(2, results["hits"][0]["document"]["popularity"]); + ASSERT_EQ("Trendy shorts", results["hits"][0]["document"]["title"]); + + ASSERT_EQ("1", results["hits"][1]["document"]["id"]); + ASSERT_EQ(1, results["hits"][1]["document"]["popularity"]); + ASSERT_EQ("Funky trousers", results["hits"][1]["document"]["title"]); +} + +TEST_F(AnalyticsManagerTest, PopularityScoreValidation) { + nlohmann::json products_schema = R"({ + "name": "books", + "fields": [ + {"name": "title", "type": "string"}, + {"name": "popularity", "type": "int32"} + ] + })"_json; + + Collection* products_coll = collectionManager.create_collection(products_schema).get(); + + nlohmann::json analytics_rule = R"({ + "name": "books_popularity", + "type": "popular_clicks", + "params": { + "source": { + "collections": ["books"] + }, + "destination": { + "collection": "popular_books", + "counter_field": "popularity" + } + } + })"_json; + + auto create_op = analyticsManager.create_rule(analytics_rule, false, true); + ASSERT_FALSE(create_op.ok()); + ASSERT_EQ("Collection `popular_books` not found.", create_op.error()); + + analytics_rule = R"({ + "name": "books_popularity", + "type": "popular_clicks", + "params": { + "source": { + "collections": ["books"] + }, + "destination": { + "collection": "books", + "counter_field": "popularity_score" + } + } + })"_json; + + create_op = analyticsManager.create_rule(analytics_rule, false, true); + ASSERT_FALSE(create_op.ok()); + ASSERT_EQ("counter_field `popularity_score` not found in destination collection.", create_op.error()); } \ No newline at end of file