From 5b462f5233b8743d163f9b0fa38b7c9d640ba6e8 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Thu, 14 Apr 2022 12:23:38 +0530 Subject: [PATCH] Drop/readd within same schema change set. --- include/collection.h | 6 ++ include/field.h | 2 + src/collection.cpp | 129 ++++++++++++++++++------- src/field.cpp | 6 ++ test/collection_schema_change_test.cpp | 90 +++++++++++++++++ 5 files changed, 198 insertions(+), 35 deletions(-) diff --git a/include/collection.h b/include/collection.h index 441bbdeb..7cace8c9 100644 --- a/include/collection.h +++ b/include/collection.h @@ -158,8 +158,14 @@ private: Option persist_collection_meta(); + Option batch_alter_data(const std::unordered_map& schema_additions, + const std::vector& del_fields, + const std::string& this_fallback_field_type, + const bool do_validation); + Option validate_alter_payload(nlohmann::json& schema_changes, std::unordered_map& schema_additions, + std::unordered_map& schema_reindex, std::vector& del_fields, std::string& fallback_field_type); diff --git a/include/field.h b/include/field.h index d948c644..fcd1cea0 100644 --- a/include/field.h +++ b/include/field.h @@ -52,6 +52,8 @@ struct field { bool sort; bool infix; + field() {} + field(const std::string &name, const std::string &type, const bool facet, const bool optional = false, bool index = true, std::string locale = "", int sort = -1, int infix = -1) : name(name), type(type), facet(facet), optional(optional), index(index), locale(locale) { diff --git a/src/collection.cpp b/src/collection.cpp index c9e63491..4726d32e 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -2494,27 +2494,10 @@ Option Collection::persist_collection_meta() { return Option(true); } -Option Collection::alter(nlohmann::json& alter_payload) { - std::unique_lock lock(mutex); - - // Validate that all stored documents are compatible with the proposed schema changes. - std::unordered_map schema_additions; - std::vector del_fields; - std::string this_fallback_field_type; - - auto validate_op = validate_alter_payload(alter_payload, schema_additions, del_fields, this_fallback_field_type); - if(!validate_op.ok()) { - return validate_op; - } - - if(!this_fallback_field_type.empty() && !fallback_field_type.empty()) { - return Option(400, "The schema already contains a `.*` field."); - } - - if(!this_fallback_field_type.empty() && fallback_field_type.empty()) { - fallback_field_type = this_fallback_field_type; - } - +Option Collection::batch_alter_data(const std::unordered_map& schema_additions, + const std::vector& del_fields, + const std::string& this_fallback_field_type, + const bool do_validation) { // Update schema with additions (deletions can only be made later) std::vector new_fields; @@ -2563,7 +2546,7 @@ Option Collection::alter(nlohmann::json& alter_payload) { nlohmann::detail::error_handler_t::ignore)); } - index_record record(num_found_docs, seq_id, document, index_operation_t::CREATE, DIRTY_VALUES::DROP); + index_record record(num_found_docs, seq_id, document, index_operation_t::CREATE, DIRTY_VALUES::REJECT); iter_batch.emplace_back(std::move(record)); // Peek and check for last record right here so that we handle batched indexing correctly @@ -2572,15 +2555,16 @@ Option Collection::alter(nlohmann::json& alter_payload) { bool last_record = !(iter->Valid() && iter->key().starts_with(seq_id_prefix)); if(num_found_docs % index_batch_size == 0 || last_record) { - Index::batch_memory_index(index, iter_batch, default_sorting_field, schema_additions, fallback_field_type, - token_separators, symbols_to_index, false); - + // put delete first because a field could be deleted and added in the same change set if(!del_fields.empty()) { for(auto& rec: iter_batch) { index->remove(seq_id, rec.doc, del_fields, true); } } + Index::batch_memory_index(index, iter_batch, default_sorting_field, schema_additions, + fallback_field_type, token_separators, symbols_to_index, do_validation); + iter_batch.clear(); } @@ -2601,7 +2585,7 @@ Option Collection::alter(nlohmann::json& alter_payload) { for(auto& del_field: del_fields) { search_schema.erase(del_field.name); auto new_end = std::remove_if(fields.begin(), fields.end(), [&del_field](const field& f) { - return f.name == del_field.name; + return f.name == del_field.name; }); fields.erase(new_end, fields.end()); @@ -2625,8 +2609,56 @@ Option Collection::alter(nlohmann::json& alter_payload) { return Option(true); } +Option Collection::alter(nlohmann::json& alter_payload) { + std::unique_lock lock(mutex); + + // Validate that all stored documents are compatible with the proposed schema changes. + std::unordered_map schema_additions; + std::unordered_map schema_reindex; + std::vector del_fields; + std::string this_fallback_field_type; + + auto validate_op = validate_alter_payload(alter_payload, schema_additions, schema_reindex, + del_fields, this_fallback_field_type); + if(!validate_op.ok()) { + return validate_op; + } + + if(!this_fallback_field_type.empty() && !fallback_field_type.empty()) { + return Option(400, "The schema already contains a `.*` field."); + } + + if(!this_fallback_field_type.empty() && fallback_field_type.empty()) { + fallback_field_type = this_fallback_field_type; + } + + LOG(INFO) << "Alter payload validation is successful..."; + if(!schema_reindex.empty()) { + LOG(INFO) << "Processing field additions and deletions first..."; + } + + auto batch_alter_op = batch_alter_data(schema_additions, del_fields, fallback_field_type, false); + if(!batch_alter_op.ok()) { + return batch_alter_op; + } + + if(!schema_reindex.empty()) { + LOG(INFO) << "Processing field modifications now..."; + // we've to run revaliation because during schema change, some coercion might be needed + // e.g. "123" -> 123 (string to integer) + bool do_validation = true; + batch_alter_op = batch_alter_data(schema_reindex, {}, fallback_field_type, do_validation); + if(!batch_alter_op.ok()) { + return batch_alter_op; + } + } + + return Option(true); +} + Option Collection::validate_alter_payload(nlohmann::json& schema_changes, std::unordered_map& schema_additions, + std::unordered_map& schema_reindex, std::vector& del_fields, std::string& fallback_field_type) { if(!schema_changes.is_object()) { @@ -2649,12 +2681,16 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, std::unordered_map updated_search_schema = search_schema; size_t num_auto_detect_fields = 0; + // since fields can be deleted and added in the same change set, + // we will first do a pass at basic validations and pick out fields to be deleted + std::set delete_field_names; + for(const auto& kv: schema_changes["fields"].items()) { - if(!kv.value().is_object()) { + if (!kv.value().is_object()) { return Option(400, err_msg); } - if(!kv.value().contains("name")) { + if (!kv.value().contains("name")) { return Option(400, err_msg); } @@ -2662,6 +2698,16 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, const auto& field_it = search_schema.find(field_name); auto found_field = (field_it != search_schema.end()); + if(kv.value().contains("drop")) { + delete_field_names.insert(field_name); + } + } + + for(const auto& kv: schema_changes["fields"].items()) { + const std::string& field_name = kv.value()["name"].get(); + const auto& field_it = search_schema.find(field_name); + auto found_field = (field_it != search_schema.end()); + if(kv.value().contains("drop")) { if(field_name == ".*") { del_fields.emplace_back(".*", field_types::AUTO, false); @@ -2681,8 +2727,15 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, } else { // add or update existing field auto is_addition = (!found_field); - if(is_addition) { - // addition: must validate fields + auto is_reindex = (delete_field_names.count(field_name) != 0); + + if(is_addition && is_reindex) { + return Option(400, "Field `" + field_name + + "` cannot be added and deleted at the same time."); + } + + if(is_addition || is_reindex) { + // must validate fields auto parse_op = field::json_field_to_field(kv.value(), diff_fields, fallback_field_type, num_auto_detect_fields); if (!parse_op.ok()) { @@ -2690,12 +2743,18 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, } const auto& f = diff_fields.back(); - updated_search_schema.emplace(f.name, f); - schema_additions.emplace(f.name, f); + updated_search_schema[f.name] = f; + + if(is_reindex) { + // delete + reindex is fine, we will handle these fields separately + schema_reindex.emplace(f.name, f); + } else { + schema_additions.emplace(f.name, f); + } } else { - // update, not supported for now + // partial update is not supported for now return Option(400, "Field `" + field_name + "` is already part of schema: only " - "field additions and deletions are supported for now."); + "field additions and deletions are supported for now."); } } } @@ -2730,7 +2789,7 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, updated_search_schema, index_operation_t::CREATE, fallback_field_type, - DIRTY_VALUES::COERCE_OR_REJECT); + DIRTY_VALUES::REJECT); if(!validate_op.ok()) { std::string schema_err = "Schema change does not match on-disk data, error: " + validate_op.error(); return Option(validate_op.code(), schema_err); diff --git a/src/field.cpp b/src/field.cpp index 5d4730c2..8ae9a4ff 100644 --- a/src/field.cpp +++ b/src/field.cpp @@ -380,6 +380,12 @@ Option field::json_field_to_field(nlohmann::json& field_json, std::vector< "`name`, `type`, `optional` and `facet` properties."); } + if(field_json.count("drop") != 0) { + return Option(400, std::string("Invalid property `drop` on field `") + + field_json[fields::name].get() + std::string("`: it is allowed only " + "during schema update.")); + } + if(field_json.count(fields::facet) != 0 && !field_json.at(fields::facet).is_boolean()) { return Option(400, std::string("The `facet` property of the field `") + field_json[fields::name].get() + std::string("` should be a boolean.")); diff --git a/test/collection_schema_change_test.cpp b/test/collection_schema_change_test.cpp index c2eba296..fff388b6 100644 --- a/test/collection_schema_change_test.cpp +++ b/test/collection_schema_change_test.cpp @@ -422,3 +422,93 @@ TEST_F(CollectionSchemaChangeTest, AlterValidations) { collectionManager.drop_collection("coll1"); } + +TEST_F(CollectionSchemaChangeTest, DropPropertyShouldNotBeAllowedInSchemaCreation) { + nlohmann::json req_json = R"({ + "name": "coll1", + "fields": [{"name": "title", "type": "string", "drop": true}] + })"_json; + + auto coll1_op = collectionManager.create_collection(req_json); + ASSERT_FALSE(coll1_op.ok()); + ASSERT_EQ("Invalid property `drop` on field `title`: it is allowed only during schema update.", coll1_op.error()); + + collectionManager.drop_collection("coll1"); +} + +TEST_F(CollectionSchemaChangeTest, AbilityToDropAndReAddIndexAtTheSameTime) { + nlohmann::json req_json = R"({ + "name": "coll1", + "fields": [ + {"name": "title", "type": "string"}, + {"name": "timestamp", "type": "int32"} + ] + })"_json; + + auto coll1_op = collectionManager.create_collection(req_json); + ASSERT_TRUE(coll1_op.ok()); + + auto coll1 = coll1_op.get(); + + nlohmann::json doc; + doc["id"] = "0"; + doc["title"] = "123"; + doc["timestamp"] = 3433232; + + ASSERT_TRUE(coll1->add(doc.dump()).ok()); + + // try to alter with a bad type + + auto schema_changes = R"({ + "fields": [ + {"name": "title", "drop": true}, + {"name": "title", "type": "int32"} + ] + })"_json; + + auto alter_op = coll1->alter(schema_changes); + ASSERT_FALSE(alter_op.ok()); + ASSERT_EQ("Schema change does not match on-disk data, error: Field `title` must be an int32.", alter_op.error()); + + // existing data should not have been touched + auto res = coll1->search("12", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {true}, 10).get(); + ASSERT_EQ(1, res["hits"].size()); + ASSERT_EQ("0", res["hits"][0]["document"]["id"].get()); + + // drop re-add with facet index + schema_changes = R"({ + "fields": [ + {"name": "title", "drop": true}, + {"name": "title", "type": "string", "facet": true} + ] + })"_json; + + alter_op = coll1->alter(schema_changes); + ASSERT_TRUE(alter_op.ok()); + + res = coll1->search("*", + {}, "", {"title"}, {}, {0}, 3, 1, FREQUENCY, {true}).get(); + + ASSERT_EQ(1, res["found"].get()); + ASSERT_EQ("0", res["hits"][0]["document"]["id"].get()); + ASSERT_EQ(1, res["facet_counts"].size()); + ASSERT_EQ(3, res["facet_counts"][0].size()); + ASSERT_EQ("title", res["facet_counts"][0]["field_name"]); + ASSERT_EQ(1, res["facet_counts"][0]["counts"].size()); + ASSERT_EQ("123", res["facet_counts"][0]["counts"][0]["value"].get()); + + // migrate int32 to int64 + schema_changes = R"({ + "fields": [ + {"name": "timestamp", "drop": true}, + {"name": "timestamp", "type": "int64"} + ] + })"_json; + + alter_op = coll1->alter(schema_changes); + ASSERT_TRUE(alter_op.ok()); + + ASSERT_EQ("int64", coll1->get_schema()["timestamp"].type); + + collectionManager.drop_collection("coll1"); +}