Bulk import API.

This commit is contained in:
Kishore Nallan 2019-05-10 21:35:49 +05:30
parent e6cdcb6a1c
commit ade8e1946f
5 changed files with 73 additions and 5 deletions

View File

@ -115,6 +115,8 @@ public:
Option<nlohmann::json> add(const std::string & json_str);
Option<nlohmann::json> add_many(const std::string & json_str);
Option<nlohmann::json> search(std::string query, const std::vector<std::string> search_fields,
const std::string & simple_filter_query, const std::vector<std::string> & facet_fields,
const std::vector<sort_by> & sort_fields, const int num_typos,

View File

@ -18,10 +18,12 @@ void get_search(http_req & req, http_res & res);
void get_collection_summary(http_req & req, http_res & res);
void get_collection_export(http_req & req, http_res & res);
void get_export_documents(http_req & req, http_res & res);
void post_add_document(http_req & req, http_res & res);
void post_import_documents(http_req & req, http_res & res);
void get_fetch_document(http_req & req, http_res & res);
void del_remove_document(http_req & req, http_res & res);

View File

@ -131,6 +131,49 @@ Option<nlohmann::json> Collection::add(const std::string & json_str) {
return Option<nlohmann::json>(document);
}
Option<nlohmann::json> Collection::add_many(const std::string & json_lines_str) {
std::vector<std::string> json_lines;
StringUtils::split(json_lines_str, json_lines, "\n");
if(json_lines.size() == 0) {
return Option<nlohmann::json>(400, "The request body was empty. So, no records were imported.");
}
std::vector<Option<bool>> errors;
size_t record_num = 1;
size_t record_imported = 0;
for(const std::string & json_line: json_lines) {
Option<nlohmann::json> op = add(json_line);
if(!op.ok()) {
std::string err_msg = std::string("Error importing record in line number ") +
std::to_string(record_num) + ": " + op.error() + " On record number: ";
Option<bool> err = Option<bool>(op.code(), err_msg);
errors.push_back(err);
} else {
record_imported++;
}
record_num++;
}
nlohmann::json resp;
resp["ok"] = (errors.size() == 0);
resp["num_imported"] = record_imported;
if(errors.size() != 0) {
resp["errors"] = nlohmann::json::array();
for(const Option<bool> & err: errors) {
nlohmann::json err_obj;
err_obj["message"] = err.error();
resp["errors"].push_back(err_obj);
}
}
return Option<nlohmann::json>(resp);
}
Option<uint32_t> Collection::index_in_memory(const nlohmann::json &document, uint32_t seq_id) {
Option<uint32_t> validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema);

View File

@ -362,7 +362,7 @@ void collection_export_handler(http_req* req, http_res* res, void* data) {
}
}
void get_collection_export(http_req & req, http_res & res) {
void get_export_documents(http_req & req, http_res & res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
@ -399,6 +399,24 @@ void post_add_document(http_req & req, http_res & res) {
}
}
void post_import_documents(http_req & req, http_res & res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
if(collection == nullptr) {
return res.send_404();
}
Option<nlohmann::json> result = collection->add_many(req.body);
if(!result.ok()) {
res.send(result.code(), result.error());
return ;
}
res.send_200(result.get().dump());
}
void get_fetch_document(http_req & req, http_res & res) {
std::string doc_id = req.params["id"];
@ -484,4 +502,4 @@ void on_send_response(void *data) {
request_response* req_res = static_cast<request_response*>(data);
req_res->response->server->send_response(req_res->req, req_res->response);
delete req_res;
}
}

View File

@ -12,7 +12,10 @@ void master_server_routes() {
// document management - `/documents/:id` end-points must be placed last in the list
server->post("/collections/:collection/documents", post_add_document);
server->get("/collections/:collection/documents/search", get_search);
server->get("/collections/:collection/documents/export", get_collection_export, true);
server->post("/collections/:collection/documents/import", post_import_documents);
server->get("/collections/:collection/documents/export", get_export_documents, true);
server->get("/collections/:collection/documents/:id", get_fetch_document);
server->del("/collections/:collection/documents/:id", del_remove_document);
@ -31,7 +34,7 @@ void replica_server_routes() {
// document management - `/documents/:id` end-points must be placed last in the list
server->get("/collections/:collection/documents/search", get_search);
server->get("/collections/:collection/documents/export", get_collection_export, true);
server->get("/collections/:collection/documents/export", get_export_documents, true);
server->get("/collections/:collection/documents/:id", get_fetch_document);
// meta