From c8cd7e04729630c3a920b219c1cca9a50e6e7917 Mon Sep 17 00:00:00 2001 From: Harpreet Sangar Date: Fri, 30 Aug 2024 11:20:05 +0530 Subject: [PATCH] Async reference field. (#1835) * Add `async_reference` property to reference field. * Accept `"async_reference": true` only when `reference` is provided. * Avoid reference lookup during updation if ref helper field is present. * Clear `referenced_in_backlog` on dispose. * Add references asynchronously to singular reference field. Store only the reference field name in the indexes instead of reference helper field name. * Only update async references when a document is indexed successfully. * Remove unused params. * Refactor reference index. * Move `update_async_references` into `Index`. * Refactor `Collection::update_async_references_with_lock`. * Handle async update of reference array field. * Add test case. * Remove `update_async_references` method from `Collection`. * Add `Join` class. --- include/collection.h | 76 ++- include/collection_manager.h | 15 +- include/field.h | 17 +- include/index.h | 19 +- include/join.h | 46 ++ src/collection.cpp | 873 ++++++++----------------------- src/collection_manager.cpp | 62 ++- src/field.cpp | 16 +- src/filter_result_iterator.cpp | 4 +- src/index.cpp | 209 +++++++- src/join.cpp | 688 ++++++++++++++++++++++++ test/collection_join_test.cpp | 747 +++++++++++++++++++++++--- test/collection_manager_test.cpp | 63 ++- 13 files changed, 1992 insertions(+), 843 deletions(-) create mode 100644 include/join.h create mode 100644 src/join.cpp diff --git a/include/collection.h b/include/collection.h index 8ac8bc55..31e08f11 100644 --- a/include/collection.h +++ b/include/collection.h @@ -20,6 +20,7 @@ #include "tokenizer.h" #include "synonym_index.h" #include "vq_model_manager.h" +#include "join.h" struct doc_seq_id_t { uint32_t seq_id; @@ -39,17 +40,6 @@ struct highlight_field_t { } }; -struct reference_pair { - std::string collection; - std::string field; - - reference_pair(std::string collection, std::string field) : collection(std::move(collection)), field(std::move(field)) {} - - bool operator < (const reference_pair& pair) const { - return collection < pair.collection; - } -}; - class Collection { private: @@ -140,14 +130,17 @@ private: SynonymIndex* synonym_index; - /// "field name" -> reference_pair(referenced_collection_name, referenced_field_name) - spp::sparse_hash_map reference_fields; + /// "field name" -> reference_info(referenced_collection_name, referenced_field_name, is_async) + spp::sparse_hash_map reference_fields; /// Contains the info where the current collection is referenced. /// Useful to perform operations such as cascading delete. /// collection_name -> field_name spp::sparse_hash_map referenced_in; + /// "field name" -> List of pairs where this collection is referenced and is marked as `async`. + spp::sparse_hash_map> async_referenced_ins; + /// Reference helper fields that are part of an object. The reference doc of these fields will be included in the /// object rather than in the document. tsl::htrie_set object_reference_helper_fields; @@ -220,7 +213,7 @@ private: bool is_update, std::vector& new_fields, bool enable_nested_fields, - const spp::sparse_hash_map& reference_fields, + const spp::sparse_hash_map& reference_fields, tsl::htrie_set& object_reference_helper_fields); static bool facet_count_compare(const facet_count_t& a, const facet_count_t& b) { @@ -331,13 +324,6 @@ private: Option get_referenced_in_field(const std::string& collection_name) const; - Option get_related_ids(const std::string& ref_field_name, const uint32_t& seq_id, - std::vector& result) const; - - Option get_object_array_related_id(const std::string& ref_field_name, - const uint32_t& seq_id, const uint32_t& object_index, - uint32_t& result) const; - void remove_embedding_field(const std::string& field_name); Option parse_and_validate_vector_query(const std::string& vector_query_str, @@ -375,6 +361,9 @@ public: static constexpr const char* COLLECTION_METADATA = "metadata"; + /// Value used when async_reference is true and a reference doc is not found. + static constexpr int64_t reference_helper_sentinel_value = UINT32_MAX; + // methods Collection() = delete; @@ -386,7 +375,9 @@ public: const std::vector& symbols_to_index, const std::vector& token_separators, const bool enable_nested_fields, std::shared_ptr vq_model = nullptr, spp::sparse_hash_map referenced_in = spp::sparse_hash_map(), - const nlohmann::json& metadata = {}); + const nlohmann::json& metadata = {}, + spp::sparse_hash_map> async_referenced_ins = + spp::sparse_hash_map>()); ~Collection(); @@ -434,11 +425,6 @@ public: void update_metadata(const nlohmann::json& meta); - static Option add_reference_helper_fields(nlohmann::json& document, const tsl::htrie_map& schema, - const spp::sparse_hash_map& reference_fields, - tsl::htrie_set& object_reference_helper_fields, - const bool& is_update); - Option to_doc(const std::string& json_str, nlohmann::json& document, const index_operation_t& operation, const DIRTY_VALUES dirty_values, @@ -458,18 +444,6 @@ public: static void remove_reference_helper_fields(nlohmann::json& document); - static Option prune_ref_doc(nlohmann::json& doc, - const reference_filter_result_t& references, - const tsl::htrie_set& ref_include_fields_full, - const tsl::htrie_set& ref_exclude_fields_full, - const bool& is_reference_array, - const ref_include_exclude_fields& ref_include_exclude); - - static Option include_references(nlohmann::json& doc, const uint32_t& seq_id, Collection *const collection, - const std::map& reference_filter_results, - const std::vector& ref_include_exclude_fields_vec, - const nlohmann::json& original_doc); - Option prune_doc_with_lock(nlohmann::json& doc, const tsl::htrie_set& include_names, const tsl::htrie_set& exclude_names, const std::map& reference_filter_results = {}, @@ -617,7 +591,7 @@ public: Option get(const std::string & id) const; - void cascade_remove_docs(const std::string& ref_helper_field_name, const uint32_t& ref_seq_id, + void cascade_remove_docs(const std::string& field_name, const uint32_t& ref_seq_id, const nlohmann::json& ref_doc, bool remove_from_store = true); Option remove(const std::string & id, bool remove_from_store = true); @@ -666,7 +640,9 @@ public: SynonymIndex* get_synonym_index(); - spp::sparse_hash_map get_reference_fields(); + spp::sparse_hash_map get_reference_fields(); + + spp::sparse_hash_map> get_async_referenced_ins(); // highlight ops @@ -710,17 +686,20 @@ public: bool is_referenced_in(const std::string& collection_name) const; - void add_referenced_in(const reference_pair& pair); + void add_referenced_ins(const std::set& ref_infos); - void add_referenced_ins(const std::set& pairs); - - void add_referenced_in(const std::string& collection_name, const std::string& field_name); + void add_referenced_in(const std::string& collection_name, const std::string& field_name, + const bool& is_async, const std::string& referenced_field_name); Option get_referenced_in_field_with_lock(const std::string& collection_name) const; Option get_related_ids_with_lock(const std::string& field_name, const uint32_t& seq_id, std::vector& result) const; + Option update_async_references_with_lock(const std::string& ref_coll_name, const std::string& filter, + const std::set& filter_values, + const uint32_t ref_seq_id, const std::string& field_name); + Option get_sort_index_value_with_lock(const std::string& field_name, const uint32_t& seq_id) const; static void hide_credential(nlohmann::json& json, const std::string& credential_name); @@ -732,6 +711,13 @@ public: void expand_search_query(const std::string& raw_query, size_t offset, size_t total, const search_args* search_params, const std::vector>& result_group_kvs, const std::vector& raw_search_fields, std::string& first_q) const; + + Option get_object_array_related_id(const std::string& ref_field_name, + const uint32_t& seq_id, const uint32_t& object_index, + uint32_t& result) const; + + Option get_related_ids(const std::string& ref_field_name, const uint32_t& seq_id, + std::vector& result) const; }; template diff --git a/include/collection_manager.h b/include/collection_manager.h index 4d988dbf..a5d3d547 100644 --- a/include/collection_manager.h +++ b/include/collection_manager.h @@ -79,7 +79,7 @@ private: std::atomic* quit; // All the references to a particular collection are stored until it is created. - std::map> referenced_in_backlog; + std::map> referenced_in_backlog; CollectionManager(); @@ -125,13 +125,15 @@ public: const uint32_t collection_next_seq_id, Store* store, float max_memory_ratio, - spp::sparse_hash_map& referenced_in); + spp::sparse_hash_map& referenced_in, + spp::sparse_hash_map>& async_referenced_ins); static Option load_collection(const nlohmann::json& collection_meta, const size_t batch_size, const StoreStatus& next_coll_id_status, const std::atomic& quit, - spp::sparse_hash_map& referenced_in); + spp::sparse_hash_map& referenced_in, + spp::sparse_hash_map>& async_referenced_ins); Option clone_collection(const std::string& existing_name, const nlohmann::json& req_json); @@ -234,14 +236,15 @@ public: std::vector& exclude_fields_vec, std::vector& ref_include_exclude_fields_vec); - void add_referenced_in_backlog(const std::string& collection_name, reference_pair&& pair); + void add_referenced_in_backlog(const std::string& collection_name, reference_info_t&& ref_info); - std::map> _get_referenced_in_backlog() const; + std::map> _get_referenced_in_backlog() const; void process_embedding_field_delete(const std::string& model_name); static void _populate_referenced_ins(const std::string& collection_meta_json, - std::map>& referenced_ins); + std::map>& referenced_ins, + std::map>>& async_referenced_ins); std::unordered_set get_collection_references(const std::string& coll_name); diff --git a/include/field.h b/include/field.h index b98a0309..5e58b216 100644 --- a/include/field.h +++ b/include/field.h @@ -60,6 +60,7 @@ namespace fields { static const std::string num_dim = "num_dim"; static const std::string vec_dist = "vec_dist"; static const std::string reference = "reference"; + static const std::string async_reference = "async_reference"; static const std::string embed = "embed"; static const std::string from = "from"; static const std::string model_name = "model_name"; @@ -86,6 +87,14 @@ enum vector_distance_type_t { cosine }; +struct reference_pair_t { + std::string collection; + std::string field; + + reference_pair_t(std::string collection, std::string field) : collection(std::move(collection)), + field(std::move(field)) {} +}; + struct field { std::string name; std::string type; @@ -112,6 +121,7 @@ struct field { static constexpr int VAL_UNKNOWN = 2; std::string reference; // Foo.bar (reference to bar field in Foo collection). + bool is_async_reference = false; bool range_index; @@ -127,10 +137,13 @@ struct field { field(const std::string &name, const std::string &type, const bool facet, const bool optional = false, bool index = true, std::string locale = "", int sort = -1, int infix = -1, bool nested = false, int nested_array = 0, size_t num_dim = 0, vector_distance_type_t vec_dist = cosine, - std::string reference = "", const nlohmann::json& embed = nlohmann::json(), const bool range_index = false, const bool store = true, const bool stem = false, const nlohmann::json hnsw_params = nlohmann::json()) : + std::string reference = "", const nlohmann::json& embed = nlohmann::json(), const bool range_index = false, + const bool store = true, const bool stem = false, const nlohmann::json hnsw_params = nlohmann::json(), + const bool async_reference = false) : name(name), type(type), facet(facet), optional(optional), index(index), locale(locale), nested(nested), nested_array(nested_array), num_dim(num_dim), vec_dist(vec_dist), reference(reference), - embed(embed), range_index(range_index), store(store), stem(stem), hnsw_params(hnsw_params) { + embed(embed), range_index(range_index), store(store), stem(stem), hnsw_params(hnsw_params), + is_async_reference(async_reference) { set_computed_defaults(sort, infix); diff --git a/include/index.h b/include/index.h index c18a0127..522e0e7d 100644 --- a/include/index.h +++ b/include/index.h @@ -569,6 +569,10 @@ private: const index_record* record, const std::vector& embedding_results, size_t& count, const field& the_field); + + void update_async_references(const std::string& collection_name, const field& afield, + std::vector& iter_batch, + const std::vector& async_referenced_ins = {}); public: // for limiting number of results on multiple candidates / query rewrites enum {TYPO_TOKENS_THRESHOLD = 1}; @@ -740,9 +744,14 @@ public: const size_t remote_embedding_timeout_ms = 60000, const size_t remote_embedding_num_tries = 2, const bool generate_embeddings = true, const bool use_addition_fields = false, - const tsl::htrie_map& addition_fields = tsl::htrie_map()); + const tsl::htrie_map& addition_fields = tsl::htrie_map(), + const std::string& collection_name = "", + const spp::sparse_hash_map>& async_referenced_ins = + spp::sparse_hash_map>()); - void index_field_in_memory(const field& afield, std::vector& iter_batch); + void index_field_in_memory(const std::string& collection_name, const field& afield, + std::vector& iter_batch, + const std::vector& async_referenced_ins = {}); template void iterate_and_index_numerical_field(std::vector& iter_batch, const field& afield, T func); @@ -762,9 +771,9 @@ public: Option do_reference_filtering_with_lock(filter_node_t* const filter_tree_root, filter_result_t& filter_result, const std::string& ref_collection_name, - const std::string& reference_helper_field_name) const; + const std::string& field_name) const; - Option do_filtering_with_reference_ids(const std::string& reference_helper_field_name, + Option do_filtering_with_reference_ids(const std::string& field_name, const std::string& ref_collection_name, filter_result_t&& ref_filter_result) const; @@ -1037,7 +1046,7 @@ public: std::vector& outside_seq_ids); Option get_related_ids(const std::string& collection_name, - const std::string& reference_helper_field_name, + const std::string& field_name, const uint32_t& seq_id, std::vector& result) const; Option get_object_array_related_id(const std::string& collection_name, diff --git a/include/join.h b/include/join.h new file mode 100644 index 00000000..5805a282 --- /dev/null +++ b/include/join.h @@ -0,0 +1,46 @@ +#pragma once + +#include "option.h" +#include "json.hpp" +#include "tsl/htrie_map.h" +#include "field.h" +#include "tsl/htrie_set.h" +#include "filter_result_iterator.h" + +struct reference_info_t { + std::string collection; + std::string field; + bool is_async; + + std::string referenced_field_name; + + reference_info_t(std::string collection, std::string field, bool is_async, std::string referenced_field_name = "") : + collection(std::move(collection)), field(std::move(field)), is_async(is_async), + referenced_field_name(std::move(referenced_field_name)) {} + + bool operator < (const reference_info_t& pair) const { + return collection < pair.collection; + } +}; + +class Join { +public: + + static Option add_reference_helper_fields(nlohmann::json& document, + const tsl::htrie_map& schema, + const spp::sparse_hash_map& reference_fields, + tsl::htrie_set& object_reference_helper_fields, + const bool& is_update); + + static Option prune_ref_doc(nlohmann::json& doc, + const reference_filter_result_t& references, + const tsl::htrie_set& ref_include_fields_full, + const tsl::htrie_set& ref_exclude_fields_full, + const bool& is_reference_array, + const ref_include_exclude_fields& ref_include_exclude); + + static Option include_references(nlohmann::json& doc, const uint32_t& seq_id, Collection *const collection, + const std::map& reference_filter_results, + const std::vector& ref_include_exclude_fields_vec, + const nlohmann::json& original_doc); +}; \ No newline at end of file diff --git a/src/collection.cpp b/src/collection.cpp index 570a9fb8..6a657d93 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -24,6 +24,7 @@ #include "conversation_manager.h" #include "conversation_model_manager.h" #include "field.h" +#include "join.h" const std::string override_t::MATCH_EXACT = "exact"; const std::string override_t::MATCH_CONTAINS = "contains"; @@ -54,7 +55,8 @@ Collection::Collection(const std::string& name, const uint32_t collection_id, co const std::vector& token_separators, const bool enable_nested_fields, std::shared_ptr vq_model, spp::sparse_hash_map referenced_in, - const nlohmann::json& metadata) : + const nlohmann::json& metadata, + spp::sparse_hash_map> async_referenced_ins) : name(name), collection_id(collection_id), created_at(created_at), next_seq_id(next_seq_id), store(store), fields(fields), default_sorting_field(default_sorting_field), enable_nested_fields(enable_nested_fields), @@ -63,7 +65,7 @@ Collection::Collection(const std::string& name, const uint32_t collection_id, co symbols_to_index(to_char_array(symbols_to_index)), token_separators(to_char_array(token_separators)), index(init_index()), vq_model(vq_model), referenced_in(std::move(referenced_in)), - metadata(metadata) { + metadata(metadata), async_referenced_ins(std::move(async_referenced_ins)) { if (vq_model) { vq_model->inc_collection_ref_count(); @@ -92,317 +94,140 @@ uint32_t Collection::get_next_seq_id() { return next_seq_id++; } -Option single_value_filter_query(nlohmann::json& document, const std::string& field_name, - const std::string& ref_field_type, std::string& filter_query) { - auto const& value = document[field_name]; - - if (value.is_null()) { - return Option(422, "Field has `null` value."); - } - - if (value.is_string() && ref_field_type == field_types::STRING) { - filter_query[filter_query.size() - 1] = '='; - filter_query += (" " + value.get()); - } else if (value.is_number_integer() && (ref_field_type == field_types::INT64 || - (ref_field_type == field_types::INT32 && - StringUtils::is_int32_t(std::to_string(value.get()))))) { - filter_query += std::to_string(value.get()); - } else { - return Option(400, "Field `" + field_name + "` must have `" + ref_field_type + "` value."); - } - - return Option(true); +inline std::string get_field_value(const nlohmann::json& doc, const std::string& field_name) { + return doc[field_name].is_number_integer() ? + std::to_string(doc[field_name].get()) : + doc[field_name].is_string() ? + doc[field_name].get() : + doc[field_name].dump(); } -Option Collection::add_reference_helper_fields(nlohmann::json& document, const tsl::htrie_map& schema, - const spp::sparse_hash_map& reference_fields, - tsl::htrie_set& object_reference_helper_fields, - const bool& is_update) { - tsl::htrie_set flat_fields; - if (!reference_fields.empty() && document.contains(".flat")) { - for (const auto &item: document[".flat"].get>()) { - flat_fields.insert(item); +inline std::string get_array_field_value(const nlohmann::json& doc, const std::string& field_name, const size_t& index) { + return doc[field_name][index].is_number_integer() ? + std::to_string(doc[field_name][index].get()) : + doc[field_name][index].is_string() ? + doc[field_name][index].get() : + doc[field_name][index].dump(); +} + +Option Collection::update_async_references_with_lock(const std::string& ref_coll_name, const std::string& filter, + const std::set& filter_values, + const uint32_t ref_seq_id, const std::string& field_name) { + // Update reference helper field of the docs matching the filter. + filter_result_t filter_result; + get_filter_ids(filter, filter_result); + + if (filter_result.count == 0) { + return Option(true); + } + + field field; + { + std::shared_lock lock(mutex); + + auto it = search_schema.find(field_name); + if (it == search_schema.end()) { + return Option(400, "Could not find field `" + field_name + "` in the schema."); + } + field = it.value(); + } + + std::vector buffer; + buffer.reserve(filter_result.count); + + for (uint32_t i = 0; i < filter_result.count; i++) { + auto const& seq_id = filter_result.docs[i]; + + nlohmann::json existing_document; + auto get_doc_op = get_document_from_store(get_seq_id_key(seq_id), existing_document); + if (!get_doc_op.ok()) { + if (get_doc_op.code() == 404) { + LOG(ERROR) << "`" << name << "` collection: Sequence ID `" << seq_id << "` exists, but document is missing."; + continue; + } + + LOG(ERROR) << "`" << name << "` collection: " << get_doc_op.error(); + continue; + } + auto const id = existing_document["id"].get(); + auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; + + if (field.is_singular()) { + // Referenced value must be unique. + if (existing_document.contains(reference_helper_field_name) && + existing_document[reference_helper_field_name].is_number_integer()) { + const int64_t existing_ref_seq_id = existing_document[reference_helper_field_name].get(); + if (existing_ref_seq_id != Collection::reference_helper_sentinel_value && + existing_ref_seq_id != ref_seq_id) { + return Option(400, "Document `id: " + id + "` already has a reference to document `" += + std::to_string(existing_ref_seq_id) + "` of `" += ref_coll_name + + "` collection, having reference value `" += + get_field_value(existing_document, field_name) + "`."); + } else if (existing_ref_seq_id == ref_seq_id) { + continue; + } + } + + // Set reference helper field of all the docs that matched filter to `ref_seq_id`. + nlohmann::json update_document; + update_document["id"] = id; + update_document[field_name] = existing_document[field_name]; + update_document[reference_helper_field_name] = ref_seq_id; + + buffer.push_back(update_document.dump()); + } else { + if (!existing_document.contains(field_name) || !existing_document[field_name].is_array()) { + return Option(400, "Expected document `id: " + id + "` to have `" += field_name + "` array field " + "that is `" += get_field_value(existing_document, field_name) + "` instead."); + } else if (!existing_document.contains(reference_helper_field_name) || + !existing_document[reference_helper_field_name].is_array()) { + return Option(400, "Expected document `id: " + id + "` to have `" += reference_helper_field_name + + "` array field that is `" += get_field_value(existing_document, field_name) + + "` instead."); + } else if (existing_document[field_name].size() != existing_document[reference_helper_field_name].size()) { + return Option(400, "Expected document `id: " + id + "` to have equal count of elements in `" += + field_name + ": " += get_field_value(existing_document, field_name) + + "` field and `" += reference_helper_field_name + ": " += + get_field_value(existing_document, reference_helper_field_name) + "` field."); + } + + nlohmann::json update_document; + update_document["id"] = id; + update_document[field_name] = existing_document[field_name]; + update_document[reference_helper_field_name] = existing_document[reference_helper_field_name]; + + auto should_update = false; + for (uint32_t j = 0; j < existing_document[field_name].size(); j++) { + auto const& ref_value = get_array_field_value(existing_document, field_name, j); + if (filter_values.count(ref_value) == 0) { + continue; + } + + const int64_t existing_ref_seq_id = existing_document[reference_helper_field_name][j].get(); + if (existing_ref_seq_id != Collection::reference_helper_sentinel_value && + existing_ref_seq_id != ref_seq_id) { + return Option(400, "Document `id: " + id + "` at `" += field_name + + "` reference array field and index `" + std::to_string(j) + + "` already has a reference to document `" += std::to_string(existing_ref_seq_id) + + "` of `" += ref_coll_name + "` collection, having reference value `" += + get_array_field_value(existing_document, field_name, j) + "`."); + } else if (existing_ref_seq_id == ref_seq_id) { + continue; + } + + should_update = true; + // Set reference helper field to `ref_seq_id` at the index corresponding to where reference field has value. + update_document[reference_helper_field_name][j] = ref_seq_id; + } + + if (should_update) { + buffer.push_back(update_document.dump()); + } } } - // Add reference helper fields in the document. - for (auto const& pair: reference_fields) { - auto field_name = pair.first; - auto const reference_helper_field = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; - - auto const& field = schema.at(field_name); - auto const& optional = field.optional; - // Strict checking for presence of non-optional reference field during indexing operation. - auto is_required = !is_update && !optional; - if (is_required && document.count(field_name) != 1) { - return Option(400, "Missing the required reference field `" + field_name - + "` in the document."); - } else if (document.count(field_name) != 1) { - if (is_update) { - document[fields::reference_helper_fields] += reference_helper_field; - } - continue; - } - - auto reference_pair = pair.second; - auto reference_collection_name = reference_pair.collection; - auto reference_field_name = reference_pair.field; - auto& cm = CollectionManager::get_instance(); - auto ref_collection = cm.get_collection(reference_collection_name); - if (ref_collection == nullptr) { - return Option(400, "Referenced collection `" + reference_collection_name - + "` not found."); - } - - bool is_object_reference_field = flat_fields.count(field_name) != 0; - std::string object_key; - bool is_object_array = false; - if (is_object_reference_field) { - object_reference_helper_fields.insert(reference_helper_field); - - std::vector tokens; - StringUtils::split(field_name, tokens, "."); - if (schema.count(tokens[0]) == 0) { - return Option(400, "Could not find `" + tokens[0] + "` object/object[] field in the schema."); - } - object_key = tokens[0]; - is_object_array = schema.at(object_key).is_array(); - } - - if (reference_field_name == "id") { - auto id_field_type_error_op = Option(400, "Field `" + field_name + "` must have string value."); - if (is_object_array) { - if (!document[field_name].is_array()) { - return Option(400, "Expected `" + field_name + "` to be an array."); - } - - document[reference_helper_field] = nlohmann::json::array(); - document[fields::reference_helper_fields] += reference_helper_field; - - std::vector keys; - StringUtils::split(field_name, keys, "."); - auto const& object_array = document[keys[0]]; - - for (uint32_t i = 0; i < object_array.size(); i++) { - if (optional && object_array[i].count(keys[1]) == 0) { - continue; - } else if (object_array[i].count(keys[1]) == 0) { - return Option(400, "Object at index `" + std::to_string(i) + "` is missing `" + field_name + "`."); - } else if (!object_array[i].at(keys[1]).is_string()) { - return id_field_type_error_op; - } - - auto id = object_array[i].at(keys[1]).get(); - auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id); - if (!ref_doc_id_op.ok()) { - return Option(400, "Referenced document having `id: " + id + - "` not found in the collection `" + - reference_collection_name + "`." ); - } - - // Adding the index of the object along with referenced doc id to account for the scenario where a - // reference field of an object array might be optional and missing. - document[reference_helper_field] += nlohmann::json::array({i, ref_doc_id_op.get()}); - } - } else if (document[field_name].is_array()) { - document[reference_helper_field] = nlohmann::json::array(); - document[fields::reference_helper_fields] += reference_helper_field; - - for (const auto &item: document[field_name].items()) { - if (optional && item.value().is_null()) { - continue; - } else if (!item.value().is_string()) { - return id_field_type_error_op; - } - - auto id = item.value().get(); - auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id); - if (!ref_doc_id_op.ok()) { - return Option(400, "Referenced document having `id: " + id + - "` not found in the collection `" + - reference_collection_name + "`." ); - } - - document[reference_helper_field] += ref_doc_id_op.get(); - } - } else if (document[field_name].is_string()) { - auto id = document[field_name].get(); - auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id); - if (!ref_doc_id_op.ok()) { - return Option(400, "Referenced document having `id: " + id + - "` not found in the collection `" + - reference_collection_name + "`." ); - } - - document[reference_helper_field] = ref_doc_id_op.get(); - document[fields::reference_helper_fields] += reference_helper_field; - } else if (optional && document[field_name].is_null()) { - // Reference helper field should also be removed along with reference field. - if (is_update) { - document[reference_helper_field] = nullptr; - } - continue; - } else { - return id_field_type_error_op; - } - - continue; - } - - if (ref_collection->get_schema().count(reference_field_name) == 0) { - return Option(400, "Referenced field `" + reference_field_name + - "` not found in the collection `" + reference_collection_name + "`."); - } - - auto const ref_field = ref_collection->get_schema().at(reference_field_name); - if (!ref_field.index) { - return Option(400, "Referenced field `" + reference_field_name + - "` in the collection `" + reference_collection_name + "` must be indexed."); - } - - // Create filter query from the value(s) in the reference field and get the reference doc id(s). - std::string filter_query = reference_field_name + ": "; - std::string ref_field_type = ref_field.is_string() ? field_types::STRING : - ref_field.is_int32() ? field_types::INT32 : - ref_field.is_int64() ? field_types::INT64 : field_types::NIL; - - if (ref_field_type == field_types::NIL) { - return Option(400, "Cannot add a reference to `" + reference_collection_name + "." + reference_field_name + - "` of type `" + ref_field.type + "`."); - } - - if (is_object_array) { - if (!document[field_name].is_array()) { - return Option(400, "Expected `" + field_name + "` to be an array."); - } - - document[reference_helper_field] = nlohmann::json::array(); - document[fields::reference_helper_fields] += reference_helper_field; - nlohmann::json temp_doc; // To store singular values of `field_name` field. - - std::vector keys; - StringUtils::split(field_name, keys, "."); - auto const& object_array = document[keys[0]]; - - for (uint32_t i = 0; i < object_array.size(); i++) { - if (optional && object_array[i].count(keys[1]) == 0) { - continue; - } else if (object_array[i].count(keys[1]) == 0) { - return Option(400, "Object at index `" + std::to_string(i) + "` is missing `" + field_name + "`."); - } - - temp_doc[field_name] = object_array[i].at(keys[1]); - auto single_value_filter_query_op = single_value_filter_query(temp_doc, field_name, ref_field_type, - filter_query); - if (!single_value_filter_query_op.ok()) { - if (single_value_filter_query_op.code() == 422) { - continue; - } - return single_value_filter_query_op; - } - - filter_result_t filter_result; - auto filter_ids_op = ref_collection->get_filter_ids(filter_query, filter_result); - if (!filter_ids_op.ok()) { - return filter_ids_op; - } - - if (filter_result.count != 1) { - // Constraints similar to foreign key apply here. The reference match must be unique and not null. - return Option(400, filter_result.count < 1 ? - "Reference document having `" + filter_query + "` not found in the collection `" - + reference_collection_name + "`." : - "Multiple documents having `" + filter_query + "` found in the collection `" + - reference_collection_name + "`."); - } - - // Adding the index of the object along with referenced doc id to account for the scenario where a - // reference field of an object array might be optional and missing. - document[reference_helper_field] += nlohmann::json::array({i, filter_result.docs[0]}); - filter_query = reference_field_name + ": "; - } - continue; - } - - if (document[field_name].is_array()) { - if (ref_field_type == field_types::STRING) { - filter_query[filter_query.size() - 1] = '='; - filter_query += " ["; - } else { - filter_query += "["; - } - bool filter_values_added = false; - for (const auto &item: document[field_name].items()) { - auto const& item_value = item.value(); - if (item_value.is_string() && ref_field_type == field_types::STRING) { - filter_query += item_value.get(); - filter_values_added = true; - } else if (item_value.is_number_integer() && (ref_field_type == field_types::INT64 || - (ref_field_type == field_types::INT32 && - StringUtils::is_int32_t(std::to_string(item_value.get()))))) { - filter_query += std::to_string(item_value.get()); - filter_values_added = true; - } else if (optional && item_value.is_null()) { - continue; - } else { - return Option(400, "Field `" + field_name + "` must only have `" + ref_field_type + "` values."); - } - filter_query += ","; - } - if (!filter_values_added) { - document[reference_helper_field] = nlohmann::json::array(); - document[fields::reference_helper_fields] += reference_helper_field; - - continue; - } - filter_query[filter_query.size() - 1] = ']'; - } else if (field.is_array() && document[field_name].is_null()) { - document[reference_helper_field] = nlohmann::json::array(); - document[fields::reference_helper_fields] += reference_helper_field; - - continue; - } else { - auto single_value_filter_query_op = single_value_filter_query(document, field_name, ref_field_type, - filter_query); - if (!single_value_filter_query_op.ok()) { - if (optional && single_value_filter_query_op.code() == 422) { - // Reference helper field should also be removed along with reference field. - if (is_update) { - document[reference_helper_field] = nullptr; - } - continue; - } - return single_value_filter_query_op; - } - } - - filter_result_t filter_result; - auto filter_ids_op = ref_collection->get_filter_ids(filter_query, filter_result); - if (!filter_ids_op.ok()) { - return filter_ids_op; - } - - if (document[field_name].is_array()) { - document[reference_helper_field] = nlohmann::json::array(); - document[fields::reference_helper_fields] += reference_helper_field; - - for (uint32_t i = 0; i < filter_result.count; i++) { - document[reference_helper_field] += filter_result.docs[i]; - } - } else { - if (filter_result.count != 1) { - // Constraints similar to foreign key apply here. The reference match must be unique and not null. - return Option(400, filter_result.count < 1 ? - "Reference document having `" + filter_query + "` not found in the collection `" - + reference_collection_name + "`." : - "Multiple documents having `" + filter_query + "` found in the collection `" + - reference_collection_name + "`."); - } - - document[reference_helper_field] = filter_result.docs[0]; - document[fields::reference_helper_fields] += reference_helper_field; - } - } + nlohmann::json dummy; + add_many(buffer, dummy, index_operation_t::UPDATE); return Option(true); } @@ -644,10 +469,12 @@ nlohmann::json Collection::add_many(std::vector& json_lines, nlohma batch_doc_ids.insert(doc_id); + std::unique_lock lock(mutex); + // if `fallback_field_type` or `dynamic_fields` is enabled, update schema first before indexing - if(!fallback_field_type.empty() || !dynamic_fields.empty() || !nested_fields.empty() || !reference_fields.empty()) { + if(!fallback_field_type.empty() || !dynamic_fields.empty() || !nested_fields.empty() || + !reference_fields.empty() || !async_referenced_ins.empty()) { std::vector new_fields; - std::unique_lock lock(mutex); Option new_fields_op = detect_new_fields(record.doc, dirty_values, search_schema, dynamic_fields, @@ -701,6 +528,7 @@ nlohmann::json Collection::add_many(std::vector& json_lines, nlohma remove_flat_fields(document); remove_reference_helper_fields(document); } + index_records.clear(); batch_doc_ids.clear(); } @@ -935,7 +763,9 @@ size_t Collection::batch_index_in_memory(std::vector& index_record std::unique_lock lock(mutex); size_t num_indexed = Index::batch_memory_index(index, index_records, default_sorting_field, search_schema, embedding_fields, fallback_field_type, - token_separators, symbols_to_index, true, remote_embedding_batch_size, remote_embedding_timeout_ms, remote_embedding_num_tries, generate_embeddings); + token_separators, symbols_to_index, true, remote_embedding_batch_size, + remote_embedding_timeout_ms, remote_embedding_num_tries,generate_embeddings, + false, tsl::htrie_map(), name, async_referenced_ins); num_documents += num_indexed; return num_indexed; } @@ -4560,9 +4390,9 @@ void Collection::remove_document(nlohmann::json & document, const uint32_t seq_i } } -void Collection::cascade_remove_docs(const std::string& ref_helper_field_name, const uint32_t& ref_seq_id, +void Collection::cascade_remove_docs(const std::string& field_name, const uint32_t& ref_seq_id, const nlohmann::json& ref_doc, bool remove_from_store) { - auto field_name = ref_helper_field_name.substr(0, ref_helper_field_name.size() - fields::REFERENCE_HELPER_FIELD_SUFFIX.size()); + auto const ref_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; filter_result_t filter_result; get_filter_ids(ref_helper_field_name + ":" + std::to_string(ref_seq_id), filter_result); @@ -4583,6 +4413,9 @@ void Collection::cascade_remove_docs(const std::string& ref_helper_field_name, c is_field_optional = it.value().optional; } + std::vector buffer; + buffer.reserve(filter_result.count); + if (is_field_singular) { // Delete all the docs where reference helper field has value `seq_id`. for (uint32_t i = 0; i < filter_result.count; i++) { @@ -4613,7 +4446,7 @@ void Collection::cascade_remove_docs(const std::string& ref_helper_field_name, c update_document["id"] = id; update_document[field_name] = nullptr; - add(update_document.dump(), index_operation_t::UPDATE, id); + buffer.push_back(update_document.dump()); } else { remove_document(existing_document, seq_id, remove_from_store); } @@ -4641,9 +4474,6 @@ void Collection::cascade_remove_docs(const std::string& ref_helper_field_name, c return; } - std::vector buffer; - buffer.reserve(filter_result.count); - // Delete all references to `seq_id` in the docs. for (uint32_t i = 0; i < filter_result.count; i++) { auto const& seq_id = filter_result.docs[i]; @@ -4676,6 +4506,16 @@ void Collection::cascade_remove_docs(const std::string& ref_helper_field_name, c LOG(ERROR) << "`" << name << "` collection doc `" << existing_document.dump() << "` at field `" << field_name << "` elements do not match the type of `" << ref_coll_name << "` collection doc `"<< ref_doc.dump() << "` at field `" << ref_field_name << "`."; + } else if (existing_document.count(ref_helper_field_name) == 0) { + LOG(ERROR) << "`" << name << "` collection doc `" << existing_document.dump() << "` is missing `" << + ref_helper_field_name << "` field."; + } else if (!existing_document.at(ref_helper_field_name).is_array()) { + LOG(ERROR) << "`" << name << "` collection doc `" << existing_document.dump() << "` field `" << + ref_helper_field_name << "` is not an array."; + } else if (existing_document[field_name].size() != existing_document[ref_helper_field_name].size()) { + LOG(ERROR) << "`" << name << "` collection doc `" << existing_document.dump() << "` reference field `" << + field_name << "` values and its reference helper field `" << ref_helper_field_name << + "` values differ in count."; } // If there are more than one references present in this document, we cannot delete the whole doc. Only remove // `ref_seq_id` from reference helper field. @@ -4685,13 +4525,18 @@ void Collection::cascade_remove_docs(const std::string& ref_helper_field_name, c update_document[field_name] = nlohmann::json::array(); auto removed_ref_value_found = false; - for (const auto& ref_value: existing_document.at(field_name)) { + + // We assume here that the value in reference field at a particular index corresponds to the value + // present at the same index in the reference helper field. + for (uint32_t j = 0; j < existing_document[field_name].size(); j++) { + auto const& ref_value = existing_document[field_name][j]; if (ref_value == ref_doc.at(ref_field_name)) { removed_ref_value_found = true; continue; } update_document[field_name] += ref_value; + update_document[ref_helper_field_name] += existing_document[ref_helper_field_name][j]; } if (removed_ref_value_found) { @@ -4717,10 +4562,10 @@ void Collection::cascade_remove_docs(const std::string& ref_helper_field_name, c remove_document(existing_document, seq_id, remove_from_store); } } - - nlohmann::json dummy; - add_many(buffer, dummy, index_operation_t::UPDATE); } + + nlohmann::json dummy; + add_many(buffer, dummy, index_operation_t::UPDATE); } Option Collection::remove(const std::string & id, const bool remove_from_store) { @@ -5143,11 +4988,16 @@ SynonymIndex* Collection::get_synonym_index() { return synonym_index; } -spp::sparse_hash_map Collection::get_reference_fields() { +spp::sparse_hash_map Collection::get_reference_fields() { std::shared_lock lock(mutex); return reference_fields; } +spp::sparse_hash_map> Collection::get_async_referenced_ins() { + std::shared_lock lock(mutex); + return async_referenced_ins; +}; + Option Collection::persist_collection_meta() { // first compact nested fields (to keep only parents of expanded children) field::compact_nested_fields(nested_fields); @@ -5458,329 +5308,6 @@ void Collection::remove_reference_helper_fields(nlohmann::json& document) { } } -Option Collection::prune_ref_doc(nlohmann::json& doc, - const reference_filter_result_t& references, - const tsl::htrie_set& ref_include_fields_full, - const tsl::htrie_set& ref_exclude_fields_full, - const bool& is_reference_array, - const ref_include_exclude_fields& ref_include_exclude) { - nlohmann::json original_doc; - if (!ref_include_exclude.nested_join_includes.empty()) { - original_doc = doc; - } - - auto const& ref_collection_name = ref_include_exclude.collection_name; - auto& cm = CollectionManager::get_instance(); - auto ref_collection = cm.get_collection(ref_collection_name); - if (ref_collection == nullptr) { - return Option(400, "Referenced collection `" + ref_collection_name + "` in `include_fields` not found."); - } - - auto const& alias = ref_include_exclude.alias; - auto const& strategy = ref_include_exclude.strategy; - auto error_prefix = "Referenced collection `" + ref_collection_name + "`: "; - - // One-to-one relation. - if (strategy != ref_include::nest_array && !is_reference_array && references.count == 1) { - auto ref_doc_seq_id = references.docs[0]; - - nlohmann::json ref_doc; - auto get_doc_op = ref_collection->get_document_from_store(ref_doc_seq_id, ref_doc); - if (!get_doc_op.ok()) { - return Option(get_doc_op.code(), error_prefix + get_doc_op.error()); - } - - remove_flat_fields(ref_doc); - remove_reference_helper_fields(ref_doc); - - auto prune_op = prune_doc(ref_doc, ref_include_fields_full, ref_exclude_fields_full); - if (!prune_op.ok()) { - return Option(prune_op.code(), error_prefix + prune_op.error()); - } - - auto const key = alias.empty() ? ref_collection_name : alias; - auto const& nest_ref_doc = (strategy == ref_include::nest); - if (!ref_doc.empty()) { - if (nest_ref_doc) { - doc[key] = ref_doc; - } else { - if (!alias.empty()) { - auto temp_doc = ref_doc; - ref_doc.clear(); - for (const auto &item: temp_doc.items()) { - ref_doc[alias + item.key()] = item.value(); - } - } - doc.update(ref_doc); - } - } - - // Include nested join references. - if (!ref_include_exclude.nested_join_includes.empty()) { - // Passing empty references in case the nested include collection is not joined, but it still can be included - // if we have a reference to it. - std::map refs; - auto nested_include_exclude_op = include_references(nest_ref_doc ? doc[key] : doc, ref_doc_seq_id, - ref_collection.get(), - references.coll_to_references == nullptr ? refs : - references.coll_to_references[0], - ref_include_exclude.nested_join_includes, original_doc); - if (!nested_include_exclude_op.ok()) { - return nested_include_exclude_op; - } - } - - return Option(true); - } - - // One-to-many relation. - for (uint32_t i = 0; i < references.count; i++) { - auto ref_doc_seq_id = references.docs[i]; - - nlohmann::json ref_doc; - auto get_doc_op = ref_collection->get_document_from_store(ref_doc_seq_id, ref_doc); - if (!get_doc_op.ok()) { - return Option(get_doc_op.code(), error_prefix + get_doc_op.error()); - } - - remove_flat_fields(ref_doc); - remove_reference_helper_fields(ref_doc); - - auto prune_op = prune_doc(ref_doc, ref_include_fields_full, ref_exclude_fields_full); - if (!prune_op.ok()) { - return Option(prune_op.code(), error_prefix + prune_op.error()); - } - - std::string key; - auto const& nest_ref_doc = (strategy == ref_include::nest || strategy == ref_include::nest_array); - if (!ref_doc.empty()) { - if (nest_ref_doc) { - key = alias.empty() ? ref_collection_name : alias; - if (doc.contains(key) && !doc[key].is_array()) { - return Option(400, "Could not include the reference document of `" + ref_collection_name + - "` collection. Expected `" + key + "` to be an array. Try " + - (alias.empty() ? "adding an" : "renaming the") + " alias."); - } - - doc[key] += ref_doc; - } else { - for (auto ref_doc_it = ref_doc.begin(); ref_doc_it != ref_doc.end(); ref_doc_it++) { - auto const& ref_doc_key = ref_doc_it.key(); - key = alias + ref_doc_key; - if (doc.contains(key) && !doc[key].is_array()) { - return Option(400, "Could not include the value of `" + ref_doc_key + - "` key of the reference document of `" + ref_collection_name + - "` collection. Expected `" + key + "` to be an array. Try " + - (alias.empty() ? "adding an" : "renaming the") + " alias."); - } - - // Add the values of ref_doc as JSON array into doc. - doc[key] += ref_doc_it.value(); - } - } - } - - // Include nested join references. - if (!ref_include_exclude.nested_join_includes.empty()) { - // Passing empty references in case the nested include collection is not joined, but it still can be included - // if we have a reference to it. - std::map refs; - auto nested_include_exclude_op = include_references(nest_ref_doc ? doc[key].at(i) : doc, ref_doc_seq_id, - ref_collection.get(), - references.coll_to_references == nullptr ? refs : - references.coll_to_references[i], - ref_include_exclude.nested_join_includes, original_doc); - if (!nested_include_exclude_op.ok()) { - return nested_include_exclude_op; - } - } - } - - return Option(true); -} - -Option Collection::include_references(nlohmann::json& doc, const uint32_t& seq_id, Collection *const collection, - const std::map& reference_filter_results, - const std::vector& ref_include_exclude_fields_vec, - const nlohmann::json& original_doc) { - for (auto const& ref_include_exclude: ref_include_exclude_fields_vec) { - auto ref_collection_name = ref_include_exclude.collection_name; - - auto& cm = CollectionManager::get_instance(); - auto ref_collection = cm.get_collection(ref_collection_name); - if (ref_collection == nullptr) { - return Option(400, "Referenced collection `" + ref_collection_name + "` in `include_fields` not found."); - } - // `CollectionManager::get_collection` accounts for collection alias being used and provides pointer to the - // original collection. - ref_collection_name = ref_collection->name; - - auto const joined_on_ref_collection = reference_filter_results.count(ref_collection_name) > 0, - has_filter_reference = (joined_on_ref_collection && - reference_filter_results.at(ref_collection_name).count > 0); - auto doc_has_reference = false, joined_coll_has_reference = false; - - // Reference include_by without join, check if doc itself contains the reference. - if (!joined_on_ref_collection && collection != nullptr) { - doc_has_reference = ref_collection->is_referenced_in(collection->name); - } - - std::string joined_coll_having_reference; - // Check if the joined collection has a reference. - if (!joined_on_ref_collection && !doc_has_reference) { - for (const auto &reference_filter_result: reference_filter_results) { - joined_coll_has_reference = ref_collection->is_referenced_in(reference_filter_result.first); - if (joined_coll_has_reference) { - joined_coll_having_reference = reference_filter_result.first; - break; - } - } - } - - if (!has_filter_reference && !doc_has_reference && !joined_coll_has_reference) { - continue; - } - - std::vector ref_include_fields_vec, ref_exclude_fields_vec; - StringUtils::split(ref_include_exclude.include_fields, ref_include_fields_vec, ","); - StringUtils::split(ref_include_exclude.exclude_fields, ref_exclude_fields_vec, ","); - - spp::sparse_hash_set ref_include_fields, ref_exclude_fields; - ref_include_fields.insert(ref_include_fields_vec.begin(), ref_include_fields_vec.end()); - ref_exclude_fields.insert(ref_exclude_fields_vec.begin(), ref_exclude_fields_vec.end()); - - tsl::htrie_set ref_include_fields_full, ref_exclude_fields_full; - auto include_exclude_op = ref_collection->populate_include_exclude_fields_lk(ref_include_fields, - ref_exclude_fields, - ref_include_fields_full, - ref_exclude_fields_full); - auto error_prefix = "Referenced collection `" + ref_collection_name + "`: "; - if (!include_exclude_op.ok()) { - return Option(include_exclude_op.code(), error_prefix + include_exclude_op.error()); - } - - Option prune_doc_op = Option(true); - auto const& ref_collection_alias = ref_include_exclude.alias; - if (has_filter_reference) { - auto const& ref_filter_result = reference_filter_results.at(ref_collection_name); - prune_doc_op = prune_ref_doc(doc, ref_filter_result, ref_include_fields_full, ref_exclude_fields_full, - ref_filter_result.is_reference_array_field, ref_include_exclude); - } else if (doc_has_reference) { - auto get_reference_field_op = ref_collection->get_referenced_in_field_with_lock(collection->name); - if (!get_reference_field_op.ok()) { - continue; - } - auto const& field_name = get_reference_field_op.get(); - if (collection->search_schema.count(field_name) == 0) { - continue; - } - - if (collection->object_reference_helper_fields.count(field_name) != 0) { - std::vector keys; - StringUtils::split(field_name, keys, "."); - auto const& key = keys[0]; - - if (!doc.contains(key)) { - if (!original_doc.contains(key)) { - return Option(400, "Could not find `" + key + - "` key in the document to include the referenced document."); - } - - // The key is excluded from the doc by the query, inserting empty object(s) so referenced doc can be - // included in it. - if (original_doc[key].is_array()) { - doc[key] = nlohmann::json::array(); - doc[key].insert(doc[key].begin(), original_doc[key].size(), nlohmann::json::object()); - } else { - doc[key] = nlohmann::json::object(); - } - } - - if (doc[key].is_array()) { - for (uint32_t i = 0; i < doc[key].size(); i++) { - uint32_t ref_doc_id; - auto op = collection->get_object_array_related_id(field_name, seq_id, i, ref_doc_id); - if (!op.ok()) { - if (op.code() == 404) { // field_name is not indexed. - break; - } else { // No reference found for this object. - continue; - } - } - - reference_filter_result_t result(1, new uint32_t[1]{ref_doc_id}); - prune_doc_op = prune_ref_doc(doc[key][i], result, - ref_include_fields_full, ref_exclude_fields_full, - false, ref_include_exclude); - if (!prune_doc_op.ok()) { - return prune_doc_op; - } - } - } else { - std::vector ids; - auto get_references_op = collection->get_related_ids(field_name, seq_id, ids); - if (!get_references_op.ok()) { - continue; - } - reference_filter_result_t result(ids.size(), &ids[0]); - prune_doc_op = prune_ref_doc(doc[key], result, ref_include_fields_full, ref_exclude_fields_full, - collection->search_schema.at(field_name).is_array(), ref_include_exclude); - result.docs = nullptr; - } - } else { - std::vector ids; - auto get_references_op = collection->get_related_ids(field_name, seq_id, ids); - if (!get_references_op.ok()) { - continue; - } - reference_filter_result_t result(ids.size(), &ids[0]); - prune_doc_op = prune_ref_doc(doc, result, ref_include_fields_full, ref_exclude_fields_full, - collection->search_schema.at(field_name).is_array(), ref_include_exclude); - result.docs = nullptr; - } - } else if (joined_coll_has_reference) { - auto joined_collection = cm.get_collection(joined_coll_having_reference); - if (joined_collection == nullptr) { - continue; - } - - auto reference_field_name_op = ref_collection->get_referenced_in_field_with_lock(joined_coll_having_reference); - if (!reference_field_name_op.ok() || joined_collection->get_schema().count(reference_field_name_op.get()) == 0) { - continue; - } - - auto const& reference_field_name = reference_field_name_op.get(); - auto const& reference_filter_result = reference_filter_results.at(joined_coll_having_reference); - auto const& count = reference_filter_result.count; - std::vector ids; - ids.reserve(count); - for (uint32_t i = 0; i < count; i++) { - joined_collection->get_related_ids_with_lock(reference_field_name, reference_filter_result.docs[i], ids); - } - if (ids.empty()) { - continue; - } - - gfx::timsort(ids.begin(), ids.end()); - ids.erase(unique(ids.begin(), ids.end()), ids.end()); - - reference_filter_result_t result; - result.count = ids.size(); - result.docs = &ids[0]; - prune_doc_op = prune_ref_doc(doc, result, ref_include_fields_full, ref_exclude_fields_full, - joined_collection->get_schema().at(reference_field_name).is_array(), - ref_include_exclude); - result.docs = nullptr; - } - - if (!prune_doc_op.ok()) { - return prune_doc_op; - } - } - - return Option(true); -} - Option Collection::prune_doc_with_lock(nlohmann::json& doc, const tsl::htrie_set& include_names, const tsl::htrie_set& exclude_names, const std::map& reference_filter_results, @@ -5879,8 +5406,8 @@ Option Collection::prune_doc(nlohmann::json& doc, it++; } - return include_references(doc, seq_id, collection, reference_filter_results, ref_include_exclude_fields_vec, - original_doc); + return Join::include_references(doc, seq_id, collection, reference_filter_results, ref_include_exclude_fields_vec, + original_doc); } Option Collection::validate_alter_payload(nlohmann::json& schema_changes, @@ -6310,7 +5837,7 @@ Option Collection::detect_new_fields(nlohmann::json& document, bool is_update, std::vector& new_fields, const bool enable_nested_fields, - const spp::sparse_hash_map& reference_fields, + const spp::sparse_hash_map& reference_fields, tsl::htrie_set& object_reference_helper_fields) { auto kv = document.begin(); @@ -6395,8 +5922,8 @@ Option Collection::detect_new_fields(nlohmann::json& document, } } - auto add_reference_helper_fields_op = add_reference_helper_fields(document, schema, reference_fields, - object_reference_helper_fields, is_update); + auto add_reference_helper_fields_op = Join::add_reference_helper_fields(document, schema, reference_fields, + object_reference_helper_fields, is_update); if (!add_reference_helper_fields_op.ok()) { return add_reference_helper_fields_op; } @@ -6438,17 +5965,17 @@ Index* Collection::init_index() { // the original collection. ref_coll_name = ref_coll->name; - // Passing reference helper field helps perform operation on doc_id instead of field value. - ref_coll->add_referenced_in(name, field.name + fields::REFERENCE_HELPER_FIELD_SUFFIX); + ref_coll->add_referenced_in(name, field.name, field.is_async_reference, ref_field_name); } else { // Reference collection has not been created yet. collectionManager.add_referenced_in_backlog(ref_coll_name, - reference_pair{name, field.name + fields::REFERENCE_HELPER_FIELD_SUFFIX}); + reference_info_t{name, field.name, field.is_async_reference, + ref_field_name}); } - reference_fields.emplace(field.name, reference_pair(ref_coll_name, ref_field_name)); + reference_fields.emplace(field.name, reference_info_t(ref_coll_name, ref_field_name, field.is_async_reference)); if (field.nested) { - object_reference_helper_fields.insert(field.name + fields::REFERENCE_HELPER_FIELD_SUFFIX); + object_reference_helper_fields.insert(field.name); } } } @@ -6925,20 +6452,40 @@ bool Collection::is_referenced_in(const std::string& collection_name) const { return referenced_in.count(collection_name) > 0; } -void Collection::add_referenced_in(const reference_pair& pair) { - return add_referenced_in(pair.collection, pair.field); -} - -void Collection::add_referenced_ins(const std::set& pairs) { +void Collection::add_referenced_ins(const std::set& ref_infos) { std::shared_lock lock(mutex); - for (const auto &pair: pairs) { - referenced_in.emplace(pair.collection, pair.field); + for (const auto &ref_info: ref_infos) { + auto const& referenced_field_name = ref_info.referenced_field_name; + + auto it = search_schema.find(referenced_field_name); + if (referenced_field_name != "id" && it == search_schema.end()) { + LOG(ERROR) << "Field `" << referenced_field_name << "` not found in the collection `" << name << + "` which is referenced in `" << ref_info.collection << "." << ref_info.field + "`."; + continue; + } + + referenced_in.emplace(ref_info.collection, ref_info.field); + if (ref_info.is_async) { + async_referenced_ins[referenced_field_name].emplace_back(ref_info.collection, ref_info.field); + } } } -void Collection::add_referenced_in(const std::string& collection_name, const std::string& field_name) { +void Collection::add_referenced_in(const std::string& collection_name, const std::string& field_name, + const bool& is_async, const std::string& referenced_field_name) { std::shared_lock lock(mutex); + + auto it = search_schema.find(referenced_field_name); + if (referenced_field_name != "id" && it == search_schema.end()) { + LOG(ERROR) << "Field `" << referenced_field_name << "` not found in the collection `" << name << + "` which is referenced in `" << collection_name << "." << field_name + "`."; + return; + } + referenced_in.emplace(collection_name, field_name); + if (is_async) { + async_referenced_ins[referenced_field_name].emplace_back(collection_name, field_name); + } } Option Collection::get_referenced_in_field_with_lock(const std::string& collection_name) const { diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 0f332652..1b152d0e 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -22,7 +22,8 @@ Collection* CollectionManager::init_collection(const nlohmann::json & collection const uint32_t collection_next_seq_id, Store* store, float max_memory_ratio, - spp::sparse_hash_map& referenced_in) { + spp::sparse_hash_map& referenced_in, + spp::sparse_hash_map>& async_referenced_ins) { std::string this_collection_name = collection_meta[Collection::COLLECTION_NAME_KEY].get(); std::vector fields; @@ -201,7 +202,7 @@ Collection* CollectionManager::init_collection(const nlohmann::json & collection symbols_to_index, token_separators, enable_nested_fields, model, std::move(referenced_in), - metadata); + metadata, std::move(async_referenced_ins)); return collection; } @@ -238,7 +239,8 @@ void CollectionManager::init(Store *store, const float max_memory_ratio, const s } void CollectionManager::_populate_referenced_ins(const std::string& collection_meta_json, - std::map>& referenced_ins) { + std::map>& referenced_ins, + std::map>>& async_referenced_ins) { auto const& obj = nlohmann::json::parse(collection_meta_json, nullptr, false); if (!obj.is_discarded() && obj.is_object() && obj.contains("name") && obj["name"].is_string() && @@ -249,10 +251,18 @@ void CollectionManager::_populate_referenced_ins(const std::string& collection_m if (!field.contains("name") || !field.contains("reference")) { continue; } - auto field_name = std::string(field["name"]) + fields::REFERENCE_HELPER_FIELD_SUFFIX; + auto field_name = std::string(field["name"]); + + auto const& reference = field["reference"].get(); std::vector split_result; - StringUtils::split(field["reference"], split_result, "."); - auto ref_coll_name = split_result.front(); + StringUtils::split(reference, split_result, "."); + if (split_result.size() < 2) { + LOG(ERROR) << "Invalid reference `" << reference << "`."; + continue; + } + + auto ref_coll_name = split_result[0]; + auto ref_field_name = reference.substr(ref_coll_name.size() + 1); // Resolves alias if used in schema. auto actual_ref_coll_it = CollectionManager::get_instance().collection_symlinks.find(ref_coll_name); @@ -264,6 +274,11 @@ void CollectionManager::_populate_referenced_ins(const std::string& collection_m } referenced_ins[ref_coll_name].emplace(collection_name, field_name); + + if (field.contains(fields::async_reference) && + field[fields::async_reference].is_boolean() && field[fields::async_reference].get()) { + async_referenced_ins[ref_coll_name][ref_field_name].emplace_back(collection_name, field_name); + } } } } @@ -321,8 +336,10 @@ Option CollectionManager::load(const size_t collection_batch_size, const s // Collection name -> Ref collection name -> Ref field name std::map> referenced_ins; + // Collection name -> field name -> {Ref collection name, Ref field name} + std::map>> async_referenced_ins; for (const auto &collection_meta_json: collection_meta_jsons) { - _populate_referenced_ins(collection_meta_json, referenced_ins); + _populate_referenced_ins(collection_meta_json, referenced_ins, async_referenced_ins); } size_t num_processed = 0; @@ -343,7 +360,7 @@ Option CollectionManager::load(const size_t collection_batch_size, const s auto captured_store = store; loading_pool.enqueue([captured_store, num_collections, collection_meta, document_batch_size, &m_process, &cv_process, &num_processed, &next_coll_id_status, quit = quit, - &referenced_ins, collection_name]() { + &referenced_ins, &async_referenced_ins, collection_name]() { spp::sparse_hash_map referenced_in; auto const& it = referenced_ins.find(collection_name); @@ -351,9 +368,15 @@ Option CollectionManager::load(const size_t collection_batch_size, const s referenced_in = it->second; } + spp::sparse_hash_map> async_referenced_in; + auto const& async_it = async_referenced_ins.find(collection_name); + if (async_it != async_referenced_ins.end()) { + async_referenced_in = async_it->second; + } + //auto begin = std::chrono::high_resolution_clock::now(); Option res = load_collection(collection_meta, document_batch_size, next_coll_id_status, *quit, - referenced_in); + referenced_in, async_referenced_in); /*long long int timeMillis = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - begin).count(); LOG(INFO) << "Time taken for indexing: " << timeMillis << "ms";*/ @@ -457,6 +480,7 @@ void CollectionManager::dispose() { collections.clear(); collection_symlinks.clear(); preset_configs.clear(); + referenced_in_backlog.clear(); store->close(); } @@ -554,10 +578,10 @@ Option CollectionManager::create_collection(const std::string& name add_to_collections(new_collection); lock.lock(); - - if (referenced_in_backlog.count(name) > 0) { - new_collection->add_referenced_ins(referenced_in_backlog.at(name)); - referenced_in_backlog.erase(name); + auto it = referenced_in_backlog.find(name); + if (it != referenced_in_backlog.end()) { + new_collection->add_referenced_ins(it->second); + referenced_in_backlog.erase(it); } return Option(new_collection); @@ -2286,7 +2310,8 @@ Option CollectionManager::load_collection(const nlohmann::json &collection const size_t batch_size, const StoreStatus& next_coll_id_status, const std::atomic& quit, - spp::sparse_hash_map& referenced_in) { + spp::sparse_hash_map& referenced_in, + spp::sparse_hash_map>& async_referenced_ins) { auto& cm = CollectionManager::get_instance(); @@ -2332,7 +2357,8 @@ Option CollectionManager::load_collection(const nlohmann::json &collection } } - Collection* collection = init_collection(collection_meta, collection_next_seq_id, cm.store, 1.0f, referenced_in); + Collection* collection = init_collection(collection_meta, collection_next_seq_id, cm.store, 1.0f, referenced_in, + async_referenced_ins); LOG(INFO) << "Loading collection " << collection->get_name(); @@ -2564,12 +2590,12 @@ Option CollectionManager::clone_collection(const string& existing_n return Option(new_coll); } -void CollectionManager::add_referenced_in_backlog(const std::string& collection_name, reference_pair&& pair) { +void CollectionManager::add_referenced_in_backlog(const std::string& collection_name, reference_info_t&& ref_info) { std::shared_lock lock(mutex); - referenced_in_backlog[collection_name].insert(pair); + referenced_in_backlog[collection_name].insert(ref_info); } -std::map> CollectionManager::_get_referenced_in_backlog() const { +std::map> CollectionManager::_get_referenced_in_backlog() const { std::shared_lock lock(mutex); return referenced_in_backlog; } diff --git a/src/field.cpp b/src/field.cpp index 9bbb3f34..8c98c11d 100644 --- a/src/field.cpp +++ b/src/field.cpp @@ -81,6 +81,18 @@ Option field::json_field_to_field(bool enable_nested_fields, nlohmann::jso field_json[fields::reference] = ""; } + if (field_json.count(fields::async_reference) == 0) { + field_json[fields::async_reference] = false; + } else if (!field_json.at(fields::async_reference).is_boolean()) { + return Option(400, std::string("The `async_reference` property of the field `") + + field_json[fields::name].get() + std::string("` should be a boolean.")); + } else if (field_json[fields::async_reference].get() && + field_json[fields::reference].get().empty()) { + return Option(400, std::string("The `async_reference` property of the field `") + + field_json[fields::name].get() + std::string("` is only applicable if " + "`reference` is specified.")); + } + if(field_json.count(fields::stem) != 0) { if(!field_json.at(fields::stem).is_boolean()) { return Option(400, std::string("The `stem` property of the field `") + @@ -411,7 +423,8 @@ Option field::json_field_to_field(bool enable_nested_fields, nlohmann::jso field_json[fields::sort], field_json[fields::infix], field_json[fields::nested], field_json[fields::nested_array], field_json[fields::num_dim], vec_dist, field_json[fields::reference], field_json[fields::embed], field_json[fields::range_index], - field_json[fields::store], field_json[fields::stem], field_json[fields::hnsw_params]) + field_json[fields::store], field_json[fields::stem], field_json[fields::hnsw_params], + field_json[fields::async_reference]) ); if (!field_json[fields::reference].get().empty()) { @@ -812,6 +825,7 @@ Option field::fields_to_json_fields(const std::vector& fields, cons if (!field.reference.empty()) { field_val[fields::reference] = field.reference; + field_val[fields::async_reference] = field.is_async_reference; } fields_json.push_back(field_val); diff --git a/src/filter_result_iterator.cpp b/src/filter_result_iterator.cpp index 936332a0..0f3c8cd1 100644 --- a/src/filter_result_iterator.cpp +++ b/src/filter_result_iterator.cpp @@ -837,8 +837,8 @@ void filter_result_iterator_t::init() { return; } - auto const& reference_helper_field_name = get_reference_field_op.get(); - auto op = index->do_filtering_with_reference_ids(reference_helper_field_name, ref_collection_name, + auto const& ref_field_name = get_reference_field_op.get(); + auto op = index->do_filtering_with_reference_ids(ref_field_name, ref_collection_name, std::move(result)); if (!op.ok()) { status = Option(op.code(), op.error()); diff --git a/src/index.cpp b/src/index.cpp index aea8ab93..b72fbcf5 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -565,7 +565,9 @@ batch_memory_index(Index *index, const bool do_validation, const size_t remote_embedding_batch_size, const size_t remote_embedding_timeout_ms, const size_t remote_embedding_num_tries, const bool generate_embeddings, - const bool use_addition_fields, const tsl::htrie_map& addition_fields) { + const bool use_addition_fields, const tsl::htrie_map& addition_fields, + const std::string& collection_name, + const spp::sparse_hash_map>& async_referenced_ins) { const size_t concurrency = 4; const size_t num_threads = std::min(concurrency, iter_batch.size()); const size_t window_size = (num_threads == 0) ? 0 : @@ -649,8 +651,14 @@ batch_memory_index(Index *index, const field& f = (field_name == "id") ? field("id", field_types::STRING, false) : indexable_schema.at(field_name); + std::vector async_references; + auto it = async_referenced_ins.find(field_name); + if (it != async_referenced_ins.end()) { + async_references = it->second; + } + try { - index->index_field_in_memory(f, iter_batch); + index->index_field_in_memory(collection_name, f, iter_batch, async_references); } catch(std::exception& e) { LOG(ERROR) << "Unhandled Typesense error: " << e.what(); for(auto& record: iter_batch) { @@ -672,7 +680,9 @@ batch_memory_index(Index *index, return num_indexed; } -void Index::index_field_in_memory(const field& afield, std::vector& iter_batch) { +void Index::index_field_in_memory(const std::string& collection_name, const field& afield, + std::vector& iter_batch, + const std::vector& async_referenced_ins) { // indexes a given field of all documents in the batch if(afield.name == "id") { @@ -687,6 +697,9 @@ void Index::index_field_in_memory(const field& afield, std::vector } } + if (!async_referenced_ins.empty()) { + update_async_references(collection_name, afield, iter_batch, async_referenced_ins); + } return; } @@ -1150,6 +1163,127 @@ void Index::index_field_in_memory(const field& afield, std::vector } } } + + if (!async_referenced_ins.empty()) { + update_async_references(collection_name, afield, iter_batch, async_referenced_ins); + } +} + +void Index::update_async_references(const std::string& collection_name, const field& afield, + std::vector& iter_batch, + const std::vector& async_referenced_ins) { + for (auto& record: iter_batch) { + if (!record.indexed.ok() || record.is_update) { + continue; + } + auto const& document = record.doc; + auto const& is_update = record.is_update; + auto const& seq_id = record.seq_id; + + for (const auto& pair: async_referenced_ins) { + auto const& reference_collection_name = pair.collection; + auto const& reference_field_name = pair.field; + + auto& cm = CollectionManager::get_instance(); + auto ref_coll = cm.get_collection(reference_collection_name); + if (ref_coll == nullptr) { + record.index_failure(400, "Collection `" + reference_collection_name + "` with async_reference to the" + " collection `" += collection_name + "` not found."); + continue; + } + + auto const& ref_fields = ref_coll->get_reference_fields(); + auto const ref_field_it = ref_fields.find(reference_field_name); + if (ref_field_it == ref_fields.end()) { + record.index_failure(400, "Field `" + reference_field_name + "` not found in the ref schema of `" += + reference_collection_name + "` having async_reference to `" += collection_name + + "` collection."); + continue; + } + + if (ref_field_it->second.collection != collection_name) { + record.index_failure(400, "`" + reference_collection_name + "." += reference_field_name + + "` does not have a reference to `" += collection_name + "` collection."); + continue; + } + + auto const& ref_schema = ref_coll->get_schema(); + if (ref_schema.count(reference_field_name) == 0) { + record.index_failure(400, "Field `" + reference_field_name + "` not found in the schema of `" += + reference_collection_name + "` having async_reference to `" += + collection_name + "` collection."); + continue; + } + + auto const& field_name = ref_field_it->second.field; + if (field_name != "id" && search_schema.count(field_name) == 0) { + record.index_failure(400, "Field `" + field_name + "`, referenced by `" += reference_collection_name + + "." += reference_field_name + "`, not found in `" += collection_name + + "` collection."); + continue; + } + + auto const& optional = field_name != "id" && search_schema.at(field_name).optional; + auto is_required = !is_update && !optional; + if (is_required && document.count(field_name) != 1) { + record.index_failure(400, "Missing the required field `" + field_name + "` in the document."); + continue; + } else if (document.count(field_name) != 1) { + continue; + } + + // After collecting the value(s) present in the field referenced by the other collection(ref_coll), we will add + // this document's seq_id as a reference where the value(s) match. + std::string ref_filter_value; + std::set values; + if (document.at(field_name).is_array()) { + ref_filter_value = "["; + + for (auto const& value: document[field_name]) { + if (value.is_number_integer()) { + auto const& v = std::to_string(value.get()); + ref_filter_value += v; + values.insert(v); + } else if (value.is_string()) { + auto const& v = value.get(); + ref_filter_value += v; + values.insert(v); + } else { + record.index_failure(400, "Field `" + field_name + "` must only have string/int32/int64 values."); + continue; + } + ref_filter_value += ","; + } + ref_filter_value[ref_filter_value.size() - 1] = ']'; + } else { + auto const& value = document[field_name]; + if (value.is_number_integer()) { + auto const& v = std::to_string(value.get()); + ref_filter_value += v; + values.insert(v); + } else if (value.is_string()) { + auto const& v = value.get(); + ref_filter_value += v; + values.insert(v); + } else { + record.index_failure(400, "Field `" + field_name + "` must only have string/int32/int64 values."); + continue; + } + } + + if (values.empty()) { + continue; + } + + auto const ref_filter = reference_field_name + ":= " += ref_filter_value; + auto update_op = ref_coll->update_async_references_with_lock(collection_name, ref_filter, values, seq_id, + reference_field_name); + if (!update_op.ok()) { + record.index_failure(400, "Error while updating async reference field `" + reference_field_name + + "` of collection `" += reference_collection_name + "`: " += update_op.error()); + } + } + } } void Index::tokenize_string(const std::string& text, const field& a_field, @@ -1727,7 +1861,7 @@ void aggregate_nested_references(single_filter_result_t *const reference_result, Option Index::do_reference_filtering_with_lock(filter_node_t* const filter_tree_root, filter_result_t& filter_result, const std::string& ref_collection_name, - const std::string& reference_helper_field_name) const { + const std::string& field_name) const { std::shared_lock lock(mutex); auto ref_filter_result_iterator = filter_result_iterator_t(ref_collection_name, this, filter_tree_root, @@ -1755,6 +1889,7 @@ Option Index::do_reference_filtering_with_lock(filter_node_t* const filter ref_filter_result->docs = nullptr; std::unique_ptr docs_guard(reference_docs); + auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; auto const is_nested_join = !ref_filter_result_iterator.reference.empty(); if (search_schema.at(reference_helper_field_name).is_singular()) { // Only one reference per doc. if (sort_index.count(reference_helper_field_name) == 0) { @@ -1838,6 +1973,10 @@ Option Index::do_reference_filtering_with_lock(filter_node_t* const filter } auto doc_id = ref_index.at(reference_doc_id); + if (doc_id == Collection::reference_helper_sentinel_value) { + continue; + } + id_pairs.emplace_back(std::make_pair(doc_id, reference_doc_id)); unique_doc_ids.insert(doc_id); } @@ -2036,7 +2175,7 @@ Option Index::do_reference_filtering_with_lock(filter_node_t* const filter return Option(true); } -Option Index::do_filtering_with_reference_ids(const std::string& reference_helper_field_name, +Option Index::do_filtering_with_reference_ids(const std::string& field_name, const std::string& ref_collection_name, filter_result_t&& ref_filter_result) const { filter_result_t filter_result; @@ -2048,6 +2187,7 @@ Option Index::do_filtering_with_reference_ids(const std::string return Option(filter_result); } + auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; if (numerical_index.count(reference_helper_field_name) == 0) { return Option(400, "`" + reference_helper_field_name + "` is not present in index."); } @@ -4831,12 +4971,14 @@ Option Index::ref_compute_sort_scores(const sort_by& sort_field, const uin return Option(get_reference_field_op.code(), get_reference_field_op.error()); } auto const& field_name = get_reference_field_op.get(); - if (sort_index.count(field_name) == 0) { - return Option(400, "Could not find `" + field_name + "` in sort_index."); - } else if (sort_index.at(field_name)->count(seq_id) == 0) { + auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; + + if (sort_index.count(reference_helper_field_name) == 0) { + return Option(400, "Could not find `" + reference_helper_field_name + "` in sort_index."); + } else if (sort_index.at(reference_helper_field_name)->count(seq_id) == 0) { reference_found = false; } else { - ref_seq_id = sort_index.at(field_name)->at(seq_id); + ref_seq_id = sort_index.at(reference_helper_field_name)->at(seq_id); } } // Joined collection has a reference @@ -7649,28 +7791,39 @@ int64_t Index::reference_string_sort_score(const string &field_name, const uint3 Option Index::get_related_ids(const std::string& collection_name, const string& field_name, const uint32_t& seq_id, std::vector& result) const { std::shared_lock lock(mutex); - if (search_schema.count(field_name) == 0) { - return Option(400, "Could not find `" + field_name + "` in the collection `" + collection_name + "`."); + + auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; + if (search_schema.count(reference_helper_field_name) == 0) { + return Option(400, "Could not find `" + reference_helper_field_name + "` in the collection `" + collection_name + "`."); } - auto const no_match_op = Option(400, "Could not find `" + field_name + "` value for doc `" + - std::to_string(seq_id) + "`."); - if (search_schema.at(field_name).is_singular()) { - if (sort_index.count(field_name) == 0 || sort_index.at(field_name)->count(seq_id) == 0) { + auto const no_match_op = Option(400, "Could not find `" + reference_helper_field_name + "` value for doc `" + + std::to_string(seq_id) + "`."); + if (search_schema.at(reference_helper_field_name).is_singular()) { + if (sort_index.count(reference_helper_field_name) == 0) { return no_match_op; } - result.emplace_back(sort_index.at(field_name)->at(seq_id)); + auto const& ref_index = sort_index.at(reference_helper_field_name); + auto const it = ref_index->find(seq_id); + if (it == ref_index->end()) { + return no_match_op; + } + + const uint32_t id = it->second; + if (id != Collection::reference_helper_sentinel_value) { + result.emplace_back(id); + } return Option(true); } - if (reference_index.count(field_name) == 0) { + if (reference_index.count(reference_helper_field_name) == 0) { return no_match_op; } size_t ids_len = 0; uint32_t* ids = nullptr; - reference_index.at(field_name)->search(EQUALS, seq_id, &ids, ids_len); + reference_index.at(reference_helper_field_name)->search(EQUALS, seq_id, &ids, ids_len); if (ids_len == 0) { return no_match_op; } @@ -7703,17 +7856,21 @@ Option Index::get_sort_index_value_with_lock(const std::string& collec const std::string& field_name, const uint32_t& seq_id) const { std::shared_lock lock(mutex); - if (search_schema.count(field_name) == 0) { - return Option(400, "Could not find `" + field_name + "` in the collection `" + collection_name + "`."); - } else if (search_schema.at(field_name).is_array()) { - return Option(400, "Cannot sort on `" + field_name + "` in the collection, `" + collection_name + - "` is `" + search_schema.at(field_name).type + "`."); - } else if (sort_index.count(field_name) == 0 || sort_index.at(field_name)->count(seq_id) == 0) { - return Option(404, "Could not find `" + field_name + "` value for doc `" + + + auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; + if (search_schema.count(reference_helper_field_name) == 0) { + return Option(400, "Could not find `" + reference_helper_field_name + "` in the collection `" + + collection_name + "`."); + } else if (search_schema.at(reference_helper_field_name).is_array()) { + return Option(400, "Cannot sort on `" + reference_helper_field_name + "` in the collection, `" + + collection_name + "` is `" + search_schema.at(reference_helper_field_name).type + "`."); + } else if (sort_index.count(reference_helper_field_name) == 0 || + sort_index.at(reference_helper_field_name)->count(seq_id) == 0) { + return Option(404, "Could not find `" + reference_helper_field_name + "` value for doc `" + std::to_string(seq_id) + "`.");; } - return Option(sort_index.at(field_name)->at(seq_id)); + return Option(sort_index.at(reference_helper_field_name)->at(seq_id)); } float Index::get_distance(const string& geo_field_name, const uint32_t& seq_id, diff --git a/src/join.cpp b/src/join.cpp new file mode 100644 index 00000000..7379d973 --- /dev/null +++ b/src/join.cpp @@ -0,0 +1,688 @@ +#include "join.h" + +#include +#include "collection.h" +#include "logger.h" +#include + +Option single_value_filter_query(nlohmann::json& document, const std::string& field_name, + const std::string& ref_field_type, std::string& filter_query) { + auto const& value = document[field_name]; + + if (value.is_null()) { + return Option(422, "Field `" + field_name + "` has `null` value."); + } + + if (value.is_string() && ref_field_type == field_types::STRING) { + filter_query += value.get(); + } else if (value.is_number_integer() && (ref_field_type == field_types::INT64 || + (ref_field_type == field_types::INT32 && + StringUtils::is_int32_t(std::to_string(value.get()))))) { + filter_query += std::to_string(value.get()); + } else { + return Option(400, "Field `" + field_name + "` must have `" + ref_field_type + "` value."); + } + + return Option(true); +} + +Option Join::add_reference_helper_fields(nlohmann::json& document, + const tsl::htrie_map& schema, + const spp::sparse_hash_map& reference_fields, + tsl::htrie_set& object_reference_helper_fields, + const bool& is_update) { + tsl::htrie_set flat_fields; + if (!reference_fields.empty() && document.contains(".flat")) { + for (const auto &item: document[".flat"].get>()) { + flat_fields.insert(item); + } + } + + // Add reference helper fields in the document. + for (auto const& pair: reference_fields) { + auto field_name = pair.first; + auto const reference_helper_field = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; + + auto const& field = schema.at(field_name); + auto const& optional = field.optional; + auto const& is_async_reference = field.is_async_reference; + // Strict checking for presence of non-optional reference field during indexing operation. + auto is_required = !is_update && !optional; + if (is_required && document.count(field_name) != 1) { + return Option(400, "Missing the required reference field `" + field_name + + "` in the document."); + } else if (document.count(field_name) != 1) { + if (is_update) { + document[fields::reference_helper_fields] += reference_helper_field; + } + continue; + } + + auto reference_pair = pair.second; + auto reference_collection_name = reference_pair.collection; + auto reference_field_name = reference_pair.field; + auto& cm = CollectionManager::get_instance(); + auto ref_collection = cm.get_collection(reference_collection_name); + + if (is_update && document.contains(reference_helper_field) && + (!document[field_name].is_array() || document[field_name].size() == document[reference_helper_field].size())) { + // No need to look up the reference collection since reference helper field is already populated. + // Saves needless computation in cases where references are known beforehand. For example, when cascade + // deleting the related docs. + document[fields::reference_helper_fields] += reference_helper_field; + continue; + } + + if (ref_collection == nullptr && is_async_reference) { + document[fields::reference_helper_fields] += reference_helper_field; + if (document[field_name].is_array()) { + document[reference_helper_field] = nlohmann::json::array(); + // Having the same number of values makes it easier to update the references in the future. + document[reference_helper_field].insert(document[reference_helper_field].begin(), + document[field_name].size(), + Collection::reference_helper_sentinel_value); + } else { + document[reference_helper_field] = Collection::reference_helper_sentinel_value; + } + + continue; + } else if (ref_collection == nullptr) { + return Option(400, "Referenced collection `" + reference_collection_name + + "` not found."); + } + + bool is_object_reference_field = flat_fields.count(field_name) != 0; + std::string object_key; + bool is_object_array = false; + if (is_object_reference_field) { + object_reference_helper_fields.insert(reference_helper_field); + + std::vector tokens; + StringUtils::split(field_name, tokens, "."); + if (schema.count(tokens[0]) == 0) { + return Option(400, "Could not find `" + tokens[0] + "` object/object[] field in the schema."); + } + object_key = tokens[0]; + is_object_array = schema.at(object_key).is_array(); + } + + if (reference_field_name == "id") { + auto id_field_type_error_op = Option(400, "Field `" + field_name + "` must have string value."); + if (is_object_array) { + if (!document[field_name].is_array()) { + return Option(400, "Expected `" + field_name + "` to be an array."); + } + + document[reference_helper_field] = nlohmann::json::array(); + document[fields::reference_helper_fields] += reference_helper_field; + + std::vector keys; + StringUtils::split(field_name, keys, "."); + auto const& object_array = document[keys[0]]; + + for (uint32_t i = 0; i < object_array.size(); i++) { + if (optional && object_array[i].count(keys[1]) == 0) { + continue; + } else if (object_array[i].count(keys[1]) == 0) { + return Option(400, "Object at index `" + std::to_string(i) + "` is missing `" + field_name + "`."); + } else if (!object_array[i].at(keys[1]).is_string()) { + return id_field_type_error_op; + } + + auto id = object_array[i].at(keys[1]).get(); + auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id); + if (!ref_doc_id_op.ok() && is_async_reference) { + auto const& value = nlohmann::json::array({i, Collection::reference_helper_sentinel_value}); + document[reference_helper_field] += value; + } else if (!ref_doc_id_op.ok()) { + return Option(400, "Referenced document having `id: " + id + + "` not found in the collection `" += + reference_collection_name + "`." ); + } else { + // Adding the index of the object along with referenced doc id to account for the scenario where a + // reference field of an object array might be optional and missing. + document[reference_helper_field] += nlohmann::json::array({i, ref_doc_id_op.get()}); + } + } + } else if (document[field_name].is_array()) { + document[reference_helper_field] = nlohmann::json::array(); + document[fields::reference_helper_fields] += reference_helper_field; + + for (const auto &item: document[field_name].items()) { + if (optional && item.value().is_null()) { + continue; + } else if (!item.value().is_string()) { + return id_field_type_error_op; + } + + auto id = item.value().get(); + auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id); + if (!ref_doc_id_op.ok() && is_async_reference) { + document[reference_helper_field] += Collection::reference_helper_sentinel_value; + } else if (!ref_doc_id_op.ok()) { + return Option(400, "Referenced document having `id: " + id + + "` not found in the collection `" += + reference_collection_name + "`." ); + } else { + document[reference_helper_field] += ref_doc_id_op.get(); + } + } + } else if (document[field_name].is_string()) { + document[fields::reference_helper_fields] += reference_helper_field; + + auto id = document[field_name].get(); + auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id); + if (!ref_doc_id_op.ok() && is_async_reference) { + document[reference_helper_field] = Collection::reference_helper_sentinel_value; + } else if (!ref_doc_id_op.ok()) { + return Option(400, "Referenced document having `id: " + id + + "` not found in the collection `" += + reference_collection_name + "`." ); + } else { + document[reference_helper_field] = ref_doc_id_op.get(); + } + } else if (optional && document[field_name].is_null()) { + // Reference helper field should also be removed along with reference field. + if (is_update) { + document[reference_helper_field] = nullptr; + } + continue; + } else { + return id_field_type_error_op; + } + + continue; + } + + if (ref_collection->get_schema().count(reference_field_name) == 0) { + return Option(400, "Referenced field `" + reference_field_name + + "` not found in the collection `" += reference_collection_name + "`."); + } + + auto const ref_field = ref_collection->get_schema().at(reference_field_name); + if (!ref_field.index) { + return Option(400, "Referenced field `" + reference_field_name + + "` in the collection `" += reference_collection_name + "` must be indexed."); + } + + std::string ref_field_type = ref_field.is_string() ? field_types::STRING : + ref_field.is_int32() ? field_types::INT32 : + ref_field.is_int64() ? field_types::INT64 : field_types::NIL; + + if (ref_field_type == field_types::NIL) { + return Option(400, "Cannot add a reference to `" + reference_collection_name + "." += reference_field_name + + "` of type `" += ref_field.type + "`."); + } + + if (is_object_array) { + if (!document[field_name].is_array()) { + return Option(400, "Expected `" + field_name + "` to be an array."); + } + + document[reference_helper_field] = nlohmann::json::array(); + document[fields::reference_helper_fields] += reference_helper_field; + nlohmann::json temp_doc; // To store singular values of `field_name` field. + + std::vector keys; + StringUtils::split(field_name, keys, "."); + auto const& object_array = document[keys[0]]; + + for (uint32_t i = 0; i < object_array.size(); i++) { + if (optional && object_array[i].count(keys[1]) == 0) { + continue; + } else if (object_array[i].count(keys[1]) == 0) { + return Option(400, "Object at index `" + std::to_string(i) + "` is missing `" + field_name + "`."); + } + + temp_doc[field_name] = object_array[i].at(keys[1]); + std::string filter_query = reference_field_name + ":= "; + + auto single_value_filter_query_op = single_value_filter_query(temp_doc, field_name, ref_field_type, + filter_query); + if (!single_value_filter_query_op.ok()) { + if (optional && single_value_filter_query_op.code() == 422) { + continue; + } + return Option(400, single_value_filter_query_op.error()); + } + + filter_result_t filter_result; + auto filter_ids_op = ref_collection->get_filter_ids(filter_query, filter_result); + if (!filter_ids_op.ok()) { + return filter_ids_op; + } + + if (filter_result.count == 0 && is_async_reference) { + document[reference_helper_field] += nlohmann::json::array({i, Collection::reference_helper_sentinel_value}); + } else if (filter_result.count != 1) { + // Constraints similar to foreign key apply here. The reference match must be unique and not null. + return Option(400, filter_result.count < 1 ? + "Reference document having `" + filter_query + "` not found in the collection `" + += reference_collection_name + "`." : + "Multiple documents having `" + filter_query + "` found in the collection `" += + reference_collection_name + "`."); + } else { + // Adding the index of the object along with referenced doc id to account for the scenario where a + // reference field of an object array might be optional and missing. + document[reference_helper_field] += nlohmann::json::array({i, filter_result.docs[0]}); + } + } + continue; + } + + auto const is_reference_array_field = field.is_array(); + std::vector filter_values; + if (is_reference_array_field) { + if (document[field_name].is_null()) { + document[reference_helper_field] = nlohmann::json::array(); + document[fields::reference_helper_fields] += reference_helper_field; + + continue; + } else if (!document[field_name].is_array()) { + return Option(400, "Expected `" + field_name + "` to be an array."); + } + + for (const auto &item: document[field_name].items()) { + auto const& item_value = item.value(); + if (item_value.is_string() && ref_field_type == field_types::STRING) { + filter_values.emplace_back(item_value.get()); + } else if (item_value.is_number_integer() && (ref_field_type == field_types::INT64 || + (ref_field_type == field_types::INT32 && + StringUtils::is_int32_t(std::to_string(item_value.get()))))) { + filter_values.emplace_back(std::to_string(item_value.get())); + } else { + return Option(400, "Field `" + field_name + "` must only have `" += ref_field_type + "` values."); + } + } + + document[reference_helper_field] = nlohmann::json::array(); + document[fields::reference_helper_fields] += reference_helper_field; + + if (filter_values.empty()) { + continue; + } + } else { + std::string value; + auto single_value_filter_query_op = single_value_filter_query(document, field_name, ref_field_type, value); + if (!single_value_filter_query_op.ok()) { + if (optional && single_value_filter_query_op.code() == 422) { + // Reference helper field should also be removed along with reference field. + if (is_update) { + document[reference_helper_field] = nullptr; + } + continue; + } + return Option(400, single_value_filter_query_op.error()); + } + + filter_values.emplace_back(value); + document[fields::reference_helper_fields] += reference_helper_field; + } + + for (const auto& filter_value: filter_values) { + std::string filter_query = reference_field_name + (field.is_string() ? ":= " : ": ") += filter_value; + filter_result_t filter_result; + auto filter_ids_op = ref_collection->get_filter_ids(filter_query, filter_result); + if (!filter_ids_op.ok()) { + return filter_ids_op; + } + + if (filter_result.count == 0 && is_async_reference) { + if (is_reference_array_field) { + document[reference_helper_field] += Collection::reference_helper_sentinel_value; + } else { + document[reference_helper_field] = Collection::reference_helper_sentinel_value; + } + } else if (filter_result.count != 1) { + // Constraints similar to foreign key apply here. The reference match must be unique and not null. + return Option(400, filter_result.count < 1 ? + "Reference document having `" + filter_query + "` not found in the collection `" + += reference_collection_name + "`." : + "Multiple documents having `" + filter_query + "` found in the collection `" += + reference_collection_name + "`."); + } else { + if (is_reference_array_field) { + document[reference_helper_field] += filter_result.docs[0]; + } else { + document[reference_helper_field] = filter_result.docs[0]; + } + } + } + } + + return Option(true); +} + +Option Join::prune_ref_doc(nlohmann::json& doc, + const reference_filter_result_t& references, + const tsl::htrie_set& ref_include_fields_full, + const tsl::htrie_set& ref_exclude_fields_full, + const bool& is_reference_array, + const ref_include_exclude_fields& ref_include_exclude) { + nlohmann::json original_doc; + if (!ref_include_exclude.nested_join_includes.empty()) { + original_doc = doc; + } + + auto const& ref_collection_name = ref_include_exclude.collection_name; + auto& cm = CollectionManager::get_instance(); + auto ref_collection = cm.get_collection(ref_collection_name); + if (ref_collection == nullptr) { + return Option(400, "Referenced collection `" + ref_collection_name + "` in `include_fields` not found."); + } + + auto const& alias = ref_include_exclude.alias; + auto const& strategy = ref_include_exclude.strategy; + auto error_prefix = "Referenced collection `" + ref_collection_name + "`: "; + + // One-to-one relation. + if (strategy != ref_include::nest_array && !is_reference_array && references.count == 1) { + auto ref_doc_seq_id = references.docs[0]; + + nlohmann::json ref_doc; + auto get_doc_op = ref_collection->get_document_from_store(ref_doc_seq_id, ref_doc); + if (!get_doc_op.ok()) { + if (ref_doc_seq_id == Collection::reference_helper_sentinel_value) { + return Option(true); + } + return Option(get_doc_op.code(), error_prefix + get_doc_op.error()); + } + + Collection::remove_flat_fields(ref_doc); + Collection::remove_reference_helper_fields(ref_doc); + + auto prune_op = Collection::prune_doc(ref_doc, ref_include_fields_full, ref_exclude_fields_full); + if (!prune_op.ok()) { + return Option(prune_op.code(), error_prefix + prune_op.error()); + } + + auto const key = alias.empty() ? ref_collection_name : alias; + auto const& nest_ref_doc = (strategy == ref_include::nest); + if (!ref_doc.empty()) { + if (nest_ref_doc) { + doc[key] = ref_doc; + } else { + if (!alias.empty()) { + auto temp_doc = ref_doc; + ref_doc.clear(); + for (const auto &item: temp_doc.items()) { + ref_doc[alias + item.key()] = item.value(); + } + } + doc.update(ref_doc); + } + } + + // Include nested join references. + if (!ref_include_exclude.nested_join_includes.empty()) { + // Passing empty references in case the nested include collection is not joined, but it still can be included + // if we have a reference to it. + std::map refs; + auto nested_include_exclude_op = include_references(nest_ref_doc ? doc[key] : doc, ref_doc_seq_id, + ref_collection.get(), + references.coll_to_references == nullptr ? refs : + references.coll_to_references[0], + ref_include_exclude.nested_join_includes, original_doc); + if (!nested_include_exclude_op.ok()) { + return nested_include_exclude_op; + } + } + + return Option(true); + } + + // One-to-many relation. + for (uint32_t i = 0; i < references.count; i++) { + auto ref_doc_seq_id = references.docs[i]; + + nlohmann::json ref_doc; + std::string key; + auto const& nest_ref_doc = (strategy == ref_include::nest || strategy == ref_include::nest_array); + + auto get_doc_op = ref_collection->get_document_from_store(ref_doc_seq_id, ref_doc); + if (!get_doc_op.ok()) { + // Referenced document is not yet indexed. + if (ref_doc_seq_id == Collection::reference_helper_sentinel_value) { + continue; + } + return Option(get_doc_op.code(), error_prefix + get_doc_op.error()); + } + + Collection::remove_flat_fields(ref_doc); + Collection::remove_reference_helper_fields(ref_doc); + + auto prune_op = Collection::prune_doc(ref_doc, ref_include_fields_full, ref_exclude_fields_full); + if (!prune_op.ok()) { + return Option(prune_op.code(), error_prefix + prune_op.error()); + } + + if (!ref_doc.empty()) { + if (nest_ref_doc) { + key = alias.empty() ? ref_collection_name : alias; + if (doc.contains(key) && !doc[key].is_array()) { + return Option(400, "Could not include the reference document of `" + ref_collection_name + + "` collection. Expected `" += key + "` to be an array. Try " + + (alias.empty() ? "adding an" : "renaming the") + " alias."); + } + + doc[key] += ref_doc; + } else { + for (auto ref_doc_it = ref_doc.begin(); ref_doc_it != ref_doc.end(); ref_doc_it++) { + auto const& ref_doc_key = ref_doc_it.key(); + key = alias + ref_doc_key; + if (doc.contains(key) && !doc[key].is_array()) { + return Option(400, "Could not include the value of `" + ref_doc_key + + "` key of the reference document of `" += ref_collection_name + + "` collection. Expected `" += key + "` to be an array. Try " + + (alias.empty() ? "adding an" : "renaming the") + " alias."); + } + + // Add the values of ref_doc as JSON array into doc. + doc[key] += ref_doc_it.value(); + } + } + } + + // Include nested join references. + if (!ref_include_exclude.nested_join_includes.empty()) { + // Passing empty references in case the nested include collection is not joined, but it still can be included + // if we have a reference to it. + std::map refs; + auto nested_include_exclude_op = include_references(nest_ref_doc ? doc[key].at(i) : doc, ref_doc_seq_id, + ref_collection.get(), + references.coll_to_references == nullptr ? refs : + references.coll_to_references[i], + ref_include_exclude.nested_join_includes, original_doc); + if (!nested_include_exclude_op.ok()) { + return nested_include_exclude_op; + } + } + } + + return Option(true); +} + +Option Join::include_references(nlohmann::json& doc, const uint32_t& seq_id, Collection *const collection, + const std::map& reference_filter_results, + const std::vector& ref_include_exclude_fields_vec, + const nlohmann::json& original_doc) { + for (auto const& ref_include_exclude: ref_include_exclude_fields_vec) { + auto ref_collection_name = ref_include_exclude.collection_name; + + auto& cm = CollectionManager::get_instance(); + auto ref_collection = cm.get_collection(ref_collection_name); + if (ref_collection == nullptr) { + return Option(400, "Referenced collection `" + ref_collection_name + "` in `include_fields` not found."); + } + // `CollectionManager::get_collection` accounts for collection alias being used and provides pointer to the + // original collection. + ref_collection_name = ref_collection->get_name(); + + auto const joined_on_ref_collection = reference_filter_results.count(ref_collection_name) > 0, + has_filter_reference = (joined_on_ref_collection && + reference_filter_results.at(ref_collection_name).count > 0); + auto doc_has_reference = false, joined_coll_has_reference = false; + + // Reference include_by without join, check if doc itself contains the reference. + if (!joined_on_ref_collection && collection != nullptr) { + doc_has_reference = ref_collection->is_referenced_in(collection->get_name()); + } + + std::string joined_coll_having_reference; + // Check if the joined collection has a reference. + if (!joined_on_ref_collection && !doc_has_reference) { + for (const auto &reference_filter_result: reference_filter_results) { + joined_coll_has_reference = ref_collection->is_referenced_in(reference_filter_result.first); + if (joined_coll_has_reference) { + joined_coll_having_reference = reference_filter_result.first; + break; + } + } + } + + if (!has_filter_reference && !doc_has_reference && !joined_coll_has_reference) { + continue; + } + + std::vector ref_include_fields_vec, ref_exclude_fields_vec; + StringUtils::split(ref_include_exclude.include_fields, ref_include_fields_vec, ","); + StringUtils::split(ref_include_exclude.exclude_fields, ref_exclude_fields_vec, ","); + + spp::sparse_hash_set ref_include_fields, ref_exclude_fields; + ref_include_fields.insert(ref_include_fields_vec.begin(), ref_include_fields_vec.end()); + ref_exclude_fields.insert(ref_exclude_fields_vec.begin(), ref_exclude_fields_vec.end()); + + tsl::htrie_set ref_include_fields_full, ref_exclude_fields_full; + auto include_exclude_op = ref_collection->populate_include_exclude_fields_lk(ref_include_fields, + ref_exclude_fields, + ref_include_fields_full, + ref_exclude_fields_full); + auto error_prefix = "Referenced collection `" + ref_collection_name + "`: "; + if (!include_exclude_op.ok()) { + return Option(include_exclude_op.code(), error_prefix + include_exclude_op.error()); + } + + Option prune_doc_op = Option(true); + auto const& ref_collection_alias = ref_include_exclude.alias; + if (has_filter_reference) { + auto const& ref_filter_result = reference_filter_results.at(ref_collection_name); + prune_doc_op = prune_ref_doc(doc, ref_filter_result, ref_include_fields_full, ref_exclude_fields_full, + ref_filter_result.is_reference_array_field, ref_include_exclude); + } else if (doc_has_reference) { + auto get_reference_field_op = ref_collection->get_referenced_in_field_with_lock(collection->get_name()); + if (!get_reference_field_op.ok()) { + continue; + } + auto const& field_name = get_reference_field_op.get(); + auto const& reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX; + if (collection->get_schema().count(reference_helper_field_name) == 0) { + continue; + } + + if (collection->get_object_reference_helper_fields().count(field_name) != 0) { + std::vector keys; + StringUtils::split(field_name, keys, "."); + auto const& key = keys[0]; + + if (!doc.contains(key)) { + if (!original_doc.contains(key)) { + return Option(400, "Could not find `" + key + + "` key in the document to include the referenced document."); + } + + // The key is excluded from the doc by the query, inserting empty object(s) so referenced doc can be + // included in it. + if (original_doc[key].is_array()) { + doc[key] = nlohmann::json::array(); + doc[key].insert(doc[key].begin(), original_doc[key].size(), nlohmann::json::object()); + } else { + doc[key] = nlohmann::json::object(); + } + } + + if (doc[key].is_array()) { + for (uint32_t i = 0; i < doc[key].size(); i++) { + uint32_t ref_doc_id; + auto op = collection->get_object_array_related_id(reference_helper_field_name, seq_id, i, ref_doc_id); + if (!op.ok()) { + if (op.code() == 404) { // field_name is not indexed. + break; + } else { // No reference found for this object. + continue; + } + } + + reference_filter_result_t result(1, new uint32_t[1]{ref_doc_id}); + prune_doc_op = prune_ref_doc(doc[key][i], result, + ref_include_fields_full, ref_exclude_fields_full, + false, ref_include_exclude); + if (!prune_doc_op.ok()) { + return prune_doc_op; + } + } + } else { + std::vector ids; + auto get_references_op = collection->get_related_ids(field_name, seq_id, ids); + if (!get_references_op.ok()) { + LOG(ERROR) << "Error while getting related ids: " + get_references_op.error(); + continue; + } + reference_filter_result_t result(ids.size(), &ids[0]); + prune_doc_op = prune_ref_doc(doc[key], result, ref_include_fields_full, ref_exclude_fields_full, + collection->get_schema().at(field_name).is_array(), ref_include_exclude); + result.docs = nullptr; + } + } else { + std::vector ids; + auto get_references_op = collection->get_related_ids(field_name, seq_id, ids); + if (!get_references_op.ok()) { + LOG(ERROR) << "Error while getting related ids: " + get_references_op.error(); + continue; + } + reference_filter_result_t result(ids.size(), &ids[0]); + prune_doc_op = prune_ref_doc(doc, result, ref_include_fields_full, ref_exclude_fields_full, + collection->get_schema().at(field_name).is_array(), ref_include_exclude); + result.docs = nullptr; + } + } else if (joined_coll_has_reference) { + auto joined_collection = cm.get_collection(joined_coll_having_reference); + if (joined_collection == nullptr) { + continue; + } + + auto reference_field_name_op = ref_collection->get_referenced_in_field_with_lock(joined_coll_having_reference); + if (!reference_field_name_op.ok() || joined_collection->get_schema().count(reference_field_name_op.get()) == 0) { + continue; + } + + auto const& reference_field_name = reference_field_name_op.get(); + auto const& reference_filter_result = reference_filter_results.at(joined_coll_having_reference); + auto const& count = reference_filter_result.count; + std::vector ids; + ids.reserve(count); + for (uint32_t i = 0; i < count; i++) { + joined_collection->get_related_ids_with_lock(reference_field_name, reference_filter_result.docs[i], ids); + } + if (ids.empty()) { + continue; + } + + gfx::timsort(ids.begin(), ids.end()); + ids.erase(unique(ids.begin(), ids.end()), ids.end()); + + reference_filter_result_t result; + result.count = ids.size(); + result.docs = &ids[0]; + prune_doc_op = prune_ref_doc(doc, result, ref_include_fields_full, ref_exclude_fields_full, + joined_collection->get_schema().at(reference_field_name).is_array(), + ref_include_exclude); + result.docs = nullptr; + } + + if (!prune_doc_op.ok()) { + return prune_doc_op; + } + } + + return Option(true); +} \ No newline at end of file diff --git a/test/collection_join_test.cpp b/test/collection_join_test.cpp index b8292a81..625511d1 100644 --- a/test/collection_join_test.cpp +++ b/test/collection_join_test.cpp @@ -525,7 +525,14 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { })"_json; add_doc_op = coll2->add(doc_json.dump()); ASSERT_FALSE(add_doc_op.ok()); - ASSERT_EQ("Field `ref_string_array_field` must be an array of string.", add_doc_op.error()); + ASSERT_EQ("Field `ref_string_array_field` must only have `string` values.", add_doc_op.error()); + + doc_json = R"({ + "ref_string_array_field": ["foo"] + })"_json; + add_doc_op = coll2->add(doc_json.dump()); + ASSERT_FALSE(add_doc_op.ok()); + ASSERT_EQ("Reference document having `string_array_field:= foo` not found in the collection `coll1`.", add_doc_op.error()); collectionManager.drop_collection("coll2"); temp_json = schema_json; @@ -556,29 +563,16 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { ASSERT_EQ(0, doc.count("ref_string_field_sequence_id")); ASSERT_EQ(0, doc.count(".ref")); - doc_json = R"({ - "ref_string_array_field": ["foo"] - })"_json; - add_doc_op = coll2->add(doc_json.dump()); - ASSERT_TRUE(add_doc_op.ok()); - result = coll2->search("*", {}, "", {}, {}, {0}).get(); ASSERT_EQ(0, result["hits"][0]["document"]["ref_string_array_field_sequence_id"].size()); - doc = coll2->get("2").get(); - ASSERT_EQ(1, doc.count("ref_string_array_field_sequence_id")); - ASSERT_EQ(0, doc["ref_string_array_field_sequence_id"].size()); - ASSERT_EQ(1, doc.count(".ref")); - ASSERT_EQ(1, doc[".ref"].size()); - ASSERT_EQ("ref_string_array_field_sequence_id", doc[".ref"][0]); - doc_json = R"({ - "ref_string_array_field": ["b", "foo"] + "ref_string_array_field": ["b"] })"_json; add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); - doc = coll2->get("3").get(); + doc = coll2->get("2").get(); ASSERT_EQ(1, doc.count("ref_string_array_field_sequence_id")); ASSERT_EQ(1, doc["ref_string_array_field_sequence_id"].size()); ASSERT_EQ(0, doc["ref_string_array_field_sequence_id"][0]); @@ -589,7 +583,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); - doc = coll2->get("4").get(); + doc = coll2->get("3").get(); ASSERT_EQ(1, doc.count("ref_string_array_field_sequence_id")); ASSERT_EQ(2, doc["ref_string_array_field_sequence_id"].size()); ASSERT_EQ(0, doc["ref_string_array_field_sequence_id"][0]); @@ -665,6 +659,13 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { ASSERT_FALSE(add_doc_op.ok()); ASSERT_EQ("Field `ref_int32_array_field` must only have `int32` values.", add_doc_op.error()); + doc_json = R"({ + "ref_int32_array_field": [1] + })"_json; + add_doc_op = coll2->add(doc_json.dump()); + ASSERT_FALSE(add_doc_op.ok()); + ASSERT_EQ("Reference document having `int32_array_field: 1` not found in the collection `coll1`.", add_doc_op.error()); + collectionManager.drop_collection("coll2"); temp_json = schema_json; collection_create_op = collectionManager.create_collection(temp_json); @@ -685,26 +686,13 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { ASSERT_EQ("ref_int32_field_sequence_id", doc[".ref"][0]); doc_json = R"({ - "ref_int32_array_field": [1] + "ref_int32_array_field": [2] })"_json; add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); doc = coll2->get("1").get(); ASSERT_EQ(1, doc.count("ref_int32_array_field_sequence_id")); - ASSERT_EQ(0, doc["ref_int32_array_field_sequence_id"].size()); - ASSERT_EQ(1, doc.count(".ref")); - ASSERT_EQ(1, doc[".ref"].size()); - ASSERT_EQ("ref_int32_array_field_sequence_id", doc[".ref"][0]); - - doc_json = R"({ - "ref_int32_array_field": [1, 2] - })"_json; - add_doc_op = coll2->add(doc_json.dump()); - ASSERT_TRUE(add_doc_op.ok()); - - doc = coll2->get("2").get(); - ASSERT_EQ(1, doc.count("ref_int32_array_field_sequence_id")); ASSERT_EQ(1, doc["ref_int32_array_field_sequence_id"].size()); ASSERT_EQ(3, doc["ref_int32_array_field_sequence_id"][0]); @@ -714,7 +702,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); - doc = coll2->get("3").get(); + doc = coll2->get("2").get(); ASSERT_EQ(1, doc.count("ref_int32_array_field_sequence_id")); ASSERT_EQ(2, doc["ref_int32_array_field_sequence_id"].size()); ASSERT_EQ(3, doc["ref_int32_array_field_sequence_id"][0]); @@ -726,7 +714,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); - doc = coll2->get("4").get(); + doc = coll2->get("3").get(); ASSERT_EQ(1, doc.count("ref_int32_array_field_sequence_id")); ASSERT_EQ(1, doc["ref_int32_array_field_sequence_id"].size()); ASSERT_EQ(3, doc["ref_int32_array_field_sequence_id"][0]); @@ -794,6 +782,13 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { ASSERT_FALSE(add_doc_op.ok()); ASSERT_EQ("Field `ref_int64_array_field` must only have `int64` values.", add_doc_op.error()); + doc_json = R"({ + "ref_int64_array_field": [1] + })"_json; + add_doc_op = coll2->add(doc_json.dump()); + ASSERT_FALSE(add_doc_op.ok()); + ASSERT_EQ("Reference document having `int64_array_field: 1` not found in the collection `coll1`.", add_doc_op.error()); + collectionManager.drop_collection("coll2"); temp_json = schema_json; collection_create_op = collectionManager.create_collection(temp_json); @@ -814,26 +809,13 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { ASSERT_EQ("ref_int64_field_sequence_id", doc[".ref"][0]); doc_json = R"({ - "ref_int64_array_field": [1] + "ref_int64_array_field": [2] })"_json; add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); doc = coll2->get("1").get(); ASSERT_EQ(1, doc.count("ref_int64_array_field_sequence_id")); - ASSERT_EQ(0, doc["ref_int64_array_field_sequence_id"].size()); - ASSERT_EQ(1, doc.count(".ref")); - ASSERT_EQ(1, doc[".ref"].size()); - ASSERT_EQ("ref_int64_array_field_sequence_id", doc[".ref"][0]); - - doc_json = R"({ - "ref_int64_array_field": [1, 2] - })"_json; - add_doc_op = coll2->add(doc_json.dump()); - ASSERT_TRUE(add_doc_op.ok()); - - doc = coll2->get("2").get(); - ASSERT_EQ(1, doc.count("ref_int64_array_field_sequence_id")); ASSERT_EQ(1, doc["ref_int64_array_field_sequence_id"].size()); ASSERT_EQ(6, doc["ref_int64_array_field_sequence_id"][0]); @@ -843,7 +825,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); - doc = coll2->get("3").get(); + doc = coll2->get("2").get(); ASSERT_EQ(1, doc.count("ref_int64_array_field_sequence_id")); ASSERT_EQ(2, doc["ref_int64_array_field_sequence_id"].size()); ASSERT_EQ(6, doc["ref_int64_array_field_sequence_id"][0]); @@ -855,7 +837,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); - doc = coll2->get("4").get(); + doc = coll2->get("3").get(); ASSERT_EQ(1, doc.count("ref_int64_array_field_sequence_id")); ASSERT_EQ(1, doc["ref_int64_array_field_sequence_id"].size()); ASSERT_EQ(6, doc["ref_int64_array_field_sequence_id"][0]); @@ -933,6 +915,15 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { ASSERT_FALSE(add_doc_op.ok()); ASSERT_EQ("Reference document having `string_array_field:= foo` not found in the collection `coll1`.", add_doc_op.error()); + doc_json = R"({ + "object": { + "ref_array_field": ["foo"] + } + })"_json; + add_doc_op = coll2->add(doc_json.dump()); + ASSERT_FALSE(add_doc_op.ok()); + ASSERT_EQ("Reference document having `string_array_field:= foo` not found in the collection `coll1`.", add_doc_op.error()); + collectionManager.drop_collection("coll2"); temp_json = schema_json; collection_create_op = collectionManager.create_collection(temp_json); @@ -957,7 +948,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { doc_json = R"({ "object": { - "ref_array_field": ["foo"] + "ref_array_field": ["b"] } })"_json; add_doc_op = coll2->add(doc_json.dump()); @@ -965,21 +956,6 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { doc = coll2->get("1").get(); ASSERT_EQ(1, doc.count("object.ref_array_field_sequence_id")); - ASSERT_EQ(0, doc["object.ref_array_field_sequence_id"].size()); - ASSERT_EQ(1, doc.count(".ref")); - ASSERT_EQ(1, doc[".ref"].size()); - ASSERT_EQ("object.ref_array_field_sequence_id", doc[".ref"][0]); - - doc_json = R"({ - "object": { - "ref_array_field": ["b", "foo"] - } - })"_json; - add_doc_op = coll2->add(doc_json.dump()); - ASSERT_TRUE(add_doc_op.ok()); - - doc = coll2->get("2").get(); - ASSERT_EQ(1, doc.count("object.ref_array_field_sequence_id")); ASSERT_EQ(1, doc["object.ref_array_field_sequence_id"].size()); ASSERT_EQ(0, doc["object.ref_array_field_sequence_id"][0]); @@ -996,7 +972,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { add_doc_op = coll2->add(doc_json.dump()); ASSERT_TRUE(add_doc_op.ok()); - doc = coll2->get("3").get(); + doc = coll2->get("2").get(); ASSERT_EQ(1, doc.count("object_array.ref_array_field_sequence_id")); ASSERT_EQ(2, doc["object_array.ref_array_field_sequence_id"].size()); ASSERT_EQ(2, doc["object_array.ref_array_field_sequence_id"][0].size()); @@ -1076,6 +1052,644 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) { ASSERT_EQ("Cannot add a reference to `coll1.object_array_field` of type `object[]`.", add_doc_op.error()); } +TEST_F(CollectionJoinTest, IndexDocumentHavingAsyncReferenceField) { + auto schema_json = + R"({ + "name": "Customers", + "fields": [ + {"name": "customer_id", "type": "string"}, + {"name": "customer_name", "type": "string"}, + {"name": "product_price", "type": "float"}, + {"name": "product_id", "type": "string", "reference": "Products.product_id", "async_reference": true} + ] + })"_json; + std::vector documents = { + R"({ + "customer_id": "customer_a", + "customer_name": "Joe", + "product_price": 143, + "product_id": "product_a" + })"_json, + R"({ + "customer_id": "customer_a", + "customer_name": "Joe", + "product_price": 73.5, + "product_id": "product_b" + })"_json, + R"({ + "customer_id": "customer_b", + "customer_name": "Dan", + "product_price": 75, + "product_id": "product_a" + })"_json + }; + + auto collection_create_op = collectionManager.create_collection(schema_json); + ASSERT_TRUE(collection_create_op.ok()); + for (auto const &json: documents) { + auto add_op = collection_create_op.get()->add(json.dump()); + if (!add_op.ok()) { + LOG(INFO) << add_op.error(); + } + ASSERT_TRUE(add_op.ok()); + } + + for (auto i = 0; i < 3; i++) { + auto const doc_id = std::to_string(i); + auto doc = collection_create_op.get()->get(doc_id).get(); + ASSERT_EQ(doc_id, doc["id"]); + + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("product_id_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("product_id_sequence_id")); + // Referenced documents don't exist yet, so dummy value is present in the reference helper field. + ASSERT_EQ(UINT32_MAX, doc["product_id_sequence_id"]); + } + + schema_json = + R"({ + "name": "coll1", + "fields": [ + {"name": "coll_id", "type": "string"}, + { + "name": "object.reference", + "type": "string", + "reference": "Products.product_id", + "optional": true, + "async_reference": true + }, + {"name": "object", "type": "object"} + ], + "enable_nested_fields": true + })"_json; + documents = { + R"({ + "coll_id": "a", + "object": {} + })"_json, + R"({ + "coll_id": "b", + "object": { + "reference": "product_b" + } + })"_json, + R"({ + "coll_id": "c", + "object": { + "reference": "product_a" + } + })"_json + }; + + collection_create_op = collectionManager.create_collection(schema_json); + ASSERT_TRUE(collection_create_op.ok()); + for (auto const &json: documents) { + auto add_op = collection_create_op.get()->add(json.dump()); + if (!add_op.ok()) { + LOG(INFO) << add_op.error(); + } + ASSERT_TRUE(add_op.ok()); + } + + for (auto i = 0; i < 3; i++) { + auto const doc_id = std::to_string(i); + auto doc = collection_create_op.get()->get(doc_id).get(); + ASSERT_EQ(doc_id, doc["id"]); + + if (i == 0) { + ASSERT_EQ(0, doc.count(".ref")); + ASSERT_EQ(0, doc.count("object.reference_sequence_id")); + continue; + } + + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("object.reference_sequence_id")); + // Referenced documents don't exist yet, so dummy value is present in the reference helper field. + ASSERT_EQ(UINT32_MAX, doc["object.reference_sequence_id"]); + } + + schema_json = + R"({ + "name": "Products", + "fields": [ + {"name": "product_id", "type": "string"}, + {"name": "product_name", "type": "string"}, + {"name": "product_description", "type": "string"}, + {"name": "rating", "type": "int32"} + ] + })"_json; + documents = { + R"({ + "product_id": "product_a", + "product_name": "shampoo", + "product_description": "Our new moisturizing shampoo is perfect for those with dry or damaged hair.", + "rating": "2" + })"_json, + R"({ + "product_id": "product_c", + "product_name": "comb", + "product_description": "Experience the natural elegance and gentle care of our handcrafted wooden combs – because your hair deserves the best.", + "rating": "3" + })"_json + }; + collection_create_op = collectionManager.create_collection(schema_json); + ASSERT_TRUE(collection_create_op.ok()); + for (auto const &json: documents) { + auto add_op = collection_create_op.get()->add(json.dump()); + if (!add_op.ok()) { + LOG(INFO) << add_op.error(); + } + ASSERT_TRUE(add_op.ok()); + } + + std::map req_params = { + {"collection", "Products"}, + {"q", "*"}, + {"query_by", "product_name"}, + {"filter_by", "id:* || $Customers(id:*)"}, + {"include_fields", "$Customers(id, strategy:nest_array) as Customers"} + }; + nlohmann::json embedded_params; + std::string json_res; + auto now_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + + auto search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts); + ASSERT_TRUE(search_op.ok()); + + auto res_obj = nlohmann::json::parse(json_res); + ASSERT_EQ(2, res_obj["found"].get()); + ASSERT_EQ(2, res_obj["hits"].size()); + ASSERT_EQ("1", res_obj["hits"][0]["document"]["id"]); + ASSERT_EQ(0, res_obj["hits"][0]["document"].count("Customers")); + + ASSERT_EQ("0", res_obj["hits"][1]["document"]["id"]); + ASSERT_EQ(1, res_obj["hits"][1]["document"].count("Customers")); + ASSERT_EQ(2, res_obj["hits"][1]["document"]["Customers"].size()); + ASSERT_EQ("0", res_obj["hits"][1]["document"]["Customers"][0]["id"]); + ASSERT_EQ("2", res_obj["hits"][1]["document"]["Customers"][1]["id"]); + + req_params = { + {"collection", "coll1"}, + {"q", "*"}, + {"include_fields", "$Products(product_id)"} + }; + + search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts); + ASSERT_TRUE(search_op.ok()); + + res_obj = nlohmann::json::parse(json_res); + ASSERT_EQ(3, res_obj["found"].get()); + ASSERT_EQ(3, res_obj["hits"].size()); + + ASSERT_EQ("2", res_obj["hits"][0]["document"]["id"]); + ASSERT_EQ(1, res_obj["hits"][0]["document"]["object"].count("Products")); + ASSERT_EQ("product_a", res_obj["hits"][0]["document"]["object"]["Products"]["product_id"]); + + ASSERT_EQ("1", res_obj["hits"][1]["document"]["id"]); + ASSERT_EQ(0, res_obj["hits"][1]["document"]["object"].count("Products")); + ASSERT_EQ(1, res_obj["hits"][1]["document"]["object"].count("reference")); + ASSERT_EQ("product_b", res_obj["hits"][1]["document"]["object"]["reference"]); + ASSERT_EQ(0, res_obj["hits"][1]["document"].count("Products")); + + ASSERT_EQ("0", res_obj["hits"][2]["document"]["id"]); + ASSERT_EQ(0, res_obj["hits"][2]["document"].count("Products")); + ASSERT_EQ(0, res_obj["hits"][2]["document"]["object"].count("reference")); + + auto doc_json = R"({ + "product_id": "product_b", + "product_name": "soap", + "product_description": "Introducing our all-natural, organic soap bar made with essential oils and botanical ingredients.", + "rating": "4" + })"_json; + auto add_doc_op = collection_create_op.get()->add(doc_json.dump()); + ASSERT_TRUE(add_doc_op.ok()); + + req_params = { + {"collection", "Products"}, + {"q", "*"}, + {"query_by", "product_name"}, + {"filter_by", "id:* || $Customers(id:*)"}, + {"include_fields", "$Customers(id, strategy:nest_array) as Customers"} + }; + + search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts); + ASSERT_TRUE(search_op.ok()); + + res_obj = nlohmann::json::parse(json_res); + ASSERT_EQ(3, res_obj["found"].get()); + ASSERT_EQ(3, res_obj["hits"].size()); + + ASSERT_EQ("2", res_obj["hits"][0]["document"]["id"]); + ASSERT_EQ(1, res_obj["hits"][0]["document"].count("Customers")); + ASSERT_EQ(1, res_obj["hits"][0]["document"]["Customers"].size()); + ASSERT_EQ("1", res_obj["hits"][0]["document"]["Customers"][0]["id"]); + + ASSERT_EQ("1", res_obj["hits"][1]["document"]["id"]); + ASSERT_EQ(0, res_obj["hits"][1]["document"].count("Customers")); + + ASSERT_EQ("0", res_obj["hits"][2]["document"]["id"]); + ASSERT_EQ(1, res_obj["hits"][2]["document"].count("Customers")); + ASSERT_EQ(2, res_obj["hits"][2]["document"]["Customers"].size()); + ASSERT_EQ("0", res_obj["hits"][2]["document"]["Customers"][0]["id"]); + ASSERT_EQ("2", res_obj["hits"][2]["document"]["Customers"][1]["id"]); + + { + auto const& customers = collectionManager.get_collection_unsafe("Customers"); + + doc_json = R"({ + "customer_id": "customer_b", + "customer_name": "Dan", + "product_price": 140, + "product_id": "product_b" + })"_json; + add_doc_op = customers->add(doc_json.dump()); + ASSERT_TRUE(add_doc_op.ok()); + + auto doc = customers->get("3").get(); + ASSERT_EQ("3", doc["id"]); + + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("product_id_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("product_id_sequence_id")); + // When referenced document is already present, reference helper field should be initialized to its seq_id. + ASSERT_EQ(2, doc["product_id_sequence_id"]); + } + + req_params = { + {"collection", "Products"}, + {"q", "*"}, + {"query_by", "product_name"}, + {"filter_by", "id:* || $Customers(id:*)"}, + {"include_fields", "$Customers(id, strategy:nest_array) as Customers"} + }; + + search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts); + ASSERT_TRUE(search_op.ok()); + + res_obj = nlohmann::json::parse(json_res); + ASSERT_EQ(3, res_obj["found"].get()); + ASSERT_EQ(3, res_obj["hits"].size()); + + ASSERT_EQ("2", res_obj["hits"][0]["document"]["id"]); + ASSERT_EQ(1, res_obj["hits"][0]["document"].count("Customers")); + ASSERT_EQ(2, res_obj["hits"][0]["document"]["Customers"].size()); + ASSERT_EQ("1", res_obj["hits"][0]["document"]["Customers"][0]["id"]); + ASSERT_EQ("3", res_obj["hits"][0]["document"]["Customers"][1]["id"]); + + ASSERT_EQ("1", res_obj["hits"][1]["document"]["id"]); + ASSERT_EQ(0, res_obj["hits"][1]["document"].count("Customers")); + + ASSERT_EQ("0", res_obj["hits"][2]["document"]["id"]); + ASSERT_EQ(1, res_obj["hits"][2]["document"].count("Customers")); + ASSERT_EQ(2, res_obj["hits"][2]["document"]["Customers"].size()); + ASSERT_EQ("0", res_obj["hits"][2]["document"]["Customers"][0]["id"]); + ASSERT_EQ("2", res_obj["hits"][2]["document"]["Customers"][1]["id"]); + + { + auto const& coll1 = collectionManager.get_collection_unsafe("coll1"); + + doc_json = R"({ + "coll_id": "d", + "object": { + "reference": "product_d" + } + })"_json; + add_doc_op = coll1->add(doc_json.dump()); + ASSERT_TRUE(add_doc_op.ok()); + + auto doc = coll1->get("3").get(); + ASSERT_EQ("3", doc["id"]); + + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("object.reference_sequence_id")); + // product_d doesn't exist yet, so dummy value is present in the reference helper field. + ASSERT_EQ(UINT32_MAX, doc["object.reference_sequence_id"]); + + doc_json = R"({ + "product_id": "product_d", + "product_name": "hair oil", + "product_description": "Revitalize your hair with our nourishing hair oil – nature's secret to lustrous, healthy locks.", + "rating": "foo" + })"_json; + add_doc_op = collection_create_op.get()->add(doc_json.dump()); + ASSERT_FALSE(add_doc_op.ok()); + ASSERT_EQ("Field `rating` must be an int32.", add_doc_op.error()); + + doc = coll1->get("3").get(); + ASSERT_EQ("3", doc["id"]); + + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("object.reference_sequence_id")); + // product_d was not indexed, reference helper field should remain unchanged. + ASSERT_EQ(UINT32_MAX, doc["object.reference_sequence_id"]); + + doc_json = R"({ + "product_id": "product_a", + "product_name": "hair oil", + "product_description": "Revitalize your hair with our nourishing hair oil – nature's secret to lustrous, healthy locks.", + "rating": "4" + })"_json; + add_doc_op = collection_create_op.get()->add(doc_json.dump()); + ASSERT_FALSE(add_doc_op.ok()); + // Singular reference field can only reference one document. + ASSERT_EQ("Error while updating async reference field `object.reference` of collection `coll1`: " + "Document `id: 2` already has a reference to document `0` of `Products` collection, " + "having reference value `product_a`.", add_doc_op.error()); + + doc = coll1->get("2").get(); + ASSERT_EQ("2", doc["id"]); + + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("object.reference_sequence_id")); + // product_a already existed, reference helper field should remain unchanged. + ASSERT_EQ(0, doc["object.reference_sequence_id"]); + + doc_json = R"({ + "product_id": "product_d", + "product_name": "hair oil", + "product_description": "Revitalize your hair with our nourishing hair oil – nature's secret to lustrous, healthy locks.", + "rating": "4" + })"_json; + add_doc_op = collection_create_op.get()->add(doc_json.dump()); + ASSERT_TRUE(add_doc_op.ok()); + + doc = coll1->get("3").get(); + ASSERT_EQ("3", doc["id"]); + + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("object.reference_sequence_id")); + ASSERT_EQ(5, doc["object.reference_sequence_id"]); + } + + schema_json = + R"({ + "name": "songs", + "fields": [ + { "name": "title", "type": "string" }, + { "name": "genres", "type": "string[]", "reference": "genres.id", "async_reference": true} + ] + })"_json; + documents = { + R"({"title":"Dil De Rani", "genres":[]})"_json, + R"({"title":"Corduroy", "genres":["1"]})"_json, + }; + collection_create_op = collectionManager.create_collection(schema_json); + ASSERT_TRUE(collection_create_op.ok()); + for (auto const &json: documents) { + auto add_op = collection_create_op.get()->add(json.dump()); + if (!add_op.ok()) { + LOG(INFO) << add_op.error(); + } + ASSERT_TRUE(add_op.ok()); + } + + { + auto doc = collection_create_op.get()->get("0").get(); + ASSERT_EQ("0", doc["id"]); + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("genres_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("genres_sequence_id")); + ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size()); + ASSERT_EQ(0, doc["genres_sequence_id"].size()); + + doc = collection_create_op.get()->get("1").get(); + ASSERT_EQ("1", doc["id"]); + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("genres_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("genres_sequence_id")); + ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size()); + ASSERT_EQ(1, doc["genres_sequence_id"].size()); + ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]); + } + + schema_json = + R"({ + "name": "genres", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "name", "type": "string" } + ] + })"_json; + documents = { + R"({"id":"0","name":"Grunge"})"_json, + R"({"id":"1","name":"Arena rock"})"_json + }; + collection_create_op = collectionManager.create_collection(schema_json); + ASSERT_TRUE(collection_create_op.ok()); + for (auto const &json: documents) { + auto add_op = collection_create_op.get()->add(json.dump()); + if (!add_op.ok()) { + LOG(INFO) << add_op.error(); + } + ASSERT_TRUE(add_op.ok()); + } + + req_params = { + {"collection", "songs"}, + {"q", "*"}, + {"include_fields", "$genres(name, strategy:nest) as genre"} + }; + + search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts); + ASSERT_TRUE(search_op.ok()); + + res_obj = nlohmann::json::parse(json_res); + ASSERT_EQ(2, res_obj["found"].get()); + ASSERT_EQ(2, res_obj["hits"].size()); + + ASSERT_EQ("Corduroy", res_obj["hits"][0]["document"]["title"].get()); + ASSERT_EQ(1, res_obj["hits"][0]["document"]["genre"].size()); + ASSERT_EQ("Arena rock", res_obj["hits"][0]["document"]["genre"][0]["name"]); + + ASSERT_EQ("Dil De Rani", res_obj["hits"][1]["document"]["title"].get()); + ASSERT_EQ(0, res_obj["hits"][1]["document"]["genre"].size()); + + { + auto const& songs_coll = collectionManager.get_collection_unsafe("songs"); + + doc_json = R"({"title":"Achilles Last Stand", "genres":["3","0","2"]})"_json; + add_doc_op = songs_coll->add(doc_json.dump()); + ASSERT_TRUE(add_doc_op.ok()); + + auto doc = songs_coll->get("2").get(); + ASSERT_EQ("2", doc["id"]); + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("genres_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("genres_sequence_id")); + ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size()); + ASSERT_EQ(3, doc["genres_sequence_id"].size()); + + ASSERT_EQ("3", doc["genres"][0]); + ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]); + ASSERT_EQ("0", doc["genres"][1]); + ASSERT_EQ(0, doc["genres_sequence_id"][1]); + + ASSERT_EQ("2", doc["genres"][2]); + ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][2]); + + auto remove_op = collection_create_op.get()->remove("0"); + ASSERT_TRUE(remove_op.ok()); + + doc = songs_coll->get("2").get(); + ASSERT_EQ("2", doc["id"]); + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("genres_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("genres_sequence_id")); + ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size()); + ASSERT_EQ(2, doc["genres_sequence_id"].size()); + ASSERT_EQ("3", doc["genres"][0]); + ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]); + + ASSERT_EQ("2", doc["genres"][1]); + ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][1]); + + doc_json = R"({"id":"2","name":"Blues"})"_json; + add_doc_op = collection_create_op.get()->add(doc_json.dump()); + ASSERT_TRUE(add_doc_op.ok()); + + doc = songs_coll->get("2").get(); + ASSERT_EQ("2", doc["id"]); + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("genres_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("genres_sequence_id")); + ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size()); + ASSERT_EQ(2, doc["genres_sequence_id"].size()); + ASSERT_EQ("3", doc["genres"][0]); + ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]); + + ASSERT_EQ("2", doc["genres"][1]); + ASSERT_EQ(2, doc["genres_sequence_id"][1]); + } + + req_params = { + {"collection", "songs"}, + {"q", "*"}, + {"include_fields", "$genres(name, strategy:nest) as genre"} + }; + + search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts); + ASSERT_TRUE(search_op.ok()); + + res_obj = nlohmann::json::parse(json_res); + ASSERT_EQ(3, res_obj["found"].get()); + ASSERT_EQ(3, res_obj["hits"].size()); + + ASSERT_EQ("Achilles Last Stand", res_obj["hits"][0]["document"]["title"].get()); + ASSERT_EQ(1, res_obj["hits"][0]["document"]["genre"].size()); + ASSERT_EQ("Blues", res_obj["hits"][0]["document"]["genre"][0]["name"]); + + ASSERT_EQ("Corduroy", res_obj["hits"][1]["document"]["title"].get()); + ASSERT_EQ(1, res_obj["hits"][1]["document"]["genre"].size()); + ASSERT_EQ("Arena rock", res_obj["hits"][1]["document"]["genre"][0]["name"]); + + ASSERT_EQ("Dil De Rani", res_obj["hits"][2]["document"]["title"].get()); + ASSERT_EQ(0, res_obj["hits"][2]["document"]["genre"].size()); + + collectionManager.dispose(); + delete store; + + store = new Store(state_dir_path); + collectionManager.init(store, 1.0, "auth_key", quit); + auto load_op = collectionManager.load(8, 1000); + + if(!load_op.ok()) { + LOG(ERROR) << load_op.error(); + } + ASSERT_TRUE(load_op.ok()); + + req_params = { + {"collection", "songs"}, + {"q", "*"}, + {"include_fields", "$genres(name, strategy:nest) as genre"} + }; + + search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts); + ASSERT_TRUE(search_op.ok()); + + res_obj = nlohmann::json::parse(json_res); + ASSERT_EQ(3, res_obj["found"].get()); + ASSERT_EQ(3, res_obj["hits"].size()); + + ASSERT_EQ("Achilles Last Stand", res_obj["hits"][0]["document"]["title"].get()); + ASSERT_EQ(1, res_obj["hits"][0]["document"]["genre"].size()); + ASSERT_EQ("Blues", res_obj["hits"][0]["document"]["genre"][0]["name"]); + + ASSERT_EQ("Corduroy", res_obj["hits"][1]["document"]["title"].get()); + ASSERT_EQ(1, res_obj["hits"][1]["document"]["genre"].size()); + ASSERT_EQ("Arena rock", res_obj["hits"][1]["document"]["genre"][0]["name"]); + + ASSERT_EQ("Dil De Rani", res_obj["hits"][2]["document"]["title"].get()); + ASSERT_EQ(0, res_obj["hits"][2]["document"]["genre"].size()); + + { + auto const& songs_coll = collectionManager.get_collection_unsafe("songs"); + auto doc = songs_coll->get("2").get(); + ASSERT_EQ("2", doc["id"]); + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("genres_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("genres_sequence_id")); + ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size()); + ASSERT_EQ(2, doc["genres_sequence_id"].size()); + ASSERT_EQ("3", doc["genres"][0]); + ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]); + + ASSERT_EQ("2", doc["genres"][1]); + ASSERT_EQ(2, doc["genres_sequence_id"][1]); + + auto const& genres_coll = collectionManager.get_collection_unsafe("genres"); + doc_json = R"({"id":"3","name":"Metal"})"_json; + add_doc_op = genres_coll->add(doc_json.dump()); + ASSERT_TRUE(add_doc_op.ok()); + + doc = songs_coll->get("2").get(); + ASSERT_EQ("2", doc["id"]); + ASSERT_EQ(1, doc.count(".ref")); + ASSERT_EQ(1, doc[".ref"].size()); + ASSERT_EQ("genres_sequence_id", doc[".ref"][0]); + + ASSERT_EQ(1, doc.count("genres_sequence_id")); + ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size()); + ASSERT_EQ(2, doc["genres_sequence_id"].size()); + ASSERT_EQ("3", doc["genres"][0]); + ASSERT_EQ(3, doc["genres_sequence_id"][0]); + + ASSERT_EQ("2", doc["genres"][1]); + ASSERT_EQ(2, doc["genres_sequence_id"][1]); + } +} + TEST_F(CollectionJoinTest, UpdateDocumentHavingReferenceField) { auto schema_json = R"({ @@ -2573,6 +3187,7 @@ TEST_F(CollectionJoinTest, FilterByNestedReferences) { ASSERT_EQ(1, res_obj["hits"][0]["document"]["Coll_B"].size()); ASSERT_EQ("coll_b_3", res_obj["hits"][0]["document"]["Coll_B"][0]["title"]); + ASSERT_EQ(1, res_obj["hits"][0]["document"]["Coll_B"][0].count("Coll_A")); ASSERT_EQ(1, res_obj["hits"][0]["document"]["Coll_B"][0]["Coll_A"].size()); ASSERT_EQ("coll_a_0", res_obj["hits"][0]["document"]["Coll_B"][0]["Coll_A"]["title"]); diff --git a/test/collection_manager_test.cpp b/test/collection_manager_test.cpp index 8d0cef5e..02b2505a 100644 --- a/test/collection_manager_test.cpp +++ b/test/collection_manager_test.cpp @@ -43,7 +43,7 @@ protected: {"name": "points", "type": "int32"}, {"name": "person", "type": "object", "optional": true}, {"name": "vec", "type": "float[]", "num_dim": 128, "optional": true}, - {"name": "product_id", "type": "string", "reference": "Products.product_id", "optional": true} + {"name": "product_id", "type": "string", "reference": "Products.product_id", "optional": true, "async_reference": true} ], "default_sorting_field": "points", "symbols_to_index":["+"], @@ -255,6 +255,7 @@ TEST_F(CollectionManagerTest, CollectionCreation) { "stem":false }, { + "async_reference":true, "facet":false, "index":true, "infix":false, @@ -340,7 +341,10 @@ TEST_F(CollectionManagerTest, ShouldInitCollection) { "\"string\", \"facet\": false}], \"default_sorting_field\": \"foo\"}"); spp::sparse_hash_map referenced_in; - Collection *collection = collectionManager.init_collection(collection_meta1, 100, store, 1.0f, referenced_in); + spp::sparse_hash_map> async_referenced_ins; + + Collection *collection = collectionManager.init_collection(collection_meta1, 100, store, 1.0f, referenced_in, + async_referenced_ins); ASSERT_EQ("foobar", collection->get_name()); ASSERT_EQ(100, collection->get_collection_id()); ASSERT_EQ(1, collection->get_fields().size()); @@ -362,7 +366,8 @@ TEST_F(CollectionManagerTest, ShouldInitCollection) { "\"symbols_to_index\": [\"+\"], \"token_separators\": [\"-\"]}"); - collection = collectionManager.init_collection(collection_meta2, 100, store, 1.0f, referenced_in); + collection = collectionManager.init_collection(collection_meta2, 100, store, 1.0f, referenced_in, + async_referenced_ins); ASSERT_EQ(12345, collection->get_created_at()); std::vector expected_symbols = {'+'}; @@ -500,6 +505,25 @@ TEST_F(CollectionManagerTest, RestoreRecordsOnRestart) { tsl::htrie_map schema = collection1->get_schema(); ASSERT_EQ(schema.count("product_id_sequence_id"), 1); + auto products_schema_json = + R"({ + "name": "Products", + "fields": [ + {"name": "product_id", "type": "string"}, + {"name": "product_name", "type": "string"}, + {"name": "product_description", "type": "string"} + ] + })"_json; + auto const& collection_create_op = collectionManager.create_collection(products_schema_json); + ASSERT_TRUE(collection_create_op.ok()); + + auto async_ref_fields = collection_create_op.get()->get_async_referenced_ins(); + ASSERT_EQ(1, async_ref_fields.size()); + ASSERT_EQ(1, async_ref_fields.count("product_id")); + ASSERT_EQ(1, async_ref_fields["product_id"].size()); + ASSERT_EQ("collection1", async_ref_fields["product_id"][0].collection); + ASSERT_EQ("product_id", async_ref_fields["product_id"][0].field); + // recreate collection manager to ensure that it restores the records from the disk backed store collectionManager.dispose(); delete store; @@ -573,6 +597,13 @@ TEST_F(CollectionManagerTest, RestoreRecordsOnRestart) { results = collection1->search("thomas", search_fields, "", facets, sort_fields, {0}, 10, 1, FREQUENCY, {false}).get(); ASSERT_EQ(4, results["hits"].size()); + + async_ref_fields = collectionManager.get_collection("Products").get()->get_async_referenced_ins(); + ASSERT_EQ(1, async_ref_fields.size()); + ASSERT_EQ(1, async_ref_fields.count("product_id")); + ASSERT_EQ(1, async_ref_fields["product_id"].size()); + ASSERT_EQ("collection1", async_ref_fields["product_id"][0].collection); + ASSERT_EQ("product_id", async_ref_fields["product_id"][0].field); } TEST_F(CollectionManagerTest, VerifyEmbeddedParametersOfScopedAPIKey) { @@ -1852,7 +1883,7 @@ TEST_F(CollectionManagerTest, ReferencedInBacklog) { auto const& references = referenced_ins_backlog.at("Products"); ASSERT_EQ(1, references.size()); ASSERT_EQ("collection1", references.cbegin()->collection); - ASSERT_EQ("product_id_sequence_id", references.cbegin()->field); + ASSERT_EQ("product_id", references.cbegin()->field); auto schema_json = R"({ @@ -1871,7 +1902,7 @@ TEST_F(CollectionManagerTest, ReferencedInBacklog) { auto get_reference_field_op = create_op.get()->get_referenced_in_field_with_lock("collection1"); ASSERT_TRUE(get_reference_field_op.ok()); - ASSERT_EQ("product_id_sequence_id", get_reference_field_op.get()); + ASSERT_EQ("product_id", get_reference_field_op.get()); get_reference_field_op = create_op.get()->get_referenced_in_field_with_lock("foo"); ASSERT_FALSE(get_reference_field_op.ok()); @@ -2071,7 +2102,8 @@ TEST_F(CollectionManagerTest, PopulateReferencedIns) { "name": "B", "fields": [ {"name": "b_id", "type": "string"}, - {"name": "b_ref", "type": "string", "reference": "A.a_id"} + {"name": "a_ref", "type": "string", "reference": "A.a_id"}, + {"name": "c_ref", "type": "string", "reference": "C.c_id", "async_reference": true} ] })"_json.dump(), R"({ @@ -2082,16 +2114,29 @@ TEST_F(CollectionManagerTest, PopulateReferencedIns) { })"_json.dump(), }; std::map> referenced_ins; + std::map>> async_referenced_ins; for (const auto &collection_meta_json: collection_meta_jsons) { - CollectionManager::_populate_referenced_ins(collection_meta_json, referenced_ins); + CollectionManager::_populate_referenced_ins(collection_meta_json, referenced_ins, async_referenced_ins); } - ASSERT_EQ(1, referenced_ins.size()); + ASSERT_EQ(2, referenced_ins.size()); ASSERT_EQ(1, referenced_ins.count("A")); ASSERT_EQ(1, referenced_ins["A"].size()); ASSERT_EQ(1, referenced_ins["A"].count("B")); - ASSERT_EQ("b_ref_sequence_id", referenced_ins["A"]["B"]); + ASSERT_EQ("a_ref", referenced_ins["A"]["B"]); + + ASSERT_EQ(1, referenced_ins.count("C")); + ASSERT_EQ(1, referenced_ins["C"].size()); + ASSERT_EQ(1, referenced_ins["C"].count("B")); + ASSERT_EQ("c_ref", referenced_ins["C"]["B"]); + + ASSERT_EQ(1, async_referenced_ins.count("C")); + ASSERT_EQ(1, async_referenced_ins["C"].size()); + ASSERT_EQ(1, async_referenced_ins["C"].count("c_id")); + ASSERT_EQ(1, async_referenced_ins["C"]["c_id"].size()); + ASSERT_EQ("B", async_referenced_ins["C"]["c_id"][0].collection); + ASSERT_EQ("c_ref", async_referenced_ins["C"]["c_id"][0].field); } TEST_F(CollectionManagerTest, CollectionPagination) {