Async reference field. (#1835)

* Add `async_reference` property to reference field.

* Accept `"async_reference": true` only when `reference` is provided.

* Avoid reference lookup during updation if ref helper field is present.

* Clear `referenced_in_backlog` on dispose.

* Add references asynchronously to singular reference field.

Store only the reference field name in the indexes instead of reference helper field name.

* Only update async references when a document is indexed successfully.

* Remove unused params.

* Refactor reference index.

* Move `update_async_references` into `Index`.

* Refactor `Collection::update_async_references_with_lock`.

* Handle async update of reference array field.

* Add test case.

* Remove `update_async_references` method from `Collection`.

* Add `Join` class.
This commit is contained in:
Harpreet Sangar 2024-08-30 11:20:05 +05:30 committed by GitHub
parent 9378de62f7
commit c8cd7e0472
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1992 additions and 843 deletions

View File

@ -20,6 +20,7 @@
#include "tokenizer.h"
#include "synonym_index.h"
#include "vq_model_manager.h"
#include "join.h"
struct doc_seq_id_t {
uint32_t seq_id;
@ -39,17 +40,6 @@ struct highlight_field_t {
}
};
struct reference_pair {
std::string collection;
std::string field;
reference_pair(std::string collection, std::string field) : collection(std::move(collection)), field(std::move(field)) {}
bool operator < (const reference_pair& pair) const {
return collection < pair.collection;
}
};
class Collection {
private:
@ -140,14 +130,17 @@ private:
SynonymIndex* synonym_index;
/// "field name" -> reference_pair(referenced_collection_name, referenced_field_name)
spp::sparse_hash_map<std::string, reference_pair> reference_fields;
/// "field name" -> reference_info(referenced_collection_name, referenced_field_name, is_async)
spp::sparse_hash_map<std::string, reference_info_t> reference_fields;
/// Contains the info where the current collection is referenced.
/// Useful to perform operations such as cascading delete.
/// collection_name -> field_name
spp::sparse_hash_map<std::string, std::string> referenced_in;
/// "field name" -> List of <collection, field> pairs where this collection is referenced and is marked as `async`.
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>> async_referenced_ins;
/// Reference helper fields that are part of an object. The reference doc of these fields will be included in the
/// object rather than in the document.
tsl::htrie_set<char> object_reference_helper_fields;
@ -220,7 +213,7 @@ private:
bool is_update,
std::vector<field>& new_fields,
bool enable_nested_fields,
const spp::sparse_hash_map<std::string, reference_pair>& reference_fields,
const spp::sparse_hash_map<std::string, reference_info_t>& reference_fields,
tsl::htrie_set<char>& object_reference_helper_fields);
static bool facet_count_compare(const facet_count_t& a, const facet_count_t& b) {
@ -331,13 +324,6 @@ private:
Option<std::string> get_referenced_in_field(const std::string& collection_name) const;
Option<bool> get_related_ids(const std::string& ref_field_name, const uint32_t& seq_id,
std::vector<uint32_t>& result) const;
Option<bool> get_object_array_related_id(const std::string& ref_field_name,
const uint32_t& seq_id, const uint32_t& object_index,
uint32_t& result) const;
void remove_embedding_field(const std::string& field_name);
Option<bool> parse_and_validate_vector_query(const std::string& vector_query_str,
@ -375,6 +361,9 @@ public:
static constexpr const char* COLLECTION_METADATA = "metadata";
/// Value used when async_reference is true and a reference doc is not found.
static constexpr int64_t reference_helper_sentinel_value = UINT32_MAX;
// methods
Collection() = delete;
@ -386,7 +375,9 @@ public:
const std::vector<std::string>& symbols_to_index, const std::vector<std::string>& token_separators,
const bool enable_nested_fields, std::shared_ptr<VQModel> vq_model = nullptr,
spp::sparse_hash_map<std::string, std::string> referenced_in = spp::sparse_hash_map<std::string, std::string>(),
const nlohmann::json& metadata = {});
const nlohmann::json& metadata = {},
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>> async_referenced_ins =
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>());
~Collection();
@ -434,11 +425,6 @@ public:
void update_metadata(const nlohmann::json& meta);
static Option<bool> add_reference_helper_fields(nlohmann::json& document, const tsl::htrie_map<char, field>& schema,
const spp::sparse_hash_map<std::string, reference_pair>& reference_fields,
tsl::htrie_set<char>& object_reference_helper_fields,
const bool& is_update);
Option<doc_seq_id_t> to_doc(const std::string& json_str, nlohmann::json& document,
const index_operation_t& operation,
const DIRTY_VALUES dirty_values,
@ -458,18 +444,6 @@ public:
static void remove_reference_helper_fields(nlohmann::json& document);
static Option<bool> prune_ref_doc(nlohmann::json& doc,
const reference_filter_result_t& references,
const tsl::htrie_set<char>& ref_include_fields_full,
const tsl::htrie_set<char>& ref_exclude_fields_full,
const bool& is_reference_array,
const ref_include_exclude_fields& ref_include_exclude);
static Option<bool> include_references(nlohmann::json& doc, const uint32_t& seq_id, Collection *const collection,
const std::map<std::string, reference_filter_result_t>& reference_filter_results,
const std::vector<ref_include_exclude_fields>& ref_include_exclude_fields_vec,
const nlohmann::json& original_doc);
Option<bool> prune_doc_with_lock(nlohmann::json& doc, const tsl::htrie_set<char>& include_names,
const tsl::htrie_set<char>& exclude_names,
const std::map<std::string, reference_filter_result_t>& reference_filter_results = {},
@ -617,7 +591,7 @@ public:
Option<nlohmann::json> get(const std::string & id) const;
void cascade_remove_docs(const std::string& ref_helper_field_name, const uint32_t& ref_seq_id,
void cascade_remove_docs(const std::string& field_name, const uint32_t& ref_seq_id,
const nlohmann::json& ref_doc, bool remove_from_store = true);
Option<std::string> remove(const std::string & id, bool remove_from_store = true);
@ -666,7 +640,9 @@ public:
SynonymIndex* get_synonym_index();
spp::sparse_hash_map<std::string, reference_pair> get_reference_fields();
spp::sparse_hash_map<std::string, reference_info_t> get_reference_fields();
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>> get_async_referenced_ins();
// highlight ops
@ -710,17 +686,20 @@ public:
bool is_referenced_in(const std::string& collection_name) const;
void add_referenced_in(const reference_pair& pair);
void add_referenced_ins(const std::set<reference_info_t>& ref_infos);
void add_referenced_ins(const std::set<reference_pair>& pairs);
void add_referenced_in(const std::string& collection_name, const std::string& field_name);
void add_referenced_in(const std::string& collection_name, const std::string& field_name,
const bool& is_async, const std::string& referenced_field_name);
Option<std::string> get_referenced_in_field_with_lock(const std::string& collection_name) const;
Option<bool> get_related_ids_with_lock(const std::string& field_name, const uint32_t& seq_id,
std::vector<uint32_t>& result) const;
Option<bool> update_async_references_with_lock(const std::string& ref_coll_name, const std::string& filter,
const std::set<std::string>& filter_values,
const uint32_t ref_seq_id, const std::string& field_name);
Option<uint32_t> get_sort_index_value_with_lock(const std::string& field_name, const uint32_t& seq_id) const;
static void hide_credential(nlohmann::json& json, const std::string& credential_name);
@ -732,6 +711,13 @@ public:
void expand_search_query(const std::string& raw_query, size_t offset, size_t total, const search_args* search_params,
const std::vector<std::vector<KV*>>& result_group_kvs,
const std::vector<std::string>& raw_search_fields, std::string& first_q) const;
Option<bool> get_object_array_related_id(const std::string& ref_field_name,
const uint32_t& seq_id, const uint32_t& object_index,
uint32_t& result) const;
Option<bool> get_related_ids(const std::string& ref_field_name, const uint32_t& seq_id,
std::vector<uint32_t>& result) const;
};
template<class T>

View File

@ -79,7 +79,7 @@ private:
std::atomic<bool>* quit;
// All the references to a particular collection are stored until it is created.
std::map<std::string, std::set<reference_pair>> referenced_in_backlog;
std::map<std::string, std::set<reference_info_t>> referenced_in_backlog;
CollectionManager();
@ -125,13 +125,15 @@ public:
const uint32_t collection_next_seq_id,
Store* store,
float max_memory_ratio,
spp::sparse_hash_map<std::string, std::string>& referenced_in);
spp::sparse_hash_map<std::string, std::string>& referenced_in,
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>& async_referenced_ins);
static Option<bool> load_collection(const nlohmann::json& collection_meta,
const size_t batch_size,
const StoreStatus& next_coll_id_status,
const std::atomic<bool>& quit,
spp::sparse_hash_map<std::string, std::string>& referenced_in);
spp::sparse_hash_map<std::string, std::string>& referenced_in,
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>& async_referenced_ins);
Option<Collection*> clone_collection(const std::string& existing_name, const nlohmann::json& req_json);
@ -234,14 +236,15 @@ public:
std::vector<std::string>& exclude_fields_vec,
std::vector<ref_include_exclude_fields>& ref_include_exclude_fields_vec);
void add_referenced_in_backlog(const std::string& collection_name, reference_pair&& pair);
void add_referenced_in_backlog(const std::string& collection_name, reference_info_t&& ref_info);
std::map<std::string, std::set<reference_pair>> _get_referenced_in_backlog() const;
std::map<std::string, std::set<reference_info_t>> _get_referenced_in_backlog() const;
void process_embedding_field_delete(const std::string& model_name);
static void _populate_referenced_ins(const std::string& collection_meta_json,
std::map<std::string, spp::sparse_hash_map<std::string, std::string>>& referenced_ins);
std::map<std::string, spp::sparse_hash_map<std::string, std::string>>& referenced_ins,
std::map<std::string, spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>>& async_referenced_ins);
std::unordered_set<std::string> get_collection_references(const std::string& coll_name);

View File

@ -60,6 +60,7 @@ namespace fields {
static const std::string num_dim = "num_dim";
static const std::string vec_dist = "vec_dist";
static const std::string reference = "reference";
static const std::string async_reference = "async_reference";
static const std::string embed = "embed";
static const std::string from = "from";
static const std::string model_name = "model_name";
@ -86,6 +87,14 @@ enum vector_distance_type_t {
cosine
};
struct reference_pair_t {
std::string collection;
std::string field;
reference_pair_t(std::string collection, std::string field) : collection(std::move(collection)),
field(std::move(field)) {}
};
struct field {
std::string name;
std::string type;
@ -112,6 +121,7 @@ struct field {
static constexpr int VAL_UNKNOWN = 2;
std::string reference; // Foo.bar (reference to bar field in Foo collection).
bool is_async_reference = false;
bool range_index;
@ -127,10 +137,13 @@ struct field {
field(const std::string &name, const std::string &type, const bool facet, const bool optional = false,
bool index = true, std::string locale = "", int sort = -1, int infix = -1, bool nested = false,
int nested_array = 0, size_t num_dim = 0, vector_distance_type_t vec_dist = cosine,
std::string reference = "", const nlohmann::json& embed = nlohmann::json(), const bool range_index = false, const bool store = true, const bool stem = false, const nlohmann::json hnsw_params = nlohmann::json()) :
std::string reference = "", const nlohmann::json& embed = nlohmann::json(), const bool range_index = false,
const bool store = true, const bool stem = false, const nlohmann::json hnsw_params = nlohmann::json(),
const bool async_reference = false) :
name(name), type(type), facet(facet), optional(optional), index(index), locale(locale),
nested(nested), nested_array(nested_array), num_dim(num_dim), vec_dist(vec_dist), reference(reference),
embed(embed), range_index(range_index), store(store), stem(stem), hnsw_params(hnsw_params) {
embed(embed), range_index(range_index), store(store), stem(stem), hnsw_params(hnsw_params),
is_async_reference(async_reference) {
set_computed_defaults(sort, infix);

View File

@ -569,6 +569,10 @@ private:
const index_record* record,
const std::vector<embedding_res_t>& embedding_results,
size_t& count, const field& the_field);
void update_async_references(const std::string& collection_name, const field& afield,
std::vector<index_record>& iter_batch,
const std::vector<reference_pair_t>& async_referenced_ins = {});
public:
// for limiting number of results on multiple candidates / query rewrites
enum {TYPO_TOKENS_THRESHOLD = 1};
@ -740,9 +744,14 @@ public:
const size_t remote_embedding_timeout_ms = 60000,
const size_t remote_embedding_num_tries = 2, const bool generate_embeddings = true,
const bool use_addition_fields = false,
const tsl::htrie_map<char, field>& addition_fields = tsl::htrie_map<char, field>());
const tsl::htrie_map<char, field>& addition_fields = tsl::htrie_map<char, field>(),
const std::string& collection_name = "",
const spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>& async_referenced_ins =
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>());
void index_field_in_memory(const field& afield, std::vector<index_record>& iter_batch);
void index_field_in_memory(const std::string& collection_name, const field& afield,
std::vector<index_record>& iter_batch,
const std::vector<reference_pair_t>& async_referenced_ins = {});
template<class T>
void iterate_and_index_numerical_field(std::vector<index_record>& iter_batch, const field& afield, T func);
@ -762,9 +771,9 @@ public:
Option<bool> do_reference_filtering_with_lock(filter_node_t* const filter_tree_root,
filter_result_t& filter_result,
const std::string& ref_collection_name,
const std::string& reference_helper_field_name) const;
const std::string& field_name) const;
Option<filter_result_t> do_filtering_with_reference_ids(const std::string& reference_helper_field_name,
Option<filter_result_t> do_filtering_with_reference_ids(const std::string& field_name,
const std::string& ref_collection_name,
filter_result_t&& ref_filter_result) const;
@ -1037,7 +1046,7 @@ public:
std::vector<uint32_t>& outside_seq_ids);
Option<bool> get_related_ids(const std::string& collection_name,
const std::string& reference_helper_field_name,
const std::string& field_name,
const uint32_t& seq_id, std::vector<uint32_t>& result) const;
Option<bool> get_object_array_related_id(const std::string& collection_name,

46
include/join.h Normal file
View File

@ -0,0 +1,46 @@
#pragma once
#include "option.h"
#include "json.hpp"
#include "tsl/htrie_map.h"
#include "field.h"
#include "tsl/htrie_set.h"
#include "filter_result_iterator.h"
struct reference_info_t {
std::string collection;
std::string field;
bool is_async;
std::string referenced_field_name;
reference_info_t(std::string collection, std::string field, bool is_async, std::string referenced_field_name = "") :
collection(std::move(collection)), field(std::move(field)), is_async(is_async),
referenced_field_name(std::move(referenced_field_name)) {}
bool operator < (const reference_info_t& pair) const {
return collection < pair.collection;
}
};
class Join {
public:
static Option<bool> add_reference_helper_fields(nlohmann::json& document,
const tsl::htrie_map<char, field>& schema,
const spp::sparse_hash_map<std::string, reference_info_t>& reference_fields,
tsl::htrie_set<char>& object_reference_helper_fields,
const bool& is_update);
static Option<bool> prune_ref_doc(nlohmann::json& doc,
const reference_filter_result_t& references,
const tsl::htrie_set<char>& ref_include_fields_full,
const tsl::htrie_set<char>& ref_exclude_fields_full,
const bool& is_reference_array,
const ref_include_exclude_fields& ref_include_exclude);
static Option<bool> include_references(nlohmann::json& doc, const uint32_t& seq_id, Collection *const collection,
const std::map<std::string, reference_filter_result_t>& reference_filter_results,
const std::vector<ref_include_exclude_fields>& ref_include_exclude_fields_vec,
const nlohmann::json& original_doc);
};

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,8 @@ Collection* CollectionManager::init_collection(const nlohmann::json & collection
const uint32_t collection_next_seq_id,
Store* store,
float max_memory_ratio,
spp::sparse_hash_map<std::string, std::string>& referenced_in) {
spp::sparse_hash_map<std::string, std::string>& referenced_in,
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>& async_referenced_ins) {
std::string this_collection_name = collection_meta[Collection::COLLECTION_NAME_KEY].get<std::string>();
std::vector<field> fields;
@ -201,7 +202,7 @@ Collection* CollectionManager::init_collection(const nlohmann::json & collection
symbols_to_index,
token_separators,
enable_nested_fields, model, std::move(referenced_in),
metadata);
metadata, std::move(async_referenced_ins));
return collection;
}
@ -238,7 +239,8 @@ void CollectionManager::init(Store *store, const float max_memory_ratio, const s
}
void CollectionManager::_populate_referenced_ins(const std::string& collection_meta_json,
std::map<std::string, spp::sparse_hash_map<std::string, std::string>>& referenced_ins) {
std::map<std::string, spp::sparse_hash_map<std::string, std::string>>& referenced_ins,
std::map<std::string, spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>>& async_referenced_ins) {
auto const& obj = nlohmann::json::parse(collection_meta_json, nullptr, false);
if (!obj.is_discarded() && obj.is_object() && obj.contains("name") && obj["name"].is_string() &&
@ -249,10 +251,18 @@ void CollectionManager::_populate_referenced_ins(const std::string& collection_m
if (!field.contains("name") || !field.contains("reference")) {
continue;
}
auto field_name = std::string(field["name"]) + fields::REFERENCE_HELPER_FIELD_SUFFIX;
auto field_name = std::string(field["name"]);
auto const& reference = field["reference"].get<std::string>();
std::vector<std::string> split_result;
StringUtils::split(field["reference"], split_result, ".");
auto ref_coll_name = split_result.front();
StringUtils::split(reference, split_result, ".");
if (split_result.size() < 2) {
LOG(ERROR) << "Invalid reference `" << reference << "`.";
continue;
}
auto ref_coll_name = split_result[0];
auto ref_field_name = reference.substr(ref_coll_name.size() + 1);
// Resolves alias if used in schema.
auto actual_ref_coll_it = CollectionManager::get_instance().collection_symlinks.find(ref_coll_name);
@ -264,6 +274,11 @@ void CollectionManager::_populate_referenced_ins(const std::string& collection_m
}
referenced_ins[ref_coll_name].emplace(collection_name, field_name);
if (field.contains(fields::async_reference) &&
field[fields::async_reference].is_boolean() && field[fields::async_reference].get<bool>()) {
async_referenced_ins[ref_coll_name][ref_field_name].emplace_back(collection_name, field_name);
}
}
}
}
@ -321,8 +336,10 @@ Option<bool> CollectionManager::load(const size_t collection_batch_size, const s
// Collection name -> Ref collection name -> Ref field name
std::map<std::string, spp::sparse_hash_map<std::string, std::string>> referenced_ins;
// Collection name -> field name -> {Ref collection name, Ref field name}
std::map<std::string, spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>> async_referenced_ins;
for (const auto &collection_meta_json: collection_meta_jsons) {
_populate_referenced_ins(collection_meta_json, referenced_ins);
_populate_referenced_ins(collection_meta_json, referenced_ins, async_referenced_ins);
}
size_t num_processed = 0;
@ -343,7 +360,7 @@ Option<bool> CollectionManager::load(const size_t collection_batch_size, const s
auto captured_store = store;
loading_pool.enqueue([captured_store, num_collections, collection_meta, document_batch_size,
&m_process, &cv_process, &num_processed, &next_coll_id_status, quit = quit,
&referenced_ins, collection_name]() {
&referenced_ins, &async_referenced_ins, collection_name]() {
spp::sparse_hash_map<std::string, std::string> referenced_in;
auto const& it = referenced_ins.find(collection_name);
@ -351,9 +368,15 @@ Option<bool> CollectionManager::load(const size_t collection_batch_size, const s
referenced_in = it->second;
}
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>> async_referenced_in;
auto const& async_it = async_referenced_ins.find(collection_name);
if (async_it != async_referenced_ins.end()) {
async_referenced_in = async_it->second;
}
//auto begin = std::chrono::high_resolution_clock::now();
Option<bool> res = load_collection(collection_meta, document_batch_size, next_coll_id_status, *quit,
referenced_in);
referenced_in, async_referenced_in);
/*long long int timeMillis =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - begin).count();
LOG(INFO) << "Time taken for indexing: " << timeMillis << "ms";*/
@ -457,6 +480,7 @@ void CollectionManager::dispose() {
collections.clear();
collection_symlinks.clear();
preset_configs.clear();
referenced_in_backlog.clear();
store->close();
}
@ -554,10 +578,10 @@ Option<Collection*> CollectionManager::create_collection(const std::string& name
add_to_collections(new_collection);
lock.lock();
if (referenced_in_backlog.count(name) > 0) {
new_collection->add_referenced_ins(referenced_in_backlog.at(name));
referenced_in_backlog.erase(name);
auto it = referenced_in_backlog.find(name);
if (it != referenced_in_backlog.end()) {
new_collection->add_referenced_ins(it->second);
referenced_in_backlog.erase(it);
}
return Option<Collection*>(new_collection);
@ -2286,7 +2310,8 @@ Option<bool> CollectionManager::load_collection(const nlohmann::json &collection
const size_t batch_size,
const StoreStatus& next_coll_id_status,
const std::atomic<bool>& quit,
spp::sparse_hash_map<std::string, std::string>& referenced_in) {
spp::sparse_hash_map<std::string, std::string>& referenced_in,
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>& async_referenced_ins) {
auto& cm = CollectionManager::get_instance();
@ -2332,7 +2357,8 @@ Option<bool> CollectionManager::load_collection(const nlohmann::json &collection
}
}
Collection* collection = init_collection(collection_meta, collection_next_seq_id, cm.store, 1.0f, referenced_in);
Collection* collection = init_collection(collection_meta, collection_next_seq_id, cm.store, 1.0f, referenced_in,
async_referenced_ins);
LOG(INFO) << "Loading collection " << collection->get_name();
@ -2564,12 +2590,12 @@ Option<Collection*> CollectionManager::clone_collection(const string& existing_n
return Option<Collection*>(new_coll);
}
void CollectionManager::add_referenced_in_backlog(const std::string& collection_name, reference_pair&& pair) {
void CollectionManager::add_referenced_in_backlog(const std::string& collection_name, reference_info_t&& ref_info) {
std::shared_lock lock(mutex);
referenced_in_backlog[collection_name].insert(pair);
referenced_in_backlog[collection_name].insert(ref_info);
}
std::map<std::string, std::set<reference_pair>> CollectionManager::_get_referenced_in_backlog() const {
std::map<std::string, std::set<reference_info_t>> CollectionManager::_get_referenced_in_backlog() const {
std::shared_lock lock(mutex);
return referenced_in_backlog;
}

View File

@ -81,6 +81,18 @@ Option<bool> field::json_field_to_field(bool enable_nested_fields, nlohmann::jso
field_json[fields::reference] = "";
}
if (field_json.count(fields::async_reference) == 0) {
field_json[fields::async_reference] = false;
} else if (!field_json.at(fields::async_reference).is_boolean()) {
return Option<bool>(400, std::string("The `async_reference` property of the field `") +
field_json[fields::name].get<std::string>() + std::string("` should be a boolean."));
} else if (field_json[fields::async_reference].get<bool>() &&
field_json[fields::reference].get<std::string>().empty()) {
return Option<bool>(400, std::string("The `async_reference` property of the field `") +
field_json[fields::name].get<std::string>() + std::string("` is only applicable if "
"`reference` is specified."));
}
if(field_json.count(fields::stem) != 0) {
if(!field_json.at(fields::stem).is_boolean()) {
return Option<bool>(400, std::string("The `stem` property of the field `") +
@ -411,7 +423,8 @@ Option<bool> field::json_field_to_field(bool enable_nested_fields, nlohmann::jso
field_json[fields::sort], field_json[fields::infix], field_json[fields::nested],
field_json[fields::nested_array], field_json[fields::num_dim], vec_dist,
field_json[fields::reference], field_json[fields::embed], field_json[fields::range_index],
field_json[fields::store], field_json[fields::stem], field_json[fields::hnsw_params])
field_json[fields::store], field_json[fields::stem], field_json[fields::hnsw_params],
field_json[fields::async_reference])
);
if (!field_json[fields::reference].get<std::string>().empty()) {
@ -812,6 +825,7 @@ Option<bool> field::fields_to_json_fields(const std::vector<field>& fields, cons
if (!field.reference.empty()) {
field_val[fields::reference] = field.reference;
field_val[fields::async_reference] = field.is_async_reference;
}
fields_json.push_back(field_val);

View File

@ -837,8 +837,8 @@ void filter_result_iterator_t::init() {
return;
}
auto const& reference_helper_field_name = get_reference_field_op.get();
auto op = index->do_filtering_with_reference_ids(reference_helper_field_name, ref_collection_name,
auto const& ref_field_name = get_reference_field_op.get();
auto op = index->do_filtering_with_reference_ids(ref_field_name, ref_collection_name,
std::move(result));
if (!op.ok()) {
status = Option<bool>(op.code(), op.error());

View File

@ -565,7 +565,9 @@ batch_memory_index(Index *index,
const bool do_validation, const size_t remote_embedding_batch_size,
const size_t remote_embedding_timeout_ms, const size_t remote_embedding_num_tries,
const bool generate_embeddings,
const bool use_addition_fields, const tsl::htrie_map<char, field>& addition_fields) {
const bool use_addition_fields, const tsl::htrie_map<char, field>& addition_fields,
const std::string& collection_name,
const spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>& async_referenced_ins) {
const size_t concurrency = 4;
const size_t num_threads = std::min(concurrency, iter_batch.size());
const size_t window_size = (num_threads == 0) ? 0 :
@ -649,8 +651,14 @@ batch_memory_index(Index *index,
const field& f = (field_name == "id") ?
field("id", field_types::STRING, false) : indexable_schema.at(field_name);
std::vector<reference_pair_t> async_references;
auto it = async_referenced_ins.find(field_name);
if (it != async_referenced_ins.end()) {
async_references = it->second;
}
try {
index->index_field_in_memory(f, iter_batch);
index->index_field_in_memory(collection_name, f, iter_batch, async_references);
} catch(std::exception& e) {
LOG(ERROR) << "Unhandled Typesense error: " << e.what();
for(auto& record: iter_batch) {
@ -672,7 +680,9 @@ batch_memory_index(Index *index,
return num_indexed;
}
void Index::index_field_in_memory(const field& afield, std::vector<index_record>& iter_batch) {
void Index::index_field_in_memory(const std::string& collection_name, const field& afield,
std::vector<index_record>& iter_batch,
const std::vector<reference_pair_t>& async_referenced_ins) {
// indexes a given field of all documents in the batch
if(afield.name == "id") {
@ -687,6 +697,9 @@ void Index::index_field_in_memory(const field& afield, std::vector<index_record>
}
}
if (!async_referenced_ins.empty()) {
update_async_references(collection_name, afield, iter_batch, async_referenced_ins);
}
return;
}
@ -1150,6 +1163,127 @@ void Index::index_field_in_memory(const field& afield, std::vector<index_record>
}
}
}
if (!async_referenced_ins.empty()) {
update_async_references(collection_name, afield, iter_batch, async_referenced_ins);
}
}
void Index::update_async_references(const std::string& collection_name, const field& afield,
std::vector<index_record>& iter_batch,
const std::vector<reference_pair_t>& async_referenced_ins) {
for (auto& record: iter_batch) {
if (!record.indexed.ok() || record.is_update) {
continue;
}
auto const& document = record.doc;
auto const& is_update = record.is_update;
auto const& seq_id = record.seq_id;
for (const auto& pair: async_referenced_ins) {
auto const& reference_collection_name = pair.collection;
auto const& reference_field_name = pair.field;
auto& cm = CollectionManager::get_instance();
auto ref_coll = cm.get_collection(reference_collection_name);
if (ref_coll == nullptr) {
record.index_failure(400, "Collection `" + reference_collection_name + "` with async_reference to the"
" collection `" += collection_name + "` not found.");
continue;
}
auto const& ref_fields = ref_coll->get_reference_fields();
auto const ref_field_it = ref_fields.find(reference_field_name);
if (ref_field_it == ref_fields.end()) {
record.index_failure(400, "Field `" + reference_field_name + "` not found in the ref schema of `" +=
reference_collection_name + "` having async_reference to `" += collection_name +
"` collection.");
continue;
}
if (ref_field_it->second.collection != collection_name) {
record.index_failure(400, "`" + reference_collection_name + "." += reference_field_name +
"` does not have a reference to `" += collection_name + "` collection.");
continue;
}
auto const& ref_schema = ref_coll->get_schema();
if (ref_schema.count(reference_field_name) == 0) {
record.index_failure(400, "Field `" + reference_field_name + "` not found in the schema of `" +=
reference_collection_name + "` having async_reference to `" +=
collection_name + "` collection.");
continue;
}
auto const& field_name = ref_field_it->second.field;
if (field_name != "id" && search_schema.count(field_name) == 0) {
record.index_failure(400, "Field `" + field_name + "`, referenced by `" += reference_collection_name +
"." += reference_field_name + "`, not found in `" += collection_name +
"` collection.");
continue;
}
auto const& optional = field_name != "id" && search_schema.at(field_name).optional;
auto is_required = !is_update && !optional;
if (is_required && document.count(field_name) != 1) {
record.index_failure(400, "Missing the required field `" + field_name + "` in the document.");
continue;
} else if (document.count(field_name) != 1) {
continue;
}
// After collecting the value(s) present in the field referenced by the other collection(ref_coll), we will add
// this document's seq_id as a reference where the value(s) match.
std::string ref_filter_value;
std::set<std::string> values;
if (document.at(field_name).is_array()) {
ref_filter_value = "[";
for (auto const& value: document[field_name]) {
if (value.is_number_integer()) {
auto const& v = std::to_string(value.get<int64_t>());
ref_filter_value += v;
values.insert(v);
} else if (value.is_string()) {
auto const& v = value.get<std::string>();
ref_filter_value += v;
values.insert(v);
} else {
record.index_failure(400, "Field `" + field_name + "` must only have string/int32/int64 values.");
continue;
}
ref_filter_value += ",";
}
ref_filter_value[ref_filter_value.size() - 1] = ']';
} else {
auto const& value = document[field_name];
if (value.is_number_integer()) {
auto const& v = std::to_string(value.get<int64_t>());
ref_filter_value += v;
values.insert(v);
} else if (value.is_string()) {
auto const& v = value.get<std::string>();
ref_filter_value += v;
values.insert(v);
} else {
record.index_failure(400, "Field `" + field_name + "` must only have string/int32/int64 values.");
continue;
}
}
if (values.empty()) {
continue;
}
auto const ref_filter = reference_field_name + ":= " += ref_filter_value;
auto update_op = ref_coll->update_async_references_with_lock(collection_name, ref_filter, values, seq_id,
reference_field_name);
if (!update_op.ok()) {
record.index_failure(400, "Error while updating async reference field `" + reference_field_name +
"` of collection `" += reference_collection_name + "`: " += update_op.error());
}
}
}
}
void Index::tokenize_string(const std::string& text, const field& a_field,
@ -1727,7 +1861,7 @@ void aggregate_nested_references(single_filter_result_t *const reference_result,
Option<bool> Index::do_reference_filtering_with_lock(filter_node_t* const filter_tree_root,
filter_result_t& filter_result,
const std::string& ref_collection_name,
const std::string& reference_helper_field_name) const {
const std::string& field_name) const {
std::shared_lock lock(mutex);
auto ref_filter_result_iterator = filter_result_iterator_t(ref_collection_name, this, filter_tree_root,
@ -1755,6 +1889,7 @@ Option<bool> Index::do_reference_filtering_with_lock(filter_node_t* const filter
ref_filter_result->docs = nullptr;
std::unique_ptr<uint32_t[]> docs_guard(reference_docs);
auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX;
auto const is_nested_join = !ref_filter_result_iterator.reference.empty();
if (search_schema.at(reference_helper_field_name).is_singular()) { // Only one reference per doc.
if (sort_index.count(reference_helper_field_name) == 0) {
@ -1838,6 +1973,10 @@ Option<bool> Index::do_reference_filtering_with_lock(filter_node_t* const filter
}
auto doc_id = ref_index.at(reference_doc_id);
if (doc_id == Collection::reference_helper_sentinel_value) {
continue;
}
id_pairs.emplace_back(std::make_pair(doc_id, reference_doc_id));
unique_doc_ids.insert(doc_id);
}
@ -2036,7 +2175,7 @@ Option<bool> Index::do_reference_filtering_with_lock(filter_node_t* const filter
return Option(true);
}
Option<filter_result_t> Index::do_filtering_with_reference_ids(const std::string& reference_helper_field_name,
Option<filter_result_t> Index::do_filtering_with_reference_ids(const std::string& field_name,
const std::string& ref_collection_name,
filter_result_t&& ref_filter_result) const {
filter_result_t filter_result;
@ -2048,6 +2187,7 @@ Option<filter_result_t> Index::do_filtering_with_reference_ids(const std::string
return Option<filter_result_t>(filter_result);
}
auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX;
if (numerical_index.count(reference_helper_field_name) == 0) {
return Option<filter_result_t>(400, "`" + reference_helper_field_name + "` is not present in index.");
}
@ -4831,12 +4971,14 @@ Option<bool> Index::ref_compute_sort_scores(const sort_by& sort_field, const uin
return Option<bool>(get_reference_field_op.code(), get_reference_field_op.error());
}
auto const& field_name = get_reference_field_op.get();
if (sort_index.count(field_name) == 0) {
return Option<bool>(400, "Could not find `" + field_name + "` in sort_index.");
} else if (sort_index.at(field_name)->count(seq_id) == 0) {
auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX;
if (sort_index.count(reference_helper_field_name) == 0) {
return Option<bool>(400, "Could not find `" + reference_helper_field_name + "` in sort_index.");
} else if (sort_index.at(reference_helper_field_name)->count(seq_id) == 0) {
reference_found = false;
} else {
ref_seq_id = sort_index.at(field_name)->at(seq_id);
ref_seq_id = sort_index.at(reference_helper_field_name)->at(seq_id);
}
}
// Joined collection has a reference
@ -7649,28 +7791,39 @@ int64_t Index::reference_string_sort_score(const string &field_name, const uint3
Option<bool> Index::get_related_ids(const std::string& collection_name, const string& field_name,
const uint32_t& seq_id, std::vector<uint32_t>& result) const {
std::shared_lock lock(mutex);
if (search_schema.count(field_name) == 0) {
return Option<bool>(400, "Could not find `" + field_name + "` in the collection `" + collection_name + "`.");
auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX;
if (search_schema.count(reference_helper_field_name) == 0) {
return Option<bool>(400, "Could not find `" + reference_helper_field_name + "` in the collection `" + collection_name + "`.");
}
auto const no_match_op = Option<bool>(400, "Could not find `" + field_name + "` value for doc `" +
std::to_string(seq_id) + "`.");
if (search_schema.at(field_name).is_singular()) {
if (sort_index.count(field_name) == 0 || sort_index.at(field_name)->count(seq_id) == 0) {
auto const no_match_op = Option<bool>(400, "Could not find `" + reference_helper_field_name + "` value for doc `" +
std::to_string(seq_id) + "`.");
if (search_schema.at(reference_helper_field_name).is_singular()) {
if (sort_index.count(reference_helper_field_name) == 0) {
return no_match_op;
}
result.emplace_back(sort_index.at(field_name)->at(seq_id));
auto const& ref_index = sort_index.at(reference_helper_field_name);
auto const it = ref_index->find(seq_id);
if (it == ref_index->end()) {
return no_match_op;
}
const uint32_t id = it->second;
if (id != Collection::reference_helper_sentinel_value) {
result.emplace_back(id);
}
return Option<bool>(true);
}
if (reference_index.count(field_name) == 0) {
if (reference_index.count(reference_helper_field_name) == 0) {
return no_match_op;
}
size_t ids_len = 0;
uint32_t* ids = nullptr;
reference_index.at(field_name)->search(EQUALS, seq_id, &ids, ids_len);
reference_index.at(reference_helper_field_name)->search(EQUALS, seq_id, &ids, ids_len);
if (ids_len == 0) {
return no_match_op;
}
@ -7703,17 +7856,21 @@ Option<uint32_t> Index::get_sort_index_value_with_lock(const std::string& collec
const std::string& field_name,
const uint32_t& seq_id) const {
std::shared_lock lock(mutex);
if (search_schema.count(field_name) == 0) {
return Option<uint32_t>(400, "Could not find `" + field_name + "` in the collection `" + collection_name + "`.");
} else if (search_schema.at(field_name).is_array()) {
return Option<uint32_t>(400, "Cannot sort on `" + field_name + "` in the collection, `" + collection_name +
"` is `" + search_schema.at(field_name).type + "`.");
} else if (sort_index.count(field_name) == 0 || sort_index.at(field_name)->count(seq_id) == 0) {
return Option<uint32_t>(404, "Could not find `" + field_name + "` value for doc `" +
auto const reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX;
if (search_schema.count(reference_helper_field_name) == 0) {
return Option<uint32_t>(400, "Could not find `" + reference_helper_field_name + "` in the collection `" +
collection_name + "`.");
} else if (search_schema.at(reference_helper_field_name).is_array()) {
return Option<uint32_t>(400, "Cannot sort on `" + reference_helper_field_name + "` in the collection, `" +
collection_name + "` is `" + search_schema.at(reference_helper_field_name).type + "`.");
} else if (sort_index.count(reference_helper_field_name) == 0 ||
sort_index.at(reference_helper_field_name)->count(seq_id) == 0) {
return Option<uint32_t>(404, "Could not find `" + reference_helper_field_name + "` value for doc `" +
std::to_string(seq_id) + "`.");;
}
return Option<uint32_t>(sort_index.at(field_name)->at(seq_id));
return Option<uint32_t>(sort_index.at(reference_helper_field_name)->at(seq_id));
}
float Index::get_distance(const string& geo_field_name, const uint32_t& seq_id,

688
src/join.cpp Normal file
View File

@ -0,0 +1,688 @@
#include "join.h"
#include <collection_manager.h>
#include "collection.h"
#include "logger.h"
#include <timsort.hpp>
Option<bool> single_value_filter_query(nlohmann::json& document, const std::string& field_name,
const std::string& ref_field_type, std::string& filter_query) {
auto const& value = document[field_name];
if (value.is_null()) {
return Option<bool>(422, "Field `" + field_name + "` has `null` value.");
}
if (value.is_string() && ref_field_type == field_types::STRING) {
filter_query += value.get<std::string>();
} else if (value.is_number_integer() && (ref_field_type == field_types::INT64 ||
(ref_field_type == field_types::INT32 &&
StringUtils::is_int32_t(std::to_string(value.get<int64_t>()))))) {
filter_query += std::to_string(value.get<int64_t>());
} else {
return Option<bool>(400, "Field `" + field_name + "` must have `" + ref_field_type + "` value.");
}
return Option<bool>(true);
}
Option<bool> Join::add_reference_helper_fields(nlohmann::json& document,
const tsl::htrie_map<char, field>& schema,
const spp::sparse_hash_map<std::string, reference_info_t>& reference_fields,
tsl::htrie_set<char>& object_reference_helper_fields,
const bool& is_update) {
tsl::htrie_set<char> flat_fields;
if (!reference_fields.empty() && document.contains(".flat")) {
for (const auto &item: document[".flat"].get<std::vector<std::string>>()) {
flat_fields.insert(item);
}
}
// Add reference helper fields in the document.
for (auto const& pair: reference_fields) {
auto field_name = pair.first;
auto const reference_helper_field = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX;
auto const& field = schema.at(field_name);
auto const& optional = field.optional;
auto const& is_async_reference = field.is_async_reference;
// Strict checking for presence of non-optional reference field during indexing operation.
auto is_required = !is_update && !optional;
if (is_required && document.count(field_name) != 1) {
return Option<bool>(400, "Missing the required reference field `" + field_name
+ "` in the document.");
} else if (document.count(field_name) != 1) {
if (is_update) {
document[fields::reference_helper_fields] += reference_helper_field;
}
continue;
}
auto reference_pair = pair.second;
auto reference_collection_name = reference_pair.collection;
auto reference_field_name = reference_pair.field;
auto& cm = CollectionManager::get_instance();
auto ref_collection = cm.get_collection(reference_collection_name);
if (is_update && document.contains(reference_helper_field) &&
(!document[field_name].is_array() || document[field_name].size() == document[reference_helper_field].size())) {
// No need to look up the reference collection since reference helper field is already populated.
// Saves needless computation in cases where references are known beforehand. For example, when cascade
// deleting the related docs.
document[fields::reference_helper_fields] += reference_helper_field;
continue;
}
if (ref_collection == nullptr && is_async_reference) {
document[fields::reference_helper_fields] += reference_helper_field;
if (document[field_name].is_array()) {
document[reference_helper_field] = nlohmann::json::array();
// Having the same number of values makes it easier to update the references in the future.
document[reference_helper_field].insert(document[reference_helper_field].begin(),
document[field_name].size(),
Collection::reference_helper_sentinel_value);
} else {
document[reference_helper_field] = Collection::reference_helper_sentinel_value;
}
continue;
} else if (ref_collection == nullptr) {
return Option<bool>(400, "Referenced collection `" + reference_collection_name
+ "` not found.");
}
bool is_object_reference_field = flat_fields.count(field_name) != 0;
std::string object_key;
bool is_object_array = false;
if (is_object_reference_field) {
object_reference_helper_fields.insert(reference_helper_field);
std::vector<std::string> tokens;
StringUtils::split(field_name, tokens, ".");
if (schema.count(tokens[0]) == 0) {
return Option<bool>(400, "Could not find `" + tokens[0] + "` object/object[] field in the schema.");
}
object_key = tokens[0];
is_object_array = schema.at(object_key).is_array();
}
if (reference_field_name == "id") {
auto id_field_type_error_op = Option<bool>(400, "Field `" + field_name + "` must have string value.");
if (is_object_array) {
if (!document[field_name].is_array()) {
return Option<bool>(400, "Expected `" + field_name + "` to be an array.");
}
document[reference_helper_field] = nlohmann::json::array();
document[fields::reference_helper_fields] += reference_helper_field;
std::vector<std::string> keys;
StringUtils::split(field_name, keys, ".");
auto const& object_array = document[keys[0]];
for (uint32_t i = 0; i < object_array.size(); i++) {
if (optional && object_array[i].count(keys[1]) == 0) {
continue;
} else if (object_array[i].count(keys[1]) == 0) {
return Option<bool>(400, "Object at index `" + std::to_string(i) + "` is missing `" + field_name + "`.");
} else if (!object_array[i].at(keys[1]).is_string()) {
return id_field_type_error_op;
}
auto id = object_array[i].at(keys[1]).get<std::string>();
auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id);
if (!ref_doc_id_op.ok() && is_async_reference) {
auto const& value = nlohmann::json::array({i, Collection::reference_helper_sentinel_value});
document[reference_helper_field] += value;
} else if (!ref_doc_id_op.ok()) {
return Option<bool>(400, "Referenced document having `id: " + id +
"` not found in the collection `" +=
reference_collection_name + "`." );
} else {
// Adding the index of the object along with referenced doc id to account for the scenario where a
// reference field of an object array might be optional and missing.
document[reference_helper_field] += nlohmann::json::array({i, ref_doc_id_op.get()});
}
}
} else if (document[field_name].is_array()) {
document[reference_helper_field] = nlohmann::json::array();
document[fields::reference_helper_fields] += reference_helper_field;
for (const auto &item: document[field_name].items()) {
if (optional && item.value().is_null()) {
continue;
} else if (!item.value().is_string()) {
return id_field_type_error_op;
}
auto id = item.value().get<std::string>();
auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id);
if (!ref_doc_id_op.ok() && is_async_reference) {
document[reference_helper_field] += Collection::reference_helper_sentinel_value;
} else if (!ref_doc_id_op.ok()) {
return Option<bool>(400, "Referenced document having `id: " + id +
"` not found in the collection `" +=
reference_collection_name + "`." );
} else {
document[reference_helper_field] += ref_doc_id_op.get();
}
}
} else if (document[field_name].is_string()) {
document[fields::reference_helper_fields] += reference_helper_field;
auto id = document[field_name].get<std::string>();
auto ref_doc_id_op = ref_collection->doc_id_to_seq_id_with_lock(id);
if (!ref_doc_id_op.ok() && is_async_reference) {
document[reference_helper_field] = Collection::reference_helper_sentinel_value;
} else if (!ref_doc_id_op.ok()) {
return Option<bool>(400, "Referenced document having `id: " + id +
"` not found in the collection `" +=
reference_collection_name + "`." );
} else {
document[reference_helper_field] = ref_doc_id_op.get();
}
} else if (optional && document[field_name].is_null()) {
// Reference helper field should also be removed along with reference field.
if (is_update) {
document[reference_helper_field] = nullptr;
}
continue;
} else {
return id_field_type_error_op;
}
continue;
}
if (ref_collection->get_schema().count(reference_field_name) == 0) {
return Option<bool>(400, "Referenced field `" + reference_field_name +
"` not found in the collection `" += reference_collection_name + "`.");
}
auto const ref_field = ref_collection->get_schema().at(reference_field_name);
if (!ref_field.index) {
return Option<bool>(400, "Referenced field `" + reference_field_name +
"` in the collection `" += reference_collection_name + "` must be indexed.");
}
std::string ref_field_type = ref_field.is_string() ? field_types::STRING :
ref_field.is_int32() ? field_types::INT32 :
ref_field.is_int64() ? field_types::INT64 : field_types::NIL;
if (ref_field_type == field_types::NIL) {
return Option<bool>(400, "Cannot add a reference to `" + reference_collection_name + "." += reference_field_name +
"` of type `" += ref_field.type + "`.");
}
if (is_object_array) {
if (!document[field_name].is_array()) {
return Option<bool>(400, "Expected `" + field_name + "` to be an array.");
}
document[reference_helper_field] = nlohmann::json::array();
document[fields::reference_helper_fields] += reference_helper_field;
nlohmann::json temp_doc; // To store singular values of `field_name` field.
std::vector<std::string> keys;
StringUtils::split(field_name, keys, ".");
auto const& object_array = document[keys[0]];
for (uint32_t i = 0; i < object_array.size(); i++) {
if (optional && object_array[i].count(keys[1]) == 0) {
continue;
} else if (object_array[i].count(keys[1]) == 0) {
return Option<bool>(400, "Object at index `" + std::to_string(i) + "` is missing `" + field_name + "`.");
}
temp_doc[field_name] = object_array[i].at(keys[1]);
std::string filter_query = reference_field_name + ":= ";
auto single_value_filter_query_op = single_value_filter_query(temp_doc, field_name, ref_field_type,
filter_query);
if (!single_value_filter_query_op.ok()) {
if (optional && single_value_filter_query_op.code() == 422) {
continue;
}
return Option<bool>(400, single_value_filter_query_op.error());
}
filter_result_t filter_result;
auto filter_ids_op = ref_collection->get_filter_ids(filter_query, filter_result);
if (!filter_ids_op.ok()) {
return filter_ids_op;
}
if (filter_result.count == 0 && is_async_reference) {
document[reference_helper_field] += nlohmann::json::array({i, Collection::reference_helper_sentinel_value});
} else if (filter_result.count != 1) {
// Constraints similar to foreign key apply here. The reference match must be unique and not null.
return Option<bool>(400, filter_result.count < 1 ?
"Reference document having `" + filter_query + "` not found in the collection `"
+= reference_collection_name + "`." :
"Multiple documents having `" + filter_query + "` found in the collection `" +=
reference_collection_name + "`.");
} else {
// Adding the index of the object along with referenced doc id to account for the scenario where a
// reference field of an object array might be optional and missing.
document[reference_helper_field] += nlohmann::json::array({i, filter_result.docs[0]});
}
}
continue;
}
auto const is_reference_array_field = field.is_array();
std::vector<std::string> filter_values;
if (is_reference_array_field) {
if (document[field_name].is_null()) {
document[reference_helper_field] = nlohmann::json::array();
document[fields::reference_helper_fields] += reference_helper_field;
continue;
} else if (!document[field_name].is_array()) {
return Option<bool>(400, "Expected `" + field_name + "` to be an array.");
}
for (const auto &item: document[field_name].items()) {
auto const& item_value = item.value();
if (item_value.is_string() && ref_field_type == field_types::STRING) {
filter_values.emplace_back(item_value.get<std::string>());
} else if (item_value.is_number_integer() && (ref_field_type == field_types::INT64 ||
(ref_field_type == field_types::INT32 &&
StringUtils::is_int32_t(std::to_string(item_value.get<int64_t>()))))) {
filter_values.emplace_back(std::to_string(item_value.get<int64_t>()));
} else {
return Option<bool>(400, "Field `" + field_name + "` must only have `" += ref_field_type + "` values.");
}
}
document[reference_helper_field] = nlohmann::json::array();
document[fields::reference_helper_fields] += reference_helper_field;
if (filter_values.empty()) {
continue;
}
} else {
std::string value;
auto single_value_filter_query_op = single_value_filter_query(document, field_name, ref_field_type, value);
if (!single_value_filter_query_op.ok()) {
if (optional && single_value_filter_query_op.code() == 422) {
// Reference helper field should also be removed along with reference field.
if (is_update) {
document[reference_helper_field] = nullptr;
}
continue;
}
return Option<bool>(400, single_value_filter_query_op.error());
}
filter_values.emplace_back(value);
document[fields::reference_helper_fields] += reference_helper_field;
}
for (const auto& filter_value: filter_values) {
std::string filter_query = reference_field_name + (field.is_string() ? ":= " : ": ") += filter_value;
filter_result_t filter_result;
auto filter_ids_op = ref_collection->get_filter_ids(filter_query, filter_result);
if (!filter_ids_op.ok()) {
return filter_ids_op;
}
if (filter_result.count == 0 && is_async_reference) {
if (is_reference_array_field) {
document[reference_helper_field] += Collection::reference_helper_sentinel_value;
} else {
document[reference_helper_field] = Collection::reference_helper_sentinel_value;
}
} else if (filter_result.count != 1) {
// Constraints similar to foreign key apply here. The reference match must be unique and not null.
return Option<bool>(400, filter_result.count < 1 ?
"Reference document having `" + filter_query + "` not found in the collection `"
+= reference_collection_name + "`." :
"Multiple documents having `" + filter_query + "` found in the collection `" +=
reference_collection_name + "`.");
} else {
if (is_reference_array_field) {
document[reference_helper_field] += filter_result.docs[0];
} else {
document[reference_helper_field] = filter_result.docs[0];
}
}
}
}
return Option<bool>(true);
}
Option<bool> Join::prune_ref_doc(nlohmann::json& doc,
const reference_filter_result_t& references,
const tsl::htrie_set<char>& ref_include_fields_full,
const tsl::htrie_set<char>& ref_exclude_fields_full,
const bool& is_reference_array,
const ref_include_exclude_fields& ref_include_exclude) {
nlohmann::json original_doc;
if (!ref_include_exclude.nested_join_includes.empty()) {
original_doc = doc;
}
auto const& ref_collection_name = ref_include_exclude.collection_name;
auto& cm = CollectionManager::get_instance();
auto ref_collection = cm.get_collection(ref_collection_name);
if (ref_collection == nullptr) {
return Option<bool>(400, "Referenced collection `" + ref_collection_name + "` in `include_fields` not found.");
}
auto const& alias = ref_include_exclude.alias;
auto const& strategy = ref_include_exclude.strategy;
auto error_prefix = "Referenced collection `" + ref_collection_name + "`: ";
// One-to-one relation.
if (strategy != ref_include::nest_array && !is_reference_array && references.count == 1) {
auto ref_doc_seq_id = references.docs[0];
nlohmann::json ref_doc;
auto get_doc_op = ref_collection->get_document_from_store(ref_doc_seq_id, ref_doc);
if (!get_doc_op.ok()) {
if (ref_doc_seq_id == Collection::reference_helper_sentinel_value) {
return Option<bool>(true);
}
return Option<bool>(get_doc_op.code(), error_prefix + get_doc_op.error());
}
Collection::remove_flat_fields(ref_doc);
Collection::remove_reference_helper_fields(ref_doc);
auto prune_op = Collection::prune_doc(ref_doc, ref_include_fields_full, ref_exclude_fields_full);
if (!prune_op.ok()) {
return Option<bool>(prune_op.code(), error_prefix + prune_op.error());
}
auto const key = alias.empty() ? ref_collection_name : alias;
auto const& nest_ref_doc = (strategy == ref_include::nest);
if (!ref_doc.empty()) {
if (nest_ref_doc) {
doc[key] = ref_doc;
} else {
if (!alias.empty()) {
auto temp_doc = ref_doc;
ref_doc.clear();
for (const auto &item: temp_doc.items()) {
ref_doc[alias + item.key()] = item.value();
}
}
doc.update(ref_doc);
}
}
// Include nested join references.
if (!ref_include_exclude.nested_join_includes.empty()) {
// Passing empty references in case the nested include collection is not joined, but it still can be included
// if we have a reference to it.
std::map<std::string, reference_filter_result_t> refs;
auto nested_include_exclude_op = include_references(nest_ref_doc ? doc[key] : doc, ref_doc_seq_id,
ref_collection.get(),
references.coll_to_references == nullptr ? refs :
references.coll_to_references[0],
ref_include_exclude.nested_join_includes, original_doc);
if (!nested_include_exclude_op.ok()) {
return nested_include_exclude_op;
}
}
return Option<bool>(true);
}
// One-to-many relation.
for (uint32_t i = 0; i < references.count; i++) {
auto ref_doc_seq_id = references.docs[i];
nlohmann::json ref_doc;
std::string key;
auto const& nest_ref_doc = (strategy == ref_include::nest || strategy == ref_include::nest_array);
auto get_doc_op = ref_collection->get_document_from_store(ref_doc_seq_id, ref_doc);
if (!get_doc_op.ok()) {
// Referenced document is not yet indexed.
if (ref_doc_seq_id == Collection::reference_helper_sentinel_value) {
continue;
}
return Option<bool>(get_doc_op.code(), error_prefix + get_doc_op.error());
}
Collection::remove_flat_fields(ref_doc);
Collection::remove_reference_helper_fields(ref_doc);
auto prune_op = Collection::prune_doc(ref_doc, ref_include_fields_full, ref_exclude_fields_full);
if (!prune_op.ok()) {
return Option<bool>(prune_op.code(), error_prefix + prune_op.error());
}
if (!ref_doc.empty()) {
if (nest_ref_doc) {
key = alias.empty() ? ref_collection_name : alias;
if (doc.contains(key) && !doc[key].is_array()) {
return Option<bool>(400, "Could not include the reference document of `" + ref_collection_name +
"` collection. Expected `" += key + "` to be an array. Try " +
(alias.empty() ? "adding an" : "renaming the") + " alias.");
}
doc[key] += ref_doc;
} else {
for (auto ref_doc_it = ref_doc.begin(); ref_doc_it != ref_doc.end(); ref_doc_it++) {
auto const& ref_doc_key = ref_doc_it.key();
key = alias + ref_doc_key;
if (doc.contains(key) && !doc[key].is_array()) {
return Option<bool>(400, "Could not include the value of `" + ref_doc_key +
"` key of the reference document of `" += ref_collection_name +
"` collection. Expected `" += key + "` to be an array. Try " +
(alias.empty() ? "adding an" : "renaming the") + " alias.");
}
// Add the values of ref_doc as JSON array into doc.
doc[key] += ref_doc_it.value();
}
}
}
// Include nested join references.
if (!ref_include_exclude.nested_join_includes.empty()) {
// Passing empty references in case the nested include collection is not joined, but it still can be included
// if we have a reference to it.
std::map<std::string, reference_filter_result_t> refs;
auto nested_include_exclude_op = include_references(nest_ref_doc ? doc[key].at(i) : doc, ref_doc_seq_id,
ref_collection.get(),
references.coll_to_references == nullptr ? refs :
references.coll_to_references[i],
ref_include_exclude.nested_join_includes, original_doc);
if (!nested_include_exclude_op.ok()) {
return nested_include_exclude_op;
}
}
}
return Option<bool>(true);
}
Option<bool> Join::include_references(nlohmann::json& doc, const uint32_t& seq_id, Collection *const collection,
const std::map<std::string, reference_filter_result_t>& reference_filter_results,
const std::vector<ref_include_exclude_fields>& ref_include_exclude_fields_vec,
const nlohmann::json& original_doc) {
for (auto const& ref_include_exclude: ref_include_exclude_fields_vec) {
auto ref_collection_name = ref_include_exclude.collection_name;
auto& cm = CollectionManager::get_instance();
auto ref_collection = cm.get_collection(ref_collection_name);
if (ref_collection == nullptr) {
return Option<bool>(400, "Referenced collection `" + ref_collection_name + "` in `include_fields` not found.");
}
// `CollectionManager::get_collection` accounts for collection alias being used and provides pointer to the
// original collection.
ref_collection_name = ref_collection->get_name();
auto const joined_on_ref_collection = reference_filter_results.count(ref_collection_name) > 0,
has_filter_reference = (joined_on_ref_collection &&
reference_filter_results.at(ref_collection_name).count > 0);
auto doc_has_reference = false, joined_coll_has_reference = false;
// Reference include_by without join, check if doc itself contains the reference.
if (!joined_on_ref_collection && collection != nullptr) {
doc_has_reference = ref_collection->is_referenced_in(collection->get_name());
}
std::string joined_coll_having_reference;
// Check if the joined collection has a reference.
if (!joined_on_ref_collection && !doc_has_reference) {
for (const auto &reference_filter_result: reference_filter_results) {
joined_coll_has_reference = ref_collection->is_referenced_in(reference_filter_result.first);
if (joined_coll_has_reference) {
joined_coll_having_reference = reference_filter_result.first;
break;
}
}
}
if (!has_filter_reference && !doc_has_reference && !joined_coll_has_reference) {
continue;
}
std::vector<std::string> ref_include_fields_vec, ref_exclude_fields_vec;
StringUtils::split(ref_include_exclude.include_fields, ref_include_fields_vec, ",");
StringUtils::split(ref_include_exclude.exclude_fields, ref_exclude_fields_vec, ",");
spp::sparse_hash_set<std::string> ref_include_fields, ref_exclude_fields;
ref_include_fields.insert(ref_include_fields_vec.begin(), ref_include_fields_vec.end());
ref_exclude_fields.insert(ref_exclude_fields_vec.begin(), ref_exclude_fields_vec.end());
tsl::htrie_set<char> ref_include_fields_full, ref_exclude_fields_full;
auto include_exclude_op = ref_collection->populate_include_exclude_fields_lk(ref_include_fields,
ref_exclude_fields,
ref_include_fields_full,
ref_exclude_fields_full);
auto error_prefix = "Referenced collection `" + ref_collection_name + "`: ";
if (!include_exclude_op.ok()) {
return Option<bool>(include_exclude_op.code(), error_prefix + include_exclude_op.error());
}
Option<bool> prune_doc_op = Option<bool>(true);
auto const& ref_collection_alias = ref_include_exclude.alias;
if (has_filter_reference) {
auto const& ref_filter_result = reference_filter_results.at(ref_collection_name);
prune_doc_op = prune_ref_doc(doc, ref_filter_result, ref_include_fields_full, ref_exclude_fields_full,
ref_filter_result.is_reference_array_field, ref_include_exclude);
} else if (doc_has_reference) {
auto get_reference_field_op = ref_collection->get_referenced_in_field_with_lock(collection->get_name());
if (!get_reference_field_op.ok()) {
continue;
}
auto const& field_name = get_reference_field_op.get();
auto const& reference_helper_field_name = field_name + fields::REFERENCE_HELPER_FIELD_SUFFIX;
if (collection->get_schema().count(reference_helper_field_name) == 0) {
continue;
}
if (collection->get_object_reference_helper_fields().count(field_name) != 0) {
std::vector<std::string> keys;
StringUtils::split(field_name, keys, ".");
auto const& key = keys[0];
if (!doc.contains(key)) {
if (!original_doc.contains(key)) {
return Option<bool>(400, "Could not find `" + key +
"` key in the document to include the referenced document.");
}
// The key is excluded from the doc by the query, inserting empty object(s) so referenced doc can be
// included in it.
if (original_doc[key].is_array()) {
doc[key] = nlohmann::json::array();
doc[key].insert(doc[key].begin(), original_doc[key].size(), nlohmann::json::object());
} else {
doc[key] = nlohmann::json::object();
}
}
if (doc[key].is_array()) {
for (uint32_t i = 0; i < doc[key].size(); i++) {
uint32_t ref_doc_id;
auto op = collection->get_object_array_related_id(reference_helper_field_name, seq_id, i, ref_doc_id);
if (!op.ok()) {
if (op.code() == 404) { // field_name is not indexed.
break;
} else { // No reference found for this object.
continue;
}
}
reference_filter_result_t result(1, new uint32_t[1]{ref_doc_id});
prune_doc_op = prune_ref_doc(doc[key][i], result,
ref_include_fields_full, ref_exclude_fields_full,
false, ref_include_exclude);
if (!prune_doc_op.ok()) {
return prune_doc_op;
}
}
} else {
std::vector<uint32_t> ids;
auto get_references_op = collection->get_related_ids(field_name, seq_id, ids);
if (!get_references_op.ok()) {
LOG(ERROR) << "Error while getting related ids: " + get_references_op.error();
continue;
}
reference_filter_result_t result(ids.size(), &ids[0]);
prune_doc_op = prune_ref_doc(doc[key], result, ref_include_fields_full, ref_exclude_fields_full,
collection->get_schema().at(field_name).is_array(), ref_include_exclude);
result.docs = nullptr;
}
} else {
std::vector<uint32_t> ids;
auto get_references_op = collection->get_related_ids(field_name, seq_id, ids);
if (!get_references_op.ok()) {
LOG(ERROR) << "Error while getting related ids: " + get_references_op.error();
continue;
}
reference_filter_result_t result(ids.size(), &ids[0]);
prune_doc_op = prune_ref_doc(doc, result, ref_include_fields_full, ref_exclude_fields_full,
collection->get_schema().at(field_name).is_array(), ref_include_exclude);
result.docs = nullptr;
}
} else if (joined_coll_has_reference) {
auto joined_collection = cm.get_collection(joined_coll_having_reference);
if (joined_collection == nullptr) {
continue;
}
auto reference_field_name_op = ref_collection->get_referenced_in_field_with_lock(joined_coll_having_reference);
if (!reference_field_name_op.ok() || joined_collection->get_schema().count(reference_field_name_op.get()) == 0) {
continue;
}
auto const& reference_field_name = reference_field_name_op.get();
auto const& reference_filter_result = reference_filter_results.at(joined_coll_having_reference);
auto const& count = reference_filter_result.count;
std::vector<uint32_t> ids;
ids.reserve(count);
for (uint32_t i = 0; i < count; i++) {
joined_collection->get_related_ids_with_lock(reference_field_name, reference_filter_result.docs[i], ids);
}
if (ids.empty()) {
continue;
}
gfx::timsort(ids.begin(), ids.end());
ids.erase(unique(ids.begin(), ids.end()), ids.end());
reference_filter_result_t result;
result.count = ids.size();
result.docs = &ids[0];
prune_doc_op = prune_ref_doc(doc, result, ref_include_fields_full, ref_exclude_fields_full,
joined_collection->get_schema().at(reference_field_name).is_array(),
ref_include_exclude);
result.docs = nullptr;
}
if (!prune_doc_op.ok()) {
return prune_doc_op;
}
}
return Option<bool>(true);
}

View File

@ -525,7 +525,14 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Field `ref_string_array_field` must be an array of string.", add_doc_op.error());
ASSERT_EQ("Field `ref_string_array_field` must only have `string` values.", add_doc_op.error());
doc_json = R"({
"ref_string_array_field": ["foo"]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Reference document having `string_array_field:= foo` not found in the collection `coll1`.", add_doc_op.error());
collectionManager.drop_collection("coll2");
temp_json = schema_json;
@ -556,29 +563,16 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_EQ(0, doc.count("ref_string_field_sequence_id"));
ASSERT_EQ(0, doc.count(".ref"));
doc_json = R"({
"ref_string_array_field": ["foo"]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
result = coll2->search("*", {}, "", {}, {}, {0}).get();
ASSERT_EQ(0, result["hits"][0]["document"]["ref_string_array_field_sequence_id"].size());
doc = coll2->get("2").get();
ASSERT_EQ(1, doc.count("ref_string_array_field_sequence_id"));
ASSERT_EQ(0, doc["ref_string_array_field_sequence_id"].size());
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("ref_string_array_field_sequence_id", doc[".ref"][0]);
doc_json = R"({
"ref_string_array_field": ["b", "foo"]
"ref_string_array_field": ["b"]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("3").get();
doc = coll2->get("2").get();
ASSERT_EQ(1, doc.count("ref_string_array_field_sequence_id"));
ASSERT_EQ(1, doc["ref_string_array_field_sequence_id"].size());
ASSERT_EQ(0, doc["ref_string_array_field_sequence_id"][0]);
@ -589,7 +583,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("4").get();
doc = coll2->get("3").get();
ASSERT_EQ(1, doc.count("ref_string_array_field_sequence_id"));
ASSERT_EQ(2, doc["ref_string_array_field_sequence_id"].size());
ASSERT_EQ(0, doc["ref_string_array_field_sequence_id"][0]);
@ -665,6 +659,13 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Field `ref_int32_array_field` must only have `int32` values.", add_doc_op.error());
doc_json = R"({
"ref_int32_array_field": [1]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Reference document having `int32_array_field: 1` not found in the collection `coll1`.", add_doc_op.error());
collectionManager.drop_collection("coll2");
temp_json = schema_json;
collection_create_op = collectionManager.create_collection(temp_json);
@ -685,26 +686,13 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_EQ("ref_int32_field_sequence_id", doc[".ref"][0]);
doc_json = R"({
"ref_int32_array_field": [1]
"ref_int32_array_field": [2]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("1").get();
ASSERT_EQ(1, doc.count("ref_int32_array_field_sequence_id"));
ASSERT_EQ(0, doc["ref_int32_array_field_sequence_id"].size());
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("ref_int32_array_field_sequence_id", doc[".ref"][0]);
doc_json = R"({
"ref_int32_array_field": [1, 2]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("2").get();
ASSERT_EQ(1, doc.count("ref_int32_array_field_sequence_id"));
ASSERT_EQ(1, doc["ref_int32_array_field_sequence_id"].size());
ASSERT_EQ(3, doc["ref_int32_array_field_sequence_id"][0]);
@ -714,7 +702,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("3").get();
doc = coll2->get("2").get();
ASSERT_EQ(1, doc.count("ref_int32_array_field_sequence_id"));
ASSERT_EQ(2, doc["ref_int32_array_field_sequence_id"].size());
ASSERT_EQ(3, doc["ref_int32_array_field_sequence_id"][0]);
@ -726,7 +714,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("4").get();
doc = coll2->get("3").get();
ASSERT_EQ(1, doc.count("ref_int32_array_field_sequence_id"));
ASSERT_EQ(1, doc["ref_int32_array_field_sequence_id"].size());
ASSERT_EQ(3, doc["ref_int32_array_field_sequence_id"][0]);
@ -794,6 +782,13 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Field `ref_int64_array_field` must only have `int64` values.", add_doc_op.error());
doc_json = R"({
"ref_int64_array_field": [1]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Reference document having `int64_array_field: 1` not found in the collection `coll1`.", add_doc_op.error());
collectionManager.drop_collection("coll2");
temp_json = schema_json;
collection_create_op = collectionManager.create_collection(temp_json);
@ -814,26 +809,13 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_EQ("ref_int64_field_sequence_id", doc[".ref"][0]);
doc_json = R"({
"ref_int64_array_field": [1]
"ref_int64_array_field": [2]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("1").get();
ASSERT_EQ(1, doc.count("ref_int64_array_field_sequence_id"));
ASSERT_EQ(0, doc["ref_int64_array_field_sequence_id"].size());
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("ref_int64_array_field_sequence_id", doc[".ref"][0]);
doc_json = R"({
"ref_int64_array_field": [1, 2]
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("2").get();
ASSERT_EQ(1, doc.count("ref_int64_array_field_sequence_id"));
ASSERT_EQ(1, doc["ref_int64_array_field_sequence_id"].size());
ASSERT_EQ(6, doc["ref_int64_array_field_sequence_id"][0]);
@ -843,7 +825,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("3").get();
doc = coll2->get("2").get();
ASSERT_EQ(1, doc.count("ref_int64_array_field_sequence_id"));
ASSERT_EQ(2, doc["ref_int64_array_field_sequence_id"].size());
ASSERT_EQ(6, doc["ref_int64_array_field_sequence_id"][0]);
@ -855,7 +837,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("4").get();
doc = coll2->get("3").get();
ASSERT_EQ(1, doc.count("ref_int64_array_field_sequence_id"));
ASSERT_EQ(1, doc["ref_int64_array_field_sequence_id"].size());
ASSERT_EQ(6, doc["ref_int64_array_field_sequence_id"][0]);
@ -933,6 +915,15 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Reference document having `string_array_field:= foo` not found in the collection `coll1`.", add_doc_op.error());
doc_json = R"({
"object": {
"ref_array_field": ["foo"]
}
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Reference document having `string_array_field:= foo` not found in the collection `coll1`.", add_doc_op.error());
collectionManager.drop_collection("coll2");
temp_json = schema_json;
collection_create_op = collectionManager.create_collection(temp_json);
@ -957,7 +948,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
doc_json = R"({
"object": {
"ref_array_field": ["foo"]
"ref_array_field": ["b"]
}
})"_json;
add_doc_op = coll2->add(doc_json.dump());
@ -965,21 +956,6 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
doc = coll2->get("1").get();
ASSERT_EQ(1, doc.count("object.ref_array_field_sequence_id"));
ASSERT_EQ(0, doc["object.ref_array_field_sequence_id"].size());
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("object.ref_array_field_sequence_id", doc[".ref"][0]);
doc_json = R"({
"object": {
"ref_array_field": ["b", "foo"]
}
})"_json;
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("2").get();
ASSERT_EQ(1, doc.count("object.ref_array_field_sequence_id"));
ASSERT_EQ(1, doc["object.ref_array_field_sequence_id"].size());
ASSERT_EQ(0, doc["object.ref_array_field_sequence_id"][0]);
@ -996,7 +972,7 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
add_doc_op = coll2->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll2->get("3").get();
doc = coll2->get("2").get();
ASSERT_EQ(1, doc.count("object_array.ref_array_field_sequence_id"));
ASSERT_EQ(2, doc["object_array.ref_array_field_sequence_id"].size());
ASSERT_EQ(2, doc["object_array.ref_array_field_sequence_id"][0].size());
@ -1076,6 +1052,644 @@ TEST_F(CollectionJoinTest, IndexDocumentHavingReferenceField) {
ASSERT_EQ("Cannot add a reference to `coll1.object_array_field` of type `object[]`.", add_doc_op.error());
}
TEST_F(CollectionJoinTest, IndexDocumentHavingAsyncReferenceField) {
auto schema_json =
R"({
"name": "Customers",
"fields": [
{"name": "customer_id", "type": "string"},
{"name": "customer_name", "type": "string"},
{"name": "product_price", "type": "float"},
{"name": "product_id", "type": "string", "reference": "Products.product_id", "async_reference": true}
]
})"_json;
std::vector<nlohmann::json> documents = {
R"({
"customer_id": "customer_a",
"customer_name": "Joe",
"product_price": 143,
"product_id": "product_a"
})"_json,
R"({
"customer_id": "customer_a",
"customer_name": "Joe",
"product_price": 73.5,
"product_id": "product_b"
})"_json,
R"({
"customer_id": "customer_b",
"customer_name": "Dan",
"product_price": 75,
"product_id": "product_a"
})"_json
};
auto collection_create_op = collectionManager.create_collection(schema_json);
ASSERT_TRUE(collection_create_op.ok());
for (auto const &json: documents) {
auto add_op = collection_create_op.get()->add(json.dump());
if (!add_op.ok()) {
LOG(INFO) << add_op.error();
}
ASSERT_TRUE(add_op.ok());
}
for (auto i = 0; i < 3; i++) {
auto const doc_id = std::to_string(i);
auto doc = collection_create_op.get()->get(doc_id).get();
ASSERT_EQ(doc_id, doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("product_id_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("product_id_sequence_id"));
// Referenced documents don't exist yet, so dummy value is present in the reference helper field.
ASSERT_EQ(UINT32_MAX, doc["product_id_sequence_id"]);
}
schema_json =
R"({
"name": "coll1",
"fields": [
{"name": "coll_id", "type": "string"},
{
"name": "object.reference",
"type": "string",
"reference": "Products.product_id",
"optional": true,
"async_reference": true
},
{"name": "object", "type": "object"}
],
"enable_nested_fields": true
})"_json;
documents = {
R"({
"coll_id": "a",
"object": {}
})"_json,
R"({
"coll_id": "b",
"object": {
"reference": "product_b"
}
})"_json,
R"({
"coll_id": "c",
"object": {
"reference": "product_a"
}
})"_json
};
collection_create_op = collectionManager.create_collection(schema_json);
ASSERT_TRUE(collection_create_op.ok());
for (auto const &json: documents) {
auto add_op = collection_create_op.get()->add(json.dump());
if (!add_op.ok()) {
LOG(INFO) << add_op.error();
}
ASSERT_TRUE(add_op.ok());
}
for (auto i = 0; i < 3; i++) {
auto const doc_id = std::to_string(i);
auto doc = collection_create_op.get()->get(doc_id).get();
ASSERT_EQ(doc_id, doc["id"]);
if (i == 0) {
ASSERT_EQ(0, doc.count(".ref"));
ASSERT_EQ(0, doc.count("object.reference_sequence_id"));
continue;
}
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("object.reference_sequence_id"));
// Referenced documents don't exist yet, so dummy value is present in the reference helper field.
ASSERT_EQ(UINT32_MAX, doc["object.reference_sequence_id"]);
}
schema_json =
R"({
"name": "Products",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "product_name", "type": "string"},
{"name": "product_description", "type": "string"},
{"name": "rating", "type": "int32"}
]
})"_json;
documents = {
R"({
"product_id": "product_a",
"product_name": "shampoo",
"product_description": "Our new moisturizing shampoo is perfect for those with dry or damaged hair.",
"rating": "2"
})"_json,
R"({
"product_id": "product_c",
"product_name": "comb",
"product_description": "Experience the natural elegance and gentle care of our handcrafted wooden combs because your hair deserves the best.",
"rating": "3"
})"_json
};
collection_create_op = collectionManager.create_collection(schema_json);
ASSERT_TRUE(collection_create_op.ok());
for (auto const &json: documents) {
auto add_op = collection_create_op.get()->add(json.dump());
if (!add_op.ok()) {
LOG(INFO) << add_op.error();
}
ASSERT_TRUE(add_op.ok());
}
std::map<std::string, std::string> req_params = {
{"collection", "Products"},
{"q", "*"},
{"query_by", "product_name"},
{"filter_by", "id:* || $Customers(id:*)"},
{"include_fields", "$Customers(id, strategy:nest_array) as Customers"}
};
nlohmann::json embedded_params;
std::string json_res;
auto now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
auto search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts);
ASSERT_TRUE(search_op.ok());
auto res_obj = nlohmann::json::parse(json_res);
ASSERT_EQ(2, res_obj["found"].get<size_t>());
ASSERT_EQ(2, res_obj["hits"].size());
ASSERT_EQ("1", res_obj["hits"][0]["document"]["id"]);
ASSERT_EQ(0, res_obj["hits"][0]["document"].count("Customers"));
ASSERT_EQ("0", res_obj["hits"][1]["document"]["id"]);
ASSERT_EQ(1, res_obj["hits"][1]["document"].count("Customers"));
ASSERT_EQ(2, res_obj["hits"][1]["document"]["Customers"].size());
ASSERT_EQ("0", res_obj["hits"][1]["document"]["Customers"][0]["id"]);
ASSERT_EQ("2", res_obj["hits"][1]["document"]["Customers"][1]["id"]);
req_params = {
{"collection", "coll1"},
{"q", "*"},
{"include_fields", "$Products(product_id)"}
};
search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts);
ASSERT_TRUE(search_op.ok());
res_obj = nlohmann::json::parse(json_res);
ASSERT_EQ(3, res_obj["found"].get<size_t>());
ASSERT_EQ(3, res_obj["hits"].size());
ASSERT_EQ("2", res_obj["hits"][0]["document"]["id"]);
ASSERT_EQ(1, res_obj["hits"][0]["document"]["object"].count("Products"));
ASSERT_EQ("product_a", res_obj["hits"][0]["document"]["object"]["Products"]["product_id"]);
ASSERT_EQ("1", res_obj["hits"][1]["document"]["id"]);
ASSERT_EQ(0, res_obj["hits"][1]["document"]["object"].count("Products"));
ASSERT_EQ(1, res_obj["hits"][1]["document"]["object"].count("reference"));
ASSERT_EQ("product_b", res_obj["hits"][1]["document"]["object"]["reference"]);
ASSERT_EQ(0, res_obj["hits"][1]["document"].count("Products"));
ASSERT_EQ("0", res_obj["hits"][2]["document"]["id"]);
ASSERT_EQ(0, res_obj["hits"][2]["document"].count("Products"));
ASSERT_EQ(0, res_obj["hits"][2]["document"]["object"].count("reference"));
auto doc_json = R"({
"product_id": "product_b",
"product_name": "soap",
"product_description": "Introducing our all-natural, organic soap bar made with essential oils and botanical ingredients.",
"rating": "4"
})"_json;
auto add_doc_op = collection_create_op.get()->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
req_params = {
{"collection", "Products"},
{"q", "*"},
{"query_by", "product_name"},
{"filter_by", "id:* || $Customers(id:*)"},
{"include_fields", "$Customers(id, strategy:nest_array) as Customers"}
};
search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts);
ASSERT_TRUE(search_op.ok());
res_obj = nlohmann::json::parse(json_res);
ASSERT_EQ(3, res_obj["found"].get<size_t>());
ASSERT_EQ(3, res_obj["hits"].size());
ASSERT_EQ("2", res_obj["hits"][0]["document"]["id"]);
ASSERT_EQ(1, res_obj["hits"][0]["document"].count("Customers"));
ASSERT_EQ(1, res_obj["hits"][0]["document"]["Customers"].size());
ASSERT_EQ("1", res_obj["hits"][0]["document"]["Customers"][0]["id"]);
ASSERT_EQ("1", res_obj["hits"][1]["document"]["id"]);
ASSERT_EQ(0, res_obj["hits"][1]["document"].count("Customers"));
ASSERT_EQ("0", res_obj["hits"][2]["document"]["id"]);
ASSERT_EQ(1, res_obj["hits"][2]["document"].count("Customers"));
ASSERT_EQ(2, res_obj["hits"][2]["document"]["Customers"].size());
ASSERT_EQ("0", res_obj["hits"][2]["document"]["Customers"][0]["id"]);
ASSERT_EQ("2", res_obj["hits"][2]["document"]["Customers"][1]["id"]);
{
auto const& customers = collectionManager.get_collection_unsafe("Customers");
doc_json = R"({
"customer_id": "customer_b",
"customer_name": "Dan",
"product_price": 140,
"product_id": "product_b"
})"_json;
add_doc_op = customers->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
auto doc = customers->get("3").get();
ASSERT_EQ("3", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("product_id_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("product_id_sequence_id"));
// When referenced document is already present, reference helper field should be initialized to its seq_id.
ASSERT_EQ(2, doc["product_id_sequence_id"]);
}
req_params = {
{"collection", "Products"},
{"q", "*"},
{"query_by", "product_name"},
{"filter_by", "id:* || $Customers(id:*)"},
{"include_fields", "$Customers(id, strategy:nest_array) as Customers"}
};
search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts);
ASSERT_TRUE(search_op.ok());
res_obj = nlohmann::json::parse(json_res);
ASSERT_EQ(3, res_obj["found"].get<size_t>());
ASSERT_EQ(3, res_obj["hits"].size());
ASSERT_EQ("2", res_obj["hits"][0]["document"]["id"]);
ASSERT_EQ(1, res_obj["hits"][0]["document"].count("Customers"));
ASSERT_EQ(2, res_obj["hits"][0]["document"]["Customers"].size());
ASSERT_EQ("1", res_obj["hits"][0]["document"]["Customers"][0]["id"]);
ASSERT_EQ("3", res_obj["hits"][0]["document"]["Customers"][1]["id"]);
ASSERT_EQ("1", res_obj["hits"][1]["document"]["id"]);
ASSERT_EQ(0, res_obj["hits"][1]["document"].count("Customers"));
ASSERT_EQ("0", res_obj["hits"][2]["document"]["id"]);
ASSERT_EQ(1, res_obj["hits"][2]["document"].count("Customers"));
ASSERT_EQ(2, res_obj["hits"][2]["document"]["Customers"].size());
ASSERT_EQ("0", res_obj["hits"][2]["document"]["Customers"][0]["id"]);
ASSERT_EQ("2", res_obj["hits"][2]["document"]["Customers"][1]["id"]);
{
auto const& coll1 = collectionManager.get_collection_unsafe("coll1");
doc_json = R"({
"coll_id": "d",
"object": {
"reference": "product_d"
}
})"_json;
add_doc_op = coll1->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
auto doc = coll1->get("3").get();
ASSERT_EQ("3", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("object.reference_sequence_id"));
// product_d doesn't exist yet, so dummy value is present in the reference helper field.
ASSERT_EQ(UINT32_MAX, doc["object.reference_sequence_id"]);
doc_json = R"({
"product_id": "product_d",
"product_name": "hair oil",
"product_description": "Revitalize your hair with our nourishing hair oil nature's secret to lustrous, healthy locks.",
"rating": "foo"
})"_json;
add_doc_op = collection_create_op.get()->add(doc_json.dump());
ASSERT_FALSE(add_doc_op.ok());
ASSERT_EQ("Field `rating` must be an int32.", add_doc_op.error());
doc = coll1->get("3").get();
ASSERT_EQ("3", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("object.reference_sequence_id"));
// product_d was not indexed, reference helper field should remain unchanged.
ASSERT_EQ(UINT32_MAX, doc["object.reference_sequence_id"]);
doc_json = R"({
"product_id": "product_a",
"product_name": "hair oil",
"product_description": "Revitalize your hair with our nourishing hair oil nature's secret to lustrous, healthy locks.",
"rating": "4"
})"_json;
add_doc_op = collection_create_op.get()->add(doc_json.dump());
ASSERT_FALSE(add_doc_op.ok());
// Singular reference field can only reference one document.
ASSERT_EQ("Error while updating async reference field `object.reference` of collection `coll1`: "
"Document `id: 2` already has a reference to document `0` of `Products` collection, "
"having reference value `product_a`.", add_doc_op.error());
doc = coll1->get("2").get();
ASSERT_EQ("2", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("object.reference_sequence_id"));
// product_a already existed, reference helper field should remain unchanged.
ASSERT_EQ(0, doc["object.reference_sequence_id"]);
doc_json = R"({
"product_id": "product_d",
"product_name": "hair oil",
"product_description": "Revitalize your hair with our nourishing hair oil nature's secret to lustrous, healthy locks.",
"rating": "4"
})"_json;
add_doc_op = collection_create_op.get()->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = coll1->get("3").get();
ASSERT_EQ("3", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("object.reference_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("object.reference_sequence_id"));
ASSERT_EQ(5, doc["object.reference_sequence_id"]);
}
schema_json =
R"({
"name": "songs",
"fields": [
{ "name": "title", "type": "string" },
{ "name": "genres", "type": "string[]", "reference": "genres.id", "async_reference": true}
]
})"_json;
documents = {
R"({"title":"Dil De Rani", "genres":[]})"_json,
R"({"title":"Corduroy", "genres":["1"]})"_json,
};
collection_create_op = collectionManager.create_collection(schema_json);
ASSERT_TRUE(collection_create_op.ok());
for (auto const &json: documents) {
auto add_op = collection_create_op.get()->add(json.dump());
if (!add_op.ok()) {
LOG(INFO) << add_op.error();
}
ASSERT_TRUE(add_op.ok());
}
{
auto doc = collection_create_op.get()->get("0").get();
ASSERT_EQ("0", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("genres_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("genres_sequence_id"));
ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size());
ASSERT_EQ(0, doc["genres_sequence_id"].size());
doc = collection_create_op.get()->get("1").get();
ASSERT_EQ("1", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("genres_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("genres_sequence_id"));
ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size());
ASSERT_EQ(1, doc["genres_sequence_id"].size());
ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]);
}
schema_json =
R"({
"name": "genres",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" }
]
})"_json;
documents = {
R"({"id":"0","name":"Grunge"})"_json,
R"({"id":"1","name":"Arena rock"})"_json
};
collection_create_op = collectionManager.create_collection(schema_json);
ASSERT_TRUE(collection_create_op.ok());
for (auto const &json: documents) {
auto add_op = collection_create_op.get()->add(json.dump());
if (!add_op.ok()) {
LOG(INFO) << add_op.error();
}
ASSERT_TRUE(add_op.ok());
}
req_params = {
{"collection", "songs"},
{"q", "*"},
{"include_fields", "$genres(name, strategy:nest) as genre"}
};
search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts);
ASSERT_TRUE(search_op.ok());
res_obj = nlohmann::json::parse(json_res);
ASSERT_EQ(2, res_obj["found"].get<size_t>());
ASSERT_EQ(2, res_obj["hits"].size());
ASSERT_EQ("Corduroy", res_obj["hits"][0]["document"]["title"].get<std::string>());
ASSERT_EQ(1, res_obj["hits"][0]["document"]["genre"].size());
ASSERT_EQ("Arena rock", res_obj["hits"][0]["document"]["genre"][0]["name"]);
ASSERT_EQ("Dil De Rani", res_obj["hits"][1]["document"]["title"].get<std::string>());
ASSERT_EQ(0, res_obj["hits"][1]["document"]["genre"].size());
{
auto const& songs_coll = collectionManager.get_collection_unsafe("songs");
doc_json = R"({"title":"Achilles Last Stand", "genres":["3","0","2"]})"_json;
add_doc_op = songs_coll->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
auto doc = songs_coll->get("2").get();
ASSERT_EQ("2", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("genres_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("genres_sequence_id"));
ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size());
ASSERT_EQ(3, doc["genres_sequence_id"].size());
ASSERT_EQ("3", doc["genres"][0]);
ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]);
ASSERT_EQ("0", doc["genres"][1]);
ASSERT_EQ(0, doc["genres_sequence_id"][1]);
ASSERT_EQ("2", doc["genres"][2]);
ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][2]);
auto remove_op = collection_create_op.get()->remove("0");
ASSERT_TRUE(remove_op.ok());
doc = songs_coll->get("2").get();
ASSERT_EQ("2", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("genres_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("genres_sequence_id"));
ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size());
ASSERT_EQ(2, doc["genres_sequence_id"].size());
ASSERT_EQ("3", doc["genres"][0]);
ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]);
ASSERT_EQ("2", doc["genres"][1]);
ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][1]);
doc_json = R"({"id":"2","name":"Blues"})"_json;
add_doc_op = collection_create_op.get()->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = songs_coll->get("2").get();
ASSERT_EQ("2", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("genres_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("genres_sequence_id"));
ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size());
ASSERT_EQ(2, doc["genres_sequence_id"].size());
ASSERT_EQ("3", doc["genres"][0]);
ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]);
ASSERT_EQ("2", doc["genres"][1]);
ASSERT_EQ(2, doc["genres_sequence_id"][1]);
}
req_params = {
{"collection", "songs"},
{"q", "*"},
{"include_fields", "$genres(name, strategy:nest) as genre"}
};
search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts);
ASSERT_TRUE(search_op.ok());
res_obj = nlohmann::json::parse(json_res);
ASSERT_EQ(3, res_obj["found"].get<size_t>());
ASSERT_EQ(3, res_obj["hits"].size());
ASSERT_EQ("Achilles Last Stand", res_obj["hits"][0]["document"]["title"].get<std::string>());
ASSERT_EQ(1, res_obj["hits"][0]["document"]["genre"].size());
ASSERT_EQ("Blues", res_obj["hits"][0]["document"]["genre"][0]["name"]);
ASSERT_EQ("Corduroy", res_obj["hits"][1]["document"]["title"].get<std::string>());
ASSERT_EQ(1, res_obj["hits"][1]["document"]["genre"].size());
ASSERT_EQ("Arena rock", res_obj["hits"][1]["document"]["genre"][0]["name"]);
ASSERT_EQ("Dil De Rani", res_obj["hits"][2]["document"]["title"].get<std::string>());
ASSERT_EQ(0, res_obj["hits"][2]["document"]["genre"].size());
collectionManager.dispose();
delete store;
store = new Store(state_dir_path);
collectionManager.init(store, 1.0, "auth_key", quit);
auto load_op = collectionManager.load(8, 1000);
if(!load_op.ok()) {
LOG(ERROR) << load_op.error();
}
ASSERT_TRUE(load_op.ok());
req_params = {
{"collection", "songs"},
{"q", "*"},
{"include_fields", "$genres(name, strategy:nest) as genre"}
};
search_op = collectionManager.do_search(req_params, embedded_params, json_res, now_ts);
ASSERT_TRUE(search_op.ok());
res_obj = nlohmann::json::parse(json_res);
ASSERT_EQ(3, res_obj["found"].get<size_t>());
ASSERT_EQ(3, res_obj["hits"].size());
ASSERT_EQ("Achilles Last Stand", res_obj["hits"][0]["document"]["title"].get<std::string>());
ASSERT_EQ(1, res_obj["hits"][0]["document"]["genre"].size());
ASSERT_EQ("Blues", res_obj["hits"][0]["document"]["genre"][0]["name"]);
ASSERT_EQ("Corduroy", res_obj["hits"][1]["document"]["title"].get<std::string>());
ASSERT_EQ(1, res_obj["hits"][1]["document"]["genre"].size());
ASSERT_EQ("Arena rock", res_obj["hits"][1]["document"]["genre"][0]["name"]);
ASSERT_EQ("Dil De Rani", res_obj["hits"][2]["document"]["title"].get<std::string>());
ASSERT_EQ(0, res_obj["hits"][2]["document"]["genre"].size());
{
auto const& songs_coll = collectionManager.get_collection_unsafe("songs");
auto doc = songs_coll->get("2").get();
ASSERT_EQ("2", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("genres_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("genres_sequence_id"));
ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size());
ASSERT_EQ(2, doc["genres_sequence_id"].size());
ASSERT_EQ("3", doc["genres"][0]);
ASSERT_EQ(UINT32_MAX, doc["genres_sequence_id"][0]);
ASSERT_EQ("2", doc["genres"][1]);
ASSERT_EQ(2, doc["genres_sequence_id"][1]);
auto const& genres_coll = collectionManager.get_collection_unsafe("genres");
doc_json = R"({"id":"3","name":"Metal"})"_json;
add_doc_op = genres_coll->add(doc_json.dump());
ASSERT_TRUE(add_doc_op.ok());
doc = songs_coll->get("2").get();
ASSERT_EQ("2", doc["id"]);
ASSERT_EQ(1, doc.count(".ref"));
ASSERT_EQ(1, doc[".ref"].size());
ASSERT_EQ("genres_sequence_id", doc[".ref"][0]);
ASSERT_EQ(1, doc.count("genres_sequence_id"));
ASSERT_TRUE(doc["genres"].size() == doc["genres_sequence_id"].size());
ASSERT_EQ(2, doc["genres_sequence_id"].size());
ASSERT_EQ("3", doc["genres"][0]);
ASSERT_EQ(3, doc["genres_sequence_id"][0]);
ASSERT_EQ("2", doc["genres"][1]);
ASSERT_EQ(2, doc["genres_sequence_id"][1]);
}
}
TEST_F(CollectionJoinTest, UpdateDocumentHavingReferenceField) {
auto schema_json =
R"({
@ -2573,6 +3187,7 @@ TEST_F(CollectionJoinTest, FilterByNestedReferences) {
ASSERT_EQ(1, res_obj["hits"][0]["document"]["Coll_B"].size());
ASSERT_EQ("coll_b_3", res_obj["hits"][0]["document"]["Coll_B"][0]["title"]);
ASSERT_EQ(1, res_obj["hits"][0]["document"]["Coll_B"][0].count("Coll_A"));
ASSERT_EQ(1, res_obj["hits"][0]["document"]["Coll_B"][0]["Coll_A"].size());
ASSERT_EQ("coll_a_0", res_obj["hits"][0]["document"]["Coll_B"][0]["Coll_A"]["title"]);

View File

@ -43,7 +43,7 @@ protected:
{"name": "points", "type": "int32"},
{"name": "person", "type": "object", "optional": true},
{"name": "vec", "type": "float[]", "num_dim": 128, "optional": true},
{"name": "product_id", "type": "string", "reference": "Products.product_id", "optional": true}
{"name": "product_id", "type": "string", "reference": "Products.product_id", "optional": true, "async_reference": true}
],
"default_sorting_field": "points",
"symbols_to_index":["+"],
@ -255,6 +255,7 @@ TEST_F(CollectionManagerTest, CollectionCreation) {
"stem":false
},
{
"async_reference":true,
"facet":false,
"index":true,
"infix":false,
@ -340,7 +341,10 @@ TEST_F(CollectionManagerTest, ShouldInitCollection) {
"\"string\", \"facet\": false}], \"default_sorting_field\": \"foo\"}");
spp::sparse_hash_map<std::string, std::string> referenced_in;
Collection *collection = collectionManager.init_collection(collection_meta1, 100, store, 1.0f, referenced_in);
spp::sparse_hash_map<std::string, std::vector<reference_pair_t>> async_referenced_ins;
Collection *collection = collectionManager.init_collection(collection_meta1, 100, store, 1.0f, referenced_in,
async_referenced_ins);
ASSERT_EQ("foobar", collection->get_name());
ASSERT_EQ(100, collection->get_collection_id());
ASSERT_EQ(1, collection->get_fields().size());
@ -362,7 +366,8 @@ TEST_F(CollectionManagerTest, ShouldInitCollection) {
"\"symbols_to_index\": [\"+\"], \"token_separators\": [\"-\"]}");
collection = collectionManager.init_collection(collection_meta2, 100, store, 1.0f, referenced_in);
collection = collectionManager.init_collection(collection_meta2, 100, store, 1.0f, referenced_in,
async_referenced_ins);
ASSERT_EQ(12345, collection->get_created_at());
std::vector<char> expected_symbols = {'+'};
@ -500,6 +505,25 @@ TEST_F(CollectionManagerTest, RestoreRecordsOnRestart) {
tsl::htrie_map<char, field> schema = collection1->get_schema();
ASSERT_EQ(schema.count("product_id_sequence_id"), 1);
auto products_schema_json =
R"({
"name": "Products",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "product_name", "type": "string"},
{"name": "product_description", "type": "string"}
]
})"_json;
auto const& collection_create_op = collectionManager.create_collection(products_schema_json);
ASSERT_TRUE(collection_create_op.ok());
auto async_ref_fields = collection_create_op.get()->get_async_referenced_ins();
ASSERT_EQ(1, async_ref_fields.size());
ASSERT_EQ(1, async_ref_fields.count("product_id"));
ASSERT_EQ(1, async_ref_fields["product_id"].size());
ASSERT_EQ("collection1", async_ref_fields["product_id"][0].collection);
ASSERT_EQ("product_id", async_ref_fields["product_id"][0].field);
// recreate collection manager to ensure that it restores the records from the disk backed store
collectionManager.dispose();
delete store;
@ -573,6 +597,13 @@ TEST_F(CollectionManagerTest, RestoreRecordsOnRestart) {
results = collection1->search("thomas", search_fields, "", facets, sort_fields, {0}, 10, 1, FREQUENCY, {false}).get();
ASSERT_EQ(4, results["hits"].size());
async_ref_fields = collectionManager.get_collection("Products").get()->get_async_referenced_ins();
ASSERT_EQ(1, async_ref_fields.size());
ASSERT_EQ(1, async_ref_fields.count("product_id"));
ASSERT_EQ(1, async_ref_fields["product_id"].size());
ASSERT_EQ("collection1", async_ref_fields["product_id"][0].collection);
ASSERT_EQ("product_id", async_ref_fields["product_id"][0].field);
}
TEST_F(CollectionManagerTest, VerifyEmbeddedParametersOfScopedAPIKey) {
@ -1852,7 +1883,7 @@ TEST_F(CollectionManagerTest, ReferencedInBacklog) {
auto const& references = referenced_ins_backlog.at("Products");
ASSERT_EQ(1, references.size());
ASSERT_EQ("collection1", references.cbegin()->collection);
ASSERT_EQ("product_id_sequence_id", references.cbegin()->field);
ASSERT_EQ("product_id", references.cbegin()->field);
auto schema_json =
R"({
@ -1871,7 +1902,7 @@ TEST_F(CollectionManagerTest, ReferencedInBacklog) {
auto get_reference_field_op = create_op.get()->get_referenced_in_field_with_lock("collection1");
ASSERT_TRUE(get_reference_field_op.ok());
ASSERT_EQ("product_id_sequence_id", get_reference_field_op.get());
ASSERT_EQ("product_id", get_reference_field_op.get());
get_reference_field_op = create_op.get()->get_referenced_in_field_with_lock("foo");
ASSERT_FALSE(get_reference_field_op.ok());
@ -2071,7 +2102,8 @@ TEST_F(CollectionManagerTest, PopulateReferencedIns) {
"name": "B",
"fields": [
{"name": "b_id", "type": "string"},
{"name": "b_ref", "type": "string", "reference": "A.a_id"}
{"name": "a_ref", "type": "string", "reference": "A.a_id"},
{"name": "c_ref", "type": "string", "reference": "C.c_id", "async_reference": true}
]
})"_json.dump(),
R"({
@ -2082,16 +2114,29 @@ TEST_F(CollectionManagerTest, PopulateReferencedIns) {
})"_json.dump(),
};
std::map<std::string, spp::sparse_hash_map<std::string, std::string>> referenced_ins;
std::map<std::string, spp::sparse_hash_map<std::string, std::vector<reference_pair_t>>> async_referenced_ins;
for (const auto &collection_meta_json: collection_meta_jsons) {
CollectionManager::_populate_referenced_ins(collection_meta_json, referenced_ins);
CollectionManager::_populate_referenced_ins(collection_meta_json, referenced_ins, async_referenced_ins);
}
ASSERT_EQ(1, referenced_ins.size());
ASSERT_EQ(2, referenced_ins.size());
ASSERT_EQ(1, referenced_ins.count("A"));
ASSERT_EQ(1, referenced_ins["A"].size());
ASSERT_EQ(1, referenced_ins["A"].count("B"));
ASSERT_EQ("b_ref_sequence_id", referenced_ins["A"]["B"]);
ASSERT_EQ("a_ref", referenced_ins["A"]["B"]);
ASSERT_EQ(1, referenced_ins.count("C"));
ASSERT_EQ(1, referenced_ins["C"].size());
ASSERT_EQ(1, referenced_ins["C"].count("B"));
ASSERT_EQ("c_ref", referenced_ins["C"]["B"]);
ASSERT_EQ(1, async_referenced_ins.count("C"));
ASSERT_EQ(1, async_referenced_ins["C"].size());
ASSERT_EQ(1, async_referenced_ins["C"].count("c_id"));
ASSERT_EQ(1, async_referenced_ins["C"]["c_id"].size());
ASSERT_EQ("B", async_referenced_ins["C"]["c_id"][0].collection);
ASSERT_EQ("c_ref", async_referenced_ins["C"]["c_id"][0].field);
}
TEST_F(CollectionManagerTest, CollectionPagination) {