From c349148e2de835888de0af2e625f3ccd6378da34 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 23 May 2023 14:26:46 +0530 Subject: [PATCH] Refactor query aggregation. --- ...uery_suggestions.h => analytics_manager.h} | 21 ++++--- include/core_api.h | 4 +- include/event_manager.h | 4 -- ..._suggestions.cpp => analytics_manager.cpp} | 59 +++++++++++-------- src/collection_manager.cpp | 20 ++++--- src/core_api.cpp | 12 ++-- src/event_manager.cpp | 34 +---------- src/main/typesense_server.cpp | 8 +-- src/typesense_server_utils.cpp | 8 +-- 9 files changed, 73 insertions(+), 97 deletions(-) rename include/{query_suggestions.h => analytics_manager.h} (72%) rename src/{query_suggestions.cpp => analytics_manager.cpp} (80%) diff --git a/include/query_suggestions.h b/include/analytics_manager.h similarity index 72% rename from include/query_suggestions.h rename to include/analytics_manager.h index e42b4c4f..239319fe 100644 --- a/include/query_suggestions.h +++ b/include/analytics_manager.h @@ -7,7 +7,7 @@ #include #include -class QuerySuggestions { +class AnalyticsManager { private: mutable std::mutex mutex; std::condition_variable cv; @@ -34,29 +34,28 @@ private: Store* store = nullptr; - QuerySuggestions() {} + AnalyticsManager() {} - ~QuerySuggestions(); + ~AnalyticsManager(); public: - static constexpr const char* EVENT_SINK_CONFIG_PREFIX = "$ES"; + static constexpr const char* ANALYTICS_CONFIG_PREFIX = "$AC"; + static constexpr const char* RESOURCE_TYPE = "popular_queries"; - static constexpr const char* SINK_TYPE = "query_suggestions"; - - static QuerySuggestions& get_instance() { - static QuerySuggestions instance; + static AnalyticsManager& get_instance() { + static AnalyticsManager instance; return instance; } - QuerySuggestions(QuerySuggestions const&) = delete; - void operator=(QuerySuggestions const&) = delete; + AnalyticsManager(AnalyticsManager const&) = delete; + void operator=(AnalyticsManager const&) = delete; void init(Store* store); void run(ReplicationState* raft_server); - Option create_index(const nlohmann::json& payload, bool write_to_disk = true); + Option create_index(nlohmann::json& payload, bool write_to_disk = true); Option remove_suggestion_index(const std::string& name); diff --git a/include/core_api.h b/include/core_api.h index bc8d297c..4a18a26f 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -145,9 +145,9 @@ bool del_exceed(const std::shared_ptr& req, const std::shared_ptr& req, const std::shared_ptr& res); -bool post_create_event_sink(const std::shared_ptr& req, const std::shared_ptr& res); +bool post_create_analytics_popular_queries(const std::shared_ptr& req, const std::shared_ptr& res); -bool del_event_sink(const std::shared_ptr& req, const std::shared_ptr& res); +bool del_analytics_popular_queries(const std::shared_ptr& req, const std::shared_ptr& res); // Misc helpers diff --git a/include/event_manager.h b/include/event_manager.h index e012ee8d..51fe6391 100644 --- a/include/event_manager.h +++ b/include/event_manager.h @@ -21,10 +21,6 @@ public: EventManager(EventManager const&) = delete; void operator=(EventManager const&) = delete; - Option create_sink(const nlohmann::json& sink_config, bool write_to_disk = true); - - Option remove_sink(const std::string& name); - bool add_event(const nlohmann::json& event); }; \ No newline at end of file diff --git a/src/query_suggestions.cpp b/src/analytics_manager.cpp similarity index 80% rename from src/query_suggestions.cpp rename to src/analytics_manager.cpp index a8bafc00..2d51d215 100644 --- a/src/query_suggestions.cpp +++ b/src/analytics_manager.cpp @@ -1,18 +1,17 @@ #include #include -#include "query_suggestions.h" +#include "analytics_manager.h" #include "tokenizer.h" #include "http_client.h" #include "collection_manager.h" -Option QuerySuggestions::create_index(const nlohmann::json& payload, bool write_to_disk) { +Option AnalyticsManager::create_index(nlohmann::json& payload, bool write_to_disk) { /* Sample payload: { "name": "top_queries", - "type": "query_suggestions", - "max_suggestions": 1000, + "limit": 1000, "source": { "collections": ["brands", "products"] }, @@ -22,28 +21,37 @@ Option QuerySuggestions::create_index(const nlohmann::json& payl } */ - // structural payload validation is done upstream, e.g. presence of source and destination is validated - // specific validations will be done here + if(!payload.contains("name") || !payload["name"].is_string()) { + return Option(400, "Bad or missing name."); + } + + if(!payload.contains("source") || !payload["source"].is_object()) { + return Option(400, "Bad or missing source."); + } + + if(!payload.contains("destination") || !payload["destination"].is_object()) { + return Option(400, "Bad or missing destination."); + } const std::string& suggestion_config_name = payload["name"].get(); size_t max_suggestions = 1000; - if(payload.contains("max_suggestions") && payload["max_suggestions"].is_number_integer()) { - max_suggestions = payload["max_suggestions"].get(); + if(payload.contains("limit") && payload["limit"].is_number_integer()) { + max_suggestions = payload["limit"].get(); } if(suggestion_configs.find(suggestion_config_name) != suggestion_configs.end()) { - return Option(400, "There's already another configuration with the name `" + + return Option(400, "There's already another configuration with the name `" + suggestion_config_name + "`."); } if(!payload["source"].contains("collections") || !payload["source"]["collections"].is_array()) { - return Option(400, "Must contain a valid list of source collections."); + return Option(400, "Must contain a valid list of source collections."); } if(!payload["destination"].contains("collection") || !payload["destination"]["collection"].is_string()) { - return Option(400, "Must contain a valid destination collection."); + return Option(400, "Must contain a valid destination collection."); } const std::string& suggestion_collection = payload["destination"]["collection"].get(); @@ -54,7 +62,7 @@ Option QuerySuggestions::create_index(const nlohmann::json& payl for(const auto& coll: payload["source"]["collections"]) { if(!coll.is_string()) { - return Option(400, "Must contain a valid list of source collection names."); + return Option(400, "Must contain a valid list of source collection names."); } const std::string& src_collection = coll.get(); @@ -73,17 +81,18 @@ Option QuerySuggestions::create_index(const nlohmann::json& payl popular_queries.emplace(suggestion_collection, popularQueries); if(write_to_disk) { - auto suggestion_key = std::string(EVENT_SINK_CONFIG_PREFIX) + "_" + suggestion_config_name; + payload["type"] = RESOURCE_TYPE; + auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + suggestion_config_name; bool inserted = store->insert(suggestion_key, payload.dump()); if(!inserted) { - return Option(500, "Error while storing the suggestion config to disk."); + return Option(500, "Error while storing the config to disk."); } } - return Option(payload); + return Option(true); } -QuerySuggestions::~QuerySuggestions() { +AnalyticsManager::~AnalyticsManager() { std::unique_lock lock(mutex); for(auto& kv: popular_queries) { @@ -91,13 +100,13 @@ QuerySuggestions::~QuerySuggestions() { } } -Option QuerySuggestions::remove_suggestion_index(const std::string &name) { +Option AnalyticsManager::remove_suggestion_index(const std::string &name) { std::unique_lock lock(mutex); auto suggestion_configs_it = suggestion_configs.find(name); if(suggestion_configs_it == suggestion_configs.end()) { - return Option(404, "Sink not found."); + return Option(404, "Index not found."); } const auto& suggestion_collection = suggestion_configs_it->second.suggestion_collection; @@ -113,16 +122,16 @@ Option QuerySuggestions::remove_suggestion_index(const std::string &name) suggestion_configs.erase(name); - auto suggestion_key = std::string(EVENT_SINK_CONFIG_PREFIX) + "_" + name; + auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + name; bool erased = store->remove(suggestion_key); if(!erased) { - return Option(500, "Error while removing the sink config from disk."); + return Option(500, "Error while deleting from disk."); } return Option(true); } -void QuerySuggestions::add_suggestion(const std::string &query_collection, std::string &query, +void AnalyticsManager::add_suggestion(const std::string &query_collection, std::string &query, const bool live_query, const std::string& user_id) { // look up suggestion collections for the query collection std::unique_lock lock(mutex); @@ -138,7 +147,7 @@ void QuerySuggestions::add_suggestion(const std::string &query_collection, std:: } } -void QuerySuggestions::run(ReplicationState* raft_server) { +void AnalyticsManager::run(ReplicationState* raft_server) { uint64_t prev_persistence_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -225,12 +234,12 @@ void QuerySuggestions::run(ReplicationState* raft_server) { dispose(); } -void QuerySuggestions::stop() { +void AnalyticsManager::stop() { quit = true; cv.notify_all(); } -void QuerySuggestions::dispose() { +void AnalyticsManager::dispose() { for(auto& kv: popular_queries) { delete kv.second; } @@ -238,6 +247,6 @@ void QuerySuggestions::dispose() { popular_queries.clear(); } -void QuerySuggestions::init(Store* store) { +void AnalyticsManager::init(Store* store) { this->store = store; } diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 8d45cdc8..b1ba49a8 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include "collection_manager.h" #include "batched_indexer.h" @@ -1066,7 +1066,7 @@ Option CollectionManager::do_search(std::map& re if(result.count("found") != 0 && result["found"].get() != 0) { std::string processed_query = raw_query; Tokenizer::normalize_ascii(processed_query); - QuerySuggestions::get_instance().add_suggestion(collection->get_name(), processed_query, + AnalyticsManager::get_instance().add_suggestion(collection->get_name(), processed_query, true, req_params["x-typesense-user-id"]); } } @@ -1299,14 +1299,16 @@ Option CollectionManager::load_collection(const nlohmann::json &collection } // restore query suggestions configs - std::vector sink_config_jsons; - cm.store->scan_fill(QuerySuggestions::EVENT_SINK_CONFIG_PREFIX, - std::string(QuerySuggestions::EVENT_SINK_CONFIG_PREFIX) + "`", - sink_config_jsons); + std::vector analytics_config_jsons; + cm.store->scan_fill(AnalyticsManager::ANALYTICS_CONFIG_PREFIX, + std::string(AnalyticsManager::ANALYTICS_CONFIG_PREFIX) + "`", + analytics_config_jsons); - for(const auto& sink_config_json: sink_config_jsons) { - nlohmann::json sink_config = nlohmann::json::parse(sink_config_json); - EventManager::get_instance().create_sink(sink_config, false); + for(const auto& analytics_config_json: analytics_config_jsons) { + nlohmann::json analytics_config = nlohmann::json::parse(analytics_config_json); + if(analytics_config["type"] == AnalyticsManager::RESOURCE_TYPE) { + AnalyticsManager::get_instance().create_index(analytics_config, false); + } } // Fetch records from the store and re-create memory index diff --git a/src/core_api.cpp b/src/core_api.cpp index 3ace3fa8..3d33ff86 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "typesense_server_utils.h" #include "core_api.h" #include "string_utils.h" @@ -2056,8 +2057,7 @@ bool post_create_event(const std::shared_ptr& req, const std::shared_p return false; } -bool post_create_event_sink(const std::shared_ptr& req, const std::shared_ptr& res) { - // connects an event to a sink, which for now, is another collection +bool post_create_analytics_popular_queries(const std::shared_ptr& req, const std::shared_ptr& res) { nlohmann::json req_json; try { @@ -2068,7 +2068,7 @@ bool post_create_event_sink(const std::shared_ptr& req, const std::sha return false; } - auto op = EventManager::get_instance().create_sink(req_json); + auto op = AnalyticsManager::get_instance().create_index(req_json); if(!op.ok()) { res->set(op.code(), op.error()); @@ -2079,8 +2079,8 @@ bool post_create_event_sink(const std::shared_ptr& req, const std::sha return true; } -bool del_event_sink(const std::shared_ptr& req, const std::shared_ptr& res) { - auto op = EventManager::get_instance().remove_sink(req->params["name"]); +bool del_analytics_popular_queries(const std::shared_ptr& req, const std::shared_ptr& res) { + auto op = AnalyticsManager::get_instance().remove_suggestion_index(req->params["name"]); if(!op.ok()) { res->set(op.code(), op.error()); return false; @@ -2088,4 +2088,4 @@ bool del_event_sink(const std::shared_ptr& req, const std::shared_ptr< res->set_200(R"({"ok": true)"); return true; -} \ No newline at end of file +} diff --git a/src/event_manager.cpp b/src/event_manager.cpp index 49aa8185..e0537d85 100644 --- a/src/event_manager.cpp +++ b/src/event_manager.cpp @@ -1,4 +1,4 @@ -#include +#include #include "event_manager.h" bool EventManager::add_event(const nlohmann::json& event) { @@ -45,40 +45,10 @@ bool EventManager::add_event(const nlohmann::json& event) { } std::string query = event_data_query_it.get(); - QuerySuggestions::get_instance().add_suggestion(coll.get(), query, false, ""); + AnalyticsManager::get_instance().add_suggestion(coll.get(), query, false, ""); } } } return true; } - -Option EventManager::create_sink(const nlohmann::json& sink_config, bool write_to_disk) { - if(!sink_config.contains("name") || !sink_config["name"].is_string()) { - return Option(400, "Request payload contains invalid name."); - } - - if(!sink_config.contains("type") || !sink_config["type"].is_string()) { - return Option(400, "Request payload contains invalid type."); - } - - if(!sink_config.contains("source") || !sink_config["source"].is_object()) { - return Option(400, "Request payload contains invalid source."); - } - - if(!sink_config.contains("destination") || !sink_config["destination"].is_object()) { - return Option(400, "Request payload contains invalid destination."); - } - - if(sink_config.contains("type") && sink_config["type"] == QuerySuggestions::SINK_TYPE) { - QuerySuggestions::get_instance().create_index(sink_config, write_to_disk); - } else { - return Option(400, ("Missing or invalid event sink type.")); - } - - return Option(200); -} - -Option EventManager::remove_sink(const std::string& name) { - return QuerySuggestions::get_instance().remove_suggestion_index(name); -} diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 915b5a91..c1734747 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -68,10 +68,10 @@ void master_server_routes() { server->put("/presets/:name", put_upsert_preset); server->del("/presets/:name", del_preset); - // events - server->post("/events", post_create_event); - server->post("/events/sinks", post_create_event_sink); - server->del("/events/sinks/:name", del_event_sink); + // analytics + server->post("/analytics/popular-queries", post_create_analytics_popular_queries); + server->del("/analytics/popular-queries/:name", del_analytics_popular_queries); + server->post("/analytics/events", post_create_event); // meta server->get("/metrics.json", get_metrics_json); diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 73a15e2b..63e49114 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include "core_api.h" #include "ratelimit_manager.h" @@ -397,7 +397,7 @@ int run_server(const Config & config, const std::string & version, void (*master HttpClient & httpClient = HttpClient::get_instance(); httpClient.init(config.get_api_key()); - QuerySuggestions::get_instance().init(&store); + AnalyticsManager::get_instance().init(&store); server = new HttpServer( version, @@ -456,7 +456,7 @@ int run_server(const Config & config, const std::string & version, void (*master }); std::thread event_sink_thread([&replication_state]() { - QuerySuggestions::get_instance().run(&replication_state); + AnalyticsManager::get_instance().run(&replication_state); }); std::string path_to_nodes = config.get_nodes(); @@ -476,7 +476,7 @@ int run_server(const Config & config, const std::string & version, void (*master batch_indexing_thread.join(); LOG(INFO) << "Shutting down event sink thread..."; - QuerySuggestions::get_instance().stop(); + AnalyticsManager::get_instance().stop(); LOG(INFO) << "Waiting for event sink thread to be done..."; event_sink_thread.join();