Drop/readd within same schema change set.

This commit is contained in:
Kishore Nallan 2022-04-14 12:23:38 +05:30
parent d4e5c36b7f
commit 5b462f5233
5 changed files with 198 additions and 35 deletions

View File

@ -158,8 +158,14 @@ private:
Option<bool> persist_collection_meta();
Option<bool> batch_alter_data(const std::unordered_map<std::string, field>& schema_additions,
const std::vector<field>& del_fields,
const std::string& this_fallback_field_type,
const bool do_validation);
Option<bool> validate_alter_payload(nlohmann::json& schema_changes,
std::unordered_map<std::string, field>& schema_additions,
std::unordered_map<std::string, field>& schema_reindex,
std::vector<field>& del_fields,
std::string& fallback_field_type);

View File

@ -52,6 +52,8 @@ struct field {
bool sort;
bool infix;
field() {}
field(const std::string &name, const std::string &type, const bool facet, const bool optional = false,
bool index = true, std::string locale = "", int sort = -1, int infix = -1) :
name(name), type(type), facet(facet), optional(optional), index(index), locale(locale) {

View File

@ -2494,27 +2494,10 @@ Option<bool> Collection::persist_collection_meta() {
return Option<bool>(true);
}
Option<bool> Collection::alter(nlohmann::json& alter_payload) {
std::unique_lock lock(mutex);
// Validate that all stored documents are compatible with the proposed schema changes.
std::unordered_map<std::string, field> schema_additions;
std::vector<field> del_fields;
std::string this_fallback_field_type;
auto validate_op = validate_alter_payload(alter_payload, schema_additions, del_fields, this_fallback_field_type);
if(!validate_op.ok()) {
return validate_op;
}
if(!this_fallback_field_type.empty() && !fallback_field_type.empty()) {
return Option<bool>(400, "The schema already contains a `.*` field.");
}
if(!this_fallback_field_type.empty() && fallback_field_type.empty()) {
fallback_field_type = this_fallback_field_type;
}
Option<bool> Collection::batch_alter_data(const std::unordered_map<std::string, field>& schema_additions,
const std::vector<field>& del_fields,
const std::string& this_fallback_field_type,
const bool do_validation) {
// Update schema with additions (deletions can only be made later)
std::vector<field> new_fields;
@ -2563,7 +2546,7 @@ Option<bool> Collection::alter(nlohmann::json& alter_payload) {
nlohmann::detail::error_handler_t::ignore));
}
index_record record(num_found_docs, seq_id, document, index_operation_t::CREATE, DIRTY_VALUES::DROP);
index_record record(num_found_docs, seq_id, document, index_operation_t::CREATE, DIRTY_VALUES::REJECT);
iter_batch.emplace_back(std::move(record));
// Peek and check for last record right here so that we handle batched indexing correctly
@ -2572,15 +2555,16 @@ Option<bool> Collection::alter(nlohmann::json& alter_payload) {
bool last_record = !(iter->Valid() && iter->key().starts_with(seq_id_prefix));
if(num_found_docs % index_batch_size == 0 || last_record) {
Index::batch_memory_index(index, iter_batch, default_sorting_field, schema_additions, fallback_field_type,
token_separators, symbols_to_index, false);
// put delete first because a field could be deleted and added in the same change set
if(!del_fields.empty()) {
for(auto& rec: iter_batch) {
index->remove(seq_id, rec.doc, del_fields, true);
}
}
Index::batch_memory_index(index, iter_batch, default_sorting_field, schema_additions,
fallback_field_type, token_separators, symbols_to_index, do_validation);
iter_batch.clear();
}
@ -2601,7 +2585,7 @@ Option<bool> Collection::alter(nlohmann::json& alter_payload) {
for(auto& del_field: del_fields) {
search_schema.erase(del_field.name);
auto new_end = std::remove_if(fields.begin(), fields.end(), [&del_field](const field& f) {
return f.name == del_field.name;
return f.name == del_field.name;
});
fields.erase(new_end, fields.end());
@ -2625,8 +2609,56 @@ Option<bool> Collection::alter(nlohmann::json& alter_payload) {
return Option<bool>(true);
}
Option<bool> Collection::alter(nlohmann::json& alter_payload) {
std::unique_lock lock(mutex);
// Validate that all stored documents are compatible with the proposed schema changes.
std::unordered_map<std::string, field> schema_additions;
std::unordered_map<std::string, field> schema_reindex;
std::vector<field> del_fields;
std::string this_fallback_field_type;
auto validate_op = validate_alter_payload(alter_payload, schema_additions, schema_reindex,
del_fields, this_fallback_field_type);
if(!validate_op.ok()) {
return validate_op;
}
if(!this_fallback_field_type.empty() && !fallback_field_type.empty()) {
return Option<bool>(400, "The schema already contains a `.*` field.");
}
if(!this_fallback_field_type.empty() && fallback_field_type.empty()) {
fallback_field_type = this_fallback_field_type;
}
LOG(INFO) << "Alter payload validation is successful...";
if(!schema_reindex.empty()) {
LOG(INFO) << "Processing field additions and deletions first...";
}
auto batch_alter_op = batch_alter_data(schema_additions, del_fields, fallback_field_type, false);
if(!batch_alter_op.ok()) {
return batch_alter_op;
}
if(!schema_reindex.empty()) {
LOG(INFO) << "Processing field modifications now...";
// we've to run revaliation because during schema change, some coercion might be needed
// e.g. "123" -> 123 (string to integer)
bool do_validation = true;
batch_alter_op = batch_alter_data(schema_reindex, {}, fallback_field_type, do_validation);
if(!batch_alter_op.ok()) {
return batch_alter_op;
}
}
return Option<bool>(true);
}
Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
std::unordered_map<std::string, field>& schema_additions,
std::unordered_map<std::string, field>& schema_reindex,
std::vector<field>& del_fields,
std::string& fallback_field_type) {
if(!schema_changes.is_object()) {
@ -2649,12 +2681,16 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
std::unordered_map<std::string, field> updated_search_schema = search_schema;
size_t num_auto_detect_fields = 0;
// since fields can be deleted and added in the same change set,
// we will first do a pass at basic validations and pick out fields to be deleted
std::set<std::string> delete_field_names;
for(const auto& kv: schema_changes["fields"].items()) {
if(!kv.value().is_object()) {
if (!kv.value().is_object()) {
return Option<bool>(400, err_msg);
}
if(!kv.value().contains("name")) {
if (!kv.value().contains("name")) {
return Option<bool>(400, err_msg);
}
@ -2662,6 +2698,16 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
const auto& field_it = search_schema.find(field_name);
auto found_field = (field_it != search_schema.end());
if(kv.value().contains("drop")) {
delete_field_names.insert(field_name);
}
}
for(const auto& kv: schema_changes["fields"].items()) {
const std::string& field_name = kv.value()["name"].get<std::string>();
const auto& field_it = search_schema.find(field_name);
auto found_field = (field_it != search_schema.end());
if(kv.value().contains("drop")) {
if(field_name == ".*") {
del_fields.emplace_back(".*", field_types::AUTO, false);
@ -2681,8 +2727,15 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
} else {
// add or update existing field
auto is_addition = (!found_field);
if(is_addition) {
// addition: must validate fields
auto is_reindex = (delete_field_names.count(field_name) != 0);
if(is_addition && is_reindex) {
return Option<bool>(400, "Field `" + field_name +
"` cannot be added and deleted at the same time.");
}
if(is_addition || is_reindex) {
// must validate fields
auto parse_op = field::json_field_to_field(kv.value(), diff_fields, fallback_field_type,
num_auto_detect_fields);
if (!parse_op.ok()) {
@ -2690,12 +2743,18 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
}
const auto& f = diff_fields.back();
updated_search_schema.emplace(f.name, f);
schema_additions.emplace(f.name, f);
updated_search_schema[f.name] = f;
if(is_reindex) {
// delete + reindex is fine, we will handle these fields separately
schema_reindex.emplace(f.name, f);
} else {
schema_additions.emplace(f.name, f);
}
} else {
// update, not supported for now
// partial update is not supported for now
return Option<bool>(400, "Field `" + field_name + "` is already part of schema: only "
"field additions and deletions are supported for now.");
"field additions and deletions are supported for now.");
}
}
}
@ -2730,7 +2789,7 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
updated_search_schema,
index_operation_t::CREATE,
fallback_field_type,
DIRTY_VALUES::COERCE_OR_REJECT);
DIRTY_VALUES::REJECT);
if(!validate_op.ok()) {
std::string schema_err = "Schema change does not match on-disk data, error: " + validate_op.error();
return Option<bool>(validate_op.code(), schema_err);

View File

@ -380,6 +380,12 @@ Option<bool> field::json_field_to_field(nlohmann::json& field_json, std::vector<
"`name`, `type`, `optional` and `facet` properties.");
}
if(field_json.count("drop") != 0) {
return Option<bool>(400, std::string("Invalid property `drop` on field `") +
field_json[fields::name].get<std::string>() + std::string("`: it is allowed only "
"during schema update."));
}
if(field_json.count(fields::facet) != 0 && !field_json.at(fields::facet).is_boolean()) {
return Option<bool>(400, std::string("The `facet` property of the field `") +
field_json[fields::name].get<std::string>() + std::string("` should be a boolean."));

View File

@ -422,3 +422,93 @@ TEST_F(CollectionSchemaChangeTest, AlterValidations) {
collectionManager.drop_collection("coll1");
}
TEST_F(CollectionSchemaChangeTest, DropPropertyShouldNotBeAllowedInSchemaCreation) {
nlohmann::json req_json = R"({
"name": "coll1",
"fields": [{"name": "title", "type": "string", "drop": true}]
})"_json;
auto coll1_op = collectionManager.create_collection(req_json);
ASSERT_FALSE(coll1_op.ok());
ASSERT_EQ("Invalid property `drop` on field `title`: it is allowed only during schema update.", coll1_op.error());
collectionManager.drop_collection("coll1");
}
TEST_F(CollectionSchemaChangeTest, AbilityToDropAndReAddIndexAtTheSameTime) {
nlohmann::json req_json = R"({
"name": "coll1",
"fields": [
{"name": "title", "type": "string"},
{"name": "timestamp", "type": "int32"}
]
})"_json;
auto coll1_op = collectionManager.create_collection(req_json);
ASSERT_TRUE(coll1_op.ok());
auto coll1 = coll1_op.get();
nlohmann::json doc;
doc["id"] = "0";
doc["title"] = "123";
doc["timestamp"] = 3433232;
ASSERT_TRUE(coll1->add(doc.dump()).ok());
// try to alter with a bad type
auto schema_changes = R"({
"fields": [
{"name": "title", "drop": true},
{"name": "title", "type": "int32"}
]
})"_json;
auto alter_op = coll1->alter(schema_changes);
ASSERT_FALSE(alter_op.ok());
ASSERT_EQ("Schema change does not match on-disk data, error: Field `title` must be an int32.", alter_op.error());
// existing data should not have been touched
auto res = coll1->search("12", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {true}, 10).get();
ASSERT_EQ(1, res["hits"].size());
ASSERT_EQ("0", res["hits"][0]["document"]["id"].get<std::string>());
// drop re-add with facet index
schema_changes = R"({
"fields": [
{"name": "title", "drop": true},
{"name": "title", "type": "string", "facet": true}
]
})"_json;
alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
res = coll1->search("*",
{}, "", {"title"}, {}, {0}, 3, 1, FREQUENCY, {true}).get();
ASSERT_EQ(1, res["found"].get<size_t>());
ASSERT_EQ("0", res["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ(1, res["facet_counts"].size());
ASSERT_EQ(3, res["facet_counts"][0].size());
ASSERT_EQ("title", res["facet_counts"][0]["field_name"]);
ASSERT_EQ(1, res["facet_counts"][0]["counts"].size());
ASSERT_EQ("123", res["facet_counts"][0]["counts"][0]["value"].get<std::string>());
// migrate int32 to int64
schema_changes = R"({
"fields": [
{"name": "timestamp", "drop": true},
{"name": "timestamp", "type": "int64"}
]
})"_json;
alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
ASSERT_EQ("int64", coll1->get_schema()["timestamp"].type);
collectionManager.drop_collection("coll1");
}