Merge pull request #61 from typesense/bulk-import

Import multiple documents in one go
This commit is contained in:
Kishore Nallan 2019-05-11 19:12:53 +05:30 committed by GitHub
commit 567a1c1275
6 changed files with 155 additions and 5 deletions

View File

@ -115,6 +115,8 @@ public:
Option<nlohmann::json> add(const std::string & json_str);
Option<nlohmann::json> add_many(const std::string & json_str);
Option<nlohmann::json> search(std::string query, const std::vector<std::string> search_fields,
const std::string & simple_filter_query, const std::vector<std::string> & facet_fields,
const std::vector<sort_by> & sort_fields, const int num_typos,

View File

@ -18,10 +18,12 @@ void get_search(http_req & req, http_res & res);
void get_collection_summary(http_req & req, http_res & res);
void get_collection_export(http_req & req, http_res & res);
void get_export_documents(http_req & req, http_res & res);
void post_add_document(http_req & req, http_res & res);
void post_import_documents(http_req & req, http_res & res);
void get_fetch_document(http_req & req, http_res & res);
void del_remove_document(http_req & req, http_res & res);

View File

@ -131,6 +131,49 @@ Option<nlohmann::json> Collection::add(const std::string & json_str) {
return Option<nlohmann::json>(document);
}
Option<nlohmann::json> Collection::add_many(const std::string & json_lines_str) {
std::vector<std::string> json_lines;
StringUtils::split(json_lines_str, json_lines, "\n");
if(json_lines.size() == 0) {
return Option<nlohmann::json>(400, "The request body was empty. So, no records were imported.");
}
std::vector<Option<bool>> errors;
size_t record_num = 1;
size_t record_imported = 0;
for(const std::string & json_line: json_lines) {
Option<nlohmann::json> op = add(json_line);
if(!op.ok()) {
std::string err_msg = std::string("Error importing record in line number ") +
std::to_string(record_num) + ": " + op.error();
Option<bool> err = Option<bool>(op.code(), err_msg);
errors.push_back(err);
} else {
record_imported++;
}
record_num++;
}
nlohmann::json resp;
resp["ok"] = (errors.size() == 0);
resp["num_imported"] = record_imported;
if(errors.size() != 0) {
resp["errors"] = nlohmann::json::array();
for(const Option<bool> & err: errors) {
nlohmann::json err_obj;
err_obj["message"] = err.error();
resp["errors"].push_back(err_obj);
}
}
return Option<nlohmann::json>(resp);
}
Option<uint32_t> Collection::index_in_memory(const nlohmann::json &document, uint32_t seq_id) {
Option<uint32_t> validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema);

View File

@ -362,7 +362,7 @@ void collection_export_handler(http_req* req, http_res* res, void* data) {
}
}
void get_collection_export(http_req & req, http_res & res) {
void get_export_documents(http_req & req, http_res & res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
@ -399,6 +399,24 @@ void post_add_document(http_req & req, http_res & res) {
}
}
void post_import_documents(http_req & req, http_res & res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
Collection* collection = collectionManager.get_collection(req.params["collection"]);
if(collection == nullptr) {
return res.send_404();
}
Option<nlohmann::json> result = collection->add_many(req.body);
if(!result.ok()) {
res.send(result.code(), result.error());
return ;
}
res.send_200(result.get().dump());
}
void get_fetch_document(http_req & req, http_res & res) {
std::string doc_id = req.params["id"];
@ -484,4 +502,4 @@ void on_send_response(void *data) {
request_response* req_res = static_cast<request_response*>(data);
req_res->response->server->send_response(req_res->req, req_res->response);
delete req_res;
}
}

View File

@ -12,7 +12,10 @@ void master_server_routes() {
// document management - `/documents/:id` end-points must be placed last in the list
server->post("/collections/:collection/documents", post_add_document);
server->get("/collections/:collection/documents/search", get_search);
server->get("/collections/:collection/documents/export", get_collection_export, true);
server->post("/collections/:collection/documents/import", post_import_documents);
server->get("/collections/:collection/documents/export", get_export_documents, true);
server->get("/collections/:collection/documents/:id", get_fetch_document);
server->del("/collections/:collection/documents/:id", del_remove_document);
@ -31,7 +34,7 @@ void replica_server_routes() {
// document management - `/documents/:id` end-points must be placed last in the list
server->get("/collections/:collection/documents/search", get_search);
server->get("/collections/:collection/documents/export", get_collection_export, true);
server->get("/collections/:collection/documents/export", get_export_documents, true);
server->get("/collections/:collection/documents/:id", get_fetch_document);
// meta

View File

@ -1107,6 +1107,88 @@ TEST_F(CollectionTest, FilterOnFloatFields) {
collectionManager.drop_collection("coll_array_fields");
}
TEST_F(CollectionTest, ImportDocuments) {
Collection *coll_mul_fields;
std::ifstream infile(std::string(ROOT_DIR)+"test/multi_field_documents.jsonl");
std::stringstream strstream;
strstream << infile.rdbuf();
std::string import_records = strstream.str();
infile.close();
std::vector<field> fields = {
field("title", field_types::STRING, false),
field("starring", field_types::STRING, false),
field("cast", field_types::STRING_ARRAY, false),
field("points", field_types::INT32, false)
};
coll_mul_fields = collectionManager.get_collection("coll_mul_fields");
if(coll_mul_fields == nullptr) {
coll_mul_fields = collectionManager.create_collection("coll_mul_fields", fields, "points").get();
}
// try importing records
Option<nlohmann::json> import_res = coll_mul_fields->add_many(import_records);
ASSERT_TRUE(import_res.ok());
nlohmann::json import_response = import_res.get();
ASSERT_TRUE(import_response["ok"].get<bool>());
ASSERT_EQ(18, import_response["num_imported"].get<int>());
ASSERT_EQ(0, import_response.count("errors"));
// now try searching for records
query_fields = {"title", "starring"};
std::vector<std::string> facets;
auto x = coll_mul_fields->search("Will", query_fields, "", facets, sort_fields, 0, 10, 1, FREQUENCY, false);
nlohmann::json results = coll_mul_fields->search("Will", query_fields, "", facets, sort_fields, 0, 10, 1, FREQUENCY, false).get();
ASSERT_EQ(4, results["hits"].size());
std::vector<std::string> ids = {"3", "2", "1", "0"};
for(size_t i = 0; i < results["hits"].size(); i++) {
nlohmann::json result = results["hits"].at(i);
std::string result_id = result["document"]["id"];
std::string id = ids.at(i);
ASSERT_STREQ(id.c_str(), result_id.c_str());
}
// verify that empty import is caught gracefully
import_res = coll_mul_fields->add_many("");
ASSERT_FALSE(import_res.ok());
ASSERT_STREQ("The request body was empty. So, no records were imported.", import_res.error().c_str());
// verify that only bad records are rejected, rest must be imported (records 2 and 4 are bad)
std::string more_records = std::string("{\"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, "
"\"cast\": [\"Tom Skerritt\"] }\n") +
"{\"title\": 123, \"starring\": \"Jazz Gosh\", \"points\": 23, "
"\"cast\": [\"Tom Skerritt\"] }\n" +
"{\"title\": \"Test3\", \"starring\": \"Brad Fin\", \"points\": 11, "
"\"cast\": [\"Tom Skerritt\"] }\n" +
"{\"title\": \"Test4\", \"points\": 55, "
"\"cast\": [\"Tom Skerritt\"] }\n";
import_res = coll_mul_fields->add_many(more_records);
ASSERT_TRUE(import_res.ok());
import_response = import_res.get();
ASSERT_FALSE(import_response["ok"].get<bool>());
ASSERT_EQ(2, import_response["num_imported"].get<int>());
ASSERT_EQ(1, import_response.count("errors"));
ASSERT_EQ(2, import_response["errors"].size());
ASSERT_STREQ("Error importing record in line number 2: Field `title` must be a string.",
import_response["errors"][0]["message"].get<std::string>().c_str());
ASSERT_STREQ("Error importing record in line number 4: Field `starring` has been declared in the schema, but is not "
"found in the document.", import_response["errors"][1]["message"].get<std::string>().c_str());
collectionManager.drop_collection("coll_mul_fields");
}
TEST_F(CollectionTest, SortOnFloatFields) {
Collection *coll_float_fields;