Add reference_fields map in Collection.

This commit is contained in:
Harpreet Sangar 2023-02-02 11:23:09 +05:30 committed by ozanarmagan
parent 8a45475239
commit 84fbcf37fd
6 changed files with 143 additions and 90 deletions

View File

@ -39,6 +39,13 @@ struct highlight_field_t {
}
};
struct reference_pair {
std::string collection;
std::string field;
reference_pair(std::string collection, std::string field) : collection(std::move(collection)), field(std::move(field)) {}
};
class Collection {
private:
@ -119,10 +126,14 @@ private:
std::vector<char> token_separators;
Index* index;
SynonymIndex* synonym_index;
// "field name" -> reference_pair
spp::sparse_hash_map<std::string, reference_pair> reference_fields;
// Keep index as the last field since it is initialized in the constructor via init_index(). Add a new field before it.
Index* index;
// methods
std::string get_doc_id_key(const std::string & doc_id) const;
@ -282,6 +293,8 @@ public:
static constexpr const char* COLLECTION_SYMBOLS_TO_INDEX = "symbols_to_index";
static constexpr const char* COLLECTION_SEPARATORS = "token_separators";
static constexpr const char* REFERENCE_HELPER_FIELD_SUFFIX = "_sequence_id";
// methods
Collection() = delete;
@ -488,6 +501,8 @@ public:
SynonymIndex* get_synonym_index();
spp::sparse_hash_map<std::string, reference_pair> get_reference_fields();
// highlight ops
static void highlight_text(const string& highlight_start_tag, const string& highlight_end_tag,

View File

@ -724,7 +724,7 @@ public:
void do_reference_filtering_with_lock(std::pair<uint32_t, uint32_t*>& reference_index_ids,
filter_node_t* filter_tree_root,
const std::string& reference_field_name) const;
const std::string& reference_helper_field_name) const;
void refresh_schemas(const std::vector<field>& new_fields, const std::vector<field>& del_fields);

View File

@ -100,6 +100,56 @@ Option<doc_seq_id_t> Collection::to_doc(const std::string & json_str, nlohmann::
// for UPSERT, EMPLACE or CREATE, if a document does not have an ID, we will treat it as a new doc
uint32_t seq_id = get_next_seq_id();
document["id"] = std::to_string(seq_id);
// Add reference helper fields in the document.
for (auto const& pair: reference_fields) {
auto field_name = pair.first;
auto optional = get_schema().at(field_name).optional;
if (!optional && document.count(field_name) != 1) {
return Option<doc_seq_id_t>(400, "Missing the required reference field `" + field_name
+ "` in the document.");
} else if (document.count(field_name) != 1) {
continue;
}
auto reference_pair = pair.second;
auto reference_collection_name = reference_pair.collection;
auto reference_field_name = reference_pair.field;
auto& cm = CollectionManager::get_instance();
auto collection = cm.get_collection(reference_collection_name);
if (collection == nullptr) {
return Option<doc_seq_id_t>(400, "Referenced collection `" + reference_collection_name
+ "` not found.");
}
if (collection->get_schema().count(reference_field_name) == 0) {
return Option<doc_seq_id_t>(400, "Referenced field `" + reference_field_name +
"` not found in the collection `" + reference_collection_name + "`.");
}
if (!collection->get_schema().at(reference_field_name).index) {
return Option<doc_seq_id_t>(400, "Referenced field `" + reference_field_name +
"` in the collection `" + reference_collection_name + "` must be indexed.");
}
std::vector<std::pair<size_t, uint32_t*>> documents;
auto value = document[field_name].get<std::string>();
collection->get_filter_ids(reference_field_name + ":=" + value, documents);
if (documents[0].first != 1) {
delete [] documents[0].second;
auto match = " `" + reference_field_name + ": " + value + "` ";
return Option<doc_seq_id_t>(400, documents[0].first < 1 ?
"Referenced document having" + match + "not found in the collection `"
+ reference_collection_name + "`." :
"Multiple documents having" + match + "found in the collection `" +
reference_collection_name + "`.");
}
document[field_name + REFERENCE_HELPER_FIELD_SUFFIX] = *(documents[0].second);
delete [] documents[0].second;
}
return Option<doc_seq_id_t>(doc_seq_id_t{seq_id, true});
} else {
if(!document["id"].is_string()) {
@ -2374,11 +2424,10 @@ Option<bool> Collection::get_reference_filter_ids(const std::string & filter_que
std::shared_lock lock(mutex);
std::string reference_field_name;
for (auto const& field: fields) {
if (!field.reference.empty() &&
field.reference.find(collection_name) == 0 &&
field.reference.find('.') == collection_name.size()) {
reference_field_name = field.name;
for (auto const& pair: reference_fields) {
auto reference_pair = pair.second;
if (reference_pair.collection == collection_name) {
reference_field_name = reference_pair.field;
break;
}
}
@ -2396,7 +2445,8 @@ Option<bool> Collection::get_reference_filter_ids(const std::string & filter_que
return filter_op;
}
reference_field_name += "_sequence_id";
// Reference helper field has the sequence id of other collection's documents.
reference_field_name += REFERENCE_HELPER_FIELD_SUFFIX;
index->do_reference_filtering_with_lock(reference_index_ids, filter_tree_root, reference_field_name);
delete filter_tree_root;
@ -3406,6 +3456,10 @@ SynonymIndex* Collection::get_synonym_index() {
return synonym_index;
}
spp::sparse_hash_map<std::string, reference_pair> Collection::get_reference_fields() {
return reference_fields;
}
Option<bool> Collection::persist_collection_meta() {
// first compact nested fields (to keep only parents of expanded children)
field::compact_nested_fields(nested_fields);
@ -4179,6 +4233,14 @@ Index* Collection::init_index() {
if(field.nested) {
nested_fields.emplace(field.name, field);
}
if(!field.reference.empty()) {
auto dot_index = field.reference.find('.');
auto collection_name = field.reference.substr(0, dot_index);
auto field_name = field.reference.substr(dot_index + 1);
reference_fields.emplace(field.name, reference_pair(collection_name, field_name));
}
}
field::compact_nested_fields(nested_fields);

View File

@ -727,6 +727,15 @@ Option<bool> field::json_field_to_field(bool enable_nested_fields, nlohmann::jso
auto vec_dist = magic_enum::enum_cast<vector_distance_type_t>(field_json[fields::vec_dist].get<std::string>()).value();
if (!field_json[fields::reference].get<std::string>().empty()) {
std::vector<std::string> tokens;
StringUtils::split(field_json[fields::reference].get<std::string>(), tokens, ".");
if (tokens.size() < 2) {
return Option<bool>(400, "Invalid reference `" + field_json[fields::reference].get<std::string>() + "`.");
}
}
the_fields.emplace_back(
field(field_json[fields::name], field_json[fields::type], field_json[fields::facet],
field_json[fields::optional], field_json[fields::index], field_json[fields::locale],
@ -737,8 +746,8 @@ Option<bool> field::json_field_to_field(bool enable_nested_fields, nlohmann::jso
if (!field_json[fields::reference].get<std::string>().empty()) {
the_fields.emplace_back(
field(field_json[fields::name].get<std::string>() + "_sequence_id", "int64", false,
field_json[fields::optional], true)
field(field_json[fields::name].get<std::string>() + Collection::REFERENCE_HELPER_FIELD_SUFFIX,
"int64", false, field_json[fields::optional], true)
);
}

View File

@ -432,50 +432,6 @@ Option<uint32_t> Index::validate_index_in_memory(nlohmann::json& document, uint3
continue;
}
if (!a_field.reference.empty()) {
// Add foo_sequence_id field in the document.
std::vector<std::string> tokens;
StringUtils::split(a_field.reference, tokens, ".");
if (tokens.size() < 2) {
return Option<>(400, "Invalid reference `" + a_field.reference + "`.");
}
auto& cm = CollectionManager::get_instance();
auto collection = cm.get_collection(tokens[0]);
if (collection == nullptr) {
return Option<>(400, "Referenced collection `" + tokens[0] + "` not found.");
}
if (collection->get_schema().count(tokens[1]) == 0) {
return Option<>(400, "Referenced field `" + tokens[1] + "` not found in the collection `"
+ tokens[0] + "`.");
}
auto referenced_field_name = tokens[1];
if (!collection->get_schema().at(referenced_field_name).index) {
return Option<>(400, "Referenced field `" + tokens[1] + "` in the collection `"
+ tokens[0] + "` must be indexed.");
}
std::vector<std::pair<size_t, uint32_t*>> documents;
auto value = document[a_field.name].get<std::string>();
collection->get_filter_ids(referenced_field_name + ":=" + value, documents);
if (documents[0].first != 1) {
delete [] documents[0].second;
auto match = " `" + referenced_field_name + "` = `" + value + "` ";
return Option<>(400, documents[0].first < 1 ?
"Referenced document having" + match + "not found in the collection `" + tokens[0] + "`." :
"Multiple documents having" + match + "found in the collection `" + tokens[0] + "`.");
}
document[a_field.name + "_sequence_id"] = *(documents[0].second);
delete [] documents[0].second;
}
if(document.count(field_name) == 0) {
return Option<>(400, "Field `" + field_name + "` has been declared in the schema, "
"but is not found in the document.");
@ -2186,7 +2142,7 @@ void Index::do_filtering_with_lock(uint32_t*& filter_ids,
void Index::do_reference_filtering_with_lock(std::pair<uint32_t, uint32_t*>& reference_index_ids,
filter_node_t* filter_tree_root,
const std::string& reference_field_name) const {
const std::string& reference_helper_field_name) const {
std::shared_lock lock(mutex);
adaptive_filter(reference_index_ids.second, reference_index_ids.first, filter_tree_root, false);
@ -2196,8 +2152,8 @@ void Index::do_reference_filtering_with_lock(std::pair<uint32_t, uint32_t*>& ref
for (uint32_t i = 0; i < reference_index_ids.first; i++) {
auto filtered_doc_id = reference_index_ids.second[i];
// Extract the sequence_id from the reference field.
vector.push_back(sort_index.at(reference_field_name)->at(filtered_doc_id));
// Extract the sequence id.
vector.push_back(sort_index.at(reference_helper_field_name)->at(filtered_doc_id));
}
std::sort(vector.begin(), vector.end());

View File

@ -78,20 +78,39 @@ TEST_F(CollectionJoinTest, SchemaReferenceField) {
R"({
"name": "Customers",
"fields": [
{"name": "product_id", "type": "string", "reference": "Products.product_id"},
{"name": "product_id", "type": "string", "reference": "foo"},
{"name": "customer_name", "type": "string"},
{"name": "product_price", "type": "float"}
]
})"_json;
collection_create_op = collectionManager.create_collection(schema_json);
ASSERT_TRUE(collection_create_op.ok());
ASSERT_FALSE(collection_create_op.ok());
ASSERT_EQ("Invalid reference `foo`.", collection_create_op.error());
schema_json =
R"({
"name": "Customers",
"fields": [
{"name": "product_id", "type": "string", "reference": "Products.product_id"},
{"name": "customer_name", "type": "string"},
{"name": "product_price", "type": "float"}
]
})"_json;
collection_create_op = collectionManager.create_collection(schema_json);
ASSERT_TRUE(collection_create_op.ok());
auto collection = collection_create_op.get();
auto schema = collection->get_schema();
ASSERT_EQ(schema.at("customer_name").reference, "");
ASSERT_EQ(schema.at("product_id").reference, "Products.product_id");
ASSERT_EQ(schema.count("customer_name"), 1);
ASSERT_TRUE(schema.at("customer_name").reference.empty());
ASSERT_EQ(schema.count("product_id"), 1);
ASSERT_FALSE(schema.at("product_id").reference.empty());
auto reference_fields = collection->get_reference_fields();
ASSERT_EQ(reference_fields.count("product_id"), 1);
ASSERT_EQ(reference_fields.at("product_id").collection, "Products");
ASSERT_EQ(reference_fields.at("product_id").field, "product_id");
// Add a `foo_sequence_id` field in the schema for `foo` reference field.
ASSERT_EQ(schema.count("product_id_sequence_id"), 1);
@ -108,11 +127,12 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
{"name": "customer_id", "type": "string"},
{"name": "customer_name", "type": "string"},
{"name": "product_price", "type": "float"},
{"name": "product_id", "type": "string", "reference": "foo"}
{"name": "reference_id", "type": "string", "reference": "products.product_id"}
]
})"_json;
auto collection_create_op = collectionManager.create_collection(customers_schema_json);
ASSERT_TRUE(collection_create_op.ok());
auto customer_collection = collection_create_op.get();
nlohmann::json customer_json = R"({
"customer_id": "customer_a",
@ -120,27 +140,17 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
"product_price": 143,
"product_id": "a"
})"_json;
auto customer_collection = collection_create_op.get();
auto add_doc_op = customer_collection->add(customer_json.dump());
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Invalid reference `foo`.", add_doc_op.error());
collectionManager.drop_collection("Customers");
ASSERT_EQ("Missing the required reference field `reference_id` in the document.", add_doc_op.error());
customers_schema_json =
R"({
"name": "Customers",
"fields": [
{"name": "customer_id", "type": "string"},
{"name": "customer_name", "type": "string"},
{"name": "product_price", "type": "float"},
{"name": "product_id", "type": "string", "reference": "products.product_id"}
]
})"_json;
collection_create_op = collectionManager.create_collection(customers_schema_json);
ASSERT_TRUE(collection_create_op.ok());
customer_collection = collection_create_op.get();
customer_json = R"({
"customer_id": "customer_a",
"customer_name": "Joe",
"product_price": 143,
"reference_id": "a"
})"_json;
add_doc_op = customer_collection->add(customer_json.dump());
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Referenced collection `products` not found.", add_doc_op.error());
@ -153,7 +163,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
{"name": "customer_id", "type": "string"},
{"name": "customer_name", "type": "string"},
{"name": "product_price", "type": "float"},
{"name": "product_id", "type": "string", "reference": "Products.id"}
{"name": "reference_id", "type": "string", "reference": "Products.id"}
]
})"_json;
collection_create_op = collectionManager.create_collection(customers_schema_json);
@ -184,7 +194,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
{"name": "customer_id", "type": "string"},
{"name": "customer_name", "type": "string"},
{"name": "product_price", "type": "float"},
{"name": "product_id", "type": "string", "reference": "Products.product_id"}
{"name": "reference_id", "type": "string", "reference": "Products.product_id"}
]
})"_json;
collection_create_op = collectionManager.create_collection(customers_schema_json);
@ -209,7 +219,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_TRUE(collection_create_op.ok());
add_doc_op = customer_collection->add(customer_json.dump());
ASSERT_EQ("Referenced document having `product_id` = `a` not found in the collection `Products`.", add_doc_op.error());
ASSERT_EQ("Referenced document having `product_id: a` not found in the collection `Products`.", add_doc_op.error());
std::vector<nlohmann::json> products = {
R"({
@ -231,9 +241,9 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_TRUE(add_op.ok());
}
customer_json["product_id"] = "product_a";
customer_json["reference_id"] = "product_a";
add_doc_op = customer_collection->add(customer_json.dump());
ASSERT_EQ("Multiple documents having `product_id` = `product_a` found in the collection `Products`.", add_doc_op.error());
ASSERT_EQ("Multiple documents having `product_id: product_a` found in the collection `Products`.", add_doc_op.error());
collectionManager.drop_collection("Products");
products[1]["product_id"] = "product_b";
@ -255,6 +265,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
}
ASSERT_TRUE(add_op.ok());
}
collectionManager.drop_collection("Customers");
customers_schema_json =
R"({
@ -263,7 +274,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
{"name": "customer_id", "type": "string"},
{"name": "customer_name", "type": "string"},
{"name": "product_price", "type": "float"},
{"name": "product_id", "type": "string", "reference": "Products.product_id"}
{"name": "reference_id", "type": "string", "reference": "Products.product_id"}
]
})"_json;
collection_create_op = collectionManager.create_collection(customers_schema_json);
@ -272,12 +283,12 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
customer_collection = collection_create_op.get();
add_doc_op = customer_collection->add(customer_json.dump());
ASSERT_TRUE(add_doc_op.ok());
ASSERT_EQ(customer_collection->get("0").get().count("product_id_sequence_id"), 1);
ASSERT_EQ(customer_collection->get("0").get().count("reference_id_sequence_id"), 1);
nlohmann::json document;
// Referenced document's sequence_id must be valid.
auto get_op = collectionManager.get_collection("Products")->get_document_from_store(
customer_collection->get("0").get()["product_id_sequence_id"].get<uint32_t>(),
customer_collection->get("0").get()["reference_id_sequence_id"].get<uint32_t>(),
document);
ASSERT_TRUE(get_op.ok());
ASSERT_EQ(document.count("product_id"), 1);
@ -393,4 +404,4 @@ TEST_F(CollectionJoinTest, FilterByReferenceField) {
ASSERT_EQ(1, result["found"].get<size_t>());
ASSERT_EQ(1, result["hits"].size());
ASSERT_EQ("soap", result["hits"][0]["document"]["product_name"].get<std::string>());
}
}