diff --git a/include/analytics_manager.h b/include/analytics_manager.h index c15f049b..b69e735a 100644 --- a/include/analytics_manager.h +++ b/include/analytics_manager.h @@ -164,13 +164,14 @@ private: Store* store = nullptr; Store* analytics_store = nullptr; + std::ofstream analytics_logs; bool isRateLimitEnabled = false; AnalyticsManager() {} ~AnalyticsManager(); - Option remove_queries_index(const std::string& name); + Option remove_index(const std::string& name); Option create_index(nlohmann::json &payload, bool upsert, @@ -225,9 +226,9 @@ public: void persist_events(ReplicationState *raft_server, uint64_t prev_persistence_s); - void persist_popular_clicks(ReplicationState *raft_server, uint64_t prev_persistence_s); + void persist_popular_events(ReplicationState *raft_server, uint64_t prev_persistence_s); - nlohmann::json get_events(const std::string& event_type); + nlohmann::json get_events(const std::string& coll, const std::string& event_type); std::unordered_map get_popular_clicks(); diff --git a/include/core_api.h b/include/core_api.h index 39508922..0fa4085c 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -169,8 +169,6 @@ bool put_upsert_analytics_rules(const std::shared_ptr& req, const std: bool del_analytics_rules(const std::shared_ptr& req, const std::shared_ptr& res); -bool get_analytics_events(const std::shared_ptr& req, const std::shared_ptr& res); - bool post_replicate_events(const std::shared_ptr& req, const std::shared_ptr& res); bool get_query_hits_counts(const std::shared_ptr& req, const std::shared_ptr& res); diff --git a/src/analytics_manager.cpp b/src/analytics_manager.cpp index e31e7211..727ae4ad 100644 --- a/src/analytics_manager.cpp +++ b/src/analytics_manager.cpp @@ -6,6 +6,7 @@ #include "collection_manager.h" #include "lru/lru.hpp" #include "string_utils.h" +#include "tsconfig.h" LRU::Cache events_cache; #define EVENTS_RATE_LIMIT_SEC 60 @@ -152,7 +153,7 @@ Option AnalyticsManager::create_index(nlohmann::json &payload, bool upsert if(already_exists) { // remove the previous configuration with same name (upsert) - Option remove_op = remove_queries_index(suggestion_config_name); + Option remove_op = remove_index(suggestion_config_name); if(!remove_op.ok()) { return Option(500, "Error erasing the existing configuration.");; } @@ -235,13 +236,13 @@ Option AnalyticsManager::remove_rule(const std::string &name) { auto suggestion_configs_it = suggestion_configs.find(name); if(suggestion_configs_it != suggestion_configs.end()) { - return remove_queries_index(name); + return remove_index(name); } return Option(404, "Rule not found."); } -Option AnalyticsManager::remove_queries_index(const std::string &name) { +Option AnalyticsManager::remove_index(const std::string &name) { // lock is held by caller auto suggestion_configs_it = suggestion_configs.find(name); @@ -265,6 +266,10 @@ Option AnalyticsManager::remove_queries_index(const std::string &name) { nohits_queries.erase(suggestion_collection); } + if(counter_events.count(suggestion_collection) != 0) { + counter_events.erase(suggestion_collection); + } + suggestion_configs.erase(name); auto suggestion_key = std::string(ANALYTICS_RULE_PREFIX) + "_" + name; @@ -411,7 +416,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) { checkEventsExpiry(); persist_query_events(raft_server, prev_persistence_s); persist_events(raft_server, prev_persistence_s); - persist_popular_clicks(raft_server, prev_persistence_s); + persist_popular_events(raft_server, prev_persistence_s); prev_persistence_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -538,23 +543,18 @@ void AnalyticsManager::persist_events(ReplicationState *raft_server, uint64_t pr }; for (const auto &events_collection_it: query_collection_events) { - auto collection_id = CollectionManager::get_instance().get_collection( - events_collection_it.first)->get_collection_id(); for (const auto &event: events_collection_it.second) { - // send http request - nlohmann::json event_json; - event.to_json(event_json); - event_json["collection_id"] = std::to_string(collection_id); - payload_json.push_back(event_json); + if(analytics_logs.is_open()) { + //store events to log file + char event_type_short = event.event_type == "query_click" ? 'C' : 'P'; + + analytics_logs << event.timestamp << "\t" << event.user_id << "\t" + << event_type_short << "\t" << event.query << "\t" << event.doc_id << "\n"; + } } } - if(send_http_response()) { - query_collection_events.clear(); - } - - - payload_json.clear(); + query_collection_events.clear(); for (const auto &query_collection_hits_count_it: query_collection_hits_count) { auto collection_id = CollectionManager::get_instance().get_collection( @@ -575,7 +575,7 @@ void AnalyticsManager::persist_events(ReplicationState *raft_server, uint64_t pr payload_json.clear(); } -void AnalyticsManager::persist_popular_clicks(ReplicationState *raft_server, uint64_t prev_persistence_s) { +void AnalyticsManager::persist_popular_events(ReplicationState *raft_server, uint64_t prev_persistence_s) { auto send_http_response = [&](const std::string& import_payload, const std::string& collection) { std::string leader_url = raft_server->get_leader_url(); if (!leader_url.empty()) { @@ -630,6 +630,13 @@ void AnalyticsManager::dispose() { void AnalyticsManager::init(Store* store, Store* analytics_store) { this->store = store; this->analytics_store = analytics_store; + + if(analytics_store) { + const auto analytics_dir = Config::get_instance().get_analytics_dir(); + const auto analytics_log_path = analytics_dir + "/analytics_events.tsv"; + + analytics_logs.open(analytics_log_path, std::ofstream::out | std::ofstream::app); + } } Store* AnalyticsManager::get_analytics_store() { @@ -651,20 +658,19 @@ std::unordered_map AnalyticsManager::get_popular_c return counter_events; } -nlohmann::json AnalyticsManager::get_events(const std::string& event_type) { +nlohmann::json AnalyticsManager::get_events(const std::string& coll, const std::string& event_type) { std::unique_lock lk(mutex); - std::vector event_jsons; + nlohmann::json event_json; nlohmann::json result_json = nlohmann::json::array(); - if (analytics_store) { - auto event_prefix = event_type.find("click_events") != std::string::npos ? std::string(CLICK_EVENT) - : std::string(PURCHASE_EVENT); - analytics_store->scan_fill(event_prefix + "_", event_prefix + "`", - event_jsons); - - for (const auto &event_json: event_jsons) { - nlohmann::json event = nlohmann::json::parse(event_json); - result_json.push_back(event); + auto query_collection_events_it = query_collection_events.find(coll); + if (query_collection_events_it != query_collection_events.end()) { + auto events = query_collection_events_it->second; + for (const auto &event: events) { + if(event.event_type == event_type) { + event.to_json(event_json); + result_json.push_back(event_json); + } } } @@ -695,14 +701,7 @@ Option AnalyticsManager::write_events_to_store(nlohmann::json &event_jsons auto collection_id = event_json["collection_id"].get(); auto timestamp = event_json["timestamp"].get(); - std::string key = "_" + StringUtils::serialize_uint64_t(timestamp) + "_" + collection_id; - if(event_json["event_type"] == "query_click") { - key = std::string(CLICK_EVENT) + key; - } else if(event_json["event_type"] == "query_hits_counts") { - key = std::string(QUERY_HITS_COUNT) + key; - } else if(event_json["event_type"] == "query_purchase") { - key = std::string(PURCHASE_EVENT) + key; - } + std::string key = std::string(QUERY_HITS_COUNT) + "_" + StringUtils::serialize_uint64_t(timestamp) + "_" + collection_id; if(analytics_store) { bool inserted = analytics_store->insert(key, event_json.dump()); @@ -723,18 +722,11 @@ void AnalyticsManager::resetToggleRateLimit(bool toggle) { } void AnalyticsManager::resetAnalyticsStore() { - const std::string click_events_prefix = std::string(CLICK_EVENT) + "_"; const std::string query_hits_prefix = std::string(QUERY_HITS_COUNT) + "_"; - //delete click events - auto delete_prefix_begin = click_events_prefix; - auto delete_prefix_end = click_events_prefix + "`"; - - analytics_store->delete_range(delete_prefix_begin, delete_prefix_end); - //delete query hits counts - delete_prefix_begin = query_hits_prefix; - delete_prefix_end = query_hits_prefix + "`"; + auto delete_prefix_begin = query_hits_prefix; + auto delete_prefix_end = query_hits_prefix + "`"; analytics_store->delete_range(delete_prefix_begin, delete_prefix_end); } @@ -761,24 +753,13 @@ void AnalyticsManager::checkEventsExpiry() { //we check for 30days events validity, events older than 30 days will be removed from db auto ts_ttl_useconds = get_current_time_us() - EVENTS_TTL_INTERVAL_US; - const std::string click_events_prefix = std::string(CLICK_EVENT) + "_"; const std::string query_hits_prefix = std::string(QUERY_HITS_COUNT) + "_"; - //first remove click events - auto delete_prefix_begin = click_events_prefix; - auto delete_prefix_end = delete_prefix_begin + StringUtils::serialize_uint64_t(ts_ttl_useconds); - auto iter = analytics_store->get_iterator(); - iter->Seek(delete_prefix_end); - if (!iter->Valid()) { //exact key or key greater than not found - delete_prefix_end = std::string(CLICK_EVENT) + "`"; - } - - analytics_store->delete_range(delete_prefix_begin, delete_prefix_end); //now remove query hits counts - delete_prefix_begin = query_hits_prefix; - delete_prefix_end = std::string(QUERY_HITS_COUNT) + "_" + + auto delete_prefix_begin = query_hits_prefix; + auto delete_prefix_end = std::string(QUERY_HITS_COUNT) + "_" + StringUtils::serialize_uint64_t(ts_ttl_useconds); iter->SeekToFirst(); diff --git a/src/core_api.cpp b/src/core_api.cpp index dc7d3859..306f780e 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -2792,83 +2792,6 @@ bool put_conversation_model(const std::shared_ptr& req, const std::sha return true; } -bool get_analytics_events(const std::shared_ptr& req, const std::shared_ptr& res) { - auto analytics_store = AnalyticsManager::get_instance().get_analytics_store(); - if (!analytics_store) { - LOG(ERROR) << "Analytics store not initialized."; - return true; - } - - auto event_type = req->params["name"]; - if((event_type.find("click_events") == std::string::npos) - && (event_type.find("purchase_events") == std::string::npos)) { - - LOG(ERROR) << "Unknown event : " << event_type; - LOG(ERROR) << "get_analytics_events supports only click_events and purchase_events."; - return true; - } - - export_state_t *export_state = nullptr; - auto event_prefix = event_type.find("click_event") != std::string::npos ? std::string(AnalyticsManager::CLICK_EVENT) - : std::string(AnalyticsManager::PURCHASE_EVENT); - if (req->data == nullptr) { - export_state = new export_state_t(); - req->data = export_state; - - export_state->iter_upper_bound_key = event_prefix + "`"; - export_state->iter_upper_bound = new rocksdb::Slice(export_state->iter_upper_bound_key); - export_state->it = analytics_store->scan(event_prefix + "_", export_state->iter_upper_bound); - } else { - export_state = dynamic_cast(req->data); - } - - if (export_state->it != nullptr) { - rocksdb::Iterator *it = export_state->it; - size_t batch_counter = 0; - std::string().swap(res->body); - - if (!it->Valid()) { - LOG(ERROR) << "No events found in db."; - req->last_chunk_aggregate = true; - res->final = true; - res->set_404(); - stream_response(req, res); - return false; - } - - while (it->Valid() && it->key().ToString().compare(0, event_prefix.size(), event_prefix) == 0) { - res->body += it->value().ToString(); - it->Next(); - - // append a new line character if there is going to be one more record to send - if (it->Valid() && - it->key().ToString().compare(0, event_prefix.size(), event_prefix) == 0) { - res->body += "\n"; - req->last_chunk_aggregate = false; - res->final = false; - } else { - req->last_chunk_aggregate = true; - res->final = true; - } - - batch_counter++; - if (batch_counter == export_state->export_batch_size) { - break; - } - } - } else { - req->last_chunk_aggregate = true; - res->final = true; - } - - res->content_type_header = "text/plain; charset=utf-8"; - res->status_code = 200; - - stream_response(req, res); - - return true; -} - bool post_replicate_events(const std::shared_ptr& req, const std::shared_ptr& res) { nlohmann::json req_json; diff --git a/test/analytics_manager_test.cpp b/test/analytics_manager_test.cpp index ea9be206..06f84509 100644 --- a/test/analytics_manager_test.cpp +++ b/test/analytics_manager_test.cpp @@ -258,7 +258,115 @@ TEST_F(AnalyticsManagerTest, GetAndDeleteSuggestions) { ASSERT_FALSE(missing_rule_op.ok()); } -TEST_F(AnalyticsManagerTest, ClickEventsValidation) { +TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) { + //stores events in memory and reads as events are persisted in file on disk + nlohmann::json titles_schema = R"({ + "name": "titles", + "fields": [ + {"name": "title", "type": "string"} + ] + })"_json; + + Collection* titles_coll = collectionManager.create_collection(titles_schema).get(); + + std::shared_ptr req = std::make_shared(); + std::shared_ptr res = std::make_shared(nullptr); + + nlohmann::json event1 = R"({ + "type": "query_click", + "data": { + "q": "technology", + "collection": "titles", + "doc_id": "21", + "position": 2, + "user_id": "13" + } + })"_json; + + req->body = event1.dump(); + ASSERT_TRUE(post_create_event(req, res)); + + nlohmann::json event2 = R"({ + "type": "query_click", + "data": { + "q": "technology", + "collection": "titles", + "doc_id": "21", + "position": 4, + "user_id": "11" + } + })"_json; + + req->body = event2.dump(); + ASSERT_TRUE(post_create_event(req, res)); + + auto result = analyticsManager.get_events("titles", "query_click"); + ASSERT_EQ("13", result[0]["user_id"]); + ASSERT_EQ("21", result[0]["doc_id"]); + ASSERT_EQ(2, result[0]["position"]); + ASSERT_EQ("technology", result[0]["query"]); + + ASSERT_EQ("11", result[1]["user_id"]); + ASSERT_EQ("21", result[1]["doc_id"]); + ASSERT_EQ(4, result[1]["position"]); + ASSERT_EQ("technology", result[1]["query"]); +} + +TEST_F(AnalyticsManagerTest, PurchaseEventsStoreRetrieval) { +//stores events in memory and reads as events are persisted in file on disk + nlohmann::json titles_schema = R"({ + "name": "titles", + "fields": [ + {"name": "title", "type": "string"} + ] + })"_json; + + Collection *titles_coll = collectionManager.create_collection(titles_schema).get(); + + std::shared_ptr req = std::make_shared(); + std::shared_ptr res = std::make_shared(nullptr); + + nlohmann::json event1 = R"({ + "type": "query_purchase", + "data": { + "q": "technology", + "collection": "titles", + "doc_id": "21", + "position": 2, + "user_id": "13" + } + })"_json; + + req->body = event1.dump(); + ASSERT_TRUE(post_create_event(req, res)); + + nlohmann::json event2 = R"({ + "type": "query_purchase", + "data": { + "q": "technology", + "collection": "titles", + "doc_id": "21", + "position": 4, + "user_id": "11" + } + })"_json; + + req->body = event2.dump(); + ASSERT_TRUE(post_create_event(req, res)); + + auto result = analyticsManager.get_events("titles", "query_purchase"); + ASSERT_EQ("13", result[0]["user_id"]); + ASSERT_EQ("21", result[0]["doc_id"]); + ASSERT_EQ(2, result[0]["position"]); + ASSERT_EQ("technology", result[0]["query"]); + + ASSERT_EQ("11", result[1]["user_id"]); + ASSERT_EQ("21", result[1]["doc_id"]); + ASSERT_EQ(4, result[1]["position"]); + ASSERT_EQ("technology", result[1]["query"]); +} + +TEST_F(AnalyticsManagerTest, EventsValidation) { nlohmann::json titles_schema = R"({ "name": "titles", "fields": [ @@ -304,7 +412,7 @@ TEST_F(AnalyticsManagerTest, ClickEventsValidation) { //should be string type nlohmann::json event3 = R"({ - "type": "query_click", + "type": "query_purchase", "data": { "q": "technology", "collection": "titles", @@ -332,9 +440,23 @@ TEST_F(AnalyticsManagerTest, ClickEventsValidation) { req->body = event4.dump(); ASSERT_TRUE(post_create_event(req, res)); + + nlohmann::json event5 = R"({ + "type": "query_purchase", + "data": { + "q": "technology", + "collection": "titles", + "doc_id": "21", + "position": 4, + "user_id": "11" + } + })"_json; + + req->body = event5.dump(); + ASSERT_TRUE(post_create_event(req, res)); } -TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) { +TEST_F(AnalyticsManagerTest, EventsRateLimitTest) { nlohmann::json titles_schema = R"({ "name": "titles", "fields": [ @@ -347,6 +469,8 @@ TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) { std::shared_ptr req = std::make_shared(); std::shared_ptr res = std::make_shared(nullptr); + std::vector events; + nlohmann::json event1 = R"({ "type": "query_click", "data": { @@ -358,11 +482,8 @@ TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) { } })"_json; - req->body = event1.dump(); - ASSERT_TRUE(post_create_event(req, res)); - nlohmann::json event2 = R"({ - "type": "query_click", + "type": "query_purchase", "data": { "q": "technology", "collection": "titles", @@ -372,67 +493,14 @@ TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) { } })"_json; - req->body = event2.dump(); - ASSERT_TRUE(post_create_event(req, res)); - - event1["collection_id"] = "0"; - event1["timestamp"] = 1521512521; - event1["event_type"] = "query_click"; - event2["collection_id"] = "0"; - event2["timestamp"] = 1521514354; - event2["event_type"] = "query_click"; - - nlohmann::json click_events = nlohmann::json::array(); - click_events.push_back(event1); - click_events.push_back(event2); - - req->body = click_events.dump(); - ASSERT_TRUE(post_replicate_events(req, res)); - - auto result = analyticsManager.get_events("click_events"); - - ASSERT_EQ("0", result[0]["collection_id"]); - ASSERT_EQ("13", result[0]["data"]["user_id"]); - ASSERT_EQ("21", result[0]["data"]["doc_id"]); - ASSERT_EQ(2, result[0]["data"]["position"]); - ASSERT_EQ("technology", result[0]["data"]["q"]); - - ASSERT_EQ("0", result[1]["collection_id"]); - ASSERT_EQ("11", result[1]["data"]["user_id"]); - ASSERT_EQ("21", result[1]["data"]["doc_id"]); - ASSERT_EQ(4, result[1]["data"]["position"]); - ASSERT_EQ("technology", result[1]["data"]["q"]); -} - -TEST_F(AnalyticsManagerTest, ClickEventsRateLimitTest) { - nlohmann::json titles_schema = R"({ - "name": "titles", - "fields": [ - {"name": "title", "type": "string"} - ] - })"_json; - - Collection* titles_coll = collectionManager.create_collection(titles_schema).get(); - - std::shared_ptr req = std::make_shared(); - std::shared_ptr res = std::make_shared(nullptr); - - nlohmann::json event = R"({ - "type": "query_click", - "data": { - "q": "technology", - "collection": "titles", - "doc_id": "21", - "position": 2, - "user_id": "13" - } - })"_json; + events.push_back(event1); + events.push_back(event2); //reset the LRU cache to test the rate limit analyticsManager.resetToggleRateLimit(true); - req->body = event.dump(); for(auto i = 0; i < 5; ++i) { + req->body = events[i%2].dump(); ASSERT_TRUE(post_create_event(req, res)); } @@ -648,39 +716,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryBasic) { nlohmann::json events = nlohmann::json::array(); auto ts = 1701851341000000; - nlohmann::json event; - event["event_type"] = "query_click"; - event["q"] = "technology"; - event["collection_id"] = "0"; - event["doc_id"] = "21"; - event["position"] = 2; - event["user_id"] = 13; - event["timestamp"] = ts; - events.push_back(event); - - event["doc_id"] = "12"; - event["position"] = 3; - event["timestamp"] = ts+1000000; //1 sec later - events.push_back(event); - - ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok()); - - nlohmann::json resp = analyticsManager.get_events("click_events"); - ASSERT_EQ(2, resp.size()); - ASSERT_EQ("technology", resp[0]["q"]); - ASSERT_EQ("21", resp[0]["doc_id"]); - ASSERT_EQ(2, resp[0]["position"]); - ASSERT_EQ(13, resp[0]["user_id"]); - - ASSERT_EQ("technology", resp[1]["q"]); - ASSERT_EQ("12", resp[1]["doc_id"]); - ASSERT_EQ(3, resp[1]["position"]); - ASSERT_EQ(13, resp[1]["user_id"]); - - analyticsManager.checkEventsExpiry(); - - resp = analyticsManager.get_events("click_events"); - ASSERT_EQ(0, resp.size()); //add query hits events with click events nlohmann::json event2; @@ -695,19 +730,15 @@ TEST_F(AnalyticsManagerTest, EventsExpiryBasic) { ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok()); - resp = analyticsManager.get_query_hits_counts(); + auto resp = analyticsManager.get_query_hits_counts(); ASSERT_EQ(1, resp.size()); ASSERT_EQ("technology", resp[0]["q"]); ASSERT_EQ(13, resp[0]["user_id"]); ASSERT_EQ(124, resp[0]["hits_count"]); - //now old click events will be deleted on checking expiry but query hits events will be remaining // assumming ttl is ts + 5 sec analyticsManager.checkEventsExpiry(); - resp = analyticsManager.get_events("click_events"); - ASSERT_EQ(0, resp.size()); - resp = analyticsManager.get_query_hits_counts(); ASSERT_EQ(1, resp.size()); ASSERT_EQ("technology", resp[0]["q"]); @@ -731,21 +762,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryAll) { nlohmann::json events = nlohmann::json::array(); auto ts = 1701851341000000; - nlohmann::json event; - event["event_type"] = "query_click"; - event["q"] = "technology"; - event["collection_id"] = "0"; - event["doc_id"] = "21"; - event["position"] = 2; - event["user_id"] = 13; - event["timestamp"] = ts; - events.push_back(event); - - event["doc_id"] = "12"; - event["position"] = 3; - event["timestamp"] = ts + 1000000; //after 1s - events.push_back(event); - nlohmann::json event2; event2["event_type"] = "query_hits_counts"; event2["q"] = "technology"; @@ -758,19 +774,7 @@ TEST_F(AnalyticsManagerTest, EventsExpiryAll) { ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok()); - nlohmann::json resp = analyticsManager.get_events("click_events"); - ASSERT_EQ(2, resp.size()); - ASSERT_EQ("technology", resp[0]["q"]); - ASSERT_EQ("21", resp[0]["doc_id"]); - ASSERT_EQ(2, resp[0]["position"]); - ASSERT_EQ(13, resp[0]["user_id"]); - - ASSERT_EQ("technology", resp[1]["q"]); - ASSERT_EQ("12", resp[1]["doc_id"]); - ASSERT_EQ(3, resp[1]["position"]); - ASSERT_EQ(13, resp[1]["user_id"]); - - resp = analyticsManager.get_query_hits_counts(); + auto resp = analyticsManager.get_query_hits_counts(); ASSERT_EQ(1, resp.size()); ASSERT_EQ("technology", resp[0]["q"]); ASSERT_EQ(13, resp[0]["user_id"]); @@ -780,9 +784,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryAll) { // assuming ttl is ts + 5 sec analyticsManager.checkEventsExpiry(); - resp = analyticsManager.get_events("click_events"); - ASSERT_EQ(0, resp.size()); - resp = analyticsManager.get_query_hits_counts(); ASSERT_EQ(0, resp.size()); } @@ -803,25 +804,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryPartial) { nlohmann::json events = nlohmann::json::array(); auto ts = 1701851341000000; - nlohmann::json event; - event["event_type"] = "query_click"; - event["q"] = "technology"; - event["collection_id"] = "0"; - event["doc_id"] = "21"; - event["position"] = 2; - event["user_id"] = 13; - event["timestamp"] = ts; - events.push_back(event); - - event["doc_id"] = "12"; - event["position"] = 3; - event["timestamp"] = ts + 1000000; //after 1s - events.push_back(event); - - event["doc_id"] = "19"; - event["position"] = 1; - event["timestamp"] = ts + 6000000; //after 6s - events.push_back(event); nlohmann::json event2; event2["event_type"] = "query_hits_counts"; @@ -846,24 +828,7 @@ TEST_F(AnalyticsManagerTest, EventsExpiryPartial) { ASSERT_TRUE(analyticsManager.write_events_to_store(events).ok()); - nlohmann::json resp = analyticsManager.get_events("click_events"); - ASSERT_EQ(3, resp.size()); - ASSERT_EQ("technology", resp[0]["q"]); - ASSERT_EQ("21", resp[0]["doc_id"]); - ASSERT_EQ(2, resp[0]["position"]); - ASSERT_EQ(13, resp[0]["user_id"]); - - ASSERT_EQ("technology", resp[1]["q"]); - ASSERT_EQ("12", resp[1]["doc_id"]); - ASSERT_EQ(3, resp[1]["position"]); - ASSERT_EQ(13, resp[1]["user_id"]); - - ASSERT_EQ("technology", resp[2]["q"]); - ASSERT_EQ("19", resp[2]["doc_id"]); - ASSERT_EQ(1, resp[2]["position"]); - ASSERT_EQ(13, resp[2]["user_id"]); - - resp = analyticsManager.get_query_hits_counts(); + auto resp = analyticsManager.get_query_hits_counts(); ASSERT_EQ(3, resp.size()); ASSERT_EQ("technology", resp[0]["q"]); ASSERT_EQ(13, resp[0]["user_id"]); @@ -882,13 +847,6 @@ TEST_F(AnalyticsManagerTest, EventsExpiryPartial) { //only events in ttl interval will be removed analyticsManager.checkEventsExpiry(); - resp = analyticsManager.get_events("click_events"); - ASSERT_EQ(1, resp.size()); - ASSERT_EQ("technology", resp[0]["q"]); - ASSERT_EQ("19", resp[0]["doc_id"]); - ASSERT_EQ(1, resp[0]["position"]); - ASSERT_EQ(13, resp[0]["user_id"]); - resp = analyticsManager.get_query_hits_counts(); ASSERT_EQ(1, resp.size()); ASSERT_EQ("management", resp[0]["q"]); @@ -1179,75 +1137,4 @@ TEST_F(AnalyticsManagerTest, PopularityScoreValidation) { create_op = analyticsManager.create_rule(analytics_rule, false, true); ASSERT_FALSE(create_op.ok()); ASSERT_EQ("Bad or missing events.", create_op.error()); -} - -TEST_F(AnalyticsManagerTest, PurchaseEventsStoreRetrieval) { - - nlohmann::json titles_schema = R"({ - "name": "titles", - "fields": [ - {"name": "title", "type": "string"} - ] - })"_json; - - Collection *titles_coll = collectionManager.create_collection(titles_schema).get(); - - std::shared_ptr req = std::make_shared(); - std::shared_ptr res = std::make_shared(nullptr); - - nlohmann::json event1 = R"({ - "type": "query_purchase", - "data": { - "q": "technology", - "collection": "titles", - "doc_id": "21", - "position": 2, - "user_id": "13" - } - })"_json; - - req->body = event1.dump(); - ASSERT_TRUE(post_create_event(req, res)); - - nlohmann::json event2 = R"({ - "type": "query_purchase", - "data": { - "q": "technology", - "collection": "titles", - "doc_id": "21", - "position": 4, - "user_id": "11" - } - })"_json; - - req->body = event2.dump(); - ASSERT_TRUE(post_create_event(req, res)); - - event1["collection_id"] = "0"; - event1["timestamp"] = 1521512521; - event1["event_type"] = "query_purchase"; - event2["collection_id"] = "0"; - event2["timestamp"] = 1521514354; - event2["event_type"] = "query_purchase"; - - nlohmann::json events = nlohmann::json::array(); - events.push_back(event1); - events.push_back(event2); - - req->body = events.dump(); - ASSERT_TRUE(post_replicate_events(req, res)); - - auto result = analyticsManager.get_events("purchase_events"); - - ASSERT_EQ("0", result[0]["collection_id"]); - ASSERT_EQ("13", result[0]["data"]["user_id"]); - ASSERT_EQ("21", result[0]["data"]["doc_id"]); - ASSERT_EQ(2, result[0]["data"]["position"]); - ASSERT_EQ("technology", result[0]["data"]["q"]); - - ASSERT_EQ("0", result[1]["collection_id"]); - ASSERT_EQ("11", result[1]["data"]["user_id"]); - ASSERT_EQ("21", result[1]["data"]["doc_id"]); - ASSERT_EQ(4, result[1]["data"]["position"]); - ASSERT_EQ("technology", result[1]["data"]["q"]); } \ No newline at end of file diff --git a/test/core_api_utils_test.cpp b/test/core_api_utils_test.cpp index 0331deed..ad143b1f 100644 --- a/test/core_api_utils_test.cpp +++ b/test/core_api_utils_test.cpp @@ -1497,120 +1497,4 @@ TEST_F(CoreAPIUtilsTest, TestInvalidConversationModels) { ASSERT_EQ(400, resp->status_code); ASSERT_EQ("Property `model_name` is not provided or not a string.", nlohmann::json::parse(resp->body)["message"]); -} - -TEST_F(CoreAPIUtilsTest, GetClickEvents) { - //reset analytics store - analyticsManager.resetAnalyticsStore(); - - nlohmann::json schema = R"({ - "name": "titles", - "fields": [ - {"name": "name", "type": "string" }, - {"name": "points", "type": "int32" } - ] - })"_json; - - auto op = collectionManager.create_collection(schema); - ASSERT_TRUE(op.ok()); - Collection* titles = op.get(); - - std::shared_ptr req = std::make_shared(); - std::shared_ptr res = std::make_shared(nullptr); - - // no events in db - req->params["name"] = "click_events"; - get_analytics_events(req, res); - ASSERT_EQ("{\"message\": \"Not Found\"}", res->body); - - //add some events - nlohmann::json event1 = R"({ - "type": "query_click", - "data": { - "q": "technology", - "collection": "titles", - "doc_id": "21", - "position": 2, - "user_id": "13" - } - })"_json; - - req->body = event1.dump(); - ASSERT_TRUE(post_create_event(req, res)); - - nlohmann::json event2 = R"({ - "type": "query_click", - "data": { - "q": "technology", - "collection": "titles", - "doc_id": "12", - "position": 1, - "user_id": "13" - } - })"_json; - req->body = event2.dump(); - ASSERT_TRUE(post_create_event(req, res)); - - nlohmann::json event3 = R"({ - "type": "query_click", - "data": { - "q": "technology", - "collection": "titles", - "doc_id": "52", - "position": 5, - "user_id": "13" - } - })"_json; - req->body = event3.dump(); - ASSERT_TRUE(post_create_event(req, res)); - - event1["collection_id"] = "0"; - event1["timestamp"] = 1521512521; - event1["event_type"] = "query_click"; - event2["collection_id"] = "0"; - event2["timestamp"] = 1521514354; - event2["event_type"] = "query_click"; - event3["collection_id"] = "0"; - event3["timestamp"] = 1521515382; - event3["event_type"] = "query_click"; - - - nlohmann::json click_events = nlohmann::json::array(); - click_events.push_back(event1); - click_events.push_back(event2); - click_events.push_back(event3); - - req->body = click_events.dump(); - ASSERT_TRUE(post_replicate_events(req, res)); - - //get click events - req->data = nullptr; - req->params["name"] = "click_events"; - get_analytics_events(req, res); - - std::vector res_strs; - StringUtils::split(res->body, res_strs, "\n"); - - auto result = nlohmann::json::array(); - result.push_back(nlohmann::json::parse(res_strs[0])); - result.push_back(nlohmann::json::parse(res_strs[1])); - result.push_back(nlohmann::json::parse(res_strs[2])); - - ASSERT_EQ("0", result[0]["collection_id"]); - ASSERT_EQ("13", result[0]["data"]["user_id"]); - ASSERT_EQ("21", result[0]["data"]["doc_id"]); - ASSERT_EQ(2, result[0]["data"]["position"]); - ASSERT_EQ("technology", result[0]["data"]["q"]); - - ASSERT_EQ("0", result[1]["collection_id"]); - ASSERT_EQ("13", result[1]["data"]["user_id"]); - ASSERT_EQ("12", result[1]["data"]["doc_id"]); - ASSERT_EQ(1, result[1]["data"]["position"]); - ASSERT_EQ("technology", result[1]["data"]["q"]); - - ASSERT_EQ("0", result[2]["collection_id"]); - ASSERT_EQ("13", result[2]["data"]["user_id"]); - ASSERT_EQ("52", result[2]["data"]["doc_id"]); - ASSERT_EQ(5, result[2]["data"]["position"]); - ASSERT_EQ("technology", result[2]["data"]["q"]); } \ No newline at end of file