From 0a308b70c0513a92ec5fb8afa62913f964640985 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 19 Apr 2022 20:50:37 +0530 Subject: [PATCH] Alter for dynamic fields. --- include/collection.h | 11 +- src/collection.cpp | 146 +++++++++++++++++-------- src/index.cpp | 11 +- test/collection_all_fields_test.cpp | 16 +-- test/collection_schema_change_test.cpp | 138 ++++++++++++++++++++++- 5 files changed, 257 insertions(+), 65 deletions(-) diff --git a/include/collection.h b/include/collection.h index d230bd01..457c3495 100644 --- a/include/collection.h +++ b/include/collection.h @@ -89,7 +89,7 @@ private: std::string fallback_field_type; - std::vector dynamic_fields; + std::unordered_map dynamic_fields; std::vector symbols_to_index; @@ -129,7 +129,12 @@ private: std::vector& excluded_ids, std::vector& filter_overrides, bool& filter_curated_hits) const; - Option check_and_update_schema(nlohmann::json& document, const DIRTY_VALUES& dirty_values); + static Option detect_new_fields(nlohmann::json& document, + const DIRTY_VALUES& dirty_values, + const std::unordered_map& schema, + const std::unordered_map& dyn_fields, + const std::string& fallback_field_type, + std::vector& new_fields); static bool facet_count_compare(const std::pair& a, const std::pair& b) { @@ -232,7 +237,7 @@ public: std::vector get_fields(); - std::vector get_dynamic_fields(); + std::unordered_map get_dynamic_fields(); std::unordered_map get_schema(); diff --git a/src/collection.cpp b/src/collection.cpp index 0c2167cd..ebc89de2 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -249,9 +249,30 @@ nlohmann::json Collection::add_many(std::vector& json_lines, nlohma // if `fallback_field_type` or `dynamic_fields` is enabled, update schema first before indexing if(!fallback_field_type.empty() || !dynamic_fields.empty()) { - Option schema_change_op = check_and_update_schema(record.doc, dirty_values); - if(!schema_change_op.ok()) { - record.index_failure(schema_change_op.code(), schema_change_op.error()); + std::vector new_fields; + std::unique_lock lock(mutex); + + Option new_fields_op = detect_new_fields(record.doc, dirty_values, + search_schema, dynamic_fields, + fallback_field_type, + new_fields); + if(!new_fields_op.ok()) { + record.index_failure(new_fields_op.code(), new_fields_op.error()); + } + + else if(!new_fields.empty()) { + for(auto& new_field: new_fields) { + search_schema.emplace(new_field.name, new_field); + fields.emplace_back(new_field); + } + + auto persist_op = persist_collection_meta(); + if(!persist_op.ok()) { + record.index_failure(persist_op.code(), persist_op.error()); + continue; + } + + index->refresh_schemas(new_fields, {}); } } } @@ -2350,7 +2371,7 @@ std::vector Collection::get_fields() { return fields; } -std::vector Collection::get_dynamic_fields() { +std::unordered_map Collection::get_dynamic_fields() { std::shared_lock lock(mutex); return dynamic_fields; } @@ -2523,21 +2544,20 @@ Option Collection::batch_alter_data(const std::unordered_maprefresh_schemas(new_fields, {}); @@ -2610,6 +2630,10 @@ Option Collection::batch_alter_data(const std::unordered_map Collection::validate_alter_payload(nlohmann::json& schema_changes, } const std::string& field_name = kv.value()["name"].get(); - const auto& field_it = search_schema.find(field_name); - auto found_field = (field_it != search_schema.end()); if(kv.value().contains("drop")) { delete_field_names.insert(field_name); } } + std::unordered_map new_dynamic_fields; + for(const auto& kv: schema_changes["fields"].items()) { const std::string& field_name = kv.value()["name"].get(); const auto& field_it = search_schema.find(field_name); auto found_field = (field_it != search_schema.end()); + auto dyn_field_it = dynamic_fields.find(field_name); + auto found_dyn_field = (dyn_field_it != dynamic_fields.end()); + if(kv.value().contains("drop")) { + if(!kv.value()["drop"].is_boolean() || !kv.value()["drop"].get()) { + return Option(400, "Field `" + field_name + "` must have a drop value of `true`."); + } + if(field_name == ".*") { del_fields.emplace_back(".*", field_types::AUTO, false); continue; } - if(!found_field) { + if(!found_field && !found_dyn_field) { return Option(400, "Field `" + field_name + "` is not part of collection schema."); } - if(!kv.value()["drop"].is_boolean() || !kv.value()["drop"].get()) { - return Option(400, "Field `" + field_name + "` must have a drop value of `true`."); + if(found_field) { + del_fields.push_back(field_it->second); + } else if(found_dyn_field) { + del_fields.push_back(dyn_field_it->second); + // we will also have to resolve the actual field names which match the dynamic field pattern + for(auto& field_kv: search_schema) { + if(std::regex_match(field_kv.first, std::regex(dyn_field_it->first))) { + del_fields.push_back(field_kv.second); + // if schema contains explicit fields that match dynamic field that're going to be removed, + // we will have to remove them from the schema so that validation can occur properly + updated_search_schema.erase(field_kv.first); + } + } } - - del_fields.push_back(field_it->second); - } else { // add or update existing field - auto is_addition = (!found_field); + auto is_addition = (!found_field && !found_dyn_field); auto is_reindex = (delete_field_names.count(field_name) != 0); if(is_addition && is_reindex) { @@ -2763,11 +2802,19 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, } const auto& f = diff_fields.back(); - updated_search_schema[f.name] = f; + + if(f.is_dynamic()) { + new_dynamic_fields.emplace(f.name, f); + } else { + updated_search_schema[f.name] = f; + } if(is_reindex) { // delete + reindex is fine, we will handle these fields separately - schema_reindex.emplace(f.name, f); + //if(!f.is_dynamic()) { + // expanded versions of dynamic fields will be discovered during data iteration below + schema_reindex.emplace(f.name, f); + //} } else { schema_additions.emplace(f.name, f); } @@ -2804,6 +2851,22 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, nlohmann::detail::error_handler_t::ignore)); } + if(!fallback_field_type.empty() || !new_dynamic_fields.empty()) { + std::vector new_fields; + Option new_fields_op = detect_new_fields(document, DIRTY_VALUES::DROP, + updated_search_schema, new_dynamic_fields, + fallback_field_type, + new_fields); + if(!new_fields_op.ok()) { + return new_fields_op; + } + + for(auto& new_field: new_fields) { + updated_search_schema.emplace(new_field.name, new_field); + schema_reindex.emplace(new_field.name, new_field); + } + } + // validate existing data on disk for compatibility via updated_search_schema auto validate_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field, updated_search_schema, @@ -2860,15 +2923,16 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, return Option(true); } -Option Collection::check_and_update_schema(nlohmann::json& document, const DIRTY_VALUES& dirty_values) { - std::unique_lock lock(mutex); - - std::vector new_fields; - +Option Collection::detect_new_fields(nlohmann::json& document, + const DIRTY_VALUES& dirty_values, + const std::unordered_map& schema, + const std::unordered_map& dyn_fields, + const std::string& fallback_field_type, + std::vector& new_fields) { auto kv = document.begin(); while(kv != document.end()) { // we will not index the special "id" key - if (search_schema.count(kv.key()) == 0 && kv.key() != "id") { + if (schema.count(kv.key()) == 0 && kv.key() != "id") { const std::string &fname = kv.key(); field new_field(fname, field_types::STRING, false, true); std::string field_type; @@ -2877,9 +2941,9 @@ Option Collection::check_and_update_schema(nlohmann::json& document, const bool found_dynamic_field = false; // check against dynamic field definitions - for(const auto& dynamic_field: dynamic_fields) { - if(std::regex_match (kv.key(), std::regex(dynamic_field.name))) { - new_field = dynamic_field; + for(const auto& dynamic_field: dyn_fields) { + if(std::regex_match (kv.key(), std::regex(dynamic_field.first))) { + new_field = dynamic_field.second; new_field.name = fname; found_dynamic_field = true; break; @@ -2963,20 +3027,6 @@ Option Collection::check_and_update_schema(nlohmann::json& document, const kv++; } - for(auto& new_field: new_fields) { - search_schema.emplace(new_field.name, new_field); - fields.emplace_back(new_field); - } - - if(!new_fields.empty()) { - auto persist_op = persist_collection_meta(); - if(!persist_op.ok()) { - return persist_op; - } - - index->refresh_schemas(new_fields, {}); - } - return Option(true); } @@ -2984,7 +3034,7 @@ Index* Collection::init_index() { for(const field& field: fields) { if(field.is_dynamic()) { // regexp fields and fields with auto type are treated as dynamic fields - dynamic_fields.push_back(field); + dynamic_fields.emplace(field.name, field); continue; } diff --git a/src/index.cpp b/src/index.cpp index c57d247d..f9887b26 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -4509,12 +4509,12 @@ void Index::refresh_schemas(const std::vector& new_fields, const std::vec std::unique_lock lock(mutex); for(const auto & new_field: new_fields) { - search_schema.emplace(new_field.name, new_field); - - if(!new_field.index) { + if(new_field.is_dynamic() || !new_field.index) { continue; } + search_schema.emplace(new_field.name, new_field); + if(new_field.is_sortable()) { if(new_field.is_num_sortable()) { spp::sparse_hash_map * doc_to_score = new spp::sparse_hash_map(); @@ -4569,6 +4569,11 @@ void Index::refresh_schemas(const std::vector& new_fields, const std::vec } for(const auto & del_field: del_fields) { + if(search_schema.count(del_field.name) == 0) { + // could be a dynamic field + continue; + } + search_schema.erase(del_field.name); if(del_field.is_string() || field_types::is_string_or_array(del_field.type)) { diff --git a/test/collection_all_fields_test.cpp b/test/collection_all_fields_test.cpp index 0136b451..ccfb9669 100644 --- a/test/collection_all_fields_test.cpp +++ b/test/collection_all_fields_test.cpp @@ -961,7 +961,7 @@ TEST_F(CollectionAllFieldsTest, DynamicFieldsMustOnlyBeOptional) { coll1 = op.get(); } - ASSERT_TRUE(coll1->get_dynamic_fields()[0].optional); + ASSERT_TRUE(coll1->get_dynamic_fields()[".*_name"].optional); collectionManager.drop_collection("coll1"); } @@ -1044,13 +1044,13 @@ TEST_F(CollectionAllFieldsTest, BothFallbackAndDynamicFields) { ASSERT_EQ(4, coll1->get_fields().size()); ASSERT_EQ(2, coll1->get_dynamic_fields().size()); - ASSERT_EQ(".*_name", coll1->get_dynamic_fields()[0].name); - ASSERT_TRUE(coll1->get_dynamic_fields()[0].optional); - ASSERT_FALSE(coll1->get_dynamic_fields()[0].facet); + ASSERT_TRUE(coll1->get_dynamic_fields().count(".*_name") != 0); + ASSERT_TRUE(coll1->get_dynamic_fields()[".*_name"].optional); + ASSERT_FALSE(coll1->get_dynamic_fields()[".*_name"].facet); - ASSERT_EQ(".*_year", coll1->get_dynamic_fields()[1].name); - ASSERT_TRUE(coll1->get_dynamic_fields()[0].optional); - ASSERT_FALSE(coll1->get_dynamic_fields()[0].facet); + ASSERT_TRUE(coll1->get_dynamic_fields().count(".*_year") != 0); + ASSERT_TRUE(coll1->get_dynamic_fields()[".*_year"].optional); + ASSERT_TRUE(coll1->get_dynamic_fields()[".*_year"].facet); nlohmann::json doc; doc["title"] = "Amazon Inc."; @@ -1485,6 +1485,8 @@ TEST_F(CollectionAllFieldsTest, SchemaUpdateShouldBeAtomicForAllFields) { auto add_op = coll1->add(doc.dump(), CREATE); ASSERT_FALSE(add_op.ok()); + auto f = coll1->get_fields(); + ASSERT_EQ(1, coll1->get_fields().size()); ASSERT_EQ(0, coll1->get_sort_fields().size()); ASSERT_EQ(0, coll1->_get_index()->_get_search_index().size()); diff --git a/test/collection_schema_change_test.cpp b/test/collection_schema_change_test.cpp index f0924c86..bd7806c2 100644 --- a/test/collection_schema_change_test.cpp +++ b/test/collection_schema_change_test.cpp @@ -547,10 +547,12 @@ TEST_F(CollectionSchemaChangeTest, AddAndDropFieldImmediately) { doc["id"] = "0"; doc["title"] = "The quick brown fox was too fast."; doc["points"] = 100; - doc["quantity"] = 1000; + doc["quantity_int"] = 1000; + doc["some_txt"] = "foo"; ASSERT_TRUE(coll1->add(doc.dump()).ok()); ASSERT_EQ(2, coll1->get_schema().size()); + ASSERT_EQ(0, coll1->get_dynamic_fields().size()); auto results = coll1->search("*", {}, "", {}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get(); @@ -559,24 +561,152 @@ TEST_F(CollectionSchemaChangeTest, AddAndDropFieldImmediately) { ASSERT_EQ(1, results["hits"].size()); ASSERT_STREQ("0", results["hits"][0]["document"]["id"].get().c_str()); - // add a field via alter which we will try dropping immediately + // add a field via alter which we will try dropping later auto schema_changes = R"({ "fields": [ - {"name": "quantity", "type": "int32", "optional": true} + {"name": ".*_int", "type": "int32", "optional": true} ] })"_json; auto alter_op = coll1->alter(schema_changes); ASSERT_TRUE(alter_op.ok()); ASSERT_EQ(3, coll1->get_schema().size()); + ASSERT_EQ(4, coll1->get_fields().size()); + ASSERT_EQ(1, coll1->get_dynamic_fields().size()); + results = coll1->search("*", + {}, "quantity_int: 1000", {}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get(); + ASSERT_EQ(1, results["found"].get()); + + // drop + re-add dynamic field schema_changes = R"({ "fields": [ - {"name": "quantity", "drop": true} + {"name": ".*_int", "type": "int32", "facet": true}, + {"name": ".*_int", "drop": true} ] })"_json; alter_op = coll1->alter(schema_changes); ASSERT_TRUE(alter_op.ok()); + + ASSERT_EQ(3, coll1->get_schema().size()); + ASSERT_EQ(4, coll1->get_fields().size()); + ASSERT_EQ(1, coll1->get_dynamic_fields().size()); + + results = coll1->search("*", + {}, "", {"quantity_int"}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get(); + ASSERT_EQ(1, results["found"].get()); + ASSERT_EQ(1, results["facet_counts"].size()); + ASSERT_EQ(1, results["facet_counts"][0]["counts"][0]["count"].get()); + ASSERT_EQ("quantity_int", results["facet_counts"][0]["field_name"].get()); + + schema_changes = R"({ + "fields": [ + {"name": ".*_int", "drop": true} + ] + })"_json; + + alter_op = coll1->alter(schema_changes); + ASSERT_TRUE(alter_op.ok()); + ASSERT_EQ(2, coll1->get_schema().size()); + ASSERT_EQ(2, coll1->get_fields().size()); + ASSERT_EQ(0, coll1->get_dynamic_fields().size()); + + // with bad on-disk data + schema_changes = R"({ + "fields": [ + {"name": ".*_txt", "type": "int32"} + ] + })"_json; + + alter_op = coll1->alter(schema_changes); + ASSERT_FALSE(alter_op.ok()); + ASSERT_EQ("Schema change is incompatible with the type of documents already stored in this collection. " + "Existing data for field `some_txt` cannot be coerced into an int32.", alter_op.error()); + + ASSERT_EQ(2, coll1->get_schema().size()); + ASSERT_EQ(2, coll1->get_fields().size()); + ASSERT_EQ(0, coll1->get_dynamic_fields().size()); +} + +TEST_F(CollectionSchemaChangeTest, AddDynamicFieldMatchingMultipleFields) { + std::vector fields = {field("title", field_types::STRING, false, false, true, "", 1, 1), + field("points", field_types::INT32, true),}; + + Collection* coll1 = collectionManager.create_collection("coll1", 1, fields, "points", 0, "").get(); + + nlohmann::json doc; + doc["id"] = "0"; + doc["title"] = "The quick brown fox was too fast."; + doc["points"] = 100; + doc["quantity_int"] = 1000; + doc["year_int"] = 2020; + + ASSERT_TRUE(coll1->add(doc.dump()).ok()); + ASSERT_EQ(2, coll1->get_schema().size()); + ASSERT_EQ(0, coll1->get_dynamic_fields().size()); + + // add a dynamic field via alter that will target both _int fields + auto schema_changes = R"({ + "fields": [ + {"name": ".*_int", "type": "int32", "optional": true} + ] + })"_json; + + auto alter_op = coll1->alter(schema_changes); + ASSERT_TRUE(alter_op.ok()); + ASSERT_EQ(4, coll1->get_schema().size()); + ASSERT_EQ(5, coll1->get_fields().size()); + ASSERT_EQ(1, coll1->get_dynamic_fields().size()); + + auto results = coll1->search("*", + {}, "quantity_int: 1000", {}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get(); + ASSERT_EQ(1, results["found"].get()); + + results = coll1->search("*", + {}, "year_int: 2020", {}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get(); + ASSERT_EQ(1, results["found"].get()); + + // drop + re-add dynamic field that targets 2 underlying fields + schema_changes = R"({ + "fields": [ + {"name": ".*_int", "type": "int32", "facet": true}, + {"name": ".*_int", "drop": true} + ] + })"_json; + + alter_op = coll1->alter(schema_changes); + ASSERT_TRUE(alter_op.ok()); + + ASSERT_EQ(4, coll1->get_schema().size()); + ASSERT_EQ(5, coll1->get_fields().size()); + ASSERT_EQ(1, coll1->get_dynamic_fields().size()); + + results = coll1->search("*", + {}, "", {"quantity_int"}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get(); + ASSERT_EQ(1, results["found"].get()); + ASSERT_EQ(1, results["facet_counts"].size()); + ASSERT_EQ(1, results["facet_counts"][0]["counts"][0]["count"].get()); + ASSERT_EQ("quantity_int", results["facet_counts"][0]["field_name"].get()); + + results = coll1->search("*", + {}, "", {"year_int"}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get(); + ASSERT_EQ(1, results["found"].get()); + ASSERT_EQ(1, results["facet_counts"].size()); + ASSERT_EQ(1, results["facet_counts"][0]["counts"][0]["count"].get()); + ASSERT_EQ("year_int", results["facet_counts"][0]["field_name"].get()); + + schema_changes = R"({ + "fields": [ + {"name": ".*_int", "drop": true} + ] + })"_json; + + alter_op = coll1->alter(schema_changes); + ASSERT_TRUE(alter_op.ok()); + + ASSERT_EQ(2, coll1->get_schema().size()); + ASSERT_EQ(2, coll1->get_fields().size()); + ASSERT_EQ(0, coll1->get_dynamic_fields().size()); }