diff --git a/include/collection.h b/include/collection.h index ce27fe7e..fe4a72e8 100644 --- a/include/collection.h +++ b/include/collection.h @@ -472,6 +472,8 @@ public: Option get_document_from_store(const std::string & seq_id_key, nlohmann::json & document) const; + Option get_document_from_store(const uint32_t& seq_id, nlohmann::json & document) const; + Option index_in_memory(nlohmann::json & document, uint32_t seq_id, const index_operation_t op, const DIRTY_VALUES& dirty_values); diff --git a/include/core_api_utils.h b/include/core_api_utils.h index b61142bc..f4db135f 100644 --- a/include/core_api_utils.h +++ b/include/core_api_utils.h @@ -17,4 +17,26 @@ struct deletion_state_t { } }; -Option stateful_remove_docs(deletion_state_t* deletion_state, size_t batch_size, bool& done); \ No newline at end of file +struct export_state_t { + Collection* collection; + std::vector> index_ids; + std::vector offsets; + std::set include_fields; + std::set exclude_fields; + std::string* res_body; + + bool filtered_export = false; + + rocksdb::Iterator* it = nullptr; + + ~export_state_t() { + for(auto& kv: index_ids) { + delete [] kv.second; + } + + delete it; + } +}; + +Option stateful_remove_docs(deletion_state_t* deletion_state, size_t batch_size, bool& done); +Option stateful_export_docs(export_state_t* export_state, size_t batch_size, bool& done); \ No newline at end of file diff --git a/src/collection.cpp b/src/collection.cpp index d122dd6c..79f5901f 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -1977,6 +1977,23 @@ std::string Collection::get_default_sorting_field() { return default_sorting_field; } +Option Collection::get_document_from_store(const uint32_t& seq_id, nlohmann::json& document) const { + std::string json_doc_str; + StoreStatus json_doc_status = store->get(get_seq_id_key(seq_id), json_doc_str); + + if(json_doc_status != StoreStatus::FOUND) { + return Option(500, "Could not locate the JSON document for sequence ID: " + std::to_string(seq_id)); + } + + try { + document = nlohmann::json::parse(json_doc_str); + } catch(...) { + return Option(500, "Error while parsing stored document with sequence ID: " + std::to_string(seq_id)); + } + + return Option(true); +} + Option Collection::get_document_from_store(const std::string &seq_id_key, nlohmann::json & document) const { std::string json_doc_str; StoreStatus json_doc_status = store->get(seq_id_key, json_doc_str); diff --git a/src/core_api.cpp b/src/core_api.cpp index 40968838..c135ff35 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -391,30 +391,111 @@ bool get_export_documents(const std::shared_ptr& req, const std::share return false; } + const char* FILTER_BY = "filter_by"; + const char* INCLUDE_FIELDS = "include_fields"; + const char* EXCLUDE_FIELDS = "exclude_fields"; + + export_state_t* export_state = nullptr; + const std::string seq_id_prefix = collection->get_seq_id_collection_prefix(); - rocksdb::Iterator* it = nullptr; if(req->data == nullptr) { - it = collectionManager.get_store()->get_iterator(); - it->Seek(seq_id_prefix); - req->data = it; + export_state = new export_state_t(); + + std::string simple_filter_query; + + if(req->params.count(FILTER_BY) != 0) { + simple_filter_query = req->params[FILTER_BY]; + } + + if(req->params.count(INCLUDE_FIELDS) != 0) { + std::vector include_fields_vec; + StringUtils::split(req->params[INCLUDE_FIELDS], include_fields_vec, ","); + export_state->include_fields = std::set(include_fields_vec.begin(), include_fields_vec.end()); + } + + if(req->params.count(EXCLUDE_FIELDS) != 0) { + std::vector exclude_fields_vec; + StringUtils::split(req->params[EXCLUDE_FIELDS], exclude_fields_vec, ","); + export_state->exclude_fields = std::set(exclude_fields_vec.begin(), exclude_fields_vec.end()); + } + + if(simple_filter_query.empty()) { + export_state->it = collectionManager.get_store()->get_iterator(); + export_state->it->Seek(seq_id_prefix); + } else { + auto filter_ids_op = collection->get_filter_ids(simple_filter_query, export_state->index_ids); + + if(!filter_ids_op.ok()) { + res->set(filter_ids_op.code(), filter_ids_op.error()); + req->last_chunk_aggregate = true; + res->final = true; + stream_response(req, res); + delete export_state; + return false; + } + + for(size_t i=0; iindex_ids.size(); i++) { + export_state->offsets.push_back(0); + } + export_state->res_body = &res->body; + export_state->collection = collection.get(); + } } else { - it = static_cast(req->data); + export_state = static_cast(req->data); } - if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) { - res->body = it->value().ToString(); - it->Next(); + req->data = export_state; + + if(export_state->it != nullptr) { + rocksdb::Iterator* it = export_state->it; - // append a new line character if there is going to be one more record to send if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) { - res->body += "\n"; + if(export_state->include_fields.empty() && export_state->exclude_fields.empty()) { + res->body = it->value().ToString(); + } else { + nlohmann::json doc = nlohmann::json::parse(it->value().ToString()); + nlohmann::json filtered_doc; + for(const auto& kv: doc.items()) { + bool must_include = export_state->include_fields.empty() || + (export_state->include_fields.count(kv.key()) != 0); + + bool must_exclude = !export_state->exclude_fields.empty() && + (export_state->exclude_fields.count(kv.key()) != 0); + + if(must_include && !must_exclude) { + filtered_doc[kv.key()] = kv.value(); + } + } + + res->body = filtered_doc.dump(); + } + + it->Next(); + + // append a new line character if there is going to be one more record to send + if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) { + res->body += "\n"; + req->last_chunk_aggregate = false; + res->final = false; + } else { + req->last_chunk_aggregate = true; + res->final = true; + delete export_state; + req->data = nullptr; + } + } + } else { + bool done; + stateful_export_docs(export_state, 100, done); + + if(!done) { req->last_chunk_aggregate = false; res->final = false; } else { req->last_chunk_aggregate = true; res->final = true; - delete it; + delete export_state; req->data = nullptr; } } @@ -757,6 +838,7 @@ bool del_remove_documents(const std::shared_ptr& req, const std::share req->last_chunk_aggregate = true; res->final = true; stream_response(req, res); + delete deletion_state; return false; } diff --git a/src/core_api_utils.cpp b/src/core_api_utils.cpp index b7bdcc2e..24255f20 100644 --- a/src/core_api_utils.cpp +++ b/src/core_api_utils.cpp @@ -42,4 +42,63 @@ Option stateful_remove_docs(deletion_state_t* deletion_state, size_t batch } return Option(removed); +} + +Option stateful_export_docs(export_state_t* export_state, size_t batch_size, bool& done) { + size_t batch_count = 0; + + for(size_t i = 0; i < export_state->index_ids.size(); i++) { + std::pair& size_ids = export_state->index_ids[i]; + size_t ids_len = size_ids.first; + uint32_t* ids = size_ids.second; + + size_t start_index = export_state->offsets[i]; + size_t batched_len = std::min(ids_len, (start_index+batch_size)); + + for(size_t j = start_index; j < batched_len; j++) { + auto seq_id = ids[j]; + nlohmann::json doc; + Option get_op = export_state->collection->get_document_from_store(seq_id, doc); + + if(get_op.ok()) { + if(export_state->include_fields.empty() && export_state->exclude_fields.empty()) { + export_state->res_body->append(doc.dump()); + } else { + nlohmann::json filtered_doc; + for(const auto& kv: doc.items()) { + bool must_include = export_state->include_fields.empty() || + (export_state->include_fields.count(kv.key()) != 0); + + bool must_exclude = !export_state->exclude_fields.empty() && + (export_state->exclude_fields.count(kv.key()) != 0); + + if(must_include && !must_exclude) { + filtered_doc[kv.key()] = kv.value(); + } + } + + export_state->res_body->append(filtered_doc.dump()); + } + + export_state->res_body->append("\n"); + } + + export_state->offsets[i]++; + batch_count++; + + if(batch_count == batch_size) { + goto END; + } + } + } + + END: + + done = true; + for(size_t i=0; iindex_ids.size(); i++) { + size_t current_offset = export_state->offsets[i]; + done = done && (current_offset == export_state->index_ids[i].first); + } + + return Option(true); } \ No newline at end of file