diff --git a/include/core_api_utils.h b/include/core_api_utils.h index eca4d6af..4e057035 100644 --- a/include/core_api_utils.h +++ b/include/core_api_utils.h @@ -24,6 +24,7 @@ struct export_state_t: public req_state_t { std::vector offsets; std::set include_fields; std::set exclude_fields; + size_t export_batch_size = 100; std::string* res_body; bool filtered_export = false; diff --git a/src/core_api.cpp b/src/core_api.cpp index 6d815582..acb351ce 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -588,6 +588,7 @@ bool get_export_documents(const std::shared_ptr& req, const std::share const char* FILTER_BY = "filter_by"; const char* INCLUDE_FIELDS = "include_fields"; const char* EXCLUDE_FIELDS = "exclude_fields"; + const char* BATCH_SIZE = "batch_size"; export_state_t* export_state = nullptr; @@ -617,6 +618,10 @@ bool get_export_documents(const std::shared_ptr& req, const std::share export_state->exclude_fields = std::set(exclude_fields_vec.begin(), exclude_fields_vec.end()); } + if(req->params.count(BATCH_SIZE) != 0 && StringUtils::is_uint32_t(req->params[BATCH_SIZE])) { + export_state->export_batch_size = std::stoul(req->params[BATCH_SIZE]); + } + if(simple_filter_query.empty()) { export_state->iter_upper_bound_key = collection->get_seq_id_collection_prefix() + "`"; // cannot inline this export_state->iter_upper_bound = new rocksdb::Slice(export_state->iter_upper_bound_key); @@ -644,10 +649,12 @@ bool get_export_documents(const std::shared_ptr& req, const std::share if(export_state->it != nullptr) { rocksdb::Iterator* it = export_state->it; + size_t batch_counter = 0; + res->body.clear(); - if(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) { + while(it->Valid() && it->key().ToString().compare(0, seq_id_prefix.size(), seq_id_prefix) == 0) { if(export_state->include_fields.empty() && export_state->exclude_fields.empty()) { - res->body = it->value().ToString(); + res->body += it->value().ToString(); } else { nlohmann::json doc = nlohmann::json::parse(it->value().ToString()); nlohmann::json filtered_doc; @@ -663,7 +670,7 @@ bool get_export_documents(const std::shared_ptr& req, const std::share } } - res->body = filtered_doc.dump(); + res->body += filtered_doc.dump(); } it->Next(); @@ -677,10 +684,15 @@ bool get_export_documents(const std::shared_ptr& req, const std::share req->last_chunk_aggregate = true; res->final = true; } + + batch_counter++; + if(batch_counter == export_state->export_batch_size) { + break; + } } } else { bool done; - stateful_export_docs(export_state, 100, done); + stateful_export_docs(export_state, export_state->export_batch_size, done); if(!done) { req->last_chunk_aggregate = false;