From ba205222b7be90de3a8273eeed3a579004c3fb24 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 25 May 2019 22:28:13 +0530 Subject: [PATCH] Refactor bulk indexing process. --- include/collection.h | 7 ++- include/collection_manager.h | 3 +- include/index.h | 53 +++++++++++++++- src/collection.cpp | 116 ++++++++++++++++++++++++----------- src/collection_manager.cpp | 34 ++++++---- src/index.cpp | 35 ++++++----- test/collection_test.cpp | 42 ++++++++++--- 7 files changed, 214 insertions(+), 76 deletions(-) diff --git a/include/collection.h b/include/collection.h index ecc1451c..29b26e07 100644 --- a/include/collection.h +++ b/include/collection.h @@ -81,6 +81,8 @@ public: ~Collection(); + long long int micros = 0; + static std::string get_next_seq_id_key(const std::string & collection_name); static std::string get_meta_key(const std::string & collection_name); @@ -113,6 +115,8 @@ public: std::string get_default_sorting_field(); + Option to_doc(const std::string & json_str, nlohmann::json & document); + Option add(const std::string & json_str); Option add_many(const std::string & json_str); @@ -136,7 +140,8 @@ public: Option index_in_memory(const nlohmann::json & document, uint32_t seq_id); - Option par_index_in_memory(const std::vector>> & iter_batch); + void par_index_in_memory(std::vector> & iter_batch, + batch_index_result & result); static void prune_document(nlohmann::json &document, const spp::sparse_hash_set include_fields, const spp::sparse_hash_set exclude_fields); diff --git a/include/collection_manager.h b/include/collection_manager.h index 0fffd5ad..c3a692a6 100644 --- a/include/collection_manager.h +++ b/include/collection_manager.h @@ -49,7 +49,8 @@ public: Option init(Store *store, const size_t default_num_indices, const std::string & auth_key, - const std::string & search_only_auth_key); + const std::string & search_only_auth_key, + const size_t init_batch_size=1000); // frees in-memory data structures when server is shutdown - helps us run a memory leak detecter properly void dispose(); diff --git a/include/index.h b/include/index.h index 87c683e8..2999f703 100644 --- a/include/index.h +++ b/include/index.h @@ -53,6 +53,55 @@ struct search_args { } }; +struct index_record { + size_t record_pos; // position of record in the original request + + uint32_t seq_id; + std::string json_str; + nlohmann::json document; + + index_record(size_t record_pos, uint32_t seq_id, const std::string & json_str, const nlohmann::json & doc): + record_pos(record_pos), seq_id(seq_id), json_str(json_str), document(doc) { + + } +}; + +struct index_result { + index_record record; + Option index_op; // indicates if the indexing operation was a success + + index_result(const index_record & record, const Option & index_op): + record(record), index_op(index_op) { + + } + + bool operator<(const index_result & a) const { + return record.record_pos < a.record.record_pos; + } +}; + +struct batch_index_result { + std::vector items; + size_t num_indexed = 0; + + batch_index_result() { + + } + + void failure(const index_record & record, const uint32_t err_code, const std::string & err_msg) { + Option index_op_failure(err_code, err_msg); + index_result res(record, index_op_failure); + items.push_back(res); + } + + void success(const index_record & record) { + Option index_op_success(true); + index_result res(record, index_op_success); + items.push_back(res); + num_indexed++; + } +}; + class Index { private: std::string name; @@ -168,8 +217,8 @@ public: const std::unordered_map & search_schema, const std::unordered_map & facet_schema); - static Option batch_index(Index *index, - const std::vector> & iter_batch, + static batch_index_result batch_memory_index(Index *index, + std::vector & iter_batch, const std::string & default_sorting_field, const std::unordered_map & search_schema, const std::unordered_map & facet_schema); diff --git a/src/collection.cpp b/src/collection.cpp index 124223d2..074f52fd 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -87,13 +87,12 @@ void Collection::increment_next_seq_id_field() { next_seq_id++; } -Option Collection::add(const std::string & json_str) { - nlohmann::json document; +Option Collection::to_doc(const std::string & json_str, nlohmann::json & document) { try { document = nlohmann::json::parse(json_str); } catch(const std::exception& e) { LOG(ERR) << "JSON error: " << e.what(); - return Option(400, "Bad JSON."); + return Option(400, "Bad JSON."); } uint32_t seq_id = get_next_seq_id(); @@ -102,7 +101,7 @@ Option Collection::add(const std::string & json_str) { if(document.count("id") == 0) { document["id"] = seq_id_str; } else if(!document["id"].is_string()) { - return Option(400, "Document's `id` field should be a string."); + return Option(400, "Document's `id` field should be a string."); } std::string doc_id = document["id"]; @@ -110,9 +109,23 @@ Option Collection::add(const std::string & json_str) { // we need to check if document ID already exists before attempting to index if(doc_option.ok()) { - return Option(409, std::string("A document with id ") + doc_id + " already exists."); + return Option(409, std::string("A document with id ") + doc_id + " already exists."); } + return Option(seq_id); +} + +Option Collection::add(const std::string & json_str) { + nlohmann::json document; + Option doc_seq_id_op = to_doc(json_str, document); + + if(!doc_seq_id_op.ok()) { + return Option(doc_seq_id_op.code(), doc_seq_id_op.error()); + } + + const uint32_t seq_id = doc_seq_id_op.get(); + const std::string seq_id_str = std::to_string(seq_id); + const Option & index_memory_op = index_in_memory(document, seq_id); if(!index_memory_op.ok()) { @@ -139,36 +152,68 @@ Option Collection::add_many(const std::string & json_lines_str) return Option(400, "The request body was empty. So, no records were imported."); } - std::vector> errors; - size_t record_num = 1; - size_t record_imported = 0; + std::vector> iter_batch; + batch_index_result result; - for(const std::string & json_line: json_lines) { - Option op = add(json_line); + for(size_t i = 0; i < num_indices; i++) { + iter_batch.push_back(std::vector()); + } - if(!op.ok()) { - std::string err_msg = std::string("Error importing record in line number ") + - std::to_string(record_num) + ": " + op.error(); - Option err = Option(op.code(), err_msg); - errors.push_back(err); - } else { - record_imported++; + for(size_t i=0; i < json_lines.size(); i++) { + const std::string & json_line = json_lines[i]; + + nlohmann::json document; + Option doc_seq_id_op = to_doc(json_line, document); + + if(!doc_seq_id_op.ok()) { + index_record record(i, 0, "", document); + result.failure(record, doc_seq_id_op.code(), doc_seq_id_op.error()); + continue; } - record_num++; + const uint32_t seq_id = doc_seq_id_op.get(); + index_record record(i, seq_id, json_line, document); + iter_batch[seq_id % this->get_num_indices()].push_back(record); + } + + par_index_in_memory(iter_batch, result); + + std::sort(result.items.begin(), result.items.end()); + + // store documents only documents that were indexed in-memory successfully + for(index_result & item: result.items) { + if(item.index_op.ok()) { + rocksdb::WriteBatch batch; + const std::string seq_id_str = std::to_string(item.record.seq_id); + + batch.Put(get_doc_id_key(item.record.document["id"]), seq_id_str); + batch.Put(get_seq_id_key(item.record.seq_id), item.record.document.dump()); + bool write_ok = store->batch_write(batch); + + if(!write_ok) { + Option index_op_failure(500, "Could not write to on-disk storage."); + item.index_op = index_op_failure; + } + } } nlohmann::json resp; - resp["ok"] = (errors.size() == 0); - resp["num_imported"] = record_imported; + resp["success"] = (result.num_indexed == json_lines.size()); + resp["num_imported"] = result.num_indexed; - if(errors.size() != 0) { - resp["errors"] = nlohmann::json::array(); - for(const Option & err: errors) { - nlohmann::json err_obj; - err_obj["message"] = err.error(); - resp["errors"].push_back(err_obj); + resp["items"] = nlohmann::json::array(); + + for(const index_result & item: result.items) { + nlohmann::json item_obj; + + if(!item.index_op.ok()) { + item_obj["error"] = item.index_op.error(); + item_obj["success"] = false; + } else { + item_obj["success"] = true; } + + resp["items"].push_back(item_obj); } return Option(resp); @@ -189,23 +234,24 @@ Option Collection::index_in_memory(const nlohmann::json &document, uin return Option<>(200); } -Option Collection::par_index_in_memory(const std::vector>> & iter_batch) { - std::vector>> futures; +void Collection::par_index_in_memory(std::vector> & iter_batch, + batch_index_result & result) { + + std::vector> futures; + for(size_t i=0; i < num_indices; i++) { futures.push_back( - std::async(&Index::batch_index, indices[i], iter_batch[i], default_sorting_field, + std::async(&Index::batch_memory_index, indices[i], std::ref(iter_batch[i]), default_sorting_field, search_schema, facet_schema) ); } for(size_t i=0; i < futures.size(); i++) { - Option res = futures[i].get(); - if(!res.ok()) { - return res; - } + batch_index_result future_res = futures[i].get(); + result.items.insert(result.items.end(), future_res.items.begin(), future_res.items.end()); + result.num_indexed += future_res.num_indexed; + num_documents += future_res.num_indexed; } - - return Option(201); } void Collection::prune_document(nlohmann::json &document, const spp::sparse_hash_set include_fields, diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 6af74123..d3e57e48 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -43,7 +43,8 @@ void CollectionManager::add_to_collections(Collection* collection) { Option CollectionManager::init(Store *store, const size_t default_num_indices, const std::string & auth_key, - const std::string & search_only_auth_key) { + const std::string & search_only_auth_key, + const size_t init_batch_size) { this->store = store; this->auth_key = auth_key; this->search_only_auth_key = search_only_auth_key; @@ -101,27 +102,36 @@ Option CollectionManager::init(Store *store, const std::string seq_id_prefix = collection->get_seq_id_collection_prefix(); rocksdb::Iterator* iter = store->scan(seq_id_prefix); - std::vector>> iter_batch; + std::vector> iter_batch; for(size_t i = 0; i < collection->get_num_indices(); i++) { - iter_batch.push_back(std::vector>()); + iter_batch.push_back(std::vector()); } while(iter->Valid() && iter->key().starts_with(seq_id_prefix)) { const uint32_t seq_id = Collection::get_seq_id_key(iter->key().ToString()); + + nlohmann::json document; + Option seq_id_doc_op = collection->to_doc(iter->value().ToString(), document); + + if(!seq_id_doc_op.ok()) { + return Option(500, "Error while parsing document."); // FIXME: populate error + } + iter_batch[seq_id % collection->get_num_indices()].push_back( - std::make_pair(Collection::get_seq_id_key(iter->key().ToString()), iter->value().ToString()) + index_record(0, seq_id, iter->value().ToString(), document) ); - if(iter_batch.size() == 1000) { - Option res = collection->par_index_in_memory(iter_batch); + if(iter_batch.size() == init_batch_size) { + batch_index_result res; + collection->par_index_in_memory(iter_batch, res); for(size_t i = 0; i < collection->get_num_indices(); i++) { iter_batch[i].clear(); } - if(!res.ok()) { + if(res.num_indexed != iter_batch.size()) { delete iter; - return Option(false, res.error()); + return Option(false, "Error while loading records."); // FIXME: populate actual record error } } @@ -130,9 +140,11 @@ Option CollectionManager::init(Store *store, delete iter; - Option res = collection->par_index_in_memory(iter_batch); - if(!res.ok()) { - return Option(false, res.error()); + batch_index_result res; + collection->par_index_in_memory(iter_batch, res); + + if(res.num_indexed != iter_batch.size()) { + return Option(false, "Error while loading records."); // FIXME: populate actual record error } add_to_collections(collection); diff --git a/src/index.cpp b/src/index.cpp index 6436c57a..c1345a4c 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -268,37 +268,38 @@ Option Index::validate_index_in_memory(const nlohmann::json &document, return Option<>(200); } -Option Index::batch_index(Index *index, const std::vector> & iter_batch, +batch_index_result Index::batch_memory_index(Index *index, std::vector & iter_batch, const std::string & default_sorting_field, const std::unordered_map & search_schema, const std::unordered_map & facet_schema) { - for(auto & kv: iter_batch) { - uint32_t seq_id = kv.first; - nlohmann::json document; - try { - document = nlohmann::json::parse(kv.second); - } catch(...) { - return Option(500, std::string("Error while parsing stored document with sequence ID: " + - std::to_string(seq_id))); + batch_index_result result; + + for(auto & index_rec: iter_batch) { + if(index_rec.json_str.empty()) { + // indicates bad record (upstream validation failure) + continue; } - Option validation_op = validate_index_in_memory(document, seq_id, default_sorting_field, + Option validation_op = validate_index_in_memory(index_rec.document, index_rec.seq_id, + default_sorting_field, search_schema, facet_schema); if(!validation_op.ok()) { - std::string error_msg = std::string("Error validating document with ID: ") + - document["id"].get() + " - " + validation_op.error(); - return Option<>(validation_op.code(), error_msg); + result.failure(index_rec, validation_op.code(), validation_op.error()); + continue; } - Option res = index->index_in_memory(document, seq_id, default_sorting_field); - if(!res.ok()) { - return res; + Option index_mem_op = index->index_in_memory(index_rec.document, index_rec.seq_id, default_sorting_field); + if(!index_mem_op.ok()) { + result.failure(index_rec, index_mem_op.code(), index_mem_op.error()); + continue; } + + result.success(index_rec); } - return Option<>(201); + return result; } void Index::insert_doc(const uint32_t score, art_tree *t, uint32_t seq_id, diff --git a/test/collection_test.cpp b/test/collection_test.cpp index 55f1fb68..90d528f8 100644 --- a/test/collection_test.cpp +++ b/test/collection_test.cpp @@ -1133,7 +1133,7 @@ TEST_F(CollectionTest, ImportDocuments) { Option import_res = coll_mul_fields->add_many(import_records); ASSERT_TRUE(import_res.ok()); nlohmann::json import_response = import_res.get(); - ASSERT_TRUE(import_response["ok"].get()); + ASSERT_TRUE(import_response["success"].get()); ASSERT_EQ(18, import_response["num_imported"].get()); ASSERT_EQ(0, import_response.count("errors")); @@ -1162,7 +1162,7 @@ TEST_F(CollectionTest, ImportDocuments) { ASSERT_STREQ("The request body was empty. So, no records were imported.", import_res.error().c_str()); // verify that only bad records are rejected, rest must be imported (records 2 and 4 are bad) - std::string more_records = std::string("{\"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, " + std::string more_records = std::string("{\"id\": \"id1\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, " "\"cast\": [\"Tom Skerritt\"] }\n") + "{\"title\": 123, \"starring\": \"Jazz Gosh\", \"points\": 23, " "\"cast\": [\"Tom Skerritt\"] }\n" + @@ -1175,16 +1175,40 @@ TEST_F(CollectionTest, ImportDocuments) { ASSERT_TRUE(import_res.ok()); import_response = import_res.get(); - ASSERT_FALSE(import_response["ok"].get()); + ASSERT_FALSE(import_response["success"].get()); ASSERT_EQ(2, import_response["num_imported"].get()); - ASSERT_EQ(1, import_response.count("errors")); - ASSERT_EQ(2, import_response["errors"].size()); + ASSERT_EQ(4, import_response["items"].size()); - ASSERT_STREQ("Error importing record in line number 2: Field `title` must be a string.", - import_response["errors"][0]["message"].get().c_str()); + ASSERT_TRUE(import_response["items"][0]["success"].get()); + ASSERT_FALSE(import_response["items"][1]["success"].get()); + ASSERT_TRUE(import_response["items"][2]["success"].get()); + ASSERT_FALSE(import_response["items"][3]["success"].get()); - ASSERT_STREQ("Error importing record in line number 4: Field `starring` has been declared in the schema, but is not " - "found in the document.", import_response["errors"][1]["message"].get().c_str()); + ASSERT_STREQ("Field `title` must be a string.", import_response["items"][1]["error"].get().c_str()); + ASSERT_STREQ("Field `starring` has been declared in the schema, but is not found in the document.", + import_response["items"][3]["error"].get().c_str()); + + // record with duplicate IDs + + more_records = std::string("{\"id\": \"id1\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, " + "\"cast\": [\"Tom Skerritt\"] }\n") + + "{\"id\": \"id2\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, " + "\"cast\": [\"Tom Skerritt\"] }"; + + import_res = coll_mul_fields->add_many(more_records); + ASSERT_TRUE(import_res.ok()); + + import_response = import_res.get(); + + ASSERT_FALSE(import_response["success"].get()); + ASSERT_EQ(1, import_response["num_imported"].get()); + + std::cout << import_response << std::endl; + + ASSERT_FALSE(import_response["items"][0]["success"].get()); + ASSERT_TRUE(import_response["items"][1]["success"].get()); + + ASSERT_STREQ("A document with id id1 already exists.", import_response["items"][0]["error"].get().c_str()); collectionManager.drop_collection("coll_mul_fields"); }