From 4c32ead921b89434b5a84fcb0718dc4e1eaa9f19 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 30 May 2023 11:55:20 +0530 Subject: [PATCH] Handle aliasing for analytics query. --- include/analytics_manager.h | 4 + src/analytics_manager.cpp | 160 ++++++++++++++++++-------------- src/collection_manager.cpp | 8 +- src/core_api.cpp | 31 +++++++ test/analytics_manager_test.cpp | 91 ++++++++++++++++++ 5 files changed, 219 insertions(+), 75 deletions(-) create mode 100644 test/analytics_manager_test.cpp diff --git a/include/analytics_manager.h b/include/analytics_manager.h index 9920cfcb..353085b6 100644 --- a/include/analytics_manager.h +++ b/include/analytics_manager.h @@ -79,4 +79,8 @@ public: void stop(); void dispose(); + + void persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s); + + std::unordered_map get_popular_queries(); }; diff --git a/src/analytics_manager.cpp b/src/analytics_manager.cpp index 225101cc..26d58ac6 100644 --- a/src/analytics_manager.cpp +++ b/src/analytics_manager.cpp @@ -12,12 +12,14 @@ Option AnalyticsManager::create_rule(nlohmann::json& payload, bool write_t { "name": "top_search_queries", "type": "popular_queries", - "limit": 1000, - "source": { - "collections": ["brands", "products"] - }, - "destination": { - "collection": "top_search_queries" + "params": { + "limit": 1000, + "source": { + "collections": ["brands", "products"] + }, + "destination": { + "collection": "top_search_queries" + } } } */ @@ -207,71 +209,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) { break; } - for(const auto& suggestion_config: suggestion_configs) { - const std::string& sink_name = suggestion_config.first; - const std::string& suggestion_coll = suggestion_config.second.suggestion_collection; - - auto popular_queries_it = popular_queries.find(suggestion_coll); - if(popular_queries_it == popular_queries.end()) { - continue; - } - - // need to prepare the counts as JSON docs for import into the suggestion collection - // {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}} - - PopularQueries* popularQueries = popular_queries_it->second; - - // aggregate prefix queries to their final form - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ts_us = std::chrono::duration_cast(now).count(); - popularQueries->compact_user_queries(now_ts_us); - - auto now_ts_seconds = std::chrono::duration_cast(now).count(); - - if(now_ts_seconds - prev_persistence_s < Config::get_instance().get_analytics_flush_interval()) { - // we will persist aggregation every hour - continue; - } - - prev_persistence_s = now_ts_seconds; - - std::string import_payload; - popularQueries->serialize_as_docs(import_payload); - - if(import_payload.empty()) { - continue; - } - - // send http request - std::string leader_url = raft_server->get_leader_url(); - if(!leader_url.empty()) { - const std::string& resource_url = leader_url + "collections/" + suggestion_coll + - "/documents/import?action=emplace"; - std::string res; - std::map res_headers; - long status_code = HttpClient::post_response(resource_url, import_payload, - res, res_headers, 10*1000); - - if(status_code != 200) { - LOG(ERROR) << "Error while sending query suggestions events to leader. " - << "Status code: " << status_code << ", response: " << res; - } else { - LOG(INFO) << "Sent query suggestions to leader for aggregation."; - popularQueries->reset_local_counts(); - - if(raft_server->is_leader()) { - // try to run top-K compaction of suggestion collection - auto coll = CollectionManager::get_instance().get_collection(suggestion_coll); - if (coll == nullptr) { - LOG(ERROR) << "No collection found for suggestions aggregation: " + suggestion_coll; - continue; - } - - coll->truncate_after_top_k("count", popularQueries->get_k()); - } - } - } - } + persist_suggestions(raft_server, prev_persistence_s); lk.unlock(); } @@ -279,12 +217,88 @@ void AnalyticsManager::run(ReplicationState* raft_server) { dispose(); } +void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s) { + // lock is held by caller + for(const auto& suggestion_config: suggestion_configs) { + const std::string& sink_name = suggestion_config.first; + const std::string& suggestion_coll = suggestion_config.second.suggestion_collection; + + auto popular_queries_it = popular_queries.find(suggestion_coll); + if(popular_queries_it == popular_queries.end()) { + continue; + } + + // need to prepare the counts as JSON docs for import into the suggestion collection + // {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}} + + PopularQueries* popularQueries = popular_queries_it->second; + + // aggregate prefix queries to their final form + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ts_us = std::chrono::duration_cast(now).count(); + popularQueries->compact_user_queries(now_ts_us); + + auto now_ts_seconds = std::chrono::duration_cast(now).count(); + + if(now_ts_seconds - prev_persistence_s < Config::get_instance().get_analytics_flush_interval()) { + // we will persist aggregation every hour + continue; + } + + prev_persistence_s = now_ts_seconds; + + std::string import_payload; + popularQueries->serialize_as_docs(import_payload); + + if(import_payload.empty()) { + continue; + } + + // send http request + std::string leader_url = raft_server->get_leader_url(); + if(!leader_url.empty()) { + const std::string& base_url = leader_url + "collections/" + suggestion_coll; + std::string res; + + const std::string& update_url = base_url + "/documents/import?action=emplace"; + std::map res_headers; + long status_code = HttpClient::post_response(update_url, import_payload, + res, res_headers, 10*1000); + + if(status_code != 200) { + LOG(ERROR) << "Error while sending query suggestions events to leader. " + << "Status code: " << status_code << ", response: " << res; + } else { + LOG(INFO) << "Query aggregation for collection: " + suggestion_coll; + popularQueries->reset_local_counts(); + + if(raft_server->is_leader()) { + // try to run top-K compaction of suggestion collection + const std::string top_k_param = "count:" + std::to_string(popularQueries->get_k()); + const std::string& truncate_topk_url = base_url + "/documents?top_k_by=" + top_k_param; + res.clear(); + res_headers.clear(); + status_code = HttpClient::delete_response(truncate_topk_url, res, res_headers, 10*1000); + if(status_code != 200) { + LOG(ERROR) << "Error while running top K for query suggestions collection. " + << "Status code: " << status_code << ", response: " << res; + } else { + LOG(INFO) << "Top K aggregation for collection: " + suggestion_coll; + } + } + } + } + } +} + void AnalyticsManager::stop() { quit = true; cv.notify_all(); } void AnalyticsManager::dispose() { + std::unique_lock lk(mutex); + for(auto& kv: popular_queries) { delete kv.second; } @@ -296,3 +310,7 @@ void AnalyticsManager::init(Store* store) { this->store = store; } +std::unordered_map AnalyticsManager::get_popular_queries() { + std::unique_lock lk(mutex); + return popular_queries; +} diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index b69795d9..6c9398e3 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -734,7 +734,8 @@ Option CollectionManager::do_search(std::map& re } CollectionManager & collectionManager = CollectionManager::get_instance(); - auto collection = collectionManager.get_collection(req_params["collection"]); + const std::string& orig_coll_name = req_params["collection"]; + auto collection = collectionManager.get_collection(orig_coll_name); if(collection == nullptr) { return Option(404, "Not found."); @@ -1064,9 +1065,8 @@ Option CollectionManager::do_search(std::map& re if(Config::get_instance().get_enable_search_analytics()) { if(result.count("found") != 0 && result["found"].get() != 0) { - std::string processed_query = raw_query; - Tokenizer::normalize_ascii(processed_query); - AnalyticsManager::get_instance().add_suggestion(collection->get_name(), processed_query, + std::string analytics_query = raw_query; + AnalyticsManager::get_instance().add_suggestion(orig_coll_name, analytics_query, true, req_params["x-typesense-user-id"]); } } diff --git a/src/core_api.cpp b/src/core_api.cpp index 0973c6d9..61fd894d 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -1114,6 +1114,37 @@ bool del_remove_documents(const std::shared_ptr& req, const std::share const char *BATCH_SIZE = "batch_size"; const char *FILTER_BY = "filter_by"; + const char *TOP_K_BY = "top_k_by"; + + if(req->params.count(TOP_K_BY) != 0) { + std::vector parts; + StringUtils::split(req->params[TOP_K_BY], parts, ":"); + + if(parts.size() != 2 || !StringUtils::is_uint32_t(parts[1])) { + req->last_chunk_aggregate = true; + res->final = true; + res->set_400("The `top_k_by` parameter is not valid."); + stream_response(req, res); + return false; + } + + const std::string& field_name = parts[0]; + const size_t k = std::stoull(parts[1]); + auto op = collection->truncate_after_top_k(field_name, k); + + req->last_chunk_aggregate = true; + res->final = true; + + if(!op.ok()) { + res->set_500(op.error()); + stream_response(req, res); + return false; + } + + res->set_200(R"({"ok": true})"); + stream_response(req, res); + return true; + } if(req->params.count(BATCH_SIZE) == 0) { req->params[BATCH_SIZE] = "1000000000"; // 1 Billion diff --git a/test/analytics_manager_test.cpp b/test/analytics_manager_test.cpp new file mode 100644 index 00000000..0030f221 --- /dev/null +++ b/test/analytics_manager_test.cpp @@ -0,0 +1,91 @@ +#include +#include +#include +#include +#include +#include "collection.h" + +class AnalyticsManagerTest : public ::testing::Test { +protected: + Store *store; + CollectionManager& collectionManager = CollectionManager::get_instance(); + std::atomic quit = false; + + std::vector query_fields; + std::vector sort_fields; + + AnalyticsManager& analyticsManager = AnalyticsManager::get_instance(); + + void setupCollection() { + std::string state_dir_path = "/tmp/typesense_test/analytics_manager_test"; + LOG(INFO) << "Truncating and creating: " << state_dir_path; + system(("rm -rf "+state_dir_path+" && mkdir -p "+state_dir_path).c_str()); + system("mkdir -p /tmp/typesense_test/models"); + + store = new Store(state_dir_path); + collectionManager.init(store, 1.0, "auth_key", quit); + collectionManager.load(8, 1000); + + analyticsManager.init(store); + } + + virtual void SetUp() { + setupCollection(); + } + + virtual void TearDown() { + collectionManager.dispose(); + delete store; + } +}; + +TEST_F(AnalyticsManagerTest, AddSuggestion) { + nlohmann::json titles_schema = R"({ + "name": "titles", + "fields": [ + {"name": "title", "type": "string"} + ] + })"_json; + + Collection* titles_coll = collectionManager.create_collection(titles_schema).get(); + + nlohmann::json doc; + doc["title"] = "Cool trousers"; + ASSERT_TRUE(titles_coll->add(doc.dump()).ok()); + + // create a collection to store suggestions + nlohmann::json suggestions_schema = R"({ + "name": "top_queries", + "fields": [ + {"name": "q", "type": "string" }, + {"name": "count", "type": "int32" } + ] + })"_json; + + Collection* suggestions_coll = collectionManager.create_collection(suggestions_schema).get(); + + nlohmann::json analytics_rule = R"({ + "name": "top_search_queries", + "type": "popular_queries", + "params": { + "limit": 100, + "source": { + "collections": ["titles"] + }, + "destination": { + "collection": "top_queries" + } + } + })"_json; + + auto create_op = analyticsManager.create_rule(analytics_rule); + ASSERT_TRUE(create_op.ok()); + + std::string q = "foobar"; + analyticsManager.add_suggestion("titles", q, true, "1"); + + auto popularQueries = analyticsManager.get_popular_queries(); + auto userQueries = popularQueries["top_queries"]->get_user_prefix_queries()["1"]; + ASSERT_EQ(1, userQueries.size()); + ASSERT_EQ("foobar", userQueries[0].query); +}