Allow filtering in export end-point.

This commit is contained in:
Kishore Nallan 2021-06-27 21:16:10 +05:30
parent e4a33b8586
commit d406bb4715
5 changed files with 194 additions and 12 deletions

View File

@ -472,6 +472,8 @@ public:
Option<bool> get_document_from_store(const std::string & seq_id_key, nlohmann::json & document) const;
Option<bool> get_document_from_store(const uint32_t& seq_id, nlohmann::json & document) const;
Option<uint32_t> index_in_memory(nlohmann::json & document, uint32_t seq_id,
const index_operation_t op, const DIRTY_VALUES& dirty_values);

View File

@ -17,4 +17,26 @@ struct deletion_state_t {
}
};
Option<bool> stateful_remove_docs(deletion_state_t* deletion_state, size_t batch_size, bool& done);
struct export_state_t {
Collection* collection;
std::vector<std::pair<size_t, uint32_t*>> index_ids;
std::vector<size_t> offsets;
std::set<std::string> include_fields;
std::set<std::string> exclude_fields;
std::string* res_body;
bool filtered_export = false;
rocksdb::Iterator* it = nullptr;
~export_state_t() {
for(auto& kv: index_ids) {
delete [] kv.second;
}
delete it;
}
};
Option<bool> stateful_remove_docs(deletion_state_t* deletion_state, size_t batch_size, bool& done);
Option<bool> stateful_export_docs(export_state_t* export_state, size_t batch_size, bool& done);

View File

@ -1977,6 +1977,23 @@ std::string Collection::get_default_sorting_field() {
return default_sorting_field;
}
Option<bool> Collection::get_document_from_store(const uint32_t& seq_id, nlohmann::json& document) const {
std::string json_doc_str;
StoreStatus json_doc_status = store->get(get_seq_id_key(seq_id), json_doc_str);
if(json_doc_status != StoreStatus::FOUND) {
return Option<bool>(500, "Could not locate the JSON document for sequence ID: " + std::to_string(seq_id));
}
try {
document = nlohmann::json::parse(json_doc_str);
} catch(...) {
return Option<bool>(500, "Error while parsing stored document with sequence ID: " + std::to_string(seq_id));
}
return Option<bool>(true);
}
Option<bool> Collection::get_document_from_store(const std::string &seq_id_key, nlohmann::json & document) const {
std::string json_doc_str;
StoreStatus json_doc_status = store->get(seq_id_key, json_doc_str);

View File

@ -391,30 +391,111 @@ bool get_export_documents(const std::shared_ptr<http_req>& req, const std::share
return false;
}
const char* FILTER_BY = "filter_by";
const char* INCLUDE_FIELDS = "include_fields";
const char* EXCLUDE_FIELDS = "exclude_fields";
export_state_t* export_state = nullptr;
const std::string seq_id_prefix = collection->get_seq_id_collection_prefix();
rocksdb::Iterator* it = nullptr;
if(req->data == nullptr) {
it = collectionManager.get_store()->get_iterator();
it->Seek(seq_id_prefix);
req->data = it;
export_state = new export_state_t();
std::string simple_filter_query;
if(req->params.count(FILTER_BY) != 0) {
simple_filter_query = req->params[FILTER_BY];
}
if(req->params.count(INCLUDE_FIELDS) != 0) {
std::vector<std::string> include_fields_vec;
StringUtils::split(req->params[INCLUDE_FIELDS], include_fields_vec, ",");
export_state->include_fields = std::set<std::string>(include_fields_vec.begin(), include_fields_vec.end());
}
if(req->params.count(EXCLUDE_FIELDS) != 0) {
std::vector<std::string> exclude_fields_vec;
StringUtils::split(req->params[EXCLUDE_FIELDS], exclude_fields_vec, ",");
export_state->exclude_fields = std::set<std::string>(exclude_fields_vec.begin(), exclude_fields_vec.end());
}
if(simple_filter_query.empty()) {
export_state->it = collectionManager.get_store()->get_iterator();
export_state->it->Seek(seq_id_prefix);
} else {
auto filter_ids_op = collection->get_filter_ids(simple_filter_query, export_state->index_ids);
if(!filter_ids_op.ok()) {
res->set(filter_ids_op.code(), filter_ids_op.error());
req->last_chunk_aggregate = true;
res->final = true;
stream_response(req, res);
delete export_state;
return false;
}
for(size_t i=0; i<export_state->index_ids.size(); i++) {
export_state->offsets.push_back(0);
}
export_state->res_body = &res->body;
export_state->collection = collection.get();
}
} else {
it = static_cast<rocksdb::Iterator*>(req->data);
export_state = static_cast<export_state_t*>(req->data);
}
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res->body = it->value().ToString();
it->Next();
req->data = export_state;
if(export_state->it != nullptr) {
rocksdb::Iterator* it = export_state->it;
// append a new line character if there is going to be one more record to send
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res->body += "\n";
if(export_state->include_fields.empty() && export_state->exclude_fields.empty()) {
res->body = it->value().ToString();
} else {
nlohmann::json doc = nlohmann::json::parse(it->value().ToString());
nlohmann::json filtered_doc;
for(const auto& kv: doc.items()) {
bool must_include = export_state->include_fields.empty() ||
(export_state->include_fields.count(kv.key()) != 0);
bool must_exclude = !export_state->exclude_fields.empty() &&
(export_state->exclude_fields.count(kv.key()) != 0);
if(must_include && !must_exclude) {
filtered_doc[kv.key()] = kv.value();
}
}
res->body = filtered_doc.dump();
}
it->Next();
// append a new line character if there is going to be one more record to send
if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) {
res->body += "\n";
req->last_chunk_aggregate = false;
res->final = false;
} else {
req->last_chunk_aggregate = true;
res->final = true;
delete export_state;
req->data = nullptr;
}
}
} else {
bool done;
stateful_export_docs(export_state, 100, done);
if(!done) {
req->last_chunk_aggregate = false;
res->final = false;
} else {
req->last_chunk_aggregate = true;
res->final = true;
delete it;
delete export_state;
req->data = nullptr;
}
}
@ -757,6 +838,7 @@ bool del_remove_documents(const std::shared_ptr<http_req>& req, const std::share
req->last_chunk_aggregate = true;
res->final = true;
stream_response(req, res);
delete deletion_state;
return false;
}

View File

@ -42,4 +42,63 @@ Option<bool> stateful_remove_docs(deletion_state_t* deletion_state, size_t batch
}
return Option<bool>(removed);
}
Option<bool> stateful_export_docs(export_state_t* export_state, size_t batch_size, bool& done) {
size_t batch_count = 0;
for(size_t i = 0; i < export_state->index_ids.size(); i++) {
std::pair<size_t, uint32_t*>& size_ids = export_state->index_ids[i];
size_t ids_len = size_ids.first;
uint32_t* ids = size_ids.second;
size_t start_index = export_state->offsets[i];
size_t batched_len = std::min(ids_len, (start_index+batch_size));
for(size_t j = start_index; j < batched_len; j++) {
auto seq_id = ids[j];
nlohmann::json doc;
Option<bool> get_op = export_state->collection->get_document_from_store(seq_id, doc);
if(get_op.ok()) {
if(export_state->include_fields.empty() && export_state->exclude_fields.empty()) {
export_state->res_body->append(doc.dump());
} else {
nlohmann::json filtered_doc;
for(const auto& kv: doc.items()) {
bool must_include = export_state->include_fields.empty() ||
(export_state->include_fields.count(kv.key()) != 0);
bool must_exclude = !export_state->exclude_fields.empty() &&
(export_state->exclude_fields.count(kv.key()) != 0);
if(must_include && !must_exclude) {
filtered_doc[kv.key()] = kv.value();
}
}
export_state->res_body->append(filtered_doc.dump());
}
export_state->res_body->append("\n");
}
export_state->offsets[i]++;
batch_count++;
if(batch_count == batch_size) {
goto END;
}
}
}
END:
done = true;
for(size_t i=0; i<export_state->index_ids.size(); i++) {
size_t current_offset = export_state->offsets[i];
done = done && (current_offset == export_state->index_ids[i].first);
}
return Option<bool>(true);
}