Alter for dynamic fields.

This commit is contained in:
Kishore Nallan 2022-04-19 20:50:37 +05:30
parent b39dd3584f
commit 0a308b70c0
5 changed files with 257 additions and 65 deletions

View File

@ -89,7 +89,7 @@ private:
std::string fallback_field_type;
std::vector<field> dynamic_fields;
std::unordered_map<std::string, field> dynamic_fields;
std::vector<char> symbols_to_index;
@ -129,7 +129,12 @@ private:
std::vector<uint32_t>& excluded_ids, std::vector<const override_t*>& filter_overrides,
bool& filter_curated_hits) const;
Option<bool> check_and_update_schema(nlohmann::json& document, const DIRTY_VALUES& dirty_values);
static Option<bool> detect_new_fields(nlohmann::json& document,
const DIRTY_VALUES& dirty_values,
const std::unordered_map<std::string, field>& schema,
const std::unordered_map<std::string, field>& dyn_fields,
const std::string& fallback_field_type,
std::vector<field>& new_fields);
static bool facet_count_compare(const std::pair<uint64_t, facet_count_t>& a,
const std::pair<uint64_t, facet_count_t>& b) {
@ -232,7 +237,7 @@ public:
std::vector<field> get_fields();
std::vector<field> get_dynamic_fields();
std::unordered_map<std::string, field> get_dynamic_fields();
std::unordered_map<std::string, field> get_schema();

View File

@ -249,9 +249,30 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, nlohma
// if `fallback_field_type` or `dynamic_fields` is enabled, update schema first before indexing
if(!fallback_field_type.empty() || !dynamic_fields.empty()) {
Option<bool> schema_change_op = check_and_update_schema(record.doc, dirty_values);
if(!schema_change_op.ok()) {
record.index_failure(schema_change_op.code(), schema_change_op.error());
std::vector<field> new_fields;
std::unique_lock lock(mutex);
Option<bool> new_fields_op = detect_new_fields(record.doc, dirty_values,
search_schema, dynamic_fields,
fallback_field_type,
new_fields);
if(!new_fields_op.ok()) {
record.index_failure(new_fields_op.code(), new_fields_op.error());
}
else if(!new_fields.empty()) {
for(auto& new_field: new_fields) {
search_schema.emplace(new_field.name, new_field);
fields.emplace_back(new_field);
}
auto persist_op = persist_collection_meta();
if(!persist_op.ok()) {
record.index_failure(persist_op.code(), persist_op.error());
continue;
}
index->refresh_schemas(new_fields, {});
}
}
}
@ -2350,7 +2371,7 @@ std::vector<field> Collection::get_fields() {
return fields;
}
std::vector<field> Collection::get_dynamic_fields() {
std::unordered_map<std::string, field> Collection::get_dynamic_fields() {
std::shared_lock lock(mutex);
return dynamic_fields;
}
@ -2523,21 +2544,20 @@ Option<bool> Collection::batch_alter_data(const std::unordered_map<std::string,
for(auto& kv: schema_additions) {
const auto& f = kv.second;
if(f.is_dynamic()) {
// regexp fields and fields with auto type are treated as dynamic fields
dynamic_fields.push_back(f);
fields.push_back(f);
continue;
}
if(f.name == ".*") {
fields.push_back(f);
continue;
}
search_schema.emplace(kv.first, f);
if(f.is_dynamic()) {
// regexp fields and fields with auto type are treated as dynamic fields
dynamic_fields.emplace(f.name, f);
} else {
search_schema.emplace(kv.first, f);
new_fields.push_back(f);
}
fields.push_back(f);
new_fields.push_back(f);
}
index->refresh_schemas(new_fields, {});
@ -2610,6 +2630,10 @@ Option<bool> Collection::batch_alter_data(const std::unordered_map<std::string,
fields.erase(new_end, fields.end());
if(del_field.is_dynamic()) {
dynamic_fields.erase(del_field.name);
}
if(del_field.name == ".*") {
fallback_field_type = "";
}
@ -2715,38 +2739,53 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
}
const std::string& field_name = kv.value()["name"].get<std::string>();
const auto& field_it = search_schema.find(field_name);
auto found_field = (field_it != search_schema.end());
if(kv.value().contains("drop")) {
delete_field_names.insert(field_name);
}
}
std::unordered_map<std::string, field> new_dynamic_fields;
for(const auto& kv: schema_changes["fields"].items()) {
const std::string& field_name = kv.value()["name"].get<std::string>();
const auto& field_it = search_schema.find(field_name);
auto found_field = (field_it != search_schema.end());
auto dyn_field_it = dynamic_fields.find(field_name);
auto found_dyn_field = (dyn_field_it != dynamic_fields.end());
if(kv.value().contains("drop")) {
if(!kv.value()["drop"].is_boolean() || !kv.value()["drop"].get<bool>()) {
return Option<bool>(400, "Field `" + field_name + "` must have a drop value of `true`.");
}
if(field_name == ".*") {
del_fields.emplace_back(".*", field_types::AUTO, false);
continue;
}
if(!found_field) {
if(!found_field && !found_dyn_field) {
return Option<bool>(400, "Field `" + field_name + "` is not part of collection schema.");
}
if(!kv.value()["drop"].is_boolean() || !kv.value()["drop"].get<bool>()) {
return Option<bool>(400, "Field `" + field_name + "` must have a drop value of `true`.");
if(found_field) {
del_fields.push_back(field_it->second);
} else if(found_dyn_field) {
del_fields.push_back(dyn_field_it->second);
// we will also have to resolve the actual field names which match the dynamic field pattern
for(auto& field_kv: search_schema) {
if(std::regex_match(field_kv.first, std::regex(dyn_field_it->first))) {
del_fields.push_back(field_kv.second);
// if schema contains explicit fields that match dynamic field that're going to be removed,
// we will have to remove them from the schema so that validation can occur properly
updated_search_schema.erase(field_kv.first);
}
}
}
del_fields.push_back(field_it->second);
} else {
// add or update existing field
auto is_addition = (!found_field);
auto is_addition = (!found_field && !found_dyn_field);
auto is_reindex = (delete_field_names.count(field_name) != 0);
if(is_addition && is_reindex) {
@ -2763,11 +2802,19 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
}
const auto& f = diff_fields.back();
updated_search_schema[f.name] = f;
if(f.is_dynamic()) {
new_dynamic_fields.emplace(f.name, f);
} else {
updated_search_schema[f.name] = f;
}
if(is_reindex) {
// delete + reindex is fine, we will handle these fields separately
schema_reindex.emplace(f.name, f);
//if(!f.is_dynamic()) {
// expanded versions of dynamic fields will be discovered during data iteration below
schema_reindex.emplace(f.name, f);
//}
} else {
schema_additions.emplace(f.name, f);
}
@ -2804,6 +2851,22 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
nlohmann::detail::error_handler_t::ignore));
}
if(!fallback_field_type.empty() || !new_dynamic_fields.empty()) {
std::vector<field> new_fields;
Option<bool> new_fields_op = detect_new_fields(document, DIRTY_VALUES::DROP,
updated_search_schema, new_dynamic_fields,
fallback_field_type,
new_fields);
if(!new_fields_op.ok()) {
return new_fields_op;
}
for(auto& new_field: new_fields) {
updated_search_schema.emplace(new_field.name, new_field);
schema_reindex.emplace(new_field.name, new_field);
}
}
// validate existing data on disk for compatibility via updated_search_schema
auto validate_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
updated_search_schema,
@ -2860,15 +2923,16 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
return Option<bool>(true);
}
Option<bool> Collection::check_and_update_schema(nlohmann::json& document, const DIRTY_VALUES& dirty_values) {
std::unique_lock lock(mutex);
std::vector<field> new_fields;
Option<bool> Collection::detect_new_fields(nlohmann::json& document,
const DIRTY_VALUES& dirty_values,
const std::unordered_map<std::string, field>& schema,
const std::unordered_map<std::string, field>& dyn_fields,
const std::string& fallback_field_type,
std::vector<field>& new_fields) {
auto kv = document.begin();
while(kv != document.end()) {
// we will not index the special "id" key
if (search_schema.count(kv.key()) == 0 && kv.key() != "id") {
if (schema.count(kv.key()) == 0 && kv.key() != "id") {
const std::string &fname = kv.key();
field new_field(fname, field_types::STRING, false, true);
std::string field_type;
@ -2877,9 +2941,9 @@ Option<bool> Collection::check_and_update_schema(nlohmann::json& document, const
bool found_dynamic_field = false;
// check against dynamic field definitions
for(const auto& dynamic_field: dynamic_fields) {
if(std::regex_match (kv.key(), std::regex(dynamic_field.name))) {
new_field = dynamic_field;
for(const auto& dynamic_field: dyn_fields) {
if(std::regex_match (kv.key(), std::regex(dynamic_field.first))) {
new_field = dynamic_field.second;
new_field.name = fname;
found_dynamic_field = true;
break;
@ -2963,20 +3027,6 @@ Option<bool> Collection::check_and_update_schema(nlohmann::json& document, const
kv++;
}
for(auto& new_field: new_fields) {
search_schema.emplace(new_field.name, new_field);
fields.emplace_back(new_field);
}
if(!new_fields.empty()) {
auto persist_op = persist_collection_meta();
if(!persist_op.ok()) {
return persist_op;
}
index->refresh_schemas(new_fields, {});
}
return Option<bool>(true);
}
@ -2984,7 +3034,7 @@ Index* Collection::init_index() {
for(const field& field: fields) {
if(field.is_dynamic()) {
// regexp fields and fields with auto type are treated as dynamic fields
dynamic_fields.push_back(field);
dynamic_fields.emplace(field.name, field);
continue;
}

View File

@ -4509,12 +4509,12 @@ void Index::refresh_schemas(const std::vector<field>& new_fields, const std::vec
std::unique_lock lock(mutex);
for(const auto & new_field: new_fields) {
search_schema.emplace(new_field.name, new_field);
if(!new_field.index) {
if(new_field.is_dynamic() || !new_field.index) {
continue;
}
search_schema.emplace(new_field.name, new_field);
if(new_field.is_sortable()) {
if(new_field.is_num_sortable()) {
spp::sparse_hash_map<uint32_t, int64_t> * doc_to_score = new spp::sparse_hash_map<uint32_t, int64_t>();
@ -4569,6 +4569,11 @@ void Index::refresh_schemas(const std::vector<field>& new_fields, const std::vec
}
for(const auto & del_field: del_fields) {
if(search_schema.count(del_field.name) == 0) {
// could be a dynamic field
continue;
}
search_schema.erase(del_field.name);
if(del_field.is_string() || field_types::is_string_or_array(del_field.type)) {

View File

@ -961,7 +961,7 @@ TEST_F(CollectionAllFieldsTest, DynamicFieldsMustOnlyBeOptional) {
coll1 = op.get();
}
ASSERT_TRUE(coll1->get_dynamic_fields()[0].optional);
ASSERT_TRUE(coll1->get_dynamic_fields()[".*_name"].optional);
collectionManager.drop_collection("coll1");
}
@ -1044,13 +1044,13 @@ TEST_F(CollectionAllFieldsTest, BothFallbackAndDynamicFields) {
ASSERT_EQ(4, coll1->get_fields().size());
ASSERT_EQ(2, coll1->get_dynamic_fields().size());
ASSERT_EQ(".*_name", coll1->get_dynamic_fields()[0].name);
ASSERT_TRUE(coll1->get_dynamic_fields()[0].optional);
ASSERT_FALSE(coll1->get_dynamic_fields()[0].facet);
ASSERT_TRUE(coll1->get_dynamic_fields().count(".*_name") != 0);
ASSERT_TRUE(coll1->get_dynamic_fields()[".*_name"].optional);
ASSERT_FALSE(coll1->get_dynamic_fields()[".*_name"].facet);
ASSERT_EQ(".*_year", coll1->get_dynamic_fields()[1].name);
ASSERT_TRUE(coll1->get_dynamic_fields()[0].optional);
ASSERT_FALSE(coll1->get_dynamic_fields()[0].facet);
ASSERT_TRUE(coll1->get_dynamic_fields().count(".*_year") != 0);
ASSERT_TRUE(coll1->get_dynamic_fields()[".*_year"].optional);
ASSERT_TRUE(coll1->get_dynamic_fields()[".*_year"].facet);
nlohmann::json doc;
doc["title"] = "Amazon Inc.";
@ -1485,6 +1485,8 @@ TEST_F(CollectionAllFieldsTest, SchemaUpdateShouldBeAtomicForAllFields) {
auto add_op = coll1->add(doc.dump(), CREATE);
ASSERT_FALSE(add_op.ok());
auto f = coll1->get_fields();
ASSERT_EQ(1, coll1->get_fields().size());
ASSERT_EQ(0, coll1->get_sort_fields().size());
ASSERT_EQ(0, coll1->_get_index()->_get_search_index().size());

View File

@ -547,10 +547,12 @@ TEST_F(CollectionSchemaChangeTest, AddAndDropFieldImmediately) {
doc["id"] = "0";
doc["title"] = "The quick brown fox was too fast.";
doc["points"] = 100;
doc["quantity"] = 1000;
doc["quantity_int"] = 1000;
doc["some_txt"] = "foo";
ASSERT_TRUE(coll1->add(doc.dump()).ok());
ASSERT_EQ(2, coll1->get_schema().size());
ASSERT_EQ(0, coll1->get_dynamic_fields().size());
auto results = coll1->search("*",
{}, "", {}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get();
@ -559,24 +561,152 @@ TEST_F(CollectionSchemaChangeTest, AddAndDropFieldImmediately) {
ASSERT_EQ(1, results["hits"].size());
ASSERT_STREQ("0", results["hits"][0]["document"]["id"].get<std::string>().c_str());
// add a field via alter which we will try dropping immediately
// add a field via alter which we will try dropping later
auto schema_changes = R"({
"fields": [
{"name": "quantity", "type": "int32", "optional": true}
{"name": ".*_int", "type": "int32", "optional": true}
]
})"_json;
auto alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
ASSERT_EQ(3, coll1->get_schema().size());
ASSERT_EQ(4, coll1->get_fields().size());
ASSERT_EQ(1, coll1->get_dynamic_fields().size());
results = coll1->search("*",
{}, "quantity_int: 1000", {}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get();
ASSERT_EQ(1, results["found"].get<size_t>());
// drop + re-add dynamic field
schema_changes = R"({
"fields": [
{"name": "quantity", "drop": true}
{"name": ".*_int", "type": "int32", "facet": true},
{"name": ".*_int", "drop": true}
]
})"_json;
alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
ASSERT_EQ(3, coll1->get_schema().size());
ASSERT_EQ(4, coll1->get_fields().size());
ASSERT_EQ(1, coll1->get_dynamic_fields().size());
results = coll1->search("*",
{}, "", {"quantity_int"}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get();
ASSERT_EQ(1, results["found"].get<size_t>());
ASSERT_EQ(1, results["facet_counts"].size());
ASSERT_EQ(1, results["facet_counts"][0]["counts"][0]["count"].get<size_t>());
ASSERT_EQ("quantity_int", results["facet_counts"][0]["field_name"].get<std::string>());
schema_changes = R"({
"fields": [
{"name": ".*_int", "drop": true}
]
})"_json;
alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
ASSERT_EQ(2, coll1->get_schema().size());
ASSERT_EQ(2, coll1->get_fields().size());
ASSERT_EQ(0, coll1->get_dynamic_fields().size());
// with bad on-disk data
schema_changes = R"({
"fields": [
{"name": ".*_txt", "type": "int32"}
]
})"_json;
alter_op = coll1->alter(schema_changes);
ASSERT_FALSE(alter_op.ok());
ASSERT_EQ("Schema change is incompatible with the type of documents already stored in this collection. "
"Existing data for field `some_txt` cannot be coerced into an int32.", alter_op.error());
ASSERT_EQ(2, coll1->get_schema().size());
ASSERT_EQ(2, coll1->get_fields().size());
ASSERT_EQ(0, coll1->get_dynamic_fields().size());
}
TEST_F(CollectionSchemaChangeTest, AddDynamicFieldMatchingMultipleFields) {
std::vector<field> fields = {field("title", field_types::STRING, false, false, true, "", 1, 1),
field("points", field_types::INT32, true),};
Collection* coll1 = collectionManager.create_collection("coll1", 1, fields, "points", 0, "").get();
nlohmann::json doc;
doc["id"] = "0";
doc["title"] = "The quick brown fox was too fast.";
doc["points"] = 100;
doc["quantity_int"] = 1000;
doc["year_int"] = 2020;
ASSERT_TRUE(coll1->add(doc.dump()).ok());
ASSERT_EQ(2, coll1->get_schema().size());
ASSERT_EQ(0, coll1->get_dynamic_fields().size());
// add a dynamic field via alter that will target both _int fields
auto schema_changes = R"({
"fields": [
{"name": ".*_int", "type": "int32", "optional": true}
]
})"_json;
auto alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
ASSERT_EQ(4, coll1->get_schema().size());
ASSERT_EQ(5, coll1->get_fields().size());
ASSERT_EQ(1, coll1->get_dynamic_fields().size());
auto results = coll1->search("*",
{}, "quantity_int: 1000", {}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get();
ASSERT_EQ(1, results["found"].get<size_t>());
results = coll1->search("*",
{}, "year_int: 2020", {}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get();
ASSERT_EQ(1, results["found"].get<size_t>());
// drop + re-add dynamic field that targets 2 underlying fields
schema_changes = R"({
"fields": [
{"name": ".*_int", "type": "int32", "facet": true},
{"name": ".*_int", "drop": true}
]
})"_json;
alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
ASSERT_EQ(4, coll1->get_schema().size());
ASSERT_EQ(5, coll1->get_fields().size());
ASSERT_EQ(1, coll1->get_dynamic_fields().size());
results = coll1->search("*",
{}, "", {"quantity_int"}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get();
ASSERT_EQ(1, results["found"].get<size_t>());
ASSERT_EQ(1, results["facet_counts"].size());
ASSERT_EQ(1, results["facet_counts"][0]["counts"][0]["count"].get<size_t>());
ASSERT_EQ("quantity_int", results["facet_counts"][0]["field_name"].get<std::string>());
results = coll1->search("*",
{}, "", {"year_int"}, {}, {0}, 3, 1, FREQUENCY, {true}, 5).get();
ASSERT_EQ(1, results["found"].get<size_t>());
ASSERT_EQ(1, results["facet_counts"].size());
ASSERT_EQ(1, results["facet_counts"][0]["counts"][0]["count"].get<size_t>());
ASSERT_EQ("year_int", results["facet_counts"][0]["field_name"].get<std::string>());
schema_changes = R"({
"fields": [
{"name": ".*_int", "drop": true}
]
})"_json;
alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
ASSERT_EQ(2, coll1->get_schema().size());
ASSERT_EQ(2, coll1->get_fields().size());
ASSERT_EQ(0, coll1->get_dynamic_fields().size());
}