From a6b404e7ffac166bc068c2099115ec29a79ba92b Mon Sep 17 00:00:00 2001 From: krunal Date: Fri, 24 Nov 2023 15:27:15 +0530 Subject: [PATCH] query hits count analytics --- include/analytics_manager.h | 66 ++++++++++++++++++++-- include/core_api.h | 4 ++ src/analytics_manager.cpp | 99 ++++++++++++++++++++++++++++++++- src/collection_manager.cpp | 3 + src/core_api.cpp | 30 +++++++++- src/main/typesense_server.cpp | 2 + test/analytics_manager_test.cpp | 55 ++++++++++++++++++ 7 files changed, 251 insertions(+), 8 deletions(-) diff --git a/include/analytics_manager.h b/include/analytics_manager.h index 02a94bdd..bd9b9b74 100644 --- a/include/analytics_manager.h +++ b/include/analytics_manager.h @@ -7,18 +7,18 @@ #include #include -struct ClickEvent { +struct click_event_t { std::string query; uint64_t timestamp; std::string user_id; std::string doc_id; uint64_t position; - ClickEvent() = delete; + click_event_t() = delete; - ~ClickEvent() = default; + ~click_event_t() = default; - ClickEvent(std::string q, uint64_t ts, std::string uid, std::string id, uint64_t pos) { + click_event_t(std::string q, uint64_t ts, std::string uid, std::string id, uint64_t pos) { query = q; timestamp = ts; user_id = uid; @@ -26,7 +26,7 @@ struct ClickEvent { position = pos; } - ClickEvent& operator=(ClickEvent& other) { + click_event_t& operator=(click_event_t& other) { if (this != &other) { query = other.query; timestamp = other.timestamp; @@ -46,6 +46,47 @@ struct ClickEvent { } }; +struct query_hits_count_t { + std::string query; + uint64_t timestamp; + std::string user_id; + uint64_t hits_count; + + query_hits_count_t() = delete; + + ~query_hits_count_t() = default; + + query_hits_count_t(std::string q, uint64_t ts, std::string uid, uint64_t count) { + query = q; + timestamp = ts; + user_id = uid; + hits_count = count; + } + + query_hits_count_t &operator=(query_hits_count_t &other) { + if (this != &other) { + query = other.query; + timestamp = other.timestamp; + user_id = other.user_id; + hits_count = other.hits_count; + return *this; + } + } + + void to_json(nlohmann::json &obj) const { + obj["query"] = query; + obj["timestamp"] = timestamp; + obj["user_id"] = user_id; + obj["hits_count"] = hits_count; + } +}; + +struct query_hits_count_comp { + bool operator()(const query_hits_count_t& a, const query_hits_count_t& b) const { + return a.query < b.query; + } +}; + struct event_cache_t { uint64_t last_update_time; uint64_t count; @@ -97,7 +138,10 @@ private: std::unordered_map nohits_queries; //query collection => click events - std::unordered_map> query_collection_click_events; + std::unordered_map> query_collection_click_events; + + //query collection => query hits count + std::unordered_map> query_collection_hits_count; Store* store = nullptr; Store* analytics_store = nullptr; @@ -116,6 +160,7 @@ public: static constexpr const char* ANALYTICS_RULE_PREFIX = "$AR"; static constexpr const char* CLICK_EVENT = "$CE"; + static constexpr const char* QUERY_HITS_COUNT = "$QH"; static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries"; static constexpr const char* NOHITS_QUERIES_TYPE = "nohits_queries"; @@ -165,4 +210,13 @@ public: std::unordered_map get_nohits_queries(); void resetRateLimit(); + + void add_query_hits_count(const std::string& query_collection, const std::string& query, const std::string& user_id, + uint64_t hits_count); + + void persist_query_hits_counts(ReplicationState *raft_server, uint64_t prev_persistence_s); + + nlohmann::json get_query_hits_counts(); + + Option write_query_hits_counts_to_store(nlohmann::json& query_hits_counts_json); }; diff --git a/include/core_api.h b/include/core_api.h index 00f633ba..55186153 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -173,6 +173,10 @@ bool get_click_events(const std::shared_ptr& req, const std::shared_pt bool post_replicate_click_event(const std::shared_ptr& req, const std::shared_ptr& res); +bool get_query_hits_counts(const std::shared_ptr& req, const std::shared_ptr& res); + +bool post_replicate_query_hits_counts(const std::shared_ptr& req, const std::shared_ptr& res); + // Misc helpers void get_collections_for_auth(std::map& req_params, const std::string& body, diff --git a/src/analytics_manager.cpp b/src/analytics_manager.cpp index 3b88e1be..351a89ca 100644 --- a/src/analytics_manager.cpp +++ b/src/analytics_manager.cpp @@ -273,7 +273,7 @@ Option AnalyticsManager::add_click_event(const std::string &query_collecti auto now_ts_useconds = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); - ClickEvent click_event(query, now_ts_useconds, user_id, doc_id, position); + click_event_t click_event(query, now_ts_useconds, user_id, doc_id, position); click_events_vec.emplace_back(click_event); return Option(true); @@ -298,6 +298,25 @@ void AnalyticsManager::add_nohits_query(const std::string &query_collection, con } } +void AnalyticsManager::add_query_hits_count(const std::string &query_collection, const std::string &query, + const std::string &user_id, uint64_t hits_count) { + std::unique_lock lock(mutex); + if(analytics_store) { + auto now_ts_useconds = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + + auto &query_hits_count_set = query_collection_hits_count[query_collection]; + query_hits_count_t queryHitsCount(query, now_ts_useconds, user_id, hits_count); + auto query_hits_count_set_it = query_hits_count_set.find(queryHitsCount); + + if(query_hits_count_set_it != query_hits_count_set.end()) { + query_hits_count_set.erase(query_hits_count_set_it); + } + + query_hits_count_set.emplace(queryHitsCount); + } +} + void AnalyticsManager::run(ReplicationState* raft_server) { uint64_t prev_persistence_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -324,6 +343,8 @@ void AnalyticsManager::run(ReplicationState* raft_server) { persist_query_events(raft_server, prev_persistence_s); persist_click_events(raft_server, prev_persistence_s); + persist_query_hits_counts(raft_server, prev_persistence_s); + prev_persistence_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -458,6 +479,46 @@ void AnalyticsManager::persist_click_events(ReplicationState *raft_server, uint6 } } +void AnalyticsManager::persist_query_hits_counts(ReplicationState *raft_server, uint64_t prev_persistence_s) { + nlohmann::json payload_json = nlohmann::json::array(); + + for (const auto &query_collection_hits_count_it: query_collection_hits_count) { + auto collection_id = CollectionManager::get_instance().get_collection( + query_collection_hits_count_it.first)->get_collection_id(); + for (const auto &query_hits_count: query_collection_hits_count_it.second) { + // send http request + nlohmann::json query_hits_count_json; + query_hits_count.to_json(query_hits_count_json); + query_hits_count_json["collection_id"] = std::to_string(collection_id); + payload_json.push_back(query_hits_count_json); + } + } + + if (payload_json.empty()) { + return; + } + + const std::string import_payload = payload_json.dump(); + + std::string leader_url = raft_server->get_leader_url(); + if (!leader_url.empty()) { + const std::string &base_url = leader_url + "analytics"; + std::string res; + + const std::string &update_url = base_url + "/query_hits_counts/replicate"; + std::map res_headers; + long status_code = HttpClient::post_response(update_url, import_payload, + res, res_headers, {}, 10 * 1000, true); + + if (status_code != 200) { + LOG(ERROR) << "Error while sending click events to leader. " + << "Status code: " << status_code << ", response: " << res; + } else { + query_collection_hits_count.clear(); + } + } +} + void AnalyticsManager::stop() { quit = true; cv.notify_all(); @@ -512,6 +573,24 @@ nlohmann::json AnalyticsManager::get_click_events() { return result_json; } +nlohmann::json AnalyticsManager::get_query_hits_counts() { + std::unique_lock lk(mutex); + std::vector query_hits_counts_jsons; + nlohmann::json result_json = nlohmann::json::array(); + + if (analytics_store) { + analytics_store->scan_fill(std::string(QUERY_HITS_COUNT) + "_", std::string(QUERY_HITS_COUNT) + "`", + query_hits_counts_jsons); + + for (const auto &query_hits_count_json: query_hits_counts_jsons) { + nlohmann::json query_hits_count = nlohmann::json::parse(query_hits_count_json); + result_json.push_back(query_hits_count); + } + } + + return result_json; +} + Option AnalyticsManager::write_click_event_to_store(nlohmann::json &click_event_jsons) { for(const auto& click_event_json : click_event_jsons) { auto collection_id = click_event_json["collection_id"].get(); @@ -530,6 +609,24 @@ Option AnalyticsManager::write_click_event_to_store(nlohmann::json &click_ return Option(true); } +Option AnalyticsManager::write_query_hits_counts_to_store(nlohmann::json &query_hits_counts_json) { + for(const auto& query_hits_count : query_hits_counts_json) { + auto collection_id = query_hits_count["collection_id"].get(); + auto timestamp = query_hits_count["timestamp"].get(); + const std::string key = std::string(QUERY_HITS_COUNT) + "_" + collection_id + "_" + + std::to_string(timestamp); + if(analytics_store) { + bool inserted = analytics_store->insert(key, query_hits_count.dump()); + if (!inserted) { + return Option(500, "Unable to insert query hits count into store."); + } + } else { + return Option(500, "Analytics DB not initialized."); + } + } + return Option(true); +} + void AnalyticsManager::resetRateLimit() { events_cache.clear(); } \ No newline at end of file diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index ca6fbeb0..0fcb0f52 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -1534,6 +1534,9 @@ Option CollectionManager::do_search(std::map& re std::string analytics_query = Tokenizer::normalize_ascii_no_spaces(raw_query); AnalyticsManager::get_instance().add_suggestion(orig_coll_name, analytics_query, true, req_params["x-typesense-user-id"]); + AnalyticsManager::get_instance().add_query_hits_count(orig_coll_name, analytics_query, + req_params["x-typesense-user-id"], + result["found"].get()); } else if(result.contains("found") == 0 && result["found"].get() == 0) { std::string analytics_query = Tokenizer::normalize_ascii_no_spaces(raw_query); AnalyticsManager::get_instance().add_nohits_query(orig_coll_name, analytics_query, diff --git a/src/core_api.cpp b/src/core_api.cpp index e3afb1d2..8ca74f10 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -2823,6 +2823,34 @@ bool post_replicate_click_event(const std::shared_ptr& req, const std: return false; } - res->set_200("ClickEvent wrote to DB."); + res->set_200("click_event_t wrote to DB."); + return true; +} + +bool get_query_hits_counts(const std::shared_ptr& req, const std::shared_ptr& res) { + auto query_hits_counts = AnalyticsManager::get_instance().get_query_hits_counts(); + + res->set_200(query_hits_counts.dump()); + return true; +} + +bool post_replicate_query_hits_counts(const std::shared_ptr& req, const std::shared_ptr& res) { + nlohmann::json req_json; + + try { + req_json = nlohmann::json::parse(req->body); + } catch(const std::exception& e) { + LOG(ERROR) << "JSON error: " << e.what(); + res->set_400("Bad JSON."); + return false; + } + + auto op = AnalyticsManager::get_instance().write_query_hits_counts_to_store(req_json); + if(!op.ok()) { + res->set_body(op.code(), op.error()); + return false; + } + + res->set_200("query hits counts wrote to DB."); return true; } \ No newline at end of file diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index f9a1c3af..fc475d71 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -85,6 +85,8 @@ void master_server_routes() { server->get("/analytics/click_events", get_click_events); server->post("/analytics/click_events", post_create_event); server->post("/analytics/click_events/replicate", post_replicate_click_event); + server->get("/analytics/query_hits_counts", get_query_hits_counts); + server->post("/analytics/query_hits_counts/replicate", post_replicate_query_hits_counts); // meta server->get("/metrics.json", get_metrics_json); diff --git a/test/analytics_manager_test.cpp b/test/analytics_manager_test.cpp index 4eea0a4f..e0181d7f 100644 --- a/test/analytics_manager_test.cpp +++ b/test/analytics_manager_test.cpp @@ -440,4 +440,59 @@ TEST_F(AnalyticsManagerTest, NoresultsQueries) { noresults_queries = analyticsManager.get_nohits_queries(); ASSERT_EQ(0, noresults_queries.size()); +} + +TEST_F(AnalyticsManagerTest, QueryHitsCount) { + nlohmann::json titles_schema = R"({ + "name": "titles", + "fields": [ + {"name": "title", "type": "string"} + ] + })"_json; + + Collection *titles_coll = collectionManager.create_collection(titles_schema).get(); + + nlohmann::json doc; + doc["title"] = "Cool trousers"; + ASSERT_TRUE(titles_coll->add(doc.dump()).ok()); + + doc["title"] = "Cool pants"; + ASSERT_TRUE(titles_coll->add(doc.dump()).ok()); + + doc["title"] = "Trendy sneakers"; + ASSERT_TRUE(titles_coll->add(doc.dump()).ok()); + + doc["title"] = "Funky shorts"; + ASSERT_TRUE(titles_coll->add(doc.dump()).ok()); + + nlohmann::json query_hits_array = nlohmann::json::array(); + nlohmann::json obj; + + obj["collection_id"] = "0"; + obj["query"] = "cool"; + obj["timestamp"] = 1625365612; + obj["user_id"] = "1"; + obj["hits_count"] = 2; + query_hits_array.push_back(obj); + + obj["query"] = "funky"; + obj["timestamp"] = 1625365616; + obj["user_id"] = "1"; + obj["hits_count"] = 1; + query_hits_array.push_back(obj); + + + auto op = analyticsManager.write_query_hits_counts_to_store(query_hits_array); + ASSERT_TRUE(op.ok()); + + auto result = analyticsManager.get_query_hits_counts(); + + ASSERT_EQ(2, result.size()); + ASSERT_EQ("cool", result[0]["query"]); + ASSERT_EQ(2, result[0]["hits_count"]); + ASSERT_EQ(1625365612, result[0]["timestamp"]); + + ASSERT_EQ("funky", result[1]["query"]); + ASSERT_EQ(1, result[1]["hits_count"]); + ASSERT_EQ(1625365616, result[1]["timestamp"]); } \ No newline at end of file