refactor anlaytics queries

This commit is contained in:
krunal 2023-11-22 15:06:34 +05:30
parent e6c3fbf415
commit 6335f7b87f
7 changed files with 76 additions and 295 deletions

View File

@ -1,12 +1,11 @@
#pragma once
#include "popular_queries.h"
#include "query_analytics.h"
#include "option.h"
#include "raft_server.h"
#include <vector>
#include <string>
#include <unordered_map>
#include <shared_mutex>
#include "noresults_queries.h"
struct ClickEvent {
std::string query;
@ -92,10 +91,10 @@ private:
std::unordered_map<std::string, std::vector<std::string>> query_collection_mapping;
// suggestion collection => popular queries
std::unordered_map<std::string, PopularQueries*> popular_queries;
std::unordered_map<std::string, QueryAnalytics*> popular_queries;
// suggestion collection => noresults queries
std::unordered_map<std::string, NoresultsQueries*> noresults_queries;
std::unordered_map<std::string, QueryAnalytics*> noresults_queries;
//query collection => click events
std::unordered_map<std::string, std::vector<ClickEvent>> query_collection_click_events;
@ -147,9 +146,9 @@ public:
void dispose();
void persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s);
void persist_query_events(ReplicationState *raft_server, uint64_t prev_persistence_s);
std::unordered_map<std::string, PopularQueries*> get_popular_queries();
std::unordered_map<std::string, QueryAnalytics*> get_popular_queries();
Option<bool> add_click_event(const std::string& query_collection, const std::string& query, const std::string& user_id,
std::string doc_id, uint64_t position, const std::string& client_ip);
@ -163,9 +162,7 @@ public:
void add_noresults_query(const std::string& query_collection,
const std::string& query, bool live_query, const std::string& user_id);
void persist_noresults_queries(ReplicationState *raft_server, uint64_t prev_persistence_s);
std::unordered_map<std::string, NoresultsQueries*> get_noresults_queries();
std::unordered_map<std::string, QueryAnalytics*> get_noresults_queries();
void resetRateLimit();
};

View File

@ -1,51 +0,0 @@
#pragma once
#include <string>
#include <vector>
#include <tsl/htrie_map.h>
#include <json.hpp>
#include <atomic>
#include <shared_mutex>
class NoresultsQueries {
public:
struct QWithTimestamp {
std::string query;
uint64_t timestamp;
QWithTimestamp(const std::string& query, uint64_t timestamp) : query(query), timestamp(timestamp) {}
};
static const size_t QUERY_FINALIZATION_INTERVAL_MICROS = 4 * 1000 * 1000;
private:
size_t k;
const size_t max_size;
// counts aggregated within the current node
tsl::htrie_map<char, uint32_t> local_counts;
std::shared_mutex lmutex;
std::unordered_map<std::string, std::vector<QWithTimestamp>> user_prefix_queries;
std::shared_mutex umutex;
public:
NoresultsQueries(size_t k);
void add(const std::string& value, const bool live_query, const std::string& user_id,
uint64_t now_ts_us = 0);
void compact_user_queries(uint64_t now_ts_us);
void serialize_as_docs(std::string& docs);
void reset_local_counts();
size_t get_k();
std::unordered_map<std::string, std::vector<QWithTimestamp>> get_user_prefix_queries();
tsl::htrie_map<char, uint32_t> get_local_counts();
};

View File

@ -7,7 +7,7 @@
#include <atomic>
#include <shared_mutex>
class PopularQueries {
class QueryAnalytics {
public:
struct QWithTimestamp {
std::string query;
@ -32,7 +32,7 @@ private:
public:
PopularQueries(size_t k);
QueryAnalytics(size_t k);
void add(const std::string& value, const bool live_query, const std::string& user_id,
uint64_t now_ts_us = 0);

View File

@ -124,10 +124,10 @@ Option<bool> AnalyticsManager::create_queries_index(nlohmann::json &payload, boo
}
if(payload["type"] == POPULAR_QUERIES_TYPE) {
PopularQueries *popularQueries = new PopularQueries(limit);
QueryAnalytics *popularQueries = new QueryAnalytics(limit);
popular_queries.emplace(suggestion_collection, popularQueries);
} else if(payload["type"] == NORESULTS_QUERIES_TYPE) {
NoresultsQueries *noresultsQueries = new NoresultsQueries(limit);
QueryAnalytics *noresultsQueries = new QueryAnalytics(limit);
noresults_queries.emplace(suggestion_collection, noresultsQueries);
}
@ -322,9 +322,8 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
continue;
}
persist_suggestions(raft_server, prev_persistence_s);
persist_query_events(raft_server, prev_persistence_s);
persist_click_events(raft_server, prev_persistence_s);
persist_noresults_queries(raft_server, prev_persistence_s);
prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -334,34 +333,11 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
dispose();
}
void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s) {
void AnalyticsManager::persist_query_events(ReplicationState *raft_server, uint64_t prev_persistence_s) {
// lock is held by caller
for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
auto popular_queries_it = popular_queries.find(suggestion_coll);
if(popular_queries_it == popular_queries.end()) {
continue;
}
// need to prepare the counts as JSON docs for import into the suggestion collection
// {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}}
PopularQueries* popularQueries = popular_queries_it->second;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
popularQueries->compact_user_queries(now_ts_us);
std::string import_payload;
popularQueries->serialize_as_docs(import_payload);
if(import_payload.empty()) {
continue;
}
auto send_http_response = [&](QueryAnalytics* queryAnalyticsPtr,
const std::string& import_payload, const std::string& suggestion_coll) {
// send http request
std::string leader_url = raft_server->get_leader_url();
if(!leader_url.empty()) {
@ -378,11 +354,11 @@ void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Query aggregation for collection: " + suggestion_coll;
popularQueries->reset_local_counts();
queryAnalyticsPtr->reset_local_counts();
if(raft_server->is_leader()) {
// try to run top-K compaction of suggestion collection
const std::string top_k_param = "count:" + std::to_string(popularQueries->get_k());
const std::string top_k_param = "count:" + std::to_string(queryAnalyticsPtr->get_k());
const std::string& truncate_topk_url = base_url + "/documents?top_k_by=" + top_k_param;
res.clear();
res_headers.clear();
@ -396,6 +372,47 @@ void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64
}
}
}
};
for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
auto popular_queries_it = popular_queries.find(suggestion_coll);
auto noresults_queries_it = noresults_queries.find(suggestion_coll);
// need to prepare the counts as JSON docs for import into the suggestion collection
// {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}}
std::string import_payload;
if(popular_queries_it != popular_queries.end()) {
import_payload.clear();
QueryAnalytics *popularQueries = popular_queries_it->second;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
popularQueries->compact_user_queries(now_ts_us);
popularQueries->serialize_as_docs(import_payload);
send_http_response(popularQueries, import_payload, suggestion_coll);
}
if(noresults_queries_it != noresults_queries.end()) {
import_payload.clear();
QueryAnalytics *noresultsQueries = noresults_queries_it->second;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
noresultsQueries->compact_user_queries(now_ts_us);
noresultsQueries->serialize_as_docs(import_payload);
}
if(import_payload.empty()) {
continue;
}
}
}
@ -440,71 +457,6 @@ void AnalyticsManager::persist_click_events(ReplicationState *raft_server, uint6
}
}
void AnalyticsManager::persist_noresults_queries(ReplicationState *raft_server, uint64_t prev_persistence_s) {
// lock is held by caller
for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
auto noresults_queries_it = noresults_queries.find(suggestion_coll);
if(noresults_queries_it == noresults_queries.end()) {
continue;
}
// need to prepare the counts as JSON docs for import into the suggestion collection
// {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}}
NoresultsQueries* noresultsQueries = noresults_queries_it->second;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
noresultsQueries->compact_user_queries(now_ts_us);
std::string import_payload;
noresultsQueries->serialize_as_docs(import_payload);
if(import_payload.empty()) {
continue;
}
// send http request
std::string leader_url = raft_server->get_leader_url();
if(!leader_url.empty()) {
const std::string& base_url = leader_url + "collections/" + suggestion_coll;
std::string res;
const std::string& update_url = base_url + "/documents/import?action=emplace";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, {}, 10*1000, true);
if(status_code != 200) {
LOG(ERROR) << "Error while sending query suggestions events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Query aggregation for collection: " + suggestion_coll;
noresultsQueries->reset_local_counts();
if(raft_server->is_leader()) {
// try to run top-K compaction of suggestion collection
const std::string top_k_param = "count:" + std::to_string(noresultsQueries->get_k());
const std::string& truncate_topk_url = base_url + "/documents?top_k_by=" + top_k_param;
res.clear();
res_headers.clear();
status_code = HttpClient::delete_response(truncate_topk_url, res, res_headers, 10*1000, true);
if(status_code != 200) {
LOG(ERROR) << "Error while running top K for query suggestions collection. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Top K aggregation for collection: " + suggestion_coll;
}
}
}
}
}
}
void AnalyticsManager::stop() {
quit = true;
cv.notify_all();
@ -531,12 +483,12 @@ void AnalyticsManager::init(Store* store, Store* analytics_store) {
this->analytics_store = analytics_store;
}
std::unordered_map<std::string, PopularQueries*> AnalyticsManager::get_popular_queries() {
std::unordered_map<std::string, QueryAnalytics*> AnalyticsManager::get_popular_queries() {
std::unique_lock lk(mutex);
return popular_queries;
}
std::unordered_map<std::string, NoresultsQueries*> AnalyticsManager::get_noresults_queries() {
std::unordered_map<std::string, QueryAnalytics*> AnalyticsManager::get_noresults_queries() {
std::unique_lock lk(mutex);
return noresults_queries;
}

View File

@ -1,117 +0,0 @@
#include "noresults_queries.h"
#include "logger.h"
#include <algorithm>
#include <mutex>
#include "string_utils.h"
NoresultsQueries::NoresultsQueries(size_t k) : k(k), max_size(k * 2) {
}
void NoresultsQueries::add(const std::string& key, const bool live_query, const std::string& user_id, uint64_t now_ts_us) {
if(live_query) {
// live query must be aggregated first to their final form as they could be prefix queries
if(now_ts_us == 0) {
now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
if(!umutex.try_lock()) {
// instead of locking we just skip incrementing keys during consolidation time
return ;
}
auto& queries = user_prefix_queries[user_id];
if(queries.size() < 100) {
queries.emplace_back(key, now_ts_us);
}
umutex.unlock();
} else {
if(!lmutex.try_lock()) {
// instead of locking we just skip incrementing keys during consolidation time
return ;
}
auto it = local_counts.find(key);
if(it != local_counts.end()) {
it.value()++;
} else if(local_counts.size() < max_size) {
// skip count when map has become too large (to prevent abuse)
local_counts.emplace(key, 1);
}
lmutex.unlock();
}
}
void NoresultsQueries::serialize_as_docs(std::string& docs) {
std::shared_lock lk(lmutex);
std::string key_buffer;
for(auto it = local_counts.begin(); it != local_counts.end(); ++it) {
it.key(key_buffer);
nlohmann::json doc;
doc["id"] = std::to_string(StringUtils::hash_wy(key_buffer.c_str(), key_buffer.size()));
doc["q"] = key_buffer;
doc["$operations"]["increment"]["count"] = it.value();
docs += doc.dump() + "\n";
}
if(!docs.empty()) {
docs.pop_back();
}
}
void NoresultsQueries::reset_local_counts() {
std::unique_lock lk(lmutex);
local_counts.clear();
}
size_t NoresultsQueries::get_k() {
return k;
}
void NoresultsQueries::compact_user_queries(uint64_t now_ts_us) {
std::unique_lock lk(umutex);
std::vector<std::string> keys_to_delete;
for(auto& kv: user_prefix_queries) {
auto& queries = kv.second;
int64_t last_consolidated_index = -1;
for(uint32_t i = 0; i < queries.size(); i++) {
if(now_ts_us - queries[i].timestamp < QUERY_FINALIZATION_INTERVAL_MICROS) {
break;
}
uint64_t diff_micros = (i == queries.size()-1) ? (now_ts_us - queries[i].timestamp) :
(queries[i + 1].timestamp - queries[i].timestamp);
if(diff_micros > QUERY_FINALIZATION_INTERVAL_MICROS) {
add(queries[i].query, false, "");
last_consolidated_index = i;
}
}
queries.erase(queries.begin(), queries.begin() + last_consolidated_index+1);
if(queries.empty()) {
keys_to_delete.push_back(kv.first);
}
}
for(auto& key: keys_to_delete) {
user_prefix_queries.erase(key);
}
}
std::unordered_map<std::string, std::vector<NoresultsQueries::QWithTimestamp>> NoresultsQueries::get_user_prefix_queries() {
std::unique_lock lk(umutex);
return user_prefix_queries;
}
tsl::htrie_map<char, uint32_t> NoresultsQueries::get_local_counts() {
std::unique_lock lk(lmutex);
return local_counts;
}

View File

@ -1,14 +1,14 @@
#include "popular_queries.h"
#include "query_analytics.h"
#include "logger.h"
#include <algorithm>
#include <mutex>
#include "string_utils.h"
PopularQueries::PopularQueries(size_t k) : k(k), max_size(k * 2) {
QueryAnalytics::QueryAnalytics(size_t k) : k(k), max_size(k * 2) {
}
void PopularQueries::add(const std::string& key, const bool live_query, const std::string& user_id, uint64_t now_ts_us) {
void QueryAnalytics::add(const std::string& key, const bool live_query, const std::string& user_id, uint64_t now_ts_us) {
if(live_query) {
// live query must be aggregated first to their final form as they could be prefix queries
if(now_ts_us == 0) {
@ -47,7 +47,7 @@ void PopularQueries::add(const std::string& key, const bool live_query, const st
}
}
void PopularQueries::serialize_as_docs(std::string& docs) {
void QueryAnalytics::serialize_as_docs(std::string& docs) {
std::shared_lock lk(lmutex);
std::string key_buffer;
@ -65,16 +65,16 @@ void PopularQueries::serialize_as_docs(std::string& docs) {
}
}
void PopularQueries::reset_local_counts() {
void QueryAnalytics::reset_local_counts() {
std::unique_lock lk(lmutex);
local_counts.clear();
}
size_t PopularQueries::get_k() {
size_t QueryAnalytics::get_k() {
return k;
}
void PopularQueries::compact_user_queries(uint64_t now_ts_us) {
void QueryAnalytics::compact_user_queries(uint64_t now_ts_us) {
std::unique_lock lk(umutex);
std::vector<std::string> keys_to_delete;
@ -107,12 +107,12 @@ void PopularQueries::compact_user_queries(uint64_t now_ts_us) {
}
}
std::unordered_map<std::string, std::vector<PopularQueries::QWithTimestamp>> PopularQueries::get_user_prefix_queries() {
std::unordered_map<std::string, std::vector<QueryAnalytics::QWithTimestamp>> QueryAnalytics::get_user_prefix_queries() {
std::unique_lock lk(umutex);
return user_prefix_queries;
}
tsl::htrie_map<char, uint32_t> PopularQueries::get_local_counts() {
tsl::htrie_map<char, uint32_t> QueryAnalytics::get_local_counts() {
std::unique_lock lk(lmutex);
return local_counts;
}

View File

@ -1,5 +1,5 @@
#include <gtest/gtest.h>
#include "popular_queries.h"
#include "query_analytics.h"
#include "logger.h"
class PopularQueriesTest : public ::testing::Test {
@ -14,7 +14,7 @@ protected:
};
TEST_F(PopularQueriesTest, PrefixQueryCompaction) {
PopularQueries pq(10);
QueryAnalytics pq(10);
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -36,7 +36,7 @@ TEST_F(PopularQueriesTest, PrefixQueryCompaction) {
ASSERT_EQ(0, pq.get_local_counts().size());
// compaction interval has happened
pq.compact_user_queries(now_ts_us+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100);
pq.compact_user_queries(now_ts_us + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(0, queries.size());
auto local_counts = pq.get_local_counts();
@ -49,7 +49,7 @@ TEST_F(PopularQueriesTest, PrefixQueryCompaction) {
pq.add("f", true, "0", now_ts_us+1);
pq.add("fo", true, "0", now_ts_us+2);
pq.add("foo", true, "0", now_ts_us+3);
pq.compact_user_queries(now_ts_us+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100);
pq.compact_user_queries(now_ts_us + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(0, queries.size());
local_counts = pq.get_local_counts();
@ -62,8 +62,8 @@ TEST_F(PopularQueriesTest, PrefixQueryCompaction) {
pq.add("f", true, "0", now_ts_us+1);
pq.add("fo", true, "0", now_ts_us+2);
pq.add("foo", true, "0", now_ts_us+3);
pq.add("b", true, "0", now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100);
pq.compact_user_queries(now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100+1);
pq.add("b", true, "0", now_ts_us + 3 + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100);
pq.compact_user_queries(now_ts_us + 3 + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100 + 1);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(1, queries.size());
ASSERT_EQ(1, queries["0"].size());
@ -74,10 +74,10 @@ TEST_F(PopularQueriesTest, PrefixQueryCompaction) {
ASSERT_EQ(1, local_counts["foo"]);
// continue with that query
auto prev_ts = now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100+1;
auto prev_ts = now_ts_us + 3 + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 100 + 1;
pq.add("ba", true, "0", prev_ts+1);
pq.add("bar", true, "0", prev_ts+2);
pq.compact_user_queries(prev_ts+2+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+1);
pq.compact_user_queries(prev_ts + 2 + QueryAnalytics::QUERY_FINALIZATION_INTERVAL_MICROS + 1);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(0, queries.size());
local_counts = pq.get_local_counts();