Analytics manager fixes (#1553)

* add tests for persisting events

* fix test by adding unique event name

* fix persistance with analytics events

* early return raft_server check

* increment populairty count instead of overwrite

* extract method serialize_as_docs

* move func definition from header to source
This commit is contained in:
Krunal Gandhi 2024-02-15 11:21:08 +00:00 committed by GitHub
parent bf8a2fc6e5
commit c7c24e6ab9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 150 additions and 29 deletions

View File

@ -70,6 +70,8 @@ struct counter_event_t {
std::string counter_field;
std::map<std::string, uint64_t> docid_counts;
std::map<std::string, uint16_t> event_weight_map;
void serialize_as_docs(std::string& docs);
};
struct event_cache_t {
@ -209,7 +211,7 @@ public:
Option<bool> add_event(const std::string& client_ip, const std::string& event_type,
const std::string& event_name, const nlohmann::json& event_data);
void persist_events(ReplicationState *raft_server, uint64_t prev_persistence_s);
void persist_events();
void persist_popular_events(ReplicationState *raft_server, uint64_t prev_persistence_s);

View File

@ -483,7 +483,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
}
persist_query_events(raft_server, prev_persistence_s);
persist_events(raft_server, prev_persistence_s);
persist_events();
persist_popular_events(raft_server, prev_persistence_s);
prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
@ -579,9 +579,9 @@ void AnalyticsManager::persist_query_events(ReplicationState *raft_server, uint6
}
}
void AnalyticsManager::persist_events(ReplicationState *raft_server, uint64_t prev_persistence_s) {
void AnalyticsManager::persist_events() {
// lock is held by caller
for (const auto &events_collection_it: query_collection_events) {
for (auto &events_collection_it: query_collection_events) {
const auto& collection = events_collection_it.first;
for (const auto &event: events_collection_it.second) {
if (analytics_logs.is_open()) {
@ -598,12 +598,16 @@ void AnalyticsManager::persist_events(ReplicationState *raft_server, uint64_t pr
analytics_logs << std::flush;
}
}
events_collection_it.second.clear();
}
query_collection_events.clear();
}
void AnalyticsManager::persist_popular_events(ReplicationState *raft_server, uint64_t prev_persistence_s) {
auto send_http_response = [&](const std::string& import_payload, const std::string& collection) {
if (raft_server == nullptr) {
return;
}
std::string leader_url = raft_server->get_leader_url();
if (!leader_url.empty()) {
const std::string &base_url = leader_url + "collections/" + collection;
@ -621,17 +625,13 @@ void AnalyticsManager::persist_popular_events(ReplicationState *raft_server, uin
}
};
for(const auto& counter_event_it : counter_events) {
for(auto& counter_event_it : counter_events) {
auto coll = counter_event_it.first;
nlohmann::json doc;
auto counter_field = counter_event_it.second.counter_field;
for(const auto& counter_event : counter_event_it.second.docid_counts) {
doc["id"] = counter_event.first;
doc[counter_field] = counter_event.second;
send_http_response(doc.dump(), coll);
}
std::string docs;
counter_event_it.second.serialize_as_docs(docs);
send_http_response(docs, coll);
counter_event_it.second.docid_counts.clear();
}
counter_events.clear();
}
void AnalyticsManager::stop() {
@ -698,4 +698,17 @@ std::string AnalyticsManager::get_sub_event_type(const std::string &event_type)
return AnalyticsManager::CUSTOM_EVENT;
}
return "";
}
void counter_event_t::serialize_as_docs(std::string &docs) {
for (auto kv: docid_counts) {
nlohmann::json doc;
doc["id"] = kv.first;
doc["$operations"]["increment"][counter_field] = kv.second;
docs += doc.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore) + "\n";
}
if (!docs.empty()) {
docs.pop_back();
}
}

View File

@ -477,6 +477,97 @@ TEST_F(AnalyticsManagerTest, EventsValidation) {
ASSERT_TRUE(post_create_event(req, res));
}
TEST_F(AnalyticsManagerTest, EventsPersist) {
//remove all rules first
analyticsManager.remove_all_rules();
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection *titles_coll = collectionManager.create_collection(titles_schema).get();
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
auto analytics_rule = R"({
"name": "product_click_events",
"type": "clicks",
"params": {
"name": "APC",
"source": {
"collection": "titles"
}
}
})"_json;
auto create_op = analyticsManager.create_rule(analytics_rule, true, true);
ASSERT_TRUE(create_op.ok());
nlohmann::json event = R"({
"type": "click",
"name": "APC",
"data": {
"q": "technology",
"doc_id": "21",
"user_id": "13"
}
})"_json;
req->body = event.dump();
ASSERT_TRUE(post_create_event(req, res));
analyticsManager.persist_events();
auto fileOutput = Config::fetch_file_contents("/tmp/typesense_test/analytics_manager_test/analytics_events.tsv");
std::stringstream strbuff(fileOutput.get());
std::string docid, userid, q, collection, name, timestamp;
strbuff >> timestamp >> name >> collection >> userid >> docid >> q;
ASSERT_EQ("APC", name);
ASSERT_EQ("titles", collection);
ASSERT_EQ("13", userid);
ASSERT_EQ("21", docid);
ASSERT_EQ("technology", q);
event = R"({
"type": "click",
"name": "APC",
"data": {
"q": "technology",
"doc_id": "12",
"user_id": "13"
}
})"_json;
req->body = event.dump();
ASSERT_TRUE(post_create_event(req, res));
analyticsManager.persist_events();
fileOutput = Config::fetch_file_contents("/tmp/typesense_test/analytics_manager_test/analytics_events.tsv");
std::stringstream strbuff2(fileOutput.get());
timestamp.clear();name.clear();collection.clear();userid.clear();q.clear();
strbuff2 >> timestamp >> name >> collection >> userid >> docid >> q;
ASSERT_EQ("APC", name);
ASSERT_EQ("titles", collection);
ASSERT_EQ("13", userid);
ASSERT_EQ("21", docid);
ASSERT_EQ("technology", q);
timestamp.clear();name.clear();collection.clear();userid.clear();q.clear();
strbuff2 >> timestamp >> name >> collection >> userid >> docid >> q;
ASSERT_EQ("APC", name);
ASSERT_EQ("titles", collection);
ASSERT_EQ("13", userid);
ASSERT_EQ("12", docid);
ASSERT_EQ("technology", q);
}
TEST_F(AnalyticsManagerTest, EventsRateLimitTest) {
nlohmann::json titles_schema = R"({
"name": "titles",
@ -813,26 +904,21 @@ TEST_F(AnalyticsManagerTest, PopularityScore) {
ASSERT_EQ(7, popular_clicks["products"].docid_counts["3"]);
ASSERT_EQ(6, popular_clicks["products"].docid_counts["1"]);
//trigger persistance event
for(const auto& popular_clicks_it : popular_clicks) {
auto coll = popular_clicks_it.first;
nlohmann::json doc;
auto counter_field = popular_clicks_it.second.counter_field;
req->params["collection"] = "products";
//trigger persistance event manually
for(auto& popular_clicks_it : popular_clicks) {
std::string docs;
req->params["collection"] = popular_clicks_it.first;
req->params["action"] = "update";
for(const auto& popular_click : popular_clicks_it.second.docid_counts) {
doc["id"] = popular_click.first;
doc[counter_field] = popular_click.second;
req->body = doc.dump();
post_import_documents(req, res);
}
popular_clicks_it.second.serialize_as_docs(docs);
req->body = docs;
post_import_documents(req, res);
}
sort_fields = {sort_by("popularity", "DESC")};
auto results = products_coll->search("*", {}, "", {},
sort_fields, {0}, 10, 1, FREQUENCY,{false},
Index::DROP_TOKENS_THRESHOLD,spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>()).get();
sort_fields, {0}, 10, 1, FREQUENCY,{false},
Index::DROP_TOKENS_THRESHOLD,spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>()).get();
ASSERT_EQ(5, results["hits"].size());
@ -843,6 +929,26 @@ TEST_F(AnalyticsManagerTest, PopularityScore) {
ASSERT_EQ("1", results["hits"][1]["document"]["id"]);
ASSERT_EQ(6, results["hits"][1]["document"]["popularity"]);
ASSERT_EQ("Funky trousers", results["hits"][1]["document"]["title"]);
//after persist should able to add new events
analyticsManager.persist_popular_events(nullptr, 0);
nlohmann::json event5 = R"({
"type": "conversion",
"name": "YZ",
"data": {
"q": "shorts",
"doc_id": "3",
"user_id": "11"
}
})"_json;
req->body = event5.dump();
ASSERT_TRUE(post_create_event(req, res));
popular_clicks = analyticsManager.get_popular_clicks();
ASSERT_EQ(1, popular_clicks.size());
ASSERT_EQ("popularity", popular_clicks["products"].counter_field);
ASSERT_EQ(1, popular_clicks["products"].docid_counts.size());
}
TEST_F(AnalyticsManagerTest, PopularityScoreValidation) {