mirror of
https://github.com/typesense/typesense.git
synced 2025-05-16 19:55:21 +08:00
Refactor query aggregation.
This commit is contained in:
parent
b7f29aeed1
commit
c349148e2d
@ -7,7 +7,7 @@
|
||||
#include <unordered_map>
|
||||
#include <shared_mutex>
|
||||
|
||||
class QuerySuggestions {
|
||||
class AnalyticsManager {
|
||||
private:
|
||||
mutable std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
@ -34,29 +34,28 @@ private:
|
||||
|
||||
Store* store = nullptr;
|
||||
|
||||
QuerySuggestions() {}
|
||||
AnalyticsManager() {}
|
||||
|
||||
~QuerySuggestions();
|
||||
~AnalyticsManager();
|
||||
|
||||
public:
|
||||
|
||||
static constexpr const char* EVENT_SINK_CONFIG_PREFIX = "$ES";
|
||||
static constexpr const char* ANALYTICS_CONFIG_PREFIX = "$AC";
|
||||
static constexpr const char* RESOURCE_TYPE = "popular_queries";
|
||||
|
||||
static constexpr const char* SINK_TYPE = "query_suggestions";
|
||||
|
||||
static QuerySuggestions& get_instance() {
|
||||
static QuerySuggestions instance;
|
||||
static AnalyticsManager& get_instance() {
|
||||
static AnalyticsManager instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
QuerySuggestions(QuerySuggestions const&) = delete;
|
||||
void operator=(QuerySuggestions const&) = delete;
|
||||
AnalyticsManager(AnalyticsManager const&) = delete;
|
||||
void operator=(AnalyticsManager const&) = delete;
|
||||
|
||||
void init(Store* store);
|
||||
|
||||
void run(ReplicationState* raft_server);
|
||||
|
||||
Option<nlohmann::json> create_index(const nlohmann::json& payload, bool write_to_disk = true);
|
||||
Option<bool> create_index(nlohmann::json& payload, bool write_to_disk = true);
|
||||
|
||||
Option<bool> remove_suggestion_index(const std::string& name);
|
||||
|
@ -145,9 +145,9 @@ bool del_exceed(const std::shared_ptr<http_req>& req, const std::shared_ptr<http
|
||||
|
||||
bool post_create_event(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
|
||||
|
||||
bool post_create_event_sink(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
|
||||
bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
|
||||
|
||||
bool del_event_sink(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
|
||||
bool del_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
|
||||
|
||||
// Misc helpers
|
||||
|
||||
|
@ -21,10 +21,6 @@ public:
|
||||
EventManager(EventManager const&) = delete;
|
||||
void operator=(EventManager const&) = delete;
|
||||
|
||||
Option<uint32_t> create_sink(const nlohmann::json& sink_config, bool write_to_disk = true);
|
||||
|
||||
Option<bool> remove_sink(const std::string& name);
|
||||
|
||||
bool add_event(const nlohmann::json& event);
|
||||
|
||||
};
|
@ -1,18 +1,17 @@
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include "query_suggestions.h"
|
||||
#include "analytics_manager.h"
|
||||
#include "tokenizer.h"
|
||||
#include "http_client.h"
|
||||
#include "collection_manager.h"
|
||||
|
||||
Option<nlohmann::json> QuerySuggestions::create_index(const nlohmann::json& payload, bool write_to_disk) {
|
||||
Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_to_disk) {
|
||||
/*
|
||||
Sample payload:
|
||||
|
||||
{
|
||||
"name": "top_queries",
|
||||
"type": "query_suggestions",
|
||||
"max_suggestions": 1000,
|
||||
"limit": 1000,
|
||||
"source": {
|
||||
"collections": ["brands", "products"]
|
||||
},
|
||||
@ -22,28 +21,37 @@ Option<nlohmann::json> QuerySuggestions::create_index(const nlohmann::json& payl
|
||||
}
|
||||
*/
|
||||
|
||||
// structural payload validation is done upstream, e.g. presence of source and destination is validated
|
||||
// specific validations will be done here
|
||||
if(!payload.contains("name") || !payload["name"].is_string()) {
|
||||
return Option<bool>(400, "Bad or missing name.");
|
||||
}
|
||||
|
||||
if(!payload.contains("source") || !payload["source"].is_object()) {
|
||||
return Option<bool>(400, "Bad or missing source.");
|
||||
}
|
||||
|
||||
if(!payload.contains("destination") || !payload["destination"].is_object()) {
|
||||
return Option<bool>(400, "Bad or missing destination.");
|
||||
}
|
||||
|
||||
const std::string& suggestion_config_name = payload["name"].get<std::string>();
|
||||
|
||||
size_t max_suggestions = 1000;
|
||||
|
||||
if(payload.contains("max_suggestions") && payload["max_suggestions"].is_number_integer()) {
|
||||
max_suggestions = payload["max_suggestions"].get<size_t>();
|
||||
if(payload.contains("limit") && payload["limit"].is_number_integer()) {
|
||||
max_suggestions = payload["limit"].get<size_t>();
|
||||
}
|
||||
|
||||
if(suggestion_configs.find(suggestion_config_name) != suggestion_configs.end()) {
|
||||
return Option<nlohmann::json>(400, "There's already another configuration with the name `" +
|
||||
return Option<bool>(400, "There's already another configuration with the name `" +
|
||||
suggestion_config_name + "`.");
|
||||
}
|
||||
|
||||
if(!payload["source"].contains("collections") || !payload["source"]["collections"].is_array()) {
|
||||
return Option<nlohmann::json>(400, "Must contain a valid list of source collections.");
|
||||
return Option<bool>(400, "Must contain a valid list of source collections.");
|
||||
}
|
||||
|
||||
if(!payload["destination"].contains("collection") || !payload["destination"]["collection"].is_string()) {
|
||||
return Option<nlohmann::json>(400, "Must contain a valid destination collection.");
|
||||
return Option<bool>(400, "Must contain a valid destination collection.");
|
||||
}
|
||||
|
||||
const std::string& suggestion_collection = payload["destination"]["collection"].get<std::string>();
|
||||
@ -54,7 +62,7 @@ Option<nlohmann::json> QuerySuggestions::create_index(const nlohmann::json& payl
|
||||
|
||||
for(const auto& coll: payload["source"]["collections"]) {
|
||||
if(!coll.is_string()) {
|
||||
return Option<nlohmann::json>(400, "Must contain a valid list of source collection names.");
|
||||
return Option<bool>(400, "Must contain a valid list of source collection names.");
|
||||
}
|
||||
|
||||
const std::string& src_collection = coll.get<std::string>();
|
||||
@ -73,17 +81,18 @@ Option<nlohmann::json> QuerySuggestions::create_index(const nlohmann::json& payl
|
||||
popular_queries.emplace(suggestion_collection, popularQueries);
|
||||
|
||||
if(write_to_disk) {
|
||||
auto suggestion_key = std::string(EVENT_SINK_CONFIG_PREFIX) + "_" + suggestion_config_name;
|
||||
payload["type"] = RESOURCE_TYPE;
|
||||
auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + suggestion_config_name;
|
||||
bool inserted = store->insert(suggestion_key, payload.dump());
|
||||
if(!inserted) {
|
||||
return Option<nlohmann::json>(500, "Error while storing the suggestion config to disk.");
|
||||
return Option<bool>(500, "Error while storing the config to disk.");
|
||||
}
|
||||
}
|
||||
|
||||
return Option<nlohmann::json>(payload);
|
||||
return Option<bool>(true);
|
||||
}
|
||||
|
||||
QuerySuggestions::~QuerySuggestions() {
|
||||
AnalyticsManager::~AnalyticsManager() {
|
||||
std::unique_lock lock(mutex);
|
||||
|
||||
for(auto& kv: popular_queries) {
|
||||
@ -91,13 +100,13 @@ QuerySuggestions::~QuerySuggestions() {
|
||||
}
|
||||
}
|
||||
|
||||
Option<bool> QuerySuggestions::remove_suggestion_index(const std::string &name) {
|
||||
Option<bool> AnalyticsManager::remove_suggestion_index(const std::string &name) {
|
||||
std::unique_lock lock(mutex);
|
||||
|
||||
auto suggestion_configs_it = suggestion_configs.find(name);
|
||||
|
||||
if(suggestion_configs_it == suggestion_configs.end()) {
|
||||
return Option<bool>(404, "Sink not found.");
|
||||
return Option<bool>(404, "Index not found.");
|
||||
}
|
||||
|
||||
const auto& suggestion_collection = suggestion_configs_it->second.suggestion_collection;
|
||||
@ -113,16 +122,16 @@ Option<bool> QuerySuggestions::remove_suggestion_index(const std::string &name)
|
||||
|
||||
suggestion_configs.erase(name);
|
||||
|
||||
auto suggestion_key = std::string(EVENT_SINK_CONFIG_PREFIX) + "_" + name;
|
||||
auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + name;
|
||||
bool erased = store->remove(suggestion_key);
|
||||
if(!erased) {
|
||||
return Option<bool>(500, "Error while removing the sink config from disk.");
|
||||
return Option<bool>(500, "Error while deleting from disk.");
|
||||
}
|
||||
|
||||
return Option<bool>(true);
|
||||
}
|
||||
|
||||
void QuerySuggestions::add_suggestion(const std::string &query_collection, std::string &query,
|
||||
void AnalyticsManager::add_suggestion(const std::string &query_collection, std::string &query,
|
||||
const bool live_query, const std::string& user_id) {
|
||||
// look up suggestion collections for the query collection
|
||||
std::unique_lock lock(mutex);
|
||||
@ -138,7 +147,7 @@ void QuerySuggestions::add_suggestion(const std::string &query_collection, std::
|
||||
}
|
||||
}
|
||||
|
||||
void QuerySuggestions::run(ReplicationState* raft_server) {
|
||||
void AnalyticsManager::run(ReplicationState* raft_server) {
|
||||
uint64_t prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
@ -225,12 +234,12 @@ void QuerySuggestions::run(ReplicationState* raft_server) {
|
||||
dispose();
|
||||
}
|
||||
|
||||
void QuerySuggestions::stop() {
|
||||
void AnalyticsManager::stop() {
|
||||
quit = true;
|
||||
cv.notify_all();
|
||||
}
|
||||
|
||||
void QuerySuggestions::dispose() {
|
||||
void AnalyticsManager::dispose() {
|
||||
for(auto& kv: popular_queries) {
|
||||
delete kv.second;
|
||||
}
|
||||
@ -238,6 +247,6 @@ void QuerySuggestions::dispose() {
|
||||
popular_queries.clear();
|
||||
}
|
||||
|
||||
void QuerySuggestions::init(Store* store) {
|
||||
void AnalyticsManager::init(Store* store) {
|
||||
this->store = store;
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
#include <vector>
|
||||
#include <json.hpp>
|
||||
#include <app_metrics.h>
|
||||
#include <query_suggestions.h>
|
||||
#include <analytics_manager.h>
|
||||
#include <event_manager.h>
|
||||
#include "collection_manager.h"
|
||||
#include "batched_indexer.h"
|
||||
@ -1066,7 +1066,7 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
|
||||
if(result.count("found") != 0 && result["found"].get<size_t>() != 0) {
|
||||
std::string processed_query = raw_query;
|
||||
Tokenizer::normalize_ascii(processed_query);
|
||||
QuerySuggestions::get_instance().add_suggestion(collection->get_name(), processed_query,
|
||||
AnalyticsManager::get_instance().add_suggestion(collection->get_name(), processed_query,
|
||||
true, req_params["x-typesense-user-id"]);
|
||||
}
|
||||
}
|
||||
@ -1299,14 +1299,16 @@ Option<bool> CollectionManager::load_collection(const nlohmann::json &collection
|
||||
}
|
||||
|
||||
// restore query suggestions configs
|
||||
std::vector<std::string> sink_config_jsons;
|
||||
cm.store->scan_fill(QuerySuggestions::EVENT_SINK_CONFIG_PREFIX,
|
||||
std::string(QuerySuggestions::EVENT_SINK_CONFIG_PREFIX) + "`",
|
||||
sink_config_jsons);
|
||||
std::vector<std::string> analytics_config_jsons;
|
||||
cm.store->scan_fill(AnalyticsManager::ANALYTICS_CONFIG_PREFIX,
|
||||
std::string(AnalyticsManager::ANALYTICS_CONFIG_PREFIX) + "`",
|
||||
analytics_config_jsons);
|
||||
|
||||
for(const auto& sink_config_json: sink_config_jsons) {
|
||||
nlohmann::json sink_config = nlohmann::json::parse(sink_config_json);
|
||||
EventManager::get_instance().create_sink(sink_config, false);
|
||||
for(const auto& analytics_config_json: analytics_config_jsons) {
|
||||
nlohmann::json analytics_config = nlohmann::json::parse(analytics_config_json);
|
||||
if(analytics_config["type"] == AnalyticsManager::RESOURCE_TYPE) {
|
||||
AnalyticsManager::get_instance().create_index(analytics_config, false);
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch records from the store and re-create memory index
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <cstdlib>
|
||||
#include <app_metrics.h>
|
||||
#include <regex>
|
||||
#include <analytics_manager.h>
|
||||
#include "typesense_server_utils.h"
|
||||
#include "core_api.h"
|
||||
#include "string_utils.h"
|
||||
@ -2056,8 +2057,7 @@ bool post_create_event(const std::shared_ptr<http_req>& req, const std::shared_p
|
||||
return false;
|
||||
}
|
||||
|
||||
bool post_create_event_sink(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
|
||||
// connects an event to a sink, which for now, is another collection
|
||||
bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
|
||||
nlohmann::json req_json;
|
||||
|
||||
try {
|
||||
@ -2068,7 +2068,7 @@ bool post_create_event_sink(const std::shared_ptr<http_req>& req, const std::sha
|
||||
return false;
|
||||
}
|
||||
|
||||
auto op = EventManager::get_instance().create_sink(req_json);
|
||||
auto op = AnalyticsManager::get_instance().create_index(req_json);
|
||||
|
||||
if(!op.ok()) {
|
||||
res->set(op.code(), op.error());
|
||||
@ -2079,8 +2079,8 @@ bool post_create_event_sink(const std::shared_ptr<http_req>& req, const std::sha
|
||||
return true;
|
||||
}
|
||||
|
||||
bool del_event_sink(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
|
||||
auto op = EventManager::get_instance().remove_sink(req->params["name"]);
|
||||
bool del_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
|
||||
auto op = AnalyticsManager::get_instance().remove_suggestion_index(req->params["name"]);
|
||||
if(!op.ok()) {
|
||||
res->set(op.code(), op.error());
|
||||
return false;
|
||||
@ -2088,4 +2088,4 @@ bool del_event_sink(const std::shared_ptr<http_req>& req, const std::shared_ptr<
|
||||
|
||||
res->set_200(R"({"ok": true)");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include <query_suggestions.h>
|
||||
#include <analytics_manager.h>
|
||||
#include "event_manager.h"
|
||||
|
||||
bool EventManager::add_event(const nlohmann::json& event) {
|
||||
@ -45,40 +45,10 @@ bool EventManager::add_event(const nlohmann::json& event) {
|
||||
}
|
||||
|
||||
std::string query = event_data_query_it.get<std::string>();
|
||||
QuerySuggestions::get_instance().add_suggestion(coll.get<std::string>(), query, false, "");
|
||||
AnalyticsManager::get_instance().add_suggestion(coll.get<std::string>(), query, false, "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
Option<uint32_t> EventManager::create_sink(const nlohmann::json& sink_config, bool write_to_disk) {
|
||||
if(!sink_config.contains("name") || !sink_config["name"].is_string()) {
|
||||
return Option<uint32_t>(400, "Request payload contains invalid name.");
|
||||
}
|
||||
|
||||
if(!sink_config.contains("type") || !sink_config["type"].is_string()) {
|
||||
return Option<uint32_t>(400, "Request payload contains invalid type.");
|
||||
}
|
||||
|
||||
if(!sink_config.contains("source") || !sink_config["source"].is_object()) {
|
||||
return Option<uint32_t>(400, "Request payload contains invalid source.");
|
||||
}
|
||||
|
||||
if(!sink_config.contains("destination") || !sink_config["destination"].is_object()) {
|
||||
return Option<uint32_t>(400, "Request payload contains invalid destination.");
|
||||
}
|
||||
|
||||
if(sink_config.contains("type") && sink_config["type"] == QuerySuggestions::SINK_TYPE) {
|
||||
QuerySuggestions::get_instance().create_index(sink_config, write_to_disk);
|
||||
} else {
|
||||
return Option<uint32_t>(400, ("Missing or invalid event sink type."));
|
||||
}
|
||||
|
||||
return Option<uint32_t>(200);
|
||||
}
|
||||
|
||||
Option<bool> EventManager::remove_sink(const std::string& name) {
|
||||
return QuerySuggestions::get_instance().remove_suggestion_index(name);
|
||||
}
|
||||
|
@ -68,10 +68,10 @@ void master_server_routes() {
|
||||
server->put("/presets/:name", put_upsert_preset);
|
||||
server->del("/presets/:name", del_preset);
|
||||
|
||||
// events
|
||||
server->post("/events", post_create_event);
|
||||
server->post("/events/sinks", post_create_event_sink);
|
||||
server->del("/events/sinks/:name", del_event_sink);
|
||||
// analytics
|
||||
server->post("/analytics/popular-queries", post_create_analytics_popular_queries);
|
||||
server->del("/analytics/popular-queries/:name", del_analytics_popular_queries);
|
||||
server->post("/analytics/events", post_create_event);
|
||||
|
||||
// meta
|
||||
server->get("/metrics.json", get_metrics_json);
|
||||
|
@ -13,7 +13,7 @@
|
||||
#include <arpa/inet.h>
|
||||
#include <sys/socket.h>
|
||||
#include <ifaddrs.h>
|
||||
#include <query_suggestions.h>
|
||||
#include <analytics_manager.h>
|
||||
|
||||
#include "core_api.h"
|
||||
#include "ratelimit_manager.h"
|
||||
@ -397,7 +397,7 @@ int run_server(const Config & config, const std::string & version, void (*master
|
||||
HttpClient & httpClient = HttpClient::get_instance();
|
||||
httpClient.init(config.get_api_key());
|
||||
|
||||
QuerySuggestions::get_instance().init(&store);
|
||||
AnalyticsManager::get_instance().init(&store);
|
||||
|
||||
server = new HttpServer(
|
||||
version,
|
||||
@ -456,7 +456,7 @@ int run_server(const Config & config, const std::string & version, void (*master
|
||||
});
|
||||
|
||||
std::thread event_sink_thread([&replication_state]() {
|
||||
QuerySuggestions::get_instance().run(&replication_state);
|
||||
AnalyticsManager::get_instance().run(&replication_state);
|
||||
});
|
||||
|
||||
std::string path_to_nodes = config.get_nodes();
|
||||
@ -476,7 +476,7 @@ int run_server(const Config & config, const std::string & version, void (*master
|
||||
batch_indexing_thread.join();
|
||||
|
||||
LOG(INFO) << "Shutting down event sink thread...";
|
||||
QuerySuggestions::get_instance().stop();
|
||||
AnalyticsManager::get_instance().stop();
|
||||
|
||||
LOG(INFO) << "Waiting for event sink thread to be done...";
|
||||
event_sink_thread.join();
|
||||
|
Loading…
x
Reference in New Issue
Block a user