diff --git a/include/analytics_manager.h b/include/analytics_manager.h index 281e03c3..08c44010 100644 --- a/include/analytics_manager.h +++ b/include/analytics_manager.h @@ -200,6 +200,8 @@ public: void dispose(); + Store* get_analytics_store(); + void persist_query_events(ReplicationState *raft_server, uint64_t prev_persistence_s); std::unordered_map get_popular_queries(); diff --git a/src/analytics_manager.cpp b/src/analytics_manager.cpp index ad6003ac..6db4978b 100644 --- a/src/analytics_manager.cpp +++ b/src/analytics_manager.cpp @@ -598,6 +598,10 @@ void AnalyticsManager::init(Store* store, Store* analytics_store) { this->analytics_store = analytics_store; } +Store* AnalyticsManager::get_analytics_store() { + return this->analytics_store; +} + std::unordered_map AnalyticsManager::get_popular_queries() { std::unique_lock lk(mutex); return popular_queries; diff --git a/src/core_api.cpp b/src/core_api.cpp index 8c640b82..d75214b5 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -2804,10 +2804,69 @@ bool put_conversation_model(const std::shared_ptr& req, const std::sha res->set_200(model.dump()); return true; } -bool get_click_events(const std::shared_ptr& req, const std::shared_ptr& res) { - auto click_events = AnalyticsManager::get_instance().get_click_events(); - res->set_200(click_events.dump()); +bool get_click_events(const std::shared_ptr& req, const std::shared_ptr& res) { + auto analytics_store = AnalyticsManager::get_instance().get_analytics_store(); + if(analytics_store) { + export_state_t *export_state = nullptr; + auto click_event_prefix = std::string(AnalyticsManager::CLICK_EVENT) + "_"; + if(req->data == nullptr) { + export_state = new export_state_t(); + req->data = export_state; + + export_state->iter_upper_bound_key = std::string(AnalyticsManager::CLICK_EVENT) + "`"; + export_state->iter_upper_bound = new rocksdb::Slice(export_state->iter_upper_bound_key); + export_state->it = analytics_store->scan(click_event_prefix, export_state->iter_upper_bound); + } else { + export_state = dynamic_cast(req->data); + } + + if (export_state->it != nullptr) { + rocksdb::Iterator *it = export_state->it; + size_t batch_counter = 0; + std::string().swap(res->body); + + if(!it->Valid()) { + LOG(ERROR) << "No click events found in db."; + req->last_chunk_aggregate = true; + res->final = true; + res->set_404(); + stream_response(req, res); + return false; + } + + while (it->Valid() && it->key().ToString().compare(0, click_event_prefix.size(), click_event_prefix) == 0) { + res->body += it->value().ToString(); + it->Next(); + + // append a new line character if there is going to be one more record to send + if (it->Valid() && + it->key().ToString().compare(0, click_event_prefix.size(), click_event_prefix) == 0) { + res->body += "\n"; + req->last_chunk_aggregate = false; + res->final = false; + } else { + req->last_chunk_aggregate = true; + res->final = true; + } + + batch_counter++; + if (batch_counter == export_state->export_batch_size) { + break; + } + } + } else { + req->last_chunk_aggregate = true; + res->final = true; + } + + res->content_type_header = "text/plain; charset=utf-8"; + res->status_code = 200; + + stream_response(req, res); + } else { + LOG(ERROR) << "Analytics store not initialized."; + } return true; } diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index f4993434..a52db200 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -82,7 +82,7 @@ void master_server_routes() { server->post("/analytics/events", post_create_event); //collection based query click events - server->get("/analytics/click_events", get_click_events); + server->get("/analytics/click_events", get_click_events, false, true); server->post("/analytics/click_events", post_create_event); server->post("/analytics/click_events/replicate", post_replicate_events); server->get("/analytics/query_hits_counts", get_query_hits_counts);