#pragma once #include "popular_queries.h" #include "option.h" #include "raft_server.h" #include #include #include #include class AnalyticsManager { private: mutable std::mutex mutex; std::condition_variable cv; std::atomic quit = false; const size_t QUERY_COMPACTION_INTERVAL_S = 60; struct suggestion_config_t { std::string name; std::string suggestion_collection; std::vector query_collections; size_t limit; void to_json(nlohmann::json& obj) const { obj["name"] = name; obj["type"] = POPULAR_QUERIES_TYPE; obj["params"] = nlohmann::json::object(); obj["params"]["limit"] = limit; obj["params"]["source"]["collections"] = query_collections; obj["params"]["destination"]["collection"] = suggestion_collection; } }; // config name => config std::unordered_map suggestion_configs; // query collection => suggestion collections std::unordered_map> query_collection_mapping; // suggestion collection => popular queries std::unordered_map popular_queries; Store* store = nullptr; AnalyticsManager() {} ~AnalyticsManager(); Option remove_popular_queries_index(const std::string& name); Option create_popular_queries_index(nlohmann::json &payload, bool upsert, bool write_to_disk); public: static constexpr const char* ANALYTICS_RULE_PREFIX = "$AR"; static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries"; static AnalyticsManager& get_instance() { static AnalyticsManager instance; return instance; } AnalyticsManager(AnalyticsManager const&) = delete; void operator=(AnalyticsManager const&) = delete; void init(Store* store); void run(ReplicationState* raft_server); Option list_rules(); Option get_rule(const std::string& name); Option create_rule(nlohmann::json& payload, bool upsert, bool write_to_disk); Option remove_rule(const std::string& name); void add_suggestion(const std::string& query_collection, std::string& query, bool live_query, const std::string& user_id); void stop(); void dispose(); void persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s); std::unordered_map get_popular_queries(); };