diff --git a/include/analytics_manager.h b/include/analytics_manager.h index bd9b9b74..37ddcbf1 100644 --- a/include/analytics_manager.h +++ b/include/analytics_manager.h @@ -198,11 +198,11 @@ public: Option add_click_event(const std::string& query_collection, const std::string& query, const std::string& user_id, std::string doc_id, uint64_t position, const std::string& client_ip); - void persist_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s); + void persist_query_hits_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s); nlohmann::json get_click_events(); - Option write_click_event_to_store(nlohmann::json& click_event_json); + Option write_events_to_store(nlohmann::json& event_jsons); void add_nohits_query(const std::string& query_collection, const std::string& query, bool live_query, const std::string& user_id); @@ -214,9 +214,5 @@ public: void add_query_hits_count(const std::string& query_collection, const std::string& query, const std::string& user_id, uint64_t hits_count); - void persist_query_hits_counts(ReplicationState *raft_server, uint64_t prev_persistence_s); - nlohmann::json get_query_hits_counts(); - - Option write_query_hits_counts_to_store(nlohmann::json& query_hits_counts_json); }; diff --git a/include/core_api.h b/include/core_api.h index 55186153..158c975b 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -171,12 +171,10 @@ bool del_analytics_rules(const std::shared_ptr& req, const std::shared bool get_click_events(const std::shared_ptr& req, const std::shared_ptr& res); -bool post_replicate_click_event(const std::shared_ptr& req, const std::shared_ptr& res); +bool post_replicate_events(const std::shared_ptr& req, const std::shared_ptr& res); bool get_query_hits_counts(const std::shared_ptr& req, const std::shared_ptr& res); -bool post_replicate_query_hits_counts(const std::shared_ptr& req, const std::shared_ptr& res); - // Misc helpers void get_collections_for_auth(std::map& req_params, const std::string& body, diff --git a/src/analytics_manager.cpp b/src/analytics_manager.cpp index 351a89ca..662ec153 100644 --- a/src/analytics_manager.cpp +++ b/src/analytics_manager.cpp @@ -342,8 +342,7 @@ void AnalyticsManager::run(ReplicationState* raft_server) { } persist_query_events(raft_server, prev_persistence_s); - persist_click_events(raft_server, prev_persistence_s); - persist_query_hits_counts(raft_server, prev_persistence_s); + persist_query_hits_click_events(raft_server, prev_persistence_s); prev_persistence_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -438,10 +437,36 @@ void AnalyticsManager::persist_query_events(ReplicationState *raft_server, uint6 } } -void AnalyticsManager::persist_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s) { +void AnalyticsManager::persist_query_hits_click_events(ReplicationState *raft_server, uint64_t prev_persistence_s) { // lock is held by caller nlohmann::json payload_json = nlohmann::json::array(); + auto send_http_response = [&](const std::string& event_type) { + if(payload_json.empty()) { + return; + } + + const std::string import_payload = payload_json.dump(); + + std::string leader_url = raft_server->get_leader_url(); + if (!leader_url.empty()) { + const std::string &base_url = leader_url + "analytics"; + std::string res; + + const std::string &update_url = base_url + "/" + event_type +"/replicate"; + std::map res_headers; + long status_code = HttpClient::post_response(update_url, import_payload, + res, res_headers, {}, 10 * 1000, true); + + if (status_code != 200) { + LOG(ERROR) << "Error while sending click events to leader. " + << "Status code: " << status_code << ", response: " << res; + } else { + query_collection_click_events.clear(); + } + } + }; + for (const auto &click_events_collection_it: query_collection_click_events) { auto collection_id = CollectionManager::get_instance().get_collection( click_events_collection_it.first)->get_collection_id(); @@ -450,37 +475,13 @@ void AnalyticsManager::persist_click_events(ReplicationState *raft_server, uint6 nlohmann::json click_event_json; click_event.to_json(click_event_json); click_event_json["collection_id"] = std::to_string(collection_id); + click_event_json["event_type"] = "click_events"; payload_json.push_back(click_event_json); } } - if(payload_json.empty()) { - return; - } + send_http_response("click_events"); - const std::string import_payload = payload_json.dump(); - - std::string leader_url = raft_server->get_leader_url(); - if (!leader_url.empty()) { - const std::string &base_url = leader_url + "analytics"; - std::string res; - - const std::string &update_url = base_url + "/click_events/replicate"; - std::map res_headers; - long status_code = HttpClient::post_response(update_url, import_payload, - res, res_headers, {}, 10 * 1000, true); - - if (status_code != 200) { - LOG(ERROR) << "Error while sending click events to leader. " - << "Status code: " << status_code << ", response: " << res; - } else { - query_collection_click_events.clear(); - } - } -} - -void AnalyticsManager::persist_query_hits_counts(ReplicationState *raft_server, uint64_t prev_persistence_s) { - nlohmann::json payload_json = nlohmann::json::array(); for (const auto &query_collection_hits_count_it: query_collection_hits_count) { auto collection_id = CollectionManager::get_instance().get_collection( @@ -490,33 +491,11 @@ void AnalyticsManager::persist_query_hits_counts(ReplicationState *raft_server, nlohmann::json query_hits_count_json; query_hits_count.to_json(query_hits_count_json); query_hits_count_json["collection_id"] = std::to_string(collection_id); + query_hits_count_json["event_type"] = "query_hits_counts"; payload_json.push_back(query_hits_count_json); } } - - if (payload_json.empty()) { - return; - } - - const std::string import_payload = payload_json.dump(); - - std::string leader_url = raft_server->get_leader_url(); - if (!leader_url.empty()) { - const std::string &base_url = leader_url + "analytics"; - std::string res; - - const std::string &update_url = base_url + "/query_hits_counts/replicate"; - std::map res_headers; - long status_code = HttpClient::post_response(update_url, import_payload, - res, res_headers, {}, 10 * 1000, true); - - if (status_code != 200) { - LOG(ERROR) << "Error while sending click events to leader. " - << "Status code: " << status_code << ", response: " << res; - } else { - query_collection_hits_count.clear(); - } - } + send_http_response("query_hits_counts"); } void AnalyticsManager::stop() { @@ -591,34 +570,25 @@ nlohmann::json AnalyticsManager::get_query_hits_counts() { return result_json; } -Option AnalyticsManager::write_click_event_to_store(nlohmann::json &click_event_jsons) { - for(const auto& click_event_json : click_event_jsons) { - auto collection_id = click_event_json["collection_id"].get(); - auto timestamp = click_event_json["timestamp"].get(); - const std::string key = std::string(CLICK_EVENT) + "_" + collection_id + "_" + - std::to_string(timestamp); - if(analytics_store) { - bool inserted = analytics_store->insert(key, click_event_json.dump()); - if (!inserted) { - return Option(500, "Unable to insert clickevent into store."); - } - } else { - return Option(500, "Analytics DB not initialized."); - } - } - return Option(true); -} +Option AnalyticsManager::write_events_to_store(nlohmann::json &event_jsons) { + for(const auto& event_json : event_jsons) { + auto collection_id = event_json["collection_id"].get(); + auto timestamp = event_json["timestamp"].get(); + + std::string key = ""; + if(event_json["event_type"] == "click_events") { + key = std::string(CLICK_EVENT) + "_" + collection_id + "_" + + std::to_string(timestamp); + } else if(event_json["event_type"] == "query_hits_counts") { + key = std::string(QUERY_HITS_COUNT) + "_" + collection_id + "_" + + std::to_string(timestamp); + } -Option AnalyticsManager::write_query_hits_counts_to_store(nlohmann::json &query_hits_counts_json) { - for(const auto& query_hits_count : query_hits_counts_json) { - auto collection_id = query_hits_count["collection_id"].get(); - auto timestamp = query_hits_count["timestamp"].get(); - const std::string key = std::string(QUERY_HITS_COUNT) + "_" + collection_id + "_" + - std::to_string(timestamp); if(analytics_store) { - bool inserted = analytics_store->insert(key, query_hits_count.dump()); + bool inserted = analytics_store->insert(key, event_json.dump()); if (!inserted) { - return Option(500, "Unable to insert query hits count into store."); + std::string error = "Unable to insert " + std::string(event_json["event_type"]) + " to store"; + return Option(500, error); } } else { return Option(500, "Analytics DB not initialized."); diff --git a/src/core_api.cpp b/src/core_api.cpp index 8ca74f10..aa538bc3 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -2806,7 +2806,7 @@ bool get_click_events(const std::shared_ptr& req, const std::shared_pt return true; } -bool post_replicate_click_event(const std::shared_ptr& req, const std::shared_ptr& res) { +bool post_replicate_events(const std::shared_ptr& req, const std::shared_ptr& res) { nlohmann::json req_json; try { @@ -2817,13 +2817,13 @@ bool post_replicate_click_event(const std::shared_ptr& req, const std: return false; } - auto op = AnalyticsManager::get_instance().write_click_event_to_store(req_json); + auto op = AnalyticsManager::get_instance().write_events_to_store(req_json); if(!op.ok()) { res->set_body(op.code(), op.error()); return false; } - res->set_200("click_event_t wrote to DB."); + res->set_200("event wrote to DB."); return true; } @@ -2832,25 +2832,4 @@ bool get_query_hits_counts(const std::shared_ptr& req, const std::shar res->set_200(query_hits_counts.dump()); return true; -} - -bool post_replicate_query_hits_counts(const std::shared_ptr& req, const std::shared_ptr& res) { - nlohmann::json req_json; - - try { - req_json = nlohmann::json::parse(req->body); - } catch(const std::exception& e) { - LOG(ERROR) << "JSON error: " << e.what(); - res->set_400("Bad JSON."); - return false; - } - - auto op = AnalyticsManager::get_instance().write_query_hits_counts_to_store(req_json); - if(!op.ok()) { - res->set_body(op.code(), op.error()); - return false; - } - - res->set_200("query hits counts wrote to DB."); - return true; } \ No newline at end of file diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index fc475d71..f4993434 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -84,9 +84,9 @@ void master_server_routes() { //collection based query click events server->get("/analytics/click_events", get_click_events); server->post("/analytics/click_events", post_create_event); - server->post("/analytics/click_events/replicate", post_replicate_click_event); + server->post("/analytics/click_events/replicate", post_replicate_events); server->get("/analytics/query_hits_counts", get_query_hits_counts); - server->post("/analytics/query_hits_counts/replicate", post_replicate_query_hits_counts); + server->post("/analytics/query_hits_counts/replicate", post_replicate_events); // meta server->get("/metrics.json", get_metrics_json); diff --git a/test/analytics_manager_test.cpp b/test/analytics_manager_test.cpp index e0181d7f..27facacb 100644 --- a/test/analytics_manager_test.cpp +++ b/test/analytics_manager_test.cpp @@ -324,14 +324,17 @@ TEST_F(AnalyticsManagerTest, ClickEventsStoreRetrieveal) { event1["collection_id"] = "0"; event1["timestamp"] = 1521512521; + event1["event_type"] = "click_events"; event2["collection_id"] = "0"; event2["timestamp"] = 1521514354; + event2["event_type"] = "click_events"; + nlohmann::json click_events = nlohmann::json::array(); click_events.push_back(event1); click_events.push_back(event2); req->body = click_events.dump(); - ASSERT_TRUE(post_replicate_click_event(req, res)); + ASSERT_TRUE(post_replicate_events(req, res)); auto result = analyticsManager.get_click_events(); @@ -469,6 +472,8 @@ TEST_F(AnalyticsManagerTest, QueryHitsCount) { nlohmann::json obj; obj["collection_id"] = "0"; + obj["event_type"] = "query_hits_counts"; + obj["query"] = "cool"; obj["timestamp"] = 1625365612; obj["user_id"] = "1"; @@ -482,7 +487,7 @@ TEST_F(AnalyticsManagerTest, QueryHitsCount) { query_hits_array.push_back(obj); - auto op = analyticsManager.write_query_hits_counts_to_store(query_hits_array); + auto op = analyticsManager.write_events_to_store(query_hits_array); ASSERT_TRUE(op.ok()); auto result = analyticsManager.get_query_hits_counts();