query hits count analytics

This commit is contained in:
krunal 2023-11-24 15:27:15 +05:30
parent 5b4df3211c
commit a6b404e7ff
7 changed files with 251 additions and 8 deletions

View File

@ -7,18 +7,18 @@
#include <unordered_map>
#include <shared_mutex>
struct ClickEvent {
struct click_event_t {
std::string query;
uint64_t timestamp;
std::string user_id;
std::string doc_id;
uint64_t position;
ClickEvent() = delete;
click_event_t() = delete;
~ClickEvent() = default;
~click_event_t() = default;
ClickEvent(std::string q, uint64_t ts, std::string uid, std::string id, uint64_t pos) {
click_event_t(std::string q, uint64_t ts, std::string uid, std::string id, uint64_t pos) {
query = q;
timestamp = ts;
user_id = uid;
@ -26,7 +26,7 @@ struct ClickEvent {
position = pos;
}
ClickEvent& operator=(ClickEvent& other) {
click_event_t& operator=(click_event_t& other) {
if (this != &other) {
query = other.query;
timestamp = other.timestamp;
@ -46,6 +46,47 @@ struct ClickEvent {
}
};
struct query_hits_count_t {
std::string query;
uint64_t timestamp;
std::string user_id;
uint64_t hits_count;
query_hits_count_t() = delete;
~query_hits_count_t() = default;
query_hits_count_t(std::string q, uint64_t ts, std::string uid, uint64_t count) {
query = q;
timestamp = ts;
user_id = uid;
hits_count = count;
}
query_hits_count_t &operator=(query_hits_count_t &other) {
if (this != &other) {
query = other.query;
timestamp = other.timestamp;
user_id = other.user_id;
hits_count = other.hits_count;
return *this;
}
}
void to_json(nlohmann::json &obj) const {
obj["query"] = query;
obj["timestamp"] = timestamp;
obj["user_id"] = user_id;
obj["hits_count"] = hits_count;
}
};
struct query_hits_count_comp {
bool operator()(const query_hits_count_t& a, const query_hits_count_t& b) const {
return a.query < b.query;
}
};
struct event_cache_t {
uint64_t last_update_time;
uint64_t count;
@ -97,7 +138,10 @@ private:
std::unordered_map<std::string, QueryAnalytics*> nohits_queries;
//query collection => click events
std::unordered_map<std::string, std::vector<ClickEvent>> query_collection_click_events;
std::unordered_map<std::string, std::vector<click_event_t>> query_collection_click_events;
//query collection => query hits count
std::unordered_map<std::string, std::set<query_hits_count_t, query_hits_count_comp>> query_collection_hits_count;
Store* store = nullptr;
Store* analytics_store = nullptr;
@ -116,6 +160,7 @@ public:
static constexpr const char* ANALYTICS_RULE_PREFIX = "$AR";
static constexpr const char* CLICK_EVENT = "$CE";
static constexpr const char* QUERY_HITS_COUNT = "$QH";
static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries";
static constexpr const char* NOHITS_QUERIES_TYPE = "nohits_queries";
@ -165,4 +210,13 @@ public:
std::unordered_map<std::string, QueryAnalytics*> get_nohits_queries();
void resetRateLimit();
void add_query_hits_count(const std::string& query_collection, const std::string& query, const std::string& user_id,
uint64_t hits_count);
void persist_query_hits_counts(ReplicationState *raft_server, uint64_t prev_persistence_s);
nlohmann::json get_query_hits_counts();
Option<bool> write_query_hits_counts_to_store(nlohmann::json& query_hits_counts_json);
};

View File

@ -173,6 +173,10 @@ bool get_click_events(const std::shared_ptr<http_req>& req, const std::shared_pt
bool post_replicate_click_event(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool get_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_replicate_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Misc helpers
void get_collections_for_auth(std::map<std::string, std::string>& req_params, const std::string& body,

View File

@ -273,7 +273,7 @@ Option<bool> AnalyticsManager::add_click_event(const std::string &query_collecti
auto now_ts_useconds = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
ClickEvent click_event(query, now_ts_useconds, user_id, doc_id, position);
click_event_t click_event(query, now_ts_useconds, user_id, doc_id, position);
click_events_vec.emplace_back(click_event);
return Option<bool>(true);
@ -298,6 +298,25 @@ void AnalyticsManager::add_nohits_query(const std::string &query_collection, con
}
}
void AnalyticsManager::add_query_hits_count(const std::string &query_collection, const std::string &query,
const std::string &user_id, uint64_t hits_count) {
std::unique_lock lock(mutex);
if(analytics_store) {
auto now_ts_useconds = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
auto &query_hits_count_set = query_collection_hits_count[query_collection];
query_hits_count_t queryHitsCount(query, now_ts_useconds, user_id, hits_count);
auto query_hits_count_set_it = query_hits_count_set.find(queryHitsCount);
if(query_hits_count_set_it != query_hits_count_set.end()) {
query_hits_count_set.erase(query_hits_count_set_it);
}
query_hits_count_set.emplace(queryHitsCount);
}
}
void AnalyticsManager::run(ReplicationState* raft_server) {
uint64_t prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -324,6 +343,8 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
persist_query_events(raft_server, prev_persistence_s);
persist_click_events(raft_server, prev_persistence_s);
persist_query_hits_counts(raft_server, prev_persistence_s);
prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -458,6 +479,46 @@ void AnalyticsManager::persist_click_events(ReplicationState *raft_server, uint6
}
}
void AnalyticsManager::persist_query_hits_counts(ReplicationState *raft_server, uint64_t prev_persistence_s) {
nlohmann::json payload_json = nlohmann::json::array();
for (const auto &query_collection_hits_count_it: query_collection_hits_count) {
auto collection_id = CollectionManager::get_instance().get_collection(
query_collection_hits_count_it.first)->get_collection_id();
for (const auto &query_hits_count: query_collection_hits_count_it.second) {
// send http request
nlohmann::json query_hits_count_json;
query_hits_count.to_json(query_hits_count_json);
query_hits_count_json["collection_id"] = std::to_string(collection_id);
payload_json.push_back(query_hits_count_json);
}
}
if (payload_json.empty()) {
return;
}
const std::string import_payload = payload_json.dump();
std::string leader_url = raft_server->get_leader_url();
if (!leader_url.empty()) {
const std::string &base_url = leader_url + "analytics";
std::string res;
const std::string &update_url = base_url + "/query_hits_counts/replicate";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, {}, 10 * 1000, true);
if (status_code != 200) {
LOG(ERROR) << "Error while sending click events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
query_collection_hits_count.clear();
}
}
}
void AnalyticsManager::stop() {
quit = true;
cv.notify_all();
@ -512,6 +573,24 @@ nlohmann::json AnalyticsManager::get_click_events() {
return result_json;
}
nlohmann::json AnalyticsManager::get_query_hits_counts() {
std::unique_lock lk(mutex);
std::vector<std::string> query_hits_counts_jsons;
nlohmann::json result_json = nlohmann::json::array();
if (analytics_store) {
analytics_store->scan_fill(std::string(QUERY_HITS_COUNT) + "_", std::string(QUERY_HITS_COUNT) + "`",
query_hits_counts_jsons);
for (const auto &query_hits_count_json: query_hits_counts_jsons) {
nlohmann::json query_hits_count = nlohmann::json::parse(query_hits_count_json);
result_json.push_back(query_hits_count);
}
}
return result_json;
}
Option<bool> AnalyticsManager::write_click_event_to_store(nlohmann::json &click_event_jsons) {
for(const auto& click_event_json : click_event_jsons) {
auto collection_id = click_event_json["collection_id"].get<std::string>();
@ -530,6 +609,24 @@ Option<bool> AnalyticsManager::write_click_event_to_store(nlohmann::json &click_
return Option<bool>(true);
}
Option<bool> AnalyticsManager::write_query_hits_counts_to_store(nlohmann::json &query_hits_counts_json) {
for(const auto& query_hits_count : query_hits_counts_json) {
auto collection_id = query_hits_count["collection_id"].get<std::string>();
auto timestamp = query_hits_count["timestamp"].get<uint64_t>();
const std::string key = std::string(QUERY_HITS_COUNT) + "_" + collection_id + "_" +
std::to_string(timestamp);
if(analytics_store) {
bool inserted = analytics_store->insert(key, query_hits_count.dump());
if (!inserted) {
return Option<bool>(500, "Unable to insert query hits count into store.");
}
} else {
return Option<bool>(500, "Analytics DB not initialized.");
}
}
return Option<bool>(true);
}
void AnalyticsManager::resetRateLimit() {
events_cache.clear();
}

View File

@ -1534,6 +1534,9 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
std::string analytics_query = Tokenizer::normalize_ascii_no_spaces(raw_query);
AnalyticsManager::get_instance().add_suggestion(orig_coll_name, analytics_query,
true, req_params["x-typesense-user-id"]);
AnalyticsManager::get_instance().add_query_hits_count(orig_coll_name, analytics_query,
req_params["x-typesense-user-id"],
result["found"].get<size_t>());
} else if(result.contains("found") == 0 && result["found"].get<size_t>() == 0) {
std::string analytics_query = Tokenizer::normalize_ascii_no_spaces(raw_query);
AnalyticsManager::get_instance().add_nohits_query(orig_coll_name, analytics_query,

View File

@ -2823,6 +2823,34 @@ bool post_replicate_click_event(const std::shared_ptr<http_req>& req, const std:
return false;
}
res->set_200("ClickEvent wrote to DB.");
res->set_200("click_event_t wrote to DB.");
return true;
}
bool get_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto query_hits_counts = AnalyticsManager::get_instance().get_query_hits_counts();
res->set_200(query_hits_counts.dump());
return true;
}
bool post_replicate_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
nlohmann::json req_json;
try {
req_json = nlohmann::json::parse(req->body);
} catch(const std::exception& e) {
LOG(ERROR) << "JSON error: " << e.what();
res->set_400("Bad JSON.");
return false;
}
auto op = AnalyticsManager::get_instance().write_query_hits_counts_to_store(req_json);
if(!op.ok()) {
res->set_body(op.code(), op.error());
return false;
}
res->set_200("query hits counts wrote to DB.");
return true;
}

View File

@ -85,6 +85,8 @@ void master_server_routes() {
server->get("/analytics/click_events", get_click_events);
server->post("/analytics/click_events", post_create_event);
server->post("/analytics/click_events/replicate", post_replicate_click_event);
server->get("/analytics/query_hits_counts", get_query_hits_counts);
server->post("/analytics/query_hits_counts/replicate", post_replicate_query_hits_counts);
// meta
server->get("/metrics.json", get_metrics_json);

View File

@ -440,4 +440,59 @@ TEST_F(AnalyticsManagerTest, NoresultsQueries) {
noresults_queries = analyticsManager.get_nohits_queries();
ASSERT_EQ(0, noresults_queries.size());
}
TEST_F(AnalyticsManagerTest, QueryHitsCount) {
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection *titles_coll = collectionManager.create_collection(titles_schema).get();
nlohmann::json doc;
doc["title"] = "Cool trousers";
ASSERT_TRUE(titles_coll->add(doc.dump()).ok());
doc["title"] = "Cool pants";
ASSERT_TRUE(titles_coll->add(doc.dump()).ok());
doc["title"] = "Trendy sneakers";
ASSERT_TRUE(titles_coll->add(doc.dump()).ok());
doc["title"] = "Funky shorts";
ASSERT_TRUE(titles_coll->add(doc.dump()).ok());
nlohmann::json query_hits_array = nlohmann::json::array();
nlohmann::json obj;
obj["collection_id"] = "0";
obj["query"] = "cool";
obj["timestamp"] = 1625365612;
obj["user_id"] = "1";
obj["hits_count"] = 2;
query_hits_array.push_back(obj);
obj["query"] = "funky";
obj["timestamp"] = 1625365616;
obj["user_id"] = "1";
obj["hits_count"] = 1;
query_hits_array.push_back(obj);
auto op = analyticsManager.write_query_hits_counts_to_store(query_hits_array);
ASSERT_TRUE(op.ok());
auto result = analyticsManager.get_query_hits_counts();
ASSERT_EQ(2, result.size());
ASSERT_EQ("cool", result[0]["query"]);
ASSERT_EQ(2, result[0]["hits_count"]);
ASSERT_EQ(1625365612, result[0]["timestamp"]);
ASSERT_EQ("funky", result[1]["query"]);
ASSERT_EQ(1, result[1]["hits_count"]);
ASSERT_EQ(1625365616, result[1]["timestamp"]);
}