store purchase, click events in tsv log file

This commit is contained in:
krunal 2024-01-08 17:14:57 +05:30
parent ee414dfb1d
commit 29943c5089
6 changed files with 178 additions and 504 deletions

View File

@ -164,13 +164,14 @@ private:
Store* store = nullptr;
Store* analytics_store = nullptr;
std::ofstream analytics_logs;
bool isRateLimitEnabled = false;
AnalyticsManager() {}
~AnalyticsManager();
Option<bool> remove_queries_index(const std::string& name);
Option<bool> remove_index(const std::string& name);
Option<bool> create_index(nlohmann::json &payload,
bool upsert,
@ -225,9 +226,9 @@ public:
void persist_events(ReplicationState *raft_server, uint64_t prev_persistence_s);
void persist_popular_clicks(ReplicationState *raft_server, uint64_t prev_persistence_s);
void persist_popular_events(ReplicationState *raft_server, uint64_t prev_persistence_s);
nlohmann::json get_events(const std::string& event_type);
nlohmann::json get_events(const std::string& coll, const std::string& event_type);
std::unordered_map<std::string, counter_event_t> get_popular_clicks();

View File

@ -169,8 +169,6 @@ bool put_upsert_analytics_rules(const std::shared_ptr<http_req>& req, const std:
bool del_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool get_analytics_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_replicate_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool get_query_hits_counts(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);

View File

@ -6,6 +6,7 @@
#include "collection_manager.h"
#include "lru/lru.hpp"
#include "string_utils.h"
#include "tsconfig.h"
LRU::Cache<std::string, event_cache_t> events_cache;
#define EVENTS_RATE_LIMIT_SEC 60
@ -152,7 +153,7 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json &payload, bool upsert
if(already_exists) {
// remove the previous configuration with same name (upsert)
Option<bool> remove_op = remove_queries_index(suggestion_config_name);
Option<bool> remove_op = remove_index(suggestion_config_name);
if(!remove_op.ok()) {
return Option<bool>(500, "Error erasing the existing configuration.");;
}
@ -235,13 +236,13 @@ Option<bool> AnalyticsManager::remove_rule(const std::string &name) {
auto suggestion_configs_it = suggestion_configs.find(name);
if(suggestion_configs_it != suggestion_configs.end()) {
return remove_queries_index(name);
return remove_index(name);
}
return Option<bool>(404, "Rule not found.");
}
Option<bool> AnalyticsManager::remove_queries_index(const std::string &name) {
Option<bool> AnalyticsManager::remove_index(const std::string &name) {
// lock is held by caller
auto suggestion_configs_it = suggestion_configs.find(name);
@ -265,6 +266,10 @@ Option<bool> AnalyticsManager::remove_queries_index(const std::string &name) {
nohits_queries.erase(suggestion_collection);
}
if(counter_events.count(suggestion_collection) != 0) {
counter_events.erase(suggestion_collection);
}
suggestion_configs.erase(name);
auto suggestion_key = std::string(ANALYTICS_RULE_PREFIX) + "_" + name;
@ -411,7 +416,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
checkEventsExpiry();
persist_query_events(raft_server, prev_persistence_s);
persist_events(raft_server, prev_persistence_s);
persist_popular_clicks(raft_server, prev_persistence_s);
persist_popular_events(raft_server, prev_persistence_s);
prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
@ -538,23 +543,18 @@ void AnalyticsManager::persist_events(ReplicationState *raft_server, uint64_t pr
};
for (const auto &events_collection_it: query_collection_events) {
auto collection_id = CollectionManager::get_instance().get_collection(
events_collection_it.first)->get_collection_id();
for (const auto &event: events_collection_it.second) {
// send http request
nlohmann::json event_json;
event.to_json(event_json);
event_json["collection_id"] = std::to_string(collection_id);
payload_json.push_back(event_json);
if(analytics_logs.is_open()) {
//store events to log file
char event_type_short = event.event_type == "query_click" ? 'C' : 'P';
analytics_logs << event.timestamp << "\t" << event.user_id << "\t"
<< event_type_short << "\t" << event.query << "\t" << event.doc_id << "\n";
}
}
}
if(send_http_response()) {
query_collection_events.clear();
}
payload_json.clear();
query_collection_events.clear();
for (const auto &query_collection_hits_count_it: query_collection_hits_count) {
auto collection_id = CollectionManager::get_instance().get_collection(
@ -575,7 +575,7 @@ void AnalyticsManager::persist_events(ReplicationState *raft_server, uint64_t pr
payload_json.clear();
}
void AnalyticsManager::persist_popular_clicks(ReplicationState *raft_server, uint64_t prev_persistence_s) {
void AnalyticsManager::persist_popular_events(ReplicationState *raft_server, uint64_t prev_persistence_s) {
auto send_http_response = [&](const std::string& import_payload, const std::string& collection) {
std::string leader_url = raft_server->get_leader_url();
if (!leader_url.empty()) {
@ -630,6 +630,13 @@ void AnalyticsManager::dispose() {
void AnalyticsManager::init(Store* store, Store* analytics_store) {
this->store = store;
this->analytics_store = analytics_store;
if(analytics_store) {
const auto analytics_dir = Config::get_instance().get_analytics_dir();
const auto analytics_log_path = analytics_dir + "/analytics_events.tsv";
analytics_logs.open(analytics_log_path, std::ofstream::out | std::ofstream::app);
}
}
Store* AnalyticsManager::get_analytics_store() {
@ -651,20 +658,19 @@ std::unordered_map<std::string, counter_event_t> AnalyticsManager::get_popular_c
return counter_events;
}
nlohmann::json AnalyticsManager::get_events(const std::string& event_type) {
nlohmann::json AnalyticsManager::get_events(const std::string& coll, const std::string& event_type) {
std::unique_lock lk(mutex);
std::vector<std::string> event_jsons;
nlohmann::json event_json;
nlohmann::json result_json = nlohmann::json::array();
if (analytics_store) {
auto event_prefix = event_type.find("click_events") != std::string::npos ? std::string(CLICK_EVENT)
: std::string(PURCHASE_EVENT);
analytics_store->scan_fill(event_prefix + "_", event_prefix + "`",
event_jsons);
for (const auto &event_json: event_jsons) {
nlohmann::json event = nlohmann::json::parse(event_json);
result_json.push_back(event);
auto query_collection_events_it = query_collection_events.find(coll);
if (query_collection_events_it != query_collection_events.end()) {
auto events = query_collection_events_it->second;
for (const auto &event: events) {
if(event.event_type == event_type) {
event.to_json(event_json);
result_json.push_back(event_json);
}
}
}
@ -695,14 +701,7 @@ Option<bool> AnalyticsManager::write_events_to_store(nlohmann::json &event_jsons
auto collection_id = event_json["collection_id"].get<std::string>();
auto timestamp = event_json["timestamp"].get<uint64_t>();
std::string key = "_" + StringUtils::serialize_uint64_t(timestamp) + "_" + collection_id;
if(event_json["event_type"] == "query_click") {
key = std::string(CLICK_EVENT) + key;
} else if(event_json["event_type"] == "query_hits_counts") {
key = std::string(QUERY_HITS_COUNT) + key;
} else if(event_json["event_type"] == "query_purchase") {
key = std::string(PURCHASE_EVENT) + key;
}
std::string key = std::string(QUERY_HITS_COUNT) + "_" + StringUtils::serialize_uint64_t(timestamp) + "_" + collection_id;
if(analytics_store) {
bool inserted = analytics_store->insert(key, event_json.dump());
@ -723,18 +722,11 @@ void AnalyticsManager::resetToggleRateLimit(bool toggle) {
}
void AnalyticsManager::resetAnalyticsStore() {
const std::string click_events_prefix = std::string(CLICK_EVENT) + "_";
const std::string query_hits_prefix = std::string(QUERY_HITS_COUNT) + "_";
//delete click events
auto delete_prefix_begin = click_events_prefix;
auto delete_prefix_end = click_events_prefix + "`";
analytics_store->delete_range(delete_prefix_begin, delete_prefix_end);
//delete query hits counts
delete_prefix_begin = query_hits_prefix;
delete_prefix_end = query_hits_prefix + "`";
auto delete_prefix_begin = query_hits_prefix;
auto delete_prefix_end = query_hits_prefix + "`";
analytics_store->delete_range(delete_prefix_begin, delete_prefix_end);
}
@ -761,24 +753,13 @@ void AnalyticsManager::checkEventsExpiry() {
//we check for 30days events validity, events older than 30 days will be removed from db
auto ts_ttl_useconds = get_current_time_us() - EVENTS_TTL_INTERVAL_US;
const std::string click_events_prefix = std::string(CLICK_EVENT) + "_";
const std::string query_hits_prefix = std::string(QUERY_HITS_COUNT) + "_";
//first remove click events
auto delete_prefix_begin = click_events_prefix;
auto delete_prefix_end = delete_prefix_begin + StringUtils::serialize_uint64_t(ts_ttl_useconds);
auto iter = analytics_store->get_iterator();
iter->Seek(delete_prefix_end);
if (!iter->Valid()) { //exact key or key greater than not found
delete_prefix_end = std::string(CLICK_EVENT) + "`";
}
analytics_store->delete_range(delete_prefix_begin, delete_prefix_end);
//now remove query hits counts
delete_prefix_begin = query_hits_prefix;
delete_prefix_end = std::string(QUERY_HITS_COUNT) + "_" +
auto delete_prefix_begin = query_hits_prefix;
auto delete_prefix_end = std::string(QUERY_HITS_COUNT) + "_" +
StringUtils::serialize_uint64_t(ts_ttl_useconds);
iter->SeekToFirst();

View File

@ -2792,83 +2792,6 @@ bool put_conversation_model(const std::shared_ptr<http_req>& req, const std::sha
return true;
}
bool get_analytics_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto analytics_store = AnalyticsManager::get_instance().get_analytics_store();
if (!analytics_store) {
LOG(ERROR) << "Analytics store not initialized.";
return true;
}
auto event_type = req->params["name"];
if((event_type.find("click_events") == std::string::npos)
&& (event_type.find("purchase_events") == std::string::npos)) {
LOG(ERROR) << "Unknown event : " << event_type;
LOG(ERROR) << "get_analytics_events supports only click_events and purchase_events.";
return true;
}
export_state_t *export_state = nullptr;
auto event_prefix = event_type.find("click_event") != std::string::npos ? std::string(AnalyticsManager::CLICK_EVENT)
: std::string(AnalyticsManager::PURCHASE_EVENT);
if (req->data == nullptr) {
export_state = new export_state_t();
req->data = export_state;
export_state->iter_upper_bound_key = event_prefix + "`";
export_state->iter_upper_bound = new rocksdb::Slice(export_state->iter_upper_bound_key);
export_state->it = analytics_store->scan(event_prefix + "_", export_state->iter_upper_bound);
} else {
export_state = dynamic_cast<export_state_t *>(req->data);
}
if (export_state->it != nullptr) {
rocksdb::Iterator *it = export_state->it;
size_t batch_counter = 0;
std::string().swap(res->body);
if (!it->Valid()) {
LOG(ERROR) << "No events found in db.";
req->last_chunk_aggregate = true;
res->final = true;
res->set_404();
stream_response(req, res);
return false;
}
while (it->Valid() && it->key().ToString().compare(0, event_prefix.size(), event_prefix) == 0) {
res->body += it->value().ToString();
it->Next();
// append a new line character if there is going to be one more record to send
if (it->Valid() &&
it->key().ToString().compare(0, event_prefix.size(), event_prefix) == 0) {
res->body += "\n";
req->last_chunk_aggregate = false;
res->final = false;
} else {
req->last_chunk_aggregate = true;
res->final = true;
}
batch_counter++;
if (batch_counter == export_state->export_batch_size) {
break;
}
}
} else {
req->last_chunk_aggregate = true;
res->final = true;
}
res->content_type_header = "text/plain; charset=utf-8";
res->status_code = 200;
stream_response(req, res);
return true;
}
bool post_replicate_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
nlohmann::json req_json;

View File

@ -258,7 +258,115 @@ TEST_F(AnalyticsManagerTest, GetAndDeleteSuggestions) {
ASSERT_FALSE(missing_rule_op.ok());
}
TEST_F(AnalyticsManagerTest, ClickEventsValidation) {
TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) {
//stores events in memory and reads as events are persisted in file on disk
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection* titles_coll = collectionManager.create_collection(titles_schema).get();
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
nlohmann::json event1 = R"({
"type": "query_click",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 2,
"user_id": "13"
}
})"_json;
req->body = event1.dump();
ASSERT_TRUE(post_create_event(req, res));
nlohmann::json event2 = R"({
"type": "query_click",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 4,
"user_id": "11"
}
})"_json;
req->body = event2.dump();
ASSERT_TRUE(post_create_event(req, res));
auto result = analyticsManager.get_events("titles", "query_click");
ASSERT_EQ("13", result[0]["user_id"]);
ASSERT_EQ("21", result[0]["doc_id"]);
ASSERT_EQ(2, result[0]["position"]);
ASSERT_EQ("technology", result[0]["query"]);
ASSERT_EQ("11", result[1]["user_id"]);
ASSERT_EQ("21", result[1]["doc_id"]);
ASSERT_EQ(4, result[1]["position"]);
ASSERT_EQ("technology", result[1]["query"]);
}
TEST_F(AnalyticsManagerTest, PurchaseEventsStoreRetrieval) {
//stores events in memory and reads as events are persisted in file on disk
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection *titles_coll = collectionManager.create_collection(titles_schema).get();
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
nlohmann::json event1 = R"({
"type": "query_purchase",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 2,
"user_id": "13"
}
})"_json;
req->body = event1.dump();
ASSERT_TRUE(post_create_event(req, res));
nlohmann::json event2 = R"({
"type": "query_purchase",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 4,
"user_id": "11"
}
})"_json;
req->body = event2.dump();
ASSERT_TRUE(post_create_event(req, res));
auto result = analyticsManager.get_events("titles", "query_purchase");
ASSERT_EQ("13", result[0]["user_id"]);
ASSERT_EQ("21", result[0]["doc_id"]);
ASSERT_EQ(2, result[0]["position"]);
ASSERT_EQ("technology", result[0]["query"]);
ASSERT_EQ("11", result[1]["user_id"]);
ASSERT_EQ("21", result[1]["doc_id"]);
ASSERT_EQ(4, result[1]["position"]);
ASSERT_EQ("technology", result[1]["query"]);
}
TEST_F(AnalyticsManagerTest, EventsValidation) {
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
@ -304,7 +412,7 @@ TEST_F(AnalyticsManagerTest, ClickEventsValidation) {
//should be string type
nlohmann::json event3 = R"({
"type": "query_click",
"type": "query_purchase",
"data": {
"q": "technology",
"collection": "titles",
@ -332,9 +440,23 @@ TEST_F(AnalyticsManagerTest, ClickEventsValidation) {
req->body = event4.dump();
ASSERT_TRUE(post_create_event(req, res));
nlohmann::json event5 = R"({
"type": "query_purchase",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 4,
"user_id": "11"
}
})"_json;
req->body = event5.dump();
ASSERT_TRUE(post_create_event(req, res));
}
TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) {
TEST_F(AnalyticsManagerTest, EventsRateLimitTest) {
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
@ -347,6 +469,8 @@ TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) {
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
std::vector<nlohmann::json> events;
nlohmann::json event1 = R"({
"type": "query_click",
"data": {
@ -358,11 +482,8 @@ TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) {
}
})"_json;
req->body = event1.dump();
ASSERT_TRUE(post_create_event(req, res));
nlohmann::json event2 = R"({
"type": "query_click",
"type": "query_purchase",
"data": {
"q": "technology",
"collection": "titles",
@ -372,67 +493,14 @@ TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) {
}
})"_json;
req->body = event2.dump();
ASSERT_TRUE(post_create_event(req, res));
event1["collection_id"] = "0";
event1["timestamp"] = 1521512521;
event1["event_type"] = "query_click";
event2["collection_id"] = "0";
event2["timestamp"] = 1521514354;
event2["event_type"] = "query_click";
nlohmann::json click_events = nlohmann::json::array();
click_events.push_back(event1);
click_events.push_back(event2);
req->body = click_events.dump();
ASSERT_TRUE(post_replicate_events(req, res));
auto result = analyticsManager.get_events("click_events");
ASSERT_EQ("0", result[0]["collection_id"]);
ASSERT_EQ("13", result[0]["data"]["user_id"]);
ASSERT_EQ("21", result[0]["data"]["doc_id"]);
ASSERT_EQ(2, result[0]["data"]["position"]);
ASSERT_EQ("technology", result[0]["data"]["q"]);
ASSERT_EQ("0", result[1]["collection_id"]);
ASSERT_EQ("11", result[1]["data"]["user_id"]);
ASSERT_EQ("21", result[1]["data"]["doc_id"]);
ASSERT_EQ(4, result[1]["data"]["position"]);
ASSERT_EQ("technology", result[1]["data"]["q"]);
}
TEST_F(AnalyticsManagerTest, ClickEventsRateLimitTest) {
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection* titles_coll = collectionManager.create_collection(titles_schema).get();
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
nlohmann::json event = R"({
"type": "query_click",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 2,
"user_id": "13"
}
})"_json;
events.push_back(event1);
events.push_back(event2);
//reset the LRU cache to test the rate limit
analyticsManager.resetToggleRateLimit(true);
req->body = event.dump();
for(auto i = 0; i < 5; ++i) {
req->body = events[i%2].dump();
ASSERT_TRUE(post_create_event(req, res));
}
@ -648,39 +716,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryBasic) {
nlohmann::json events = nlohmann::json::array();
auto ts = 1701851341000000;
nlohmann::json event;
event["event_type"] = "query_click";
event["q"] = "technology";
event["collection_id"] = "0";
event["doc_id"] = "21";
event["position"] = 2;
event["user_id"] = 13;
event["timestamp"] = ts;
events.push_back(event);
event["doc_id"] = "12";
event["position"] = 3;
event["timestamp"] = ts+1000000; //1 sec later
events.push_back(event);
ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok());
nlohmann::json resp = analyticsManager.get_events("click_events");
ASSERT_EQ(2, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ("21", resp[0]["doc_id"]);
ASSERT_EQ(2, resp[0]["position"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ("technology", resp[1]["q"]);
ASSERT_EQ("12", resp[1]["doc_id"]);
ASSERT_EQ(3, resp[1]["position"]);
ASSERT_EQ(13, resp[1]["user_id"]);
analyticsManager.checkEventsExpiry();
resp = analyticsManager.get_events("click_events");
ASSERT_EQ(0, resp.size());
//add query hits events with click events
nlohmann::json event2;
@ -695,19 +730,15 @@ TEST_F(AnalyticsManagerTest, EventsExpiryBasic) {
ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok());
resp = analyticsManager.get_query_hits_counts();
auto resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ(124, resp[0]["hits_count"]);
//now old click events will be deleted on checking expiry but query hits events will be remaining
// assumming ttl is ts + 5 sec
analyticsManager.checkEventsExpiry();
resp = analyticsManager.get_events("click_events");
ASSERT_EQ(0, resp.size());
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
@ -731,21 +762,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryAll) {
nlohmann::json events = nlohmann::json::array();
auto ts = 1701851341000000;
nlohmann::json event;
event["event_type"] = "query_click";
event["q"] = "technology";
event["collection_id"] = "0";
event["doc_id"] = "21";
event["position"] = 2;
event["user_id"] = 13;
event["timestamp"] = ts;
events.push_back(event);
event["doc_id"] = "12";
event["position"] = 3;
event["timestamp"] = ts + 1000000; //after 1s
events.push_back(event);
nlohmann::json event2;
event2["event_type"] = "query_hits_counts";
event2["q"] = "technology";
@ -758,19 +774,7 @@ TEST_F(AnalyticsManagerTest, EventsExpiryAll) {
ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok());
nlohmann::json resp = analyticsManager.get_events("click_events");
ASSERT_EQ(2, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ("21", resp[0]["doc_id"]);
ASSERT_EQ(2, resp[0]["position"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ("technology", resp[1]["q"]);
ASSERT_EQ("12", resp[1]["doc_id"]);
ASSERT_EQ(3, resp[1]["position"]);
ASSERT_EQ(13, resp[1]["user_id"]);
resp = analyticsManager.get_query_hits_counts();
auto resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
@ -780,9 +784,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryAll) {
// assuming ttl is ts + 5 sec
analyticsManager.checkEventsExpiry();
resp = analyticsManager.get_events("click_events");
ASSERT_EQ(0, resp.size());
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(0, resp.size());
}
@ -803,25 +804,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryPartial) {
nlohmann::json events = nlohmann::json::array();
auto ts = 1701851341000000;
nlohmann::json event;
event["event_type"] = "query_click";
event["q"] = "technology";
event["collection_id"] = "0";
event["doc_id"] = "21";
event["position"] = 2;
event["user_id"] = 13;
event["timestamp"] = ts;
events.push_back(event);
event["doc_id"] = "12";
event["position"] = 3;
event["timestamp"] = ts + 1000000; //after 1s
events.push_back(event);
event["doc_id"] = "19";
event["position"] = 1;
event["timestamp"] = ts + 6000000; //after 6s
events.push_back(event);
nlohmann::json event2;
event2["event_type"] = "query_hits_counts";
@ -846,24 +828,7 @@ TEST_F(AnalyticsManagerTest, EventsExpiryPartial) {
ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok());
nlohmann::json resp = analyticsManager.get_events("click_events");
ASSERT_EQ(3, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ("21", resp[0]["doc_id"]);
ASSERT_EQ(2, resp[0]["position"]);
ASSERT_EQ(13, resp[0]["user_id"]);
ASSERT_EQ("technology", resp[1]["q"]);
ASSERT_EQ("12", resp[1]["doc_id"]);
ASSERT_EQ(3, resp[1]["position"]);
ASSERT_EQ(13, resp[1]["user_id"]);
ASSERT_EQ("technology", resp[2]["q"]);
ASSERT_EQ("19", resp[2]["doc_id"]);
ASSERT_EQ(1, resp[2]["position"]);
ASSERT_EQ(13, resp[2]["user_id"]);
resp = analyticsManager.get_query_hits_counts();
auto resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(3, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ(13, resp[0]["user_id"]);
@ -882,13 +847,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryPartial) {
//only events in ttl interval will be removed
analyticsManager.checkEventsExpiry();
resp = analyticsManager.get_events("click_events");
ASSERT_EQ(1, resp.size());
ASSERT_EQ("technology", resp[0]["q"]);
ASSERT_EQ("19", resp[0]["doc_id"]);
ASSERT_EQ(1, resp[0]["position"]);
ASSERT_EQ(13, resp[0]["user_id"]);
resp = analyticsManager.get_query_hits_counts();
ASSERT_EQ(1, resp.size());
ASSERT_EQ("management", resp[0]["q"]);
@ -1179,75 +1137,4 @@ TEST_F(AnalyticsManagerTest, PopularityScoreValidation) {
create_op = analyticsManager.create_rule(analytics_rule, false, true);
ASSERT_FALSE(create_op.ok());
ASSERT_EQ("Bad or missing events.", create_op.error());
}
TEST_F(AnalyticsManagerTest, PurchaseEventsStoreRetrieval) {
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection *titles_coll = collectionManager.create_collection(titles_schema).get();
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
nlohmann::json event1 = R"({
"type": "query_purchase",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 2,
"user_id": "13"
}
})"_json;
req->body = event1.dump();
ASSERT_TRUE(post_create_event(req, res));
nlohmann::json event2 = R"({
"type": "query_purchase",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 4,
"user_id": "11"
}
})"_json;
req->body = event2.dump();
ASSERT_TRUE(post_create_event(req, res));
event1["collection_id"] = "0";
event1["timestamp"] = 1521512521;
event1["event_type"] = "query_purchase";
event2["collection_id"] = "0";
event2["timestamp"] = 1521514354;
event2["event_type"] = "query_purchase";
nlohmann::json events = nlohmann::json::array();
events.push_back(event1);
events.push_back(event2);
req->body = events.dump();
ASSERT_TRUE(post_replicate_events(req, res));
auto result = analyticsManager.get_events("purchase_events");
ASSERT_EQ("0", result[0]["collection_id"]);
ASSERT_EQ("13", result[0]["data"]["user_id"]);
ASSERT_EQ("21", result[0]["data"]["doc_id"]);
ASSERT_EQ(2, result[0]["data"]["position"]);
ASSERT_EQ("technology", result[0]["data"]["q"]);
ASSERT_EQ("0", result[1]["collection_id"]);
ASSERT_EQ("11", result[1]["data"]["user_id"]);
ASSERT_EQ("21", result[1]["data"]["doc_id"]);
ASSERT_EQ(4, result[1]["data"]["position"]);
ASSERT_EQ("technology", result[1]["data"]["q"]);
}

View File

@ -1497,120 +1497,4 @@ TEST_F(CoreAPIUtilsTest, TestInvalidConversationModels) {
ASSERT_EQ(400, resp->status_code);
ASSERT_EQ("Property `model_name` is not provided or not a string.", nlohmann::json::parse(resp->body)["message"]);
}
TEST_F(CoreAPIUtilsTest, GetClickEvents) {
//reset analytics store
analyticsManager.resetAnalyticsStore();
nlohmann::json schema = R"({
"name": "titles",
"fields": [
{"name": "name", "type": "string" },
{"name": "points", "type": "int32" }
]
})"_json;
auto op = collectionManager.create_collection(schema);
ASSERT_TRUE(op.ok());
Collection* titles = op.get();
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
// no events in db
req->params["name"] = "click_events";
get_analytics_events(req, res);
ASSERT_EQ("{\"message\": \"Not Found\"}", res->body);
//add some events
nlohmann::json event1 = R"({
"type": "query_click",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "21",
"position": 2,
"user_id": "13"
}
})"_json;
req->body = event1.dump();
ASSERT_TRUE(post_create_event(req, res));
nlohmann::json event2 = R"({
"type": "query_click",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "12",
"position": 1,
"user_id": "13"
}
})"_json;
req->body = event2.dump();
ASSERT_TRUE(post_create_event(req, res));
nlohmann::json event3 = R"({
"type": "query_click",
"data": {
"q": "technology",
"collection": "titles",
"doc_id": "52",
"position": 5,
"user_id": "13"
}
})"_json;
req->body = event3.dump();
ASSERT_TRUE(post_create_event(req, res));
event1["collection_id"] = "0";
event1["timestamp"] = 1521512521;
event1["event_type"] = "query_click";
event2["collection_id"] = "0";
event2["timestamp"] = 1521514354;
event2["event_type"] = "query_click";
event3["collection_id"] = "0";
event3["timestamp"] = 1521515382;
event3["event_type"] = "query_click";
nlohmann::json click_events = nlohmann::json::array();
click_events.push_back(event1);
click_events.push_back(event2);
click_events.push_back(event3);
req->body = click_events.dump();
ASSERT_TRUE(post_replicate_events(req, res));
//get click events
req->data = nullptr;
req->params["name"] = "click_events";
get_analytics_events(req, res);
std::vector<std::string> res_strs;
StringUtils::split(res->body, res_strs, "\n");
auto result = nlohmann::json::array();
result.push_back(nlohmann::json::parse(res_strs[0]));
result.push_back(nlohmann::json::parse(res_strs[1]));
result.push_back(nlohmann::json::parse(res_strs[2]));
ASSERT_EQ("0", result[0]["collection_id"]);
ASSERT_EQ("13", result[0]["data"]["user_id"]);
ASSERT_EQ("21", result[0]["data"]["doc_id"]);
ASSERT_EQ(2, result[0]["data"]["position"]);
ASSERT_EQ("technology", result[0]["data"]["q"]);
ASSERT_EQ("0", result[1]["collection_id"]);
ASSERT_EQ("13", result[1]["data"]["user_id"]);
ASSERT_EQ("12", result[1]["data"]["doc_id"]);
ASSERT_EQ(1, result[1]["data"]["position"]);
ASSERT_EQ("technology", result[1]["data"]["q"]);
ASSERT_EQ("0", result[2]["collection_id"]);
ASSERT_EQ("13", result[2]["data"]["user_id"]);
ASSERT_EQ("52", result[2]["data"]["doc_id"]);
ASSERT_EQ(5, result[2]["data"]["position"]);
ASSERT_EQ("technology", result[2]["data"]["q"]);
}