From 6335f7b87fd05c438d2df41e2147359d3b185824 Mon Sep 17 00:00:00 2001 From: krunal Date: Wed, 22 Nov 2023 15:06:34 +0530 Subject: [PATCH] refactor anlaytics queries --- include/analytics_manager.h | 15 +- include/noresults_queries.h | 51 ------ .../{popular_queries.h => query_analytics.h} | 4 +- src/analytics_manager.cpp | 150 ++++++------------ src/noresults_queries.cpp | 117 -------------- ...opular_queries.cpp => query_analytics.cpp} | 18 +-- test/popular_queries_test.cpp | 16 +- 7 files changed, 76 insertions(+), 295 deletions(-) delete mode 100644 include/noresults_queries.h rename include/{popular_queries.h => query_analytics.h} (95%) delete mode 100644 src/noresults_queries.cpp rename src/{popular_queries.cpp => query_analytics.cpp} (84%) diff --git a/include/analytics_manager.h b/include/analytics_manager.h index 2aaad8f2..47688780 100644 --- a/include/analytics_manager.h +++ b/include/analytics_manager.h @@ -1,12 +1,11 @@ #pragma once -#include "popular_queries.h" +#include "query_analytics.h" #include "option.h" #include "raft_server.h" #include #include #include #include -#include "noresults_queries.h" struct ClickEvent { std::string query; @@ -92,10 +91,10 @@ private: std::unordered_map> query_collection_mapping; // suggestion collection => popular queries - std::unordered_map popular_queries; + std::unordered_map popular_queries; // suggestion collection => noresults queries - std::unordered_map noresults_queries; + std::unordered_map noresults_queries; //query collection => click events std::unordered_map> query_collection_click_events; @@ -147,9 +146,9 @@ public: void dispose(); - void persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s); + void persist_query_events(ReplicationState *raft_server, uint64_t prev_persistence_s); - std::unordered_map get_popular_queries(); + std::unordered_map get_popular_queries(); Option add_click_event(const std::string& query_collection, const std::string& query, const std::string& user_id, std::string doc_id, uint64_t position, const std::string& client_ip); @@ -163,9 +162,7 @@ public: void add_noresults_query(const std::string& query_collection, const std::string& query, bool live_query, const std::string& user_id); - void persist_noresults_queries(ReplicationState *raft_server, uint64_t prev_persistence_s); - - std::unordered_map get_noresults_queries(); + std::unordered_map get_noresults_queries(); void resetRateLimit(); }; diff --git a/include/noresults_queries.h b/include/noresults_queries.h deleted file mode 100644 index d0ed902d..00000000 --- a/include/noresults_queries.h +++ /dev/null @@ -1,51 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -class NoresultsQueries { -public: - struct QWithTimestamp { - std::string query; - uint64_t timestamp; - - QWithTimestamp(const std::string& query, uint64_t timestamp) : query(query), timestamp(timestamp) {} - }; - - static const size_t QUERY_FINALIZATION_INTERVAL_MICROS = 4 * 1000 * 1000; - -private: - - size_t k; - const size_t max_size; - - // counts aggregated within the current node - tsl::htrie_map local_counts; - std::shared_mutex lmutex; - - std::unordered_map> user_prefix_queries; - std::shared_mutex umutex; - -public: - - NoresultsQueries(size_t k); - - void add(const std::string& value, const bool live_query, const std::string& user_id, - uint64_t now_ts_us = 0); - - void compact_user_queries(uint64_t now_ts_us); - - void serialize_as_docs(std::string& docs); - - void reset_local_counts(); - - size_t get_k(); - - std::unordered_map> get_user_prefix_queries(); - - tsl::htrie_map get_local_counts(); -}; diff --git a/include/popular_queries.h b/include/query_analytics.h similarity index 95% rename from include/popular_queries.h rename to include/query_analytics.h index 95493f27..c6aca8c3 100644 --- a/include/popular_queries.h +++ b/include/query_analytics.h @@ -7,7 +7,7 @@ #include #include -class PopularQueries { +class QueryAnalytics { public: struct QWithTimestamp { std::string query; @@ -32,7 +32,7 @@ private: public: - PopularQueries(size_t k); + QueryAnalytics(size_t k); void add(const std::string& value, const bool live_query, const std::string& user_id, uint64_t now_ts_us = 0); diff --git a/src/analytics_manager.cpp b/src/analytics_manager.cpp index 804e7c79..c88f8cb3 100644 --- a/src/analytics_manager.cpp +++ b/src/analytics_manager.cpp @@ -124,10 +124,10 @@ Option AnalyticsManager::create_queries_index(nlohmann::json &payload, boo } if(payload["type"] == POPULAR_QUERIES_TYPE) { - PopularQueries *popularQueries = new PopularQueries(limit); + QueryAnalytics *popularQueries = new QueryAnalytics(limit); popular_queries.emplace(suggestion_collection, popularQueries); } else if(payload["type"] == NORESULTS_QUERIES_TYPE) { - NoresultsQueries *noresultsQueries = new NoresultsQueries(limit); + QueryAnalytics *noresultsQueries = new QueryAnalytics(limit); noresults_queries.emplace(suggestion_collection, noresultsQueries); } @@ -322,9 +322,8 @@ void AnalyticsManager::run(ReplicationState* raft_server) { continue; } - persist_suggestions(raft_server, prev_persistence_s); + persist_query_events(raft_server, prev_persistence_s); persist_click_events(raft_server, prev_persistence_s); - persist_noresults_queries(raft_server, prev_persistence_s); prev_persistence_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -334,34 +333,11 @@ void AnalyticsManager::run(ReplicationState* raft_server) { dispose(); } -void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s) { +void AnalyticsManager::persist_query_events(ReplicationState *raft_server, uint64_t prev_persistence_s) { // lock is held by caller - for(const auto& suggestion_config: suggestion_configs) { - const std::string& sink_name = suggestion_config.first; - const std::string& suggestion_coll = suggestion_config.second.suggestion_collection; - - auto popular_queries_it = popular_queries.find(suggestion_coll); - if(popular_queries_it == popular_queries.end()) { - continue; - } - - // need to prepare the counts as JSON docs for import into the suggestion collection - // {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}} - - PopularQueries* popularQueries = popular_queries_it->second; - - // aggregate prefix queries to their final form - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ts_us = std::chrono::duration_cast(now).count(); - popularQueries->compact_user_queries(now_ts_us); - - std::string import_payload; - popularQueries->serialize_as_docs(import_payload); - - if(import_payload.empty()) { - continue; - } + auto send_http_response = [&](QueryAnalytics* queryAnalyticsPtr, + const std::string& import_payload, const std::string& suggestion_coll) { // send http request std::string leader_url = raft_server->get_leader_url(); if(!leader_url.empty()) { @@ -378,11 +354,11 @@ void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64 << "Status code: " << status_code << ", response: " << res; } else { LOG(INFO) << "Query aggregation for collection: " + suggestion_coll; - popularQueries->reset_local_counts(); + queryAnalyticsPtr->reset_local_counts(); if(raft_server->is_leader()) { // try to run top-K compaction of suggestion collection - const std::string top_k_param = "count:" + std::to_string(popularQueries->get_k()); + const std::string top_k_param = "count:" + std::to_string(queryAnalyticsPtr->get_k()); const std::string& truncate_topk_url = base_url + "/documents?top_k_by=" + top_k_param; res.clear(); res_headers.clear(); @@ -396,6 +372,47 @@ void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64 } } } + }; + + + for(const auto& suggestion_config: suggestion_configs) { + const std::string& sink_name = suggestion_config.first; + const std::string& suggestion_coll = suggestion_config.second.suggestion_collection; + + auto popular_queries_it = popular_queries.find(suggestion_coll); + auto noresults_queries_it = noresults_queries.find(suggestion_coll); + + // need to prepare the counts as JSON docs for import into the suggestion collection + // {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}} + std::string import_payload; + + if(popular_queries_it != popular_queries.end()) { + import_payload.clear(); + QueryAnalytics *popularQueries = popular_queries_it->second; + + // aggregate prefix queries to their final form + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ts_us = std::chrono::duration_cast(now).count(); + popularQueries->compact_user_queries(now_ts_us); + + popularQueries->serialize_as_docs(import_payload); + send_http_response(popularQueries, import_payload, suggestion_coll); + } + + if(noresults_queries_it != noresults_queries.end()) { + import_payload.clear(); + QueryAnalytics *noresultsQueries = noresults_queries_it->second; + // aggregate prefix queries to their final form + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ts_us = std::chrono::duration_cast(now).count(); + noresultsQueries->compact_user_queries(now_ts_us); + + noresultsQueries->serialize_as_docs(import_payload); + } + + if(import_payload.empty()) { + continue; + } } } @@ -440,71 +457,6 @@ void AnalyticsManager::persist_click_events(ReplicationState *raft_server, uint6 } } -void AnalyticsManager::persist_noresults_queries(ReplicationState *raft_server, uint64_t prev_persistence_s) { - // lock is held by caller - for(const auto& suggestion_config: suggestion_configs) { - const std::string& sink_name = suggestion_config.first; - const std::string& suggestion_coll = suggestion_config.second.suggestion_collection; - - auto noresults_queries_it = noresults_queries.find(suggestion_coll); - if(noresults_queries_it == noresults_queries.end()) { - continue; - } - - // need to prepare the counts as JSON docs for import into the suggestion collection - // {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}} - - NoresultsQueries* noresultsQueries = noresults_queries_it->second; - - // aggregate prefix queries to their final form - auto now = std::chrono::system_clock::now().time_since_epoch(); - auto now_ts_us = std::chrono::duration_cast(now).count(); - noresultsQueries->compact_user_queries(now_ts_us); - - std::string import_payload; - noresultsQueries->serialize_as_docs(import_payload); - - if(import_payload.empty()) { - continue; - } - - // send http request - std::string leader_url = raft_server->get_leader_url(); - if(!leader_url.empty()) { - const std::string& base_url = leader_url + "collections/" + suggestion_coll; - std::string res; - - const std::string& update_url = base_url + "/documents/import?action=emplace"; - std::map res_headers; - long status_code = HttpClient::post_response(update_url, import_payload, - res, res_headers, {}, 10*1000, true); - - if(status_code != 200) { - LOG(ERROR) << "Error while sending query suggestions events to leader. " - << "Status code: " << status_code << ", response: " << res; - } else { - LOG(INFO) << "Query aggregation for collection: " + suggestion_coll; - noresultsQueries->reset_local_counts(); - - if(raft_server->is_leader()) { - // try to run top-K compaction of suggestion collection - const std::string top_k_param = "count:" + std::to_string(noresultsQueries->get_k()); - const std::string& truncate_topk_url = base_url + "/documents?top_k_by=" + top_k_param; - res.clear(); - res_headers.clear(); - status_code = HttpClient::delete_response(truncate_topk_url, res, res_headers, 10*1000, true); - if(status_code != 200) { - LOG(ERROR) << "Error while running top K for query suggestions collection. " - << "Status code: " << status_code << ", response: " << res; - } else { - LOG(INFO) << "Top K aggregation for collection: " + suggestion_coll; - } - } - } - } - } -} - void AnalyticsManager::stop() { quit = true; cv.notify_all(); @@ -531,12 +483,12 @@ void AnalyticsManager::init(Store* store, Store* analytics_store) { this->analytics_store = analytics_store; } -std::unordered_map AnalyticsManager::get_popular_queries() { +std::unordered_map AnalyticsManager::get_popular_queries() { std::unique_lock lk(mutex); return popular_queries; } -std::unordered_map AnalyticsManager::get_noresults_queries() { +std::unordered_map AnalyticsManager::get_noresults_queries() { std::unique_lock lk(mutex); return noresults_queries; } diff --git a/src/noresults_queries.cpp b/src/noresults_queries.cpp deleted file mode 100644 index e337c17c..00000000 --- a/src/noresults_queries.cpp +++ /dev/null @@ -1,117 +0,0 @@ -#include "noresults_queries.h" -#include "logger.h" -#include -#include -#include "string_utils.h" - -NoresultsQueries::NoresultsQueries(size_t k) : k(k), max_size(k * 2) { -} - -void NoresultsQueries::add(const std::string& key, const bool live_query, const std::string& user_id, uint64_t now_ts_us) { - if(live_query) { - // live query must be aggregated first to their final form as they could be prefix queries - if(now_ts_us == 0) { - now_ts_us = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count(); - } - - if(!umutex.try_lock()) { - // instead of locking we just skip incrementing keys during consolidation time - return ; - } - - auto& queries = user_prefix_queries[user_id]; - if(queries.size() < 100) { - queries.emplace_back(key, now_ts_us); - } - - umutex.unlock(); - - } else { - if(!lmutex.try_lock()) { - // instead of locking we just skip incrementing keys during consolidation time - return ; - } - - auto it = local_counts.find(key); - - if(it != local_counts.end()) { - it.value()++; - } else if(local_counts.size() < max_size) { - // skip count when map has become too large (to prevent abuse) - local_counts.emplace(key, 1); - } - - lmutex.unlock(); - } -} - -void NoresultsQueries::serialize_as_docs(std::string& docs) { - std::shared_lock lk(lmutex); - - std::string key_buffer; - for(auto it = local_counts.begin(); it != local_counts.end(); ++it) { - it.key(key_buffer); - nlohmann::json doc; - doc["id"] = std::to_string(StringUtils::hash_wy(key_buffer.c_str(), key_buffer.size())); - doc["q"] = key_buffer; - doc["$operations"]["increment"]["count"] = it.value(); - docs += doc.dump() + "\n"; - } - - if(!docs.empty()) { - docs.pop_back(); - } -} - -void NoresultsQueries::reset_local_counts() { - std::unique_lock lk(lmutex); - local_counts.clear(); -} - -size_t NoresultsQueries::get_k() { - return k; -} - -void NoresultsQueries::compact_user_queries(uint64_t now_ts_us) { - std::unique_lock lk(umutex); - std::vector keys_to_delete; - - for(auto& kv: user_prefix_queries) { - auto& queries = kv.second; - int64_t last_consolidated_index = -1; - for(uint32_t i = 0; i < queries.size(); i++) { - if(now_ts_us - queries[i].timestamp < QUERY_FINALIZATION_INTERVAL_MICROS) { - break; - } - - uint64_t diff_micros = (i == queries.size()-1) ? (now_ts_us - queries[i].timestamp) : - (queries[i + 1].timestamp - queries[i].timestamp); - - if(diff_micros > QUERY_FINALIZATION_INTERVAL_MICROS) { - add(queries[i].query, false, ""); - last_consolidated_index = i; - } - } - - queries.erase(queries.begin(), queries.begin() + last_consolidated_index+1); - - if(queries.empty()) { - keys_to_delete.push_back(kv.first); - } - } - - for(auto& key: keys_to_delete) { - user_prefix_queries.erase(key); - } -} - -std::unordered_map> NoresultsQueries::get_user_prefix_queries() { - std::unique_lock lk(umutex); - return user_prefix_queries; -} - -tsl::htrie_map NoresultsQueries::get_local_counts() { - std::unique_lock lk(lmutex); - return local_counts; -} diff --git a/src/popular_queries.cpp b/src/query_analytics.cpp similarity index 84% rename from src/popular_queries.cpp rename to src/query_analytics.cpp index ba46ff72..9bf56a94 100644 --- a/src/popular_queries.cpp +++ b/src/query_analytics.cpp @@ -1,14 +1,14 @@ -#include "popular_queries.h" +#include "query_analytics.h" #include "logger.h" #include #include #include "string_utils.h" -PopularQueries::PopularQueries(size_t k) : k(k), max_size(k * 2) { +QueryAnalytics::QueryAnalytics(size_t k) : k(k), max_size(k * 2) { } -void PopularQueries::add(const std::string& key, const bool live_query, const std::string& user_id, uint64_t now_ts_us) { +void QueryAnalytics::add(const std::string& key, const bool live_query, const std::string& user_id, uint64_t now_ts_us) { if(live_query) { // live query must be aggregated first to their final form as they could be prefix queries if(now_ts_us == 0) { @@ -47,7 +47,7 @@ void PopularQueries::add(const std::string& key, const bool live_query, const st } } -void PopularQueries::serialize_as_docs(std::string& docs) { +void QueryAnalytics::serialize_as_docs(std::string& docs) { std::shared_lock lk(lmutex); std::string key_buffer; @@ -65,16 +65,16 @@ void PopularQueries::serialize_as_docs(std::string& docs) { } } -void PopularQueries::reset_local_counts() { +void QueryAnalytics::reset_local_counts() { std::unique_lock lk(lmutex); local_counts.clear(); } -size_t PopularQueries::get_k() { +size_t QueryAnalytics::get_k() { return k; } -void PopularQueries::compact_user_queries(uint64_t now_ts_us) { +void QueryAnalytics::compact_user_queries(uint64_t now_ts_us) { std::unique_lock lk(umutex); std::vector keys_to_delete; @@ -107,12 +107,12 @@ void PopularQueries::compact_user_queries(uint64_t now_ts_us) { } } -std::unordered_map> PopularQueries::get_user_prefix_queries() { +std::unordered_map> QueryAnalytics::get_user_prefix_queries() { std::unique_lock lk(umutex); return user_prefix_queries; } -tsl::htrie_map PopularQueries::get_local_counts() { +tsl::htrie_map QueryAnalytics::get_local_counts() { std::unique_lock lk(lmutex); return local_counts; } diff --git a/test/popular_queries_test.cpp b/test/popular_queries_test.cpp index 5877243d..32462b52 100644 --- a/test/popular_queries_test.cpp +++ b/test/popular_queries_test.cpp @@ -1,5 +1,5 @@ #include -#include "popular_queries.h" +#include "query_analytics.h" #include "logger.h" class PopularQueriesTest : public ::testing::Test { @@ -14,7 +14,7 @@ protected: }; TEST_F(PopularQueriesTest, PrefixQueryCompaction) { - PopularQueries pq(10); + QueryAnalytics pq(10); auto now_ts_us = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -36,7 +36,7 @@ TEST_F(PopularQueriesTest, PrefixQueryCompaction) { ASSERT_EQ(0, pq.get_local_counts().size()); // compaction interval has happened - pq.compact_user_queries(now_ts_us+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100); + pq.compact_user_queries(now_ts_us + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100); queries = pq.get_user_prefix_queries(); ASSERT_EQ(0, queries.size()); auto local_counts = pq.get_local_counts(); @@ -49,7 +49,7 @@ TEST_F(PopularQueriesTest, PrefixQueryCompaction) { pq.add("f", true, "0", now_ts_us+1); pq.add("fo", true, "0", now_ts_us+2); pq.add("foo", true, "0", now_ts_us+3); - pq.compact_user_queries(now_ts_us+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100); + pq.compact_user_queries(now_ts_us + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100); queries = pq.get_user_prefix_queries(); ASSERT_EQ(0, queries.size()); local_counts = pq.get_local_counts(); @@ -62,8 +62,8 @@ TEST_F(PopularQueriesTest, PrefixQueryCompaction) { pq.add("f", true, "0", now_ts_us+1); pq.add("fo", true, "0", now_ts_us+2); pq.add("foo", true, "0", now_ts_us+3); - pq.add("b", true, "0", now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100); - pq.compact_user_queries(now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100+1); + pq.add("b", true, "0", now_ts_us + 3 + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100); + pq.compact_user_queries(now_ts_us + 3 + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100 + 1); queries = pq.get_user_prefix_queries(); ASSERT_EQ(1, queries.size()); ASSERT_EQ(1, queries["0"].size()); @@ -74,10 +74,10 @@ TEST_F(PopularQueriesTest, PrefixQueryCompaction) { ASSERT_EQ(1, local_counts["foo"]); // continue with that query - auto prev_ts = now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100+1; + auto prev_ts = now_ts_us + 3 + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100 + 1; pq.add("ba", true, "0", prev_ts+1); pq.add("bar", true, "0", prev_ts+2); - pq.compact_user_queries(prev_ts+2+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+1); + pq.compact_user_queries(prev_ts + 2 + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 1); queries = pq.get_user_prefix_queries(); ASSERT_EQ(0, queries.size()); local_counts = pq.get_local_counts();