Refactor / simplify schema update logic.

This commit is contained in:
Kishore Nallan 2022-09-09 14:14:21 +05:30
parent 4d86106bc3
commit 0e39736327
3 changed files with 38 additions and 58 deletions

View File

@ -196,17 +196,14 @@ private:
Option<bool> persist_collection_meta();
Option<bool> batch_alter_data(const tsl::htrie_map<char, field>& schema_additions,
const std::unordered_map<std::string, field>& new_dynamic_fields,
Option<bool> batch_alter_data(const std::vector<field>& alter_fields,
const std::vector<field>& del_fields,
const std::string& this_fallback_field_type,
const bool do_validation);
Option<bool> validate_alter_payload(nlohmann::json& schema_changes,
tsl::htrie_map<char, field>& schema_additions,
tsl::htrie_map<char, field>& schema_reindex,
std::unordered_map<std::string, field>& addition_dynamic_fields,
std::unordered_map<std::string, field>& reindex_dynamic_fields,
std::vector<field>& addition_fields,
std::vector<field>& reindex_fields,
std::vector<field>& del_fields,
std::string& fallback_field_type);

View File

@ -3201,29 +3201,29 @@ Option<bool> Collection::persist_collection_meta() {
return Option<bool>(true);
}
Option<bool> Collection::batch_alter_data(const tsl::htrie_map<char, field>& schema_additions,
const std::unordered_map<std::string, field>& new_dynamic_fields,
Option<bool> Collection::batch_alter_data(const std::vector<field>& alter_fields,
const std::vector<field>& del_fields,
const std::string& this_fallback_field_type,
const bool do_validation) {
// Update schema with additions (deletions can only be made later)
std::vector<field> new_fields;
for(auto& f: schema_additions) {
tsl::htrie_map<char, field> schema_additions;
for(auto& f: alter_fields) {
if(f.name == ".*") {
fields.push_back(f);
continue;
}
search_schema.emplace(f.name, f);
new_fields.push_back(f);
fields.push_back(f);
}
if(f.is_dynamic()) {
dynamic_fields.emplace(f.name, f);
} else {
schema_additions.emplace(f.name, f);
search_schema.emplace(f.name, f);
new_fields.push_back(f);
}
for(auto& kv: new_dynamic_fields) {
// regexp fields and fields with auto type are treated as dynamic fields
const auto& f = kv.second;
dynamic_fields.emplace(f.name, f);
fields.push_back(f);
}
@ -3324,15 +3324,13 @@ Option<bool> Collection::alter(nlohmann::json& alter_payload) {
std::unique_lock lock(mutex);
// Validate that all stored documents are compatible with the proposed schema changes.
tsl::htrie_map<char, field> schema_additions;
tsl::htrie_map<char, field> schema_reindex;
std::unordered_map<std::string, field> addition_dynamic_fields;
std::unordered_map<std::string, field> reindex_dynamic_fields;
std::vector<field> del_fields;
std::vector<field> addition_fields;
std::vector<field> reindex_fields;
std::string this_fallback_field_type;
auto validate_op = validate_alter_payload(alter_payload, schema_additions, schema_reindex,
addition_dynamic_fields, reindex_dynamic_fields,
auto validate_op = validate_alter_payload(alter_payload, addition_fields, reindex_fields,
del_fields, this_fallback_field_type);
if(!validate_op.ok()) {
return validate_op;
@ -3347,21 +3345,21 @@ Option<bool> Collection::alter(nlohmann::json& alter_payload) {
}
LOG(INFO) << "Alter payload validation is successful...";
if(!schema_reindex.empty()) {
if(!reindex_fields.empty()) {
LOG(INFO) << "Processing field additions and deletions first...";
}
auto batch_alter_op = batch_alter_data(schema_additions, addition_dynamic_fields, del_fields, fallback_field_type, false);
auto batch_alter_op = batch_alter_data(addition_fields, del_fields, fallback_field_type, false);
if(!batch_alter_op.ok()) {
return batch_alter_op;
}
if(!schema_reindex.empty()) {
if(!reindex_fields.empty()) {
LOG(INFO) << "Processing field modifications now...";
// we've to run revaliation because during schema change, some coercion might be needed
// e.g. "123" -> 123 (string to integer)
bool do_validation = true;
batch_alter_op = batch_alter_data(schema_reindex, reindex_dynamic_fields, {}, fallback_field_type, do_validation);
batch_alter_op = batch_alter_data(reindex_fields, {}, fallback_field_type, do_validation);
if(!batch_alter_op.ok()) {
return batch_alter_op;
}
@ -3461,10 +3459,8 @@ void Collection::prune_doc(nlohmann::json& doc,
}
Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
tsl::htrie_map<char, field>& schema_additions,
tsl::htrie_map<char, field>& schema_reindex,
std::unordered_map<std::string, field>& addition_dynamic_fields,
std::unordered_map<std::string, field>& reindex_dynamic_fields,
std::vector<field>& addition_fields,
std::vector<field>& reindex_fields,
std::vector<field>& del_fields,
std::string& fallback_field_type) {
if(!schema_changes.is_object()) {
@ -3525,8 +3521,7 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
auto found_field = (field_it != search_schema.end());
auto dyn_field_it = dynamic_fields.find(field_name);
auto found_dyn_field = (dyn_field_it != dynamic_fields.end()) ||
(found_field && field_types::is_string_or_array(field_it.value().type));
auto found_dyn_field = (dyn_field_it != dynamic_fields.end());
if(kv.value().contains("drop")) {
if(!kv.value()["drop"].is_boolean() || !kv.value()["drop"].get<bool>()) {
@ -3545,16 +3540,10 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
if(found_field) {
del_fields.push_back(field_it.value());
updated_search_schema.erase(field_it.key());
// we will also have to resolve the dynamic field names which match the static field name
for(auto& field_kv: dynamic_fields) {
if(std::regex_match(field_kv.first, std::regex(field_it.key()))) {
del_fields.push_back(field_kv.second);
}
}
}
else if(found_dyn_field) {
// NOTE: fields with type "auto" or "string*" will exist in both `search_schema` and `dynamic_fields`
if(found_dyn_field) {
del_fields.push_back(dyn_field_it->second);
// we will also have to resolve the actual field names which match the dynamic field pattern
for(auto& a_field: search_schema) {
@ -3587,21 +3576,15 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
const auto& f = diff_fields.back();
if(f.is_dynamic()) {
new_dynamic_fields.emplace(f.name, f);
if(is_reindex) {
reindex_dynamic_fields.emplace(f.name, f);
} else {
addition_dynamic_fields.emplace(f.name, f);
}
new_dynamic_fields[f.name] = f;
} else {
updated_search_schema[f.name] = f;
if(is_reindex) {
// delete + reindex: we will handle these fields separately
schema_reindex.emplace(f.name, f);
} else {
schema_additions.emplace(f.name, f);
}
}
if(is_reindex) {
reindex_fields.push_back(f);
} else {
addition_fields.push_back(f);
}
} else {
// partial update is not supported for now
@ -3636,7 +3619,7 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
nlohmann::detail::error_handler_t::ignore));
}
if(!fallback_field_type.empty() || !addition_dynamic_fields.empty() || !reindex_dynamic_fields.empty()) {
if(!fallback_field_type.empty() || !new_dynamic_fields.empty()) {
std::vector<field> new_fields;
Option<bool> new_fields_op = detect_new_fields(document, DIRTY_VALUES::DROP,
updated_search_schema, new_dynamic_fields,
@ -3649,7 +3632,7 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
for(auto& new_field: new_fields) {
updated_search_schema[new_field.name] = new_field;
schema_reindex[new_field.name] = new_field;
reindex_fields.push_back(new_field);
}
}

View File

@ -124,8 +124,8 @@ TEST_F(CollectionSchemaChangeTest, AddNewFieldsToCollection) {
auto coll_fields = coll1->get_fields();
ASSERT_EQ(7, coll_fields.size());
ASSERT_EQ("age", coll_fields[5].name);
ASSERT_EQ(".*_bool", coll_fields[6].name);
ASSERT_EQ(".*_bool", coll_fields[5].name);
ASSERT_EQ("age", coll_fields[6].name);
doc["id"] = "1";
doc["title"] = "The one";