initial click event analytics

This commit is contained in:
krunal 2023-11-02 15:47:19 +05:30
parent bf01fbb773
commit 7dbd53cbc6
6 changed files with 167 additions and 0 deletions

View File

@ -7,6 +7,45 @@
#include <unordered_map>
#include <shared_mutex>
struct ClickEvent {
std::string query;
uint64_t timestamp;
uint64_t product_id;
uint64_t position;
ClickEvent() = delete;
~ClickEvent() = default;
ClickEvent(std::string q, uint64_t ts, uint64_t pid, uint64_t pos) {
query = q;
timestamp = ts;
product_id = pid;
position = pos;
}
ClickEvent& operator=(ClickEvent& other) {
if (this != &other) {
query = other.query;
timestamp = other.timestamp;
product_id = other.product_id;
position = other.position;
return *this;
}
}
void to_json(nlohmann::json& obj) const {
obj["query"] = query;
obj["timestamp"] = timestamp;
obj["product_id"] = product_id;
obj["position"] = position;
}
bool operator < (const ClickEvent& rhs) const {
return this->timestamp < rhs.timestamp;
}
};
class AnalyticsManager {
private:
mutable std::mutex mutex;
@ -41,6 +80,9 @@ private:
// suggestion collection => popular queries
std::unordered_map<std::string, PopularQueries*> popular_queries;
//query collection => click events
std::unordered_map<std::string, std::set<ClickEvent>> query_collection_click_events;
Store* store = nullptr;
AnalyticsManager() {}
@ -57,6 +99,7 @@ public:
static constexpr const char* ANALYTICS_RULE_PREFIX = "$AR";
static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries";
static constexpr const char* CLICK_EVENT = "$CE";
static AnalyticsManager& get_instance() {
static AnalyticsManager instance;
@ -88,4 +131,11 @@ public:
void persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s);
std::unordered_map<std::string, PopularQueries*> get_popular_queries();
void add_click_event(const std::string& query_collection, const std::string& query,
uint64_t product_id, uint64_t position);
void persist_click_event(ReplicationState *raft_server, uint64_t prev_persistence_s);
std::set<ClickEvent> get_click_events(const std::string& name);
};

View File

@ -169,6 +169,8 @@ bool put_upsert_analytics_rules(const std::shared_ptr<http_req>& req, const std:
bool del_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool get_click_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Misc helpers
void get_collections_for_auth(std::map<std::string, std::string>& req_params, const std::string& body,

View File

@ -218,6 +218,28 @@ void AnalyticsManager::add_suggestion(const std::string &query_collection, const
}
}
void AnalyticsManager::add_click_event(const std::string &query_collection, const std::string &query,
uint64_t product_id, uint64_t position) {
std::unique_lock lock(mutex);
auto &click_events_set = query_collection_click_events[query_collection];
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
ClickEvent click_event(query, now_ts_seconds, product_id, position);
click_events_set.insert(click_event);
if(store) {
const std::string key = std::string(CLICK_EVENT) + "_" + query_collection + "_" + std::to_string(now_ts_seconds);
nlohmann::json click_event_json;
click_event.to_json(click_event_json);
bool inserted = store->insert(key, click_event_json.dump());
if (!inserted) {
LOG(ERROR) << "Unable to insert clickevent into store.";
}
}
}
void AnalyticsManager::run(ReplicationState* raft_server) {
uint64_t prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -243,6 +265,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
}
persist_suggestions(raft_server, prev_persistence_s);
persist_click_event(raft_server, prev_persistence_s);
prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -317,6 +340,38 @@ void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64
}
}
void AnalyticsManager::persist_click_event(ReplicationState *raft_server, uint64_t prev_persistence_s) {
// lock is held by caller
for (const auto &click_events_collection_it: query_collection_click_events) {
for (const auto &click_event: click_events_collection_it.second) {
// send http request
nlohmann::json click_event_json;
click_event.to_json(click_event_json);
const std::string import_payload = click_event_json.dump();
if(import_payload.empty()) {
continue;
}
std::string leader_url = raft_server->get_leader_url();
if (!leader_url.empty()) {
const std::string &base_url = leader_url + "collections/" + click_events_collection_it.first;
std::string res;
const std::string &update_url = base_url + "/clickevents";
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(update_url, import_payload,
res, res_headers, {}, 10 * 1000, true);
if (status_code != 200) {
LOG(ERROR) << "Error while sending query suggestions events to leader. "
<< "Status code: " << status_code << ", response: " << res;
}
}
}
}
}
void AnalyticsManager::stop() {
quit = true;
cv.notify_all();
@ -340,3 +395,12 @@ std::unordered_map<std::string, PopularQueries*> AnalyticsManager::get_popular_q
std::unique_lock lk(mutex);
return popular_queries;
}
std::set<ClickEvent> AnalyticsManager::get_click_events(const std::string& coll) {
std::unique_lock lk(mutex);
const auto it = query_collection_click_events.find(coll);
if(it == query_collection_click_events.end()) {
return {};
}
return it->second;
}

View File

@ -2379,3 +2379,23 @@ bool post_proxy(const std::shared_ptr<http_req>& req, const std::shared_ptr<http
res->set_200(response.body);
return true;
}
bool get_click_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto click_events = AnalyticsManager::get_instance().get_click_events(req->params["collection"]);
if(click_events.empty()) {
return false;
}
nlohmann::json res_json, doc;
res_json["name"] = req->params["collection"];
res_json["click_events"] = nlohmann::json::array();
for(const auto& click_event : click_events) {
click_event.to_json(doc);
res_json["click_events"].push_back(doc);
}
res->set_200(res_json.dump());
return true;
}

View File

@ -47,6 +47,33 @@ bool EventManager::add_event(const nlohmann::json& event) {
std::string query = event_data_query_it.get<std::string>();
AnalyticsManager::get_instance().add_suggestion(coll.get<std::string>(), query, false, "");
}
} else if(event_type == "query_click") {
if (!event.contains("data")) {
return false;
}
const auto &event_data_val = event[EVENT_DATA];
if (!event_data_val.is_object()) {
return false;
}
if (!event_data_val.contains("q") || !event_data_val.contains("product_id")
|| !event_data_val.contains("position") || !event_data_val.contains("collection")) {
return false;
}
if (!event_data_val["q"].is_string() || !event_data_val["product_id"].is_number_unsigned()
|| !event_data_val["position"].is_number_unsigned() || !event_data_val["collection"].is_string()) {
return false;
}
const std::string query = event_data_val["q"].get<std::string>();
uint64_t product_id = event_data_val["product_id"].get<uint64_t>();
uint64_t position = event_data_val["position"].get<uint64_t>();
const std::string& collection = event_data_val["collection"].get<std::string>();
AnalyticsManager::get_instance().add_click_event(collection, query, product_id, position);
}
}

View File

@ -81,6 +81,10 @@ void master_server_routes() {
server->del("/analytics/rules/:name", del_analytics_rules);
server->post("/analytics/events", post_create_event);
//collection based query click events
server->get("/collections/:collection/click_events", get_click_events);
server->post("/collections/:collection/click_events", post_create_event);
// meta
server->get("/metrics.json", get_metrics_json);
server->get("/stats.json", get_stats_json);