diff --git a/include/collection.h b/include/collection.h index 22b12851..ce27fe7e 100644 --- a/include/collection.h +++ b/include/collection.h @@ -473,7 +473,7 @@ public: Option get_document_from_store(const std::string & seq_id_key, nlohmann::json & document) const; Option index_in_memory(nlohmann::json & document, uint32_t seq_id, - bool is_update, const DIRTY_VALUES& dirty_values); + const index_operation_t op, const DIRTY_VALUES& dirty_values); static void prune_document(nlohmann::json &document, const spp::sparse_hash_set & include_fields, const spp::sparse_hash_set & exclude_fields); diff --git a/include/index.h b/include/index.h index fcd1057c..2b3f0409 100644 --- a/include/index.h +++ b/include/index.h @@ -255,8 +255,8 @@ private: static void compute_facet_stats(facet &a_facet, uint64_t raw_value, const std::string & field_type); - static void get_doc_changes(const nlohmann::json &document, nlohmann::json &old_doc, - nlohmann::json &new_doc, nlohmann::json &del_doc); + static void get_doc_changes(const index_operation_t op, const nlohmann::json &update_doc, + const nlohmann::json &old_doc, nlohmann::json &new_doc, nlohmann::json &del_doc); static Option coerce_string(const DIRTY_VALUES& dirty_values, const std::string& fallback_field_type, const field& a_field, nlohmann::json &document, @@ -330,7 +330,7 @@ public: const std::vector &query_suggestion, uint32_t *exact_strt_ids, size_t& exact_strt_size) const; - void scrub_reindex_doc(nlohmann::json& update_doc, nlohmann::json& del_doc, nlohmann::json& old_doc); + void scrub_reindex_doc(nlohmann::json& update_doc, nlohmann::json& del_doc, const nlohmann::json& old_doc); static void tokenize_string_field(const nlohmann::json& document, const field& search_field, std::vector& tokens, @@ -365,7 +365,7 @@ public: Option index_in_memory(const nlohmann::json & document, uint32_t seq_id, const std::string & default_sorting_field, - const bool is_update); + const index_operation_t op); static size_t batch_memory_index(Index *index, std::vector & iter_batch, @@ -400,7 +400,7 @@ public: const std::string & default_sorting_field, const std::unordered_map & search_schema, const std::map & facet_schema, - bool is_update, + const index_operation_t op, const std::string& fallback_field_type, const DIRTY_VALUES& dirty_values); diff --git a/include/store.h b/include/store.h index 43dc1a1f..3d293a3a 100644 --- a/include/store.h +++ b/include/store.h @@ -58,6 +58,8 @@ private: mutable std::shared_mutex mutex; rocksdb::Status init_db() { + LOG(INFO) << "Initializing DB by opening state dir: " << state_dir_path; + rocksdb::Status s = rocksdb::DB::Open(options, state_dir_path, &db); if(!s.ok()) { LOG(ERROR) << "Error while initializing store: " << s.ToString(); diff --git a/src/collection.cpp b/src/collection.cpp index 23b4bcb3..0556537b 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -298,8 +298,9 @@ void Collection::batch_index(std::vector> &index_batch if(!write_ok) { // we will attempt to reindex the old doc on a best-effort basis + LOG(ERROR) << "Update to disk failed. Will restore old document"; remove_document(index_record.new_doc, index_record.seq_id, false); - index_in_memory(index_record.old_doc, index_record.seq_id, false, index_record.dirty_values); + index_in_memory(index_record.old_doc, index_record.seq_id, index_record.operation, index_record.dirty_values); index_record.index_failure(500, "Could not write to on-disk storage."); } else { num_indexed++; @@ -318,6 +319,7 @@ void Collection::batch_index(std::vector> &index_batch if(!write_ok) { // remove from in-memory store to keep the state synced + LOG(ERROR) << "Write to disk failed. Will restore old document"; remove_document(index_record.doc, index_record.seq_id, false); index_record.index_failure(500, "Could not write to on-disk storage."); } else { @@ -345,11 +347,11 @@ void Collection::batch_index(std::vector> &index_batch } Option Collection::index_in_memory(nlohmann::json &document, uint32_t seq_id, - bool is_update, const DIRTY_VALUES& dirty_values) { + const index_operation_t op, const DIRTY_VALUES& dirty_values) { std::unique_lock lock(mutex); Option validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field, - search_schema, facet_schema, is_update, + search_schema, facet_schema, op, fallback_field_type, dirty_values); if(!validation_op.ok()) { @@ -357,7 +359,7 @@ Option Collection::index_in_memory(nlohmann::json &document, uint32_t } Index* index = indices[seq_id % num_memory_shards]; - index->index_in_memory(document, seq_id, default_sorting_field, is_update); + index->index_in_memory(document, seq_id, default_sorting_field, op); num_documents += 1; return Option<>(200); diff --git a/src/index.cpp b/src/index.cpp index 8f5e2bf6..3955fadf 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -115,7 +115,7 @@ int64_t Index::float_to_in64_t(float f) { Option Index::index_in_memory(const nlohmann::json &document, uint32_t seq_id, const std::string & default_sorting_field, - const bool is_update) { + const index_operation_t op) { std::unique_lock lock(mutex); @@ -131,7 +131,7 @@ Option Index::index_in_memory(const nlohmann::json &document, uint32_t points = get_points_from_doc(document, default_sorting_field); } - if(!is_update) { + if(op != UPDATE && op != UPSERT) { // for updates, the seq_id will already exist seq_ids.append(seq_id); } @@ -283,13 +283,13 @@ Option Index::validate_index_in_memory(nlohmann::json& document, uint3 const std::string & default_sorting_field, const std::unordered_map & search_schema, const std::map & facet_schema, - bool is_update, + const index_operation_t op, const std::string& fallback_field_type, const DIRTY_VALUES& dirty_values) { bool missing_default_sort_field = (!default_sorting_field.empty() && document.count(default_sorting_field) == 0); - if(!is_update && missing_default_sort_field) { + if(op != UPDATE && missing_default_sort_field) { return Option<>(400, "Field `" + default_sorting_field + "` has been declared as a default sorting field, " "but is not found in the document."); } @@ -298,7 +298,7 @@ Option Index::validate_index_in_memory(nlohmann::json& document, uint3 const std::string& field_name = field_pair.first; const field& a_field = field_pair.second; - if((a_field.optional || is_update) && document.count(field_name) == 0) { + if((a_field.optional || op == UPDATE) && document.count(field_name) == 0) { continue; } @@ -408,37 +408,6 @@ Option Index::validate_index_in_memory(nlohmann::json& document, uint3 return Option<>(200); } -void Index::scrub_reindex_doc(nlohmann::json& update_doc, nlohmann::json& del_doc, nlohmann::json& old_doc) { - std::vector del_keys; - - for(auto it = del_doc.cbegin(); it != del_doc.cend(); it++) { - const std::string& field_name = it.key(); - - std::shared_lock lock(mutex); - - const auto& search_field_it = search_schema.find(field_name); - if(search_field_it == search_schema.end()) { - continue; - } - - const auto search_field = search_field_it->second; // copy, don't use reference! - - lock.unlock(); - - // compare values between old and update docs: - // if they match, we will remove them from both del and update docs - - if(update_doc[search_field.name] == old_doc[search_field.name]) { - del_keys.push_back(field_name); - } - } - - for(const auto& del_key: del_keys) { - del_doc.erase(del_key); - update_doc.erase(del_key); - } -} - size_t Index::batch_memory_index(Index *index, std::vector & iter_batch, const std::string & default_sorting_field, const std::unordered_map & search_schema, @@ -457,7 +426,7 @@ size_t Index::batch_memory_index(Index *index, std::vector & iter_ Option validation_op = validate_index_in_memory(index_rec.doc, index_rec.seq_id, default_sorting_field, search_schema, facet_schema, - index_rec.is_update, + index_rec.operation, fallback_field_type, index_rec.dirty_values); @@ -468,7 +437,8 @@ size_t Index::batch_memory_index(Index *index, std::vector & iter_ if(index_rec.is_update) { // scrub string fields to reduce delete ops - get_doc_changes(index_rec.doc, index_rec.old_doc, index_rec.new_doc, index_rec.del_doc); + get_doc_changes(index_rec.operation, index_rec.doc, index_rec.old_doc, index_rec.new_doc, + index_rec.del_doc); index->scrub_reindex_doc(index_rec.doc, index_rec.del_doc, index_rec.old_doc); index->remove(index_rec.seq_id, index_rec.del_doc, index_rec.is_update); } @@ -476,7 +446,7 @@ size_t Index::batch_memory_index(Index *index, std::vector & iter_ Option index_mem_op(0); try { - index_mem_op = index->index_in_memory(index_rec.doc, index_rec.seq_id, default_sorting_field, index_rec.is_update); + index_mem_op = index->index_in_memory(index_rec.doc, index_rec.seq_id, default_sorting_field, index_rec.operation); } catch(const std::exception& e) { const std::string& error_msg = std::string("Fatal error during indexing: ") + e.what(); LOG(ERROR) << error_msg << ", document: " << index_rec.doc; @@ -484,7 +454,7 @@ size_t Index::batch_memory_index(Index *index, std::vector & iter_ } if(!index_mem_op.ok()) { - index->index_in_memory(index_rec.del_doc, index_rec.seq_id, default_sorting_field, true); + index->index_in_memory(index_rec.del_doc, index_rec.seq_id, default_sorting_field, index_rec.operation); index_rec.index_failure(index_mem_op.code(), index_mem_op.error()); continue; } @@ -1255,7 +1225,7 @@ uint32_t Index::do_filtering(uint32_t** filter_ids_out, const std::vectorat(seq_id); @@ -2909,17 +2879,22 @@ Option Index::coerce_float(const DIRTY_VALUES& dirty_values, const fie return Option(200); } -void Index::get_doc_changes(const nlohmann::json &document, nlohmann::json &old_doc, nlohmann::json &new_doc, - nlohmann::json &del_doc) { +void Index::get_doc_changes(const index_operation_t op, const nlohmann::json& update_doc, + const nlohmann::json& old_doc, nlohmann::json& new_doc, nlohmann::json& del_doc) { + for(auto it = old_doc.begin(); it != old_doc.end(); ++it) { - new_doc[it.key()] = it.value(); + if(op == UPSERT && !update_doc.contains(it.key())) { + del_doc[it.key()] = it.value(); + } else { + new_doc[it.key()] = it.value(); + } } - for(auto it = document.begin(); it != document.end(); ++it) { + for(auto it = update_doc.begin(); it != update_doc.end(); ++it) { // adds new key or overrides existing key from `old_doc` new_doc[it.key()] = it.value(); - // if the update document contains a field that exists in old, we record that (for delete + reindex) + // if the update update_doc contains a field that exists in old, we record that (for delete + reindex) bool field_exists_in_old_doc = (old_doc.count(it.key()) != 0); if(field_exists_in_old_doc) { // key exists in the stored doc, so it must be reindexed @@ -2929,6 +2904,37 @@ void Index::get_doc_changes(const nlohmann::json &document, nlohmann::json &old_ } } +void Index::scrub_reindex_doc(nlohmann::json& update_doc, nlohmann::json& del_doc, const nlohmann::json& old_doc) { + std::vector del_keys; + + for(auto it = del_doc.cbegin(); it != del_doc.cend(); it++) { + const std::string& field_name = it.key(); + + std::shared_lock lock(mutex); + + const auto& search_field_it = search_schema.find(field_name); + if(search_field_it == search_schema.end()) { + continue; + } + + const auto search_field = search_field_it->second; // copy, don't use reference! + + lock.unlock(); + + // compare values between old and update docs: + // if they match, we will remove them from both del and update docs + + if(update_doc.contains(search_field.name) && update_doc[search_field.name] == old_doc[search_field.name]) { + del_keys.push_back(field_name); + } + } + + for(const auto& del_key: del_keys) { + del_doc.erase(del_key); + update_doc.erase(del_key); + } +} + // https://stackoverflow.com/questions/924171/geo-fencing-point-inside-outside-polygon // NOTE: polygon and point should have been transformed with `transform_for_180th_meridian` bool Index::is_point_in_polygon(const Geofence& poly, const GeoCoord &point) { diff --git a/test/collection_test.cpp b/test/collection_test.cpp index ddd9480a..239ba56e 100644 --- a/test/collection_test.cpp +++ b/test/collection_test.cpp @@ -1055,10 +1055,10 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) { ASSERT_EQ(1, import_response["num_imported"].get()); // update + upsert records - std::vector more_records = {R"({"id": "0", "title": "The Fifth Harry", "starring": "Will Ferrell"})", - R"({"id": "2", "cast": ["Chris Fisher", "Rand Alan"]})", + std::vector more_records = {R"({"id": "0", "title": "The Fifth Harry", "starring": "Will Ferrell", "points":62, "cast":["Adam McKay","Steve Carell","Paul Rudd"]})", + R"({"id": "2", "cast": ["Chris Fisher", "Rand Alan"], "points":81, "starring":"Daniel Day-Lewis","title":"There Will Be Blood"})", R"({"id": "18", "title": "Back Again Forest", "points": 45, "starring": "Ronald Wells", "cast": ["Dant Saren"]})", - R"({"id": "6", "points": 77})"}; + R"({"id": "6", "points": 77, "cast":["Chris Evans","Scarlett Johansson"], "starring":"Samuel L. Jackson","title":"Captain America: The Winter Soldier"})"}; import_response = coll_mul_fields->add_many(more_records, document, UPSERT); @@ -1104,9 +1104,9 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) { ASSERT_EQ(77, results["hits"][0]["document"]["points"].get()); // upserting with some bad docs - more_records = {R"({"id": "1", "title": "Wake up, Harry"})", + more_records = {R"({"id": "1", "title": "Wake up, Harry", "cast":["Josh Lawson","Chris Parnell"],"points":63,"starring":"Will Ferrell"})", R"({"id": "90", "cast": ["Kim Werrel", "Random Wake"]})", // missing fields - R"({"id": "5", "points": 60})", + R"({"id": "5", "points": 60, "cast":["Logan Lerman","Alexandra Daddario"],"starring":"Ron Perlman","starring_facet":"Ron Perlman","title":"Percy Jackson: Sea of Monsters"})", R"({"id": "24", "starring": "John", "cast": ["John Kim"], "points": 11})"}; // missing fields import_response = coll_mul_fields->add_many(more_records, document, UPSERT); @@ -1220,6 +1220,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsertOptional) { for(size_t i=0; i()); ASSERT_EQ(1000, import_response["num_imported"].get()); + + // update records (can contain partial fields) + + records.clear(); + + for(size_t i=0; iadd_many(records, document, UPDATE); + ASSERT_TRUE(import_response["success"].get()); + ASSERT_EQ(1000, import_response["num_imported"].get()); } TEST_F(CollectionTest, ImportDocuments) { @@ -2099,7 +2122,7 @@ TEST_F(CollectionTest, UpdateDocument) { Collection *coll1; std::vector fields = {field("title", field_types::STRING, true), - field("tags", field_types::STRING_ARRAY, true), + field("tags", field_types::STRING_ARRAY, true, true), field("points", field_types::INT32, false)}; std::vector sort_fields = {sort_by("points", "DESC")}; @@ -2145,6 +2168,27 @@ TEST_F(CollectionTest, UpdateDocument) { ASSERT_STREQ("LAZY", res["facet_counts"][0]["counts"][1]["value"].get().c_str()); ASSERT_EQ(1, (int) res["facet_counts"][0]["counts"][1]["count"]); + // upsert only part of the document -- document should be REPLACED + nlohmann::json partial_doc = doc; + partial_doc.erase("tags"); + add_op = coll1->add(partial_doc.dump(), UPSERT); + ASSERT_TRUE(add_op.ok()); + + res = coll1->search("lazy", {"title"}, "", {}, sort_fields, {0}, 10, 1, + token_ordering::FREQUENCY, {true}, 10, spp::sparse_hash_set(), + spp::sparse_hash_set(), 10, "", 5, 5, "title").get(); + + ASSERT_EQ(1, res["hits"].size()); + ASSERT_FALSE(res["hits"][0].contains("tags")); + + // upserting without a mandatory field should be an error + partial_doc = doc; + partial_doc.erase("title"); + LOG(INFO) << partial_doc.dump(); + add_op = coll1->add(partial_doc.dump(), UPSERT); + ASSERT_FALSE(add_op.ok()); + ASSERT_EQ("Field `title` has been declared in the schema, but is not found in the document.", add_op.error()); + // try changing the title and searching for an older token doc["title"] = "The quick brown fox."; add_op = coll1->add(doc.dump(), UPSERT); @@ -2211,7 +2255,7 @@ TEST_F(CollectionTest, UpdateDocument) { nlohmann::json doc4; doc4["points"] = 105; - add_op = coll1->add(doc4.dump(), UPSERT, "100"); + add_op = coll1->add(doc4.dump(), UPDATE, "100"); ASSERT_TRUE(add_op.ok()); res = coll1->search("*", {"tags"}, "points: > 101", {"tags"}, sort_fields, {0}, 10, 1, @@ -2223,8 +2267,9 @@ TEST_F(CollectionTest, UpdateDocument) { // try to change a field with bad value and verify that old document is put back doc4["points"] = "abc"; - add_op = coll1->add(doc4.dump(), UPSERT, "100"); + add_op = coll1->add(doc4.dump(), UPDATE, "100"); ASSERT_FALSE(add_op.ok()); + ASSERT_EQ("Field `points` must be an int32.", add_op.error()); res = coll1->search("*", {"tags"}, "points: > 101", {"tags"}, sort_fields, {0}, 10, 1, token_ordering::FREQUENCY, {true}, 10, spp::sparse_hash_set(),