From ade8e1946f4087006e1043af5f5643f5e41aa75d Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Fri, 10 May 2019 21:35:49 +0530 Subject: [PATCH 1/2] Bulk import API. --- include/collection.h | 2 ++ include/core_api.h | 4 +++- src/collection.cpp | 43 +++++++++++++++++++++++++++++++++++ src/core_api.cpp | 22 ++++++++++++++++-- src/main/typesense_server.cpp | 7 ++++-- 5 files changed, 73 insertions(+), 5 deletions(-) diff --git a/include/collection.h b/include/collection.h index d08bbea0..ecc1451c 100644 --- a/include/collection.h +++ b/include/collection.h @@ -115,6 +115,8 @@ public: Option add(const std::string & json_str); + Option add_many(const std::string & json_str); + Option search(std::string query, const std::vector search_fields, const std::string & simple_filter_query, const std::vector & facet_fields, const std::vector & sort_fields, const int num_typos, diff --git a/include/core_api.h b/include/core_api.h index 48489519..0a0e2eb7 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -18,10 +18,12 @@ void get_search(http_req & req, http_res & res); void get_collection_summary(http_req & req, http_res & res); -void get_collection_export(http_req & req, http_res & res); +void get_export_documents(http_req & req, http_res & res); void post_add_document(http_req & req, http_res & res); +void post_import_documents(http_req & req, http_res & res); + void get_fetch_document(http_req & req, http_res & res); void del_remove_document(http_req & req, http_res & res); diff --git a/src/collection.cpp b/src/collection.cpp index b487121b..33a872cb 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -131,6 +131,49 @@ Option Collection::add(const std::string & json_str) { return Option(document); } +Option Collection::add_many(const std::string & json_lines_str) { + std::vector json_lines; + StringUtils::split(json_lines_str, json_lines, "\n"); + + if(json_lines.size() == 0) { + return Option(400, "The request body was empty. So, no records were imported."); + } + + std::vector> errors; + size_t record_num = 1; + size_t record_imported = 0; + + for(const std::string & json_line: json_lines) { + Option op = add(json_line); + + if(!op.ok()) { + std::string err_msg = std::string("Error importing record in line number ") + + std::to_string(record_num) + ": " + op.error() + " On record number: "; + Option err = Option(op.code(), err_msg); + errors.push_back(err); + } else { + record_imported++; + } + + record_num++; + } + + nlohmann::json resp; + resp["ok"] = (errors.size() == 0); + resp["num_imported"] = record_imported; + + if(errors.size() != 0) { + resp["errors"] = nlohmann::json::array(); + for(const Option & err: errors) { + nlohmann::json err_obj; + err_obj["message"] = err.error(); + resp["errors"].push_back(err_obj); + } + } + + return Option(resp); +} + Option Collection::index_in_memory(const nlohmann::json &document, uint32_t seq_id) { Option validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field, search_schema, facet_schema); diff --git a/src/core_api.cpp b/src/core_api.cpp index 3cf89aae..2e61b29f 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -362,7 +362,7 @@ void collection_export_handler(http_req* req, http_res* res, void* data) { } } -void get_collection_export(http_req & req, http_res & res) { +void get_export_documents(http_req & req, http_res & res) { CollectionManager & collectionManager = CollectionManager::get_instance(); Collection* collection = collectionManager.get_collection(req.params["collection"]); @@ -399,6 +399,24 @@ void post_add_document(http_req & req, http_res & res) { } } +void post_import_documents(http_req & req, http_res & res) { + CollectionManager & collectionManager = CollectionManager::get_instance(); + Collection* collection = collectionManager.get_collection(req.params["collection"]); + + if(collection == nullptr) { + return res.send_404(); + } + + Option result = collection->add_many(req.body); + + if(!result.ok()) { + res.send(result.code(), result.error()); + return ; + } + + res.send_200(result.get().dump()); +} + void get_fetch_document(http_req & req, http_res & res) { std::string doc_id = req.params["id"]; @@ -484,4 +502,4 @@ void on_send_response(void *data) { request_response* req_res = static_cast(data); req_res->response->server->send_response(req_res->req, req_res->response); delete req_res; -} \ No newline at end of file +} diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index ed60b662..a2887475 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -12,7 +12,10 @@ void master_server_routes() { // document management - `/documents/:id` end-points must be placed last in the list server->post("/collections/:collection/documents", post_add_document); server->get("/collections/:collection/documents/search", get_search); - server->get("/collections/:collection/documents/export", get_collection_export, true); + + server->post("/collections/:collection/documents/import", post_import_documents); + server->get("/collections/:collection/documents/export", get_export_documents, true); + server->get("/collections/:collection/documents/:id", get_fetch_document); server->del("/collections/:collection/documents/:id", del_remove_document); @@ -31,7 +34,7 @@ void replica_server_routes() { // document management - `/documents/:id` end-points must be placed last in the list server->get("/collections/:collection/documents/search", get_search); - server->get("/collections/:collection/documents/export", get_collection_export, true); + server->get("/collections/:collection/documents/export", get_export_documents, true); server->get("/collections/:collection/documents/:id", get_fetch_document); // meta From 8f167dd7df5bd834c4b3c765c7a16e2bd1612cf9 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 11 May 2019 17:07:23 +0530 Subject: [PATCH 2/2] Test for import. --- src/collection.cpp | 2 +- test/collection_test.cpp | 82 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/src/collection.cpp b/src/collection.cpp index 33a872cb..124223d2 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -148,7 +148,7 @@ Option Collection::add_many(const std::string & json_lines_str) if(!op.ok()) { std::string err_msg = std::string("Error importing record in line number ") + - std::to_string(record_num) + ": " + op.error() + " On record number: "; + std::to_string(record_num) + ": " + op.error(); Option err = Option(op.code(), err_msg); errors.push_back(err); } else { diff --git a/test/collection_test.cpp b/test/collection_test.cpp index 1eb6abd9..ae255659 100644 --- a/test/collection_test.cpp +++ b/test/collection_test.cpp @@ -1107,6 +1107,88 @@ TEST_F(CollectionTest, FilterOnFloatFields) { collectionManager.drop_collection("coll_array_fields"); } +TEST_F(CollectionTest, ImportDocuments) { + Collection *coll_mul_fields; + + std::ifstream infile(std::string(ROOT_DIR)+"test/multi_field_documents.jsonl"); + std::stringstream strstream; + strstream << infile.rdbuf(); + std::string import_records = strstream.str(); + infile.close(); + + std::vector fields = { + field("title", field_types::STRING, false), + field("starring", field_types::STRING, false), + field("cast", field_types::STRING_ARRAY, false), + field("points", field_types::INT32, false) + }; + + coll_mul_fields = collectionManager.get_collection("coll_mul_fields"); + if(coll_mul_fields == nullptr) { + coll_mul_fields = collectionManager.create_collection("coll_mul_fields", fields, "points").get(); + } + + // try importing records + + Option import_res = coll_mul_fields->add_many(import_records); + ASSERT_TRUE(import_res.ok()); + nlohmann::json import_response = import_res.get(); + ASSERT_TRUE(import_response["ok"].get()); + ASSERT_EQ(18, import_response["num_imported"].get()); + ASSERT_EQ(0, import_response.count("errors")); + + // now try searching for records + + query_fields = {"title", "starring"}; + std::vector facets; + + auto x = coll_mul_fields->search("Will", query_fields, "", facets, sort_fields, 0, 10, 1, FREQUENCY, false); + + nlohmann::json results = coll_mul_fields->search("Will", query_fields, "", facets, sort_fields, 0, 10, 1, FREQUENCY, false).get(); + ASSERT_EQ(4, results["hits"].size()); + + std::vector ids = {"3", "2", "1", "0"}; + + for(size_t i = 0; i < results["hits"].size(); i++) { + nlohmann::json result = results["hits"].at(i); + std::string result_id = result["document"]["id"]; + std::string id = ids.at(i); + ASSERT_STREQ(id.c_str(), result_id.c_str()); + } + + // verify that empty import is caught gracefully + import_res = coll_mul_fields->add_many(""); + ASSERT_FALSE(import_res.ok()); + ASSERT_STREQ("The request body was empty. So, no records were imported.", import_res.error().c_str()); + + // verify that only bad records are rejected, rest must be imported (records 2 and 4 are bad) + std::string more_records = std::string("{\"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, " + "\"cast\": [\"Tom Skerritt\"] }\n") + + "{\"title\": 123, \"starring\": \"Jazz Gosh\", \"points\": 23, " + "\"cast\": [\"Tom Skerritt\"] }\n" + + "{\"title\": \"Test3\", \"starring\": \"Brad Fin\", \"points\": 11, " + "\"cast\": [\"Tom Skerritt\"] }\n" + + "{\"title\": \"Test4\", \"points\": 55, " + "\"cast\": [\"Tom Skerritt\"] }\n"; + + import_res = coll_mul_fields->add_many(more_records); + ASSERT_TRUE(import_res.ok()); + + import_response = import_res.get(); + ASSERT_FALSE(import_response["ok"].get()); + ASSERT_EQ(2, import_response["num_imported"].get()); + ASSERT_EQ(1, import_response.count("errors")); + ASSERT_EQ(2, import_response["errors"].size()); + + ASSERT_STREQ("Error importing record in line number 2: Field `title` must be a string.", + import_response["errors"][0]["message"].get().c_str()); + + ASSERT_STREQ("Error importing record in line number 4: Field `starring` has been declared in the schema, but is not " + "found in the document.", import_response["errors"][1]["message"].get().c_str()); + + collectionManager.drop_collection("coll_mul_fields"); +} + TEST_F(CollectionTest, SortOnFloatFields) { Collection *coll_float_fields;