make get_click_events as export

This commit is contained in:
krunal 2023-12-18 16:36:12 +05:30
parent 35ac3b2980
commit e37b9bf775
4 changed files with 69 additions and 4 deletions

View File

@ -200,6 +200,8 @@ public:
void dispose();
Store* get_analytics_store();
void persist_query_events(ReplicationState *raft_server, uint64_t prev_persistence_s);
std::unordered_map<std::string, QueryAnalytics*> get_popular_queries();

View File

@ -598,6 +598,10 @@ void AnalyticsManager::init(Store* store, Store* analytics_store) {
this->analytics_store = analytics_store;
}
Store* AnalyticsManager::get_analytics_store() {
return this->analytics_store;
}
std::unordered_map<std::string, QueryAnalytics*> AnalyticsManager::get_popular_queries() {
std::unique_lock lk(mutex);
return popular_queries;

View File

@ -2804,10 +2804,69 @@ bool put_conversation_model(const std::shared_ptr<http_req>& req, const std::sha
res->set_200(model.dump());
return true;
}
bool get_click_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto click_events = AnalyticsManager::get_instance().get_click_events();
res->set_200(click_events.dump());
bool get_click_events(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto analytics_store = AnalyticsManager::get_instance().get_analytics_store();
if(analytics_store) {
export_state_t *export_state = nullptr;
auto click_event_prefix = std::string(AnalyticsManager::CLICK_EVENT) + "_";
if(req->data == nullptr) {
export_state = new export_state_t();
req->data = export_state;
export_state->iter_upper_bound_key = std::string(AnalyticsManager::CLICK_EVENT) + "`";
export_state->iter_upper_bound = new rocksdb::Slice(export_state->iter_upper_bound_key);
export_state->it = analytics_store->scan(click_event_prefix, export_state->iter_upper_bound);
} else {
export_state = dynamic_cast<export_state_t*>(req->data);
}
if (export_state->it != nullptr) {
rocksdb::Iterator *it = export_state->it;
size_t batch_counter = 0;
std::string().swap(res->body);
if(!it->Valid()) {
LOG(ERROR) << "No click events found in db.";
req->last_chunk_aggregate = true;
res->final = true;
res->set_404();
stream_response(req, res);
return false;
}
while (it->Valid() && it->key().ToString().compare(0, click_event_prefix.size(), click_event_prefix) == 0) {
res->body += it->value().ToString();
it->Next();
// append a new line character if there is going to be one more record to send
if (it->Valid() &&
it->key().ToString().compare(0, click_event_prefix.size(), click_event_prefix) == 0) {
res->body += "\n";
req->last_chunk_aggregate = false;
res->final = false;
} else {
req->last_chunk_aggregate = true;
res->final = true;
}
batch_counter++;
if (batch_counter == export_state->export_batch_size) {
break;
}
}
} else {
req->last_chunk_aggregate = true;
res->final = true;
}
res->content_type_header = "text/plain; charset=utf-8";
res->status_code = 200;
stream_response(req, res);
} else {
LOG(ERROR) << "Analytics store not initialized.";
}
return true;
}

View File

@ -82,7 +82,7 @@ void master_server_routes() {
server->post("/analytics/events", post_create_event);
//collection based query click events
server->get("/analytics/click_events", get_click_events);
server->get("/analytics/click_events", get_click_events, false, true);
server->post("/analytics/click_events", post_create_event);
server->post("/analytics/click_events/replicate", post_replicate_events);
server->get("/analytics/query_hits_counts", get_query_hits_counts);