Handle aliasing for analytics query.

This commit is contained in:
Kishore Nallan 2023-05-30 11:55:20 +05:30
parent 9e505a5322
commit 4c32ead921
5 changed files with 219 additions and 75 deletions

View File

@ -79,4 +79,8 @@ public:
void stop();
void dispose();
void persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s);
std::unordered_map<std::string, PopularQueries*> get_popular_queries();
};

View File

@ -12,12 +12,14 @@ Option<bool> AnalyticsManager::create_rule(nlohmann::json& payload, bool write_t
{
"name": "top_search_queries",
"type": "popular_queries",
"limit": 1000,
"source": {
"collections": ["brands", "products"]
},
"destination": {
"collection": "top_search_queries"
"params": {
"limit": 1000,
"source": {
"collections": ["brands", "products"]
},
"destination": {
"collection": "top_search_queries"
}
}
}
*/
@ -207,71 +209,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
break;
}
for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
auto popular_queries_it = popular_queries.find(suggestion_coll);
if(popular_queries_it == popular_queries.end()) {
continue;
}
// need to prepare the counts as JSON docs for import into the suggestion collection
// {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}}
PopularQueries* popularQueries = popular_queries_it->second;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
popularQueries->compact_user_queries(now_ts_us);
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::seconds>(now).count();
if(now_ts_seconds - prev_persistence_s < Config::get_instance().get_analytics_flush_interval()) {
// we will persist aggregation every hour
continue;
}
prev_persistence_s = now_ts_seconds;
std::string import_payload;
popularQueries->serialize_as_docs(import_payload);
if(import_payload.empty()) {
continue;
}
// send http request
std::string leader_url = raft_server->get_leader_url();
if(!leader_url.empty()) {
const std::string& resource_url = leader_url + "collections/" + suggestion_coll +
"/documents/import?action=emplace";
std::string res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(resource_url, import_payload,
res, res_headers, 10*1000);
if(status_code != 200) {
LOG(ERROR) << "Error while sending query suggestions events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Sent query suggestions to leader for aggregation.";
popularQueries->reset_local_counts();
if(raft_server->is_leader()) {
// try to run top-K compaction of suggestion collection
auto coll = CollectionManager::get_instance().get_collection(suggestion_coll);
if (coll == nullptr) {
LOG(ERROR) << "No collection found for suggestions aggregation: " + suggestion_coll;
continue;
}
coll->truncate_after_top_k("count", popularQueries->get_k());
}
}
}
}
persist_suggestions(raft_server, prev_persistence_s);
lk.unlock();
}
@ -279,12 +217,88 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
dispose();
}
void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s) {
// lock is held by caller
for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
auto popular_queries_it = popular_queries.find(suggestion_coll);
if(popular_queries_it == popular_queries.end()) {
continue;
}
// need to prepare the counts as JSON docs for import into the suggestion collection
// {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}}
PopularQueries* popularQueries = popular_queries_it->second;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
popularQueries->compact_user_queries(now_ts_us);
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::seconds>(now).count();
if(now_ts_seconds - prev_persistence_s < Config::get_instance().get_analytics_flush_interval()) {
// we will persist aggregation every hour
continue;
}
prev_persistence_s = now_ts_seconds;
std::string import_payload;
popularQueries->serialize_as_docs(import_payload);
if(import_payload.empty()) {
continue;
}
// send http request
std::string leader_url = raft_server->get_leader_url();
if(!leader_url.empty()) {
const std::string& base_url = leader_url + "collections/" + suggestion_coll;
std::string res;
const std::string& update_url = base_url + "/documents/import?action=emplace";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, 10*1000);
if(status_code != 200) {
LOG(ERROR) << "Error while sending query suggestions events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Query aggregation for collection: " + suggestion_coll;
popularQueries->reset_local_counts();
if(raft_server->is_leader()) {
// try to run top-K compaction of suggestion collection
const std::string top_k_param = "count:" + std::to_string(popularQueries->get_k());
const std::string& truncate_topk_url = base_url + "/documents?top_k_by=" + top_k_param;
res.clear();
res_headers.clear();
status_code = HttpClient::delete_response(truncate_topk_url, res, res_headers, 10*1000);
if(status_code != 200) {
LOG(ERROR) << "Error while running top K for query suggestions collection. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Top K aggregation for collection: " + suggestion_coll;
}
}
}
}
}
}
void AnalyticsManager::stop() {
quit = true;
cv.notify_all();
}
void AnalyticsManager::dispose() {
std::unique_lock lk(mutex);
for(auto& kv: popular_queries) {
delete kv.second;
}
@ -296,3 +310,7 @@ void AnalyticsManager::init(Store* store) {
this->store = store;
}
std::unordered_map<std::string, PopularQueries*> AnalyticsManager::get_popular_queries() {
std::unique_lock lk(mutex);
return popular_queries;
}

View File

@ -734,7 +734,8 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
}
CollectionManager & collectionManager = CollectionManager::get_instance();
auto collection = collectionManager.get_collection(req_params["collection"]);
const std::string& orig_coll_name = req_params["collection"];
auto collection = collectionManager.get_collection(orig_coll_name);
if(collection == nullptr) {
return Option<bool>(404, "Not found.");
@ -1064,9 +1065,8 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
if(Config::get_instance().get_enable_search_analytics()) {
if(result.count("found") != 0 && result["found"].get<size_t>() != 0) {
std::string processed_query = raw_query;
Tokenizer::normalize_ascii(processed_query);
AnalyticsManager::get_instance().add_suggestion(collection->get_name(), processed_query,
std::string analytics_query = raw_query;
AnalyticsManager::get_instance().add_suggestion(orig_coll_name, analytics_query,
true, req_params["x-typesense-user-id"]);
}
}

View File

@ -1114,6 +1114,37 @@ bool del_remove_documents(const std::shared_ptr<http_req>& req, const std::share
const char *BATCH_SIZE = "batch_size";
const char *FILTER_BY = "filter_by";
const char *TOP_K_BY = "top_k_by";
if(req->params.count(TOP_K_BY) != 0) {
std::vector<std::string> parts;
StringUtils::split(req->params[TOP_K_BY], parts, ":");
if(parts.size() != 2 || !StringUtils::is_uint32_t(parts[1])) {
req->last_chunk_aggregate = true;
res->final = true;
res->set_400("The `top_k_by` parameter is not valid.");
stream_response(req, res);
return false;
}
const std::string& field_name = parts[0];
const size_t k = std::stoull(parts[1]);
auto op = collection->truncate_after_top_k(field_name, k);
req->last_chunk_aggregate = true;
res->final = true;
if(!op.ok()) {
res->set_500(op.error());
stream_response(req, res);
return false;
}
res->set_200(R"({"ok": true})");
stream_response(req, res);
return true;
}
if(req->params.count(BATCH_SIZE) == 0) {
req->params[BATCH_SIZE] = "1000000000"; // 1 Billion

View File

@ -0,0 +1,91 @@
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include <collection_manager.h>
#include <analytics_manager.h>
#include "collection.h"
class AnalyticsManagerTest : public ::testing::Test {
protected:
Store *store;
CollectionManager& collectionManager = CollectionManager::get_instance();
std::atomic<bool> quit = false;
std::vector<std::string> query_fields;
std::vector<sort_by> sort_fields;
AnalyticsManager& analyticsManager = AnalyticsManager::get_instance();
void setupCollection() {
std::string state_dir_path = "/tmp/typesense_test/analytics_manager_test";
LOG(INFO) << "Truncating and creating: " << state_dir_path;
system(("rm -rf "+state_dir_path+" && mkdir -p "+state_dir_path).c_str());
system("mkdir -p /tmp/typesense_test/models");
store = new Store(state_dir_path);
collectionManager.init(store, 1.0, "auth_key", quit);
collectionManager.load(8, 1000);
analyticsManager.init(store);
}
virtual void SetUp() {
setupCollection();
}
virtual void TearDown() {
collectionManager.dispose();
delete store;
}
};
TEST_F(AnalyticsManagerTest, AddSuggestion) {
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection* titles_coll = collectionManager.create_collection(titles_schema).get();
nlohmann::json doc;
doc["title"] = "Cool trousers";
ASSERT_TRUE(titles_coll->add(doc.dump()).ok());
// create a collection to store suggestions
nlohmann::json suggestions_schema = R"({
"name": "top_queries",
"fields": [
{"name": "q", "type": "string" },
{"name": "count", "type": "int32" }
]
})"_json;
Collection* suggestions_coll = collectionManager.create_collection(suggestions_schema).get();
nlohmann::json analytics_rule = R"({
"name": "top_search_queries",
"type": "popular_queries",
"params": {
"limit": 100,
"source": {
"collections": ["titles"]
},
"destination": {
"collection": "top_queries"
}
}
})"_json;
auto create_op = analyticsManager.create_rule(analytics_rule);
ASSERT_TRUE(create_op.ok());
std::string q = "foobar";
analyticsManager.add_suggestion("titles", q, true, "1");
auto popularQueries = analyticsManager.get_popular_queries();
auto userQueries = popularQueries["top_queries"]->get_user_prefix_queries()["1"];
ASSERT_EQ(1, userQueries.size());
ASSERT_EQ("foobar", userQueries[0].query);
}