mirror of
https://github.com/typesense/typesense.git
synced 2025-05-22 06:40:30 +08:00
Merge branch 'single-index-concurrency' into v0.23
# Conflicts: # test/collection_specific_test.cpp
This commit is contained in:
commit
1f4384797e
@ -35,7 +35,8 @@ rm -rf /tmp/typesense-server-$TS_VERSION /tmp/typesense-server-$TS_VERSION.tar.g
|
||||
|
||||
sed -i "s/\$VERSION/$TS_VERSION/g" `find /tmp/typesense-deb-build -maxdepth 10 -type f`
|
||||
|
||||
dpkg -b /tmp/typesense-deb-build/typesense-server "/tmp/typesense-deb-build/typesense-server-${TS_VERSION}-amd64.deb"
|
||||
dpkg-deb -Zgzip -z6 \
|
||||
-b /tmp/typesense-deb-build/typesense-server "/tmp/typesense-deb-build/typesense-server-${TS_VERSION}-amd64.deb"
|
||||
|
||||
# Generate RPM
|
||||
|
||||
|
@ -29,6 +29,11 @@ using array_mapped_facet_t = std::array<facet_map_t*, ARRAY_FACET_DIM>;
|
||||
struct token_t {
|
||||
size_t position;
|
||||
std::string value;
|
||||
bool prefix;
|
||||
|
||||
token_t(size_t position, const std::string& value, bool prefix): position(position), value(value), prefix(prefix) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
struct token_candidates {
|
||||
@ -510,7 +515,8 @@ private:
|
||||
bool prioritize_exact_match,
|
||||
bool exhaustive_search,
|
||||
size_t concurrency,
|
||||
std::set<uint64>& query_hashes) const;
|
||||
std::set<uint64>& query_hashes,
|
||||
std::vector<uint32_t>& id_buff) const;
|
||||
|
||||
void do_filtering(uint32_t*& filter_ids, uint32_t& filter_ids_length, const std::vector<filter>& filters,
|
||||
const bool enable_short_circuit) const;
|
||||
|
@ -65,9 +65,11 @@ public:
|
||||
|
||||
to_expanded_plists(raw_posting_lists, plists, expanded_plists);
|
||||
|
||||
std::sort(this->plists.begin(), this->plists.end(), [](posting_list_t* a, posting_list_t* b) {
|
||||
return a->num_blocks() < b->num_blocks();
|
||||
});
|
||||
if(plists.size() > 1) {
|
||||
std::sort(this->plists.begin(), this->plists.end(), [](posting_list_t* a, posting_list_t* b) {
|
||||
return a->num_blocks() < b->num_blocks();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
~block_intersector_t() {
|
||||
|
@ -105,10 +105,9 @@ struct Topster {
|
||||
KV *data;
|
||||
KV** kvs;
|
||||
|
||||
// For distinct, stores the min heap kv of each group_kv_map topster value
|
||||
std::unordered_map<uint64_t, KV*> kv_map;
|
||||
|
||||
std::unordered_map<uint64_t, Topster*> group_kv_map;
|
||||
spp::sparse_hash_map<uint64_t, Topster*> group_kv_map;
|
||||
size_t distinct;
|
||||
|
||||
explicit Topster(size_t capacity): Topster(capacity, 0) {
|
||||
@ -172,64 +171,17 @@ struct Topster {
|
||||
bool SIFT_DOWN = true;
|
||||
|
||||
if(distinct) {
|
||||
const auto& found_it = group_kv_map.find(kv->distinct_key);
|
||||
bool is_duplicate_key = (found_it != group_kv_map.end());
|
||||
|
||||
if(!is_duplicate_key && less_than_min_heap) {
|
||||
// for distinct, if a non duplicate kv is < than min heap we ignore
|
||||
return false;
|
||||
}
|
||||
|
||||
if(is_duplicate_key) {
|
||||
// if min heap (group_topster.kvs[0]) changes, we have to update kvs and sift
|
||||
Topster* group_topster = found_it->second;
|
||||
KV old_min_heap_kv = *kv_map[kv->distinct_key];
|
||||
bool added = group_topster->add(kv);
|
||||
|
||||
if(!added) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// if new kv score is greater than previous min heap score we sift down, otherwise sift up
|
||||
SIFT_DOWN = is_greater(kv, &old_min_heap_kv);
|
||||
|
||||
// new kv is different from old_min_heap_kv so we have to sift heap
|
||||
heap_op_index = old_min_heap_kv.array_index;
|
||||
|
||||
// erase current min heap key from kv_map
|
||||
kv_map.erase(old_min_heap_kv.distinct_key);
|
||||
|
||||
// Grouping cannot be a streaming operation, so aggregate the KVs associated with every group.
|
||||
auto kvs_it = group_kv_map.find(kv->distinct_key);
|
||||
if(kvs_it != group_kv_map.end()) {
|
||||
kvs_it->second->add(kv);
|
||||
} else {
|
||||
// kv is guaranteed to be > current min heap: kvs[0]
|
||||
// create fresh topster for this distinct group key since it does not exist
|
||||
Topster* group_topster = new Topster(distinct, 0);
|
||||
group_topster->add(kv);
|
||||
|
||||
// add new group key to map
|
||||
group_kv_map.emplace(kv->distinct_key, group_topster);
|
||||
|
||||
// find heap operation index for updating kvs
|
||||
|
||||
if(size < MAX_SIZE) {
|
||||
// there is enough space in heap we just copy to end
|
||||
SIFT_DOWN = false;
|
||||
heap_op_index = size;
|
||||
size++;
|
||||
} else {
|
||||
SIFT_DOWN = true;
|
||||
|
||||
// max size is reached so we are forced to replace current min heap element (kvs[0])
|
||||
heap_op_index = 0;
|
||||
|
||||
// remove current min heap group key from maps
|
||||
delete group_kv_map[kvs[heap_op_index]->distinct_key];
|
||||
group_kv_map.erase(kvs[heap_op_index]->distinct_key);
|
||||
kv_map.erase(kvs[heap_op_index]->distinct_key);
|
||||
}
|
||||
Topster* g_topster = new Topster(distinct, 0);
|
||||
g_topster->add(kv);
|
||||
group_kv_map.insert({kv->distinct_key, g_topster});
|
||||
}
|
||||
|
||||
// kv will be copied into the pointer at heap_op_index
|
||||
kv_map.emplace(kv->distinct_key, kvs[heap_op_index]);
|
||||
|
||||
return true;
|
||||
|
||||
} else { // not distinct
|
||||
//LOG(INFO) << "Searching for key: " << kv->key;
|
||||
@ -334,9 +286,8 @@ struct Topster {
|
||||
|
||||
// topster must be sorted before iterated upon to remove dead array entries
|
||||
void sort() {
|
||||
std::stable_sort(kvs, kvs + size, is_greater);
|
||||
for(auto &group_topster: group_kv_map) {
|
||||
group_topster.second->sort();
|
||||
if(!distinct) {
|
||||
std::stable_sort(kvs, kvs + size, is_greater);
|
||||
}
|
||||
}
|
||||
|
||||
|
10
src/art.cpp
10
src/art.cpp
@ -950,6 +950,8 @@ int art_topk_iter(const art_node *root, token_ordering token_order, size_t max_r
|
||||
|
||||
q.push(root);
|
||||
|
||||
size_t num_large_lists = 0;
|
||||
|
||||
while(!q.empty() && results.size() < max_results*4) {
|
||||
art_node *n = (art_node *) q.top();
|
||||
q.pop();
|
||||
@ -974,6 +976,10 @@ int art_topk_iter(const art_node *root, token_ordering token_order, size_t max_r
|
||||
results.push_back(l);
|
||||
} else {
|
||||
// we will push leaf only if filter matches with leaf IDs
|
||||
if(!IS_COMPACT_POSTING(l->values)) {
|
||||
num_large_lists++;
|
||||
}
|
||||
|
||||
bool found_atleast_one = posting_t::contains_atleast_one(l->values, filter_ids, filter_ids_length);
|
||||
if(found_atleast_one) {
|
||||
results.push_back(l);
|
||||
@ -1024,6 +1030,10 @@ int art_topk_iter(const art_node *root, token_ordering token_order, size_t max_r
|
||||
}
|
||||
}
|
||||
|
||||
/*LOG(INFO) << "leaf results.size: " << results.size()
|
||||
<< ", filter_ids_length: " << filter_ids_length
|
||||
<< ", num_large_lists: " << num_large_lists;*/
|
||||
|
||||
printf("OUTSIDE art_topk_iter: results size: %d\n", results.size());
|
||||
return 0;
|
||||
}
|
||||
|
@ -255,9 +255,11 @@ void BatchedIndexer::run() {
|
||||
const std::string& req_key_prefix = get_req_prefix_key(it->second.start_ts);
|
||||
store->delete_range(req_key_prefix, req_key_prefix + StringUtils::serialize_uint32_t(UINT32_MAX));
|
||||
|
||||
it->second.res->final = true;
|
||||
async_req_res_t* async_req_res = new async_req_res_t(it->second.req, it->second.res, true);
|
||||
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, async_req_res);
|
||||
if(it->second.res->is_alive) {
|
||||
it->second.res->final = true;
|
||||
async_req_res_t* async_req_res = new async_req_res_t(it->second.req, it->second.res, true);
|
||||
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, async_req_res);
|
||||
}
|
||||
|
||||
it = req_res_map.erase(it);
|
||||
} else {
|
||||
@ -352,7 +354,7 @@ void BatchedIndexer::load_state(const nlohmann::json& state) {
|
||||
queued_writes = state["queued_writes"].get<int64_t>();
|
||||
|
||||
size_t num_reqs_restored = 0;
|
||||
std::vector<uint64_t> queue_ids;
|
||||
std::set<uint64_t> queue_ids;
|
||||
|
||||
for(auto& kv: state["req_res_map"].items()) {
|
||||
std::shared_ptr<http_req> req = std::make_shared<http_req>();
|
||||
@ -380,19 +382,19 @@ void BatchedIndexer::load_state(const nlohmann::json& state) {
|
||||
|
||||
const std::string& coll_name = get_collection_name(req);
|
||||
uint64_t queue_id = StringUtils::hash_wy(coll_name.c_str(), coll_name.size()) % num_threads;
|
||||
queue_ids.push_back(queue_id);
|
||||
queue_ids.insert(queue_id);
|
||||
std::unique_lock qlk(qmutuxes[queue_id].mcv);
|
||||
queues[queue_id].emplace_back(req->start_ts);
|
||||
qmutuxes[queue_id].cv.notify_one();
|
||||
}
|
||||
|
||||
num_reqs_restored++;
|
||||
}
|
||||
|
||||
// need to sort on `start_ts` to preserve original order
|
||||
// need to sort on `start_ts` to preserve original order before notifying queues
|
||||
for(auto queue_id: queue_ids) {
|
||||
std::unique_lock lk(qmutuxes[queue_id].mcv);
|
||||
std::sort(queues[queue_id].begin(), queues[queue_id].end());
|
||||
qmutuxes[queue_id].cv.notify_one();
|
||||
}
|
||||
|
||||
LOG(INFO) << "Restored " << num_reqs_restored << " in-flight requests from snapshot.";
|
||||
|
@ -223,22 +223,28 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, nlohma
|
||||
// NOTE: we overwrite the input json_lines with result to avoid memory pressure
|
||||
|
||||
record.is_update = false;
|
||||
bool repeated_doc = false;
|
||||
|
||||
if(!doc_seq_id_op.ok()) {
|
||||
record.index_failure(doc_seq_id_op.code(), doc_seq_id_op.error());
|
||||
} else {
|
||||
const std::string& doc_id = record.doc["id"].get<std::string>();
|
||||
repeated_doc = (batch_doc_ids.find(doc_id) != batch_doc_ids.end());
|
||||
|
||||
if(repeated_doc) {
|
||||
// when a document repeats, we send the batch until this document so that we can deal with conflicts
|
||||
i--;
|
||||
goto do_batched_index;
|
||||
}
|
||||
|
||||
record.is_update = !doc_seq_id_op.get().is_new;
|
||||
|
||||
if(record.is_update) {
|
||||
get_document_from_store(get_seq_id_key(seq_id), record.old_doc);
|
||||
} else {
|
||||
const std::string& doc_id = record.doc["id"].get<std::string>();
|
||||
if(batch_doc_ids.find(doc_id) != batch_doc_ids.end()) {
|
||||
record.index_failure(400, "Document with `id` " + doc_id + " already exists in the batch.");
|
||||
} else {
|
||||
batch_doc_ids.emplace(doc_id);
|
||||
}
|
||||
}
|
||||
|
||||
batch_doc_ids.insert(doc_id);
|
||||
|
||||
// if `fallback_field_type` or `dynamic_fields` is enabled, update schema first before indexing
|
||||
if(!fallback_field_type.empty() || !dynamic_fields.empty()) {
|
||||
Option<bool> schema_change_op = check_and_update_schema(record.doc, dirty_values);
|
||||
@ -248,25 +254,11 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, nlohma
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// check for memory threshold before allowing subsequent batches
|
||||
if(is_exceeding_memory_threshold()) {
|
||||
exceeds_memory_limit = true;
|
||||
}
|
||||
|
||||
if(exceeds_memory_limit) {
|
||||
nlohmann::json index_res;
|
||||
index_res["error"] = "Max memory ratio exceeded.";
|
||||
index_res["success"] = false;
|
||||
index_res["document"] = json_line;
|
||||
json_lines[i] = index_res.dump();
|
||||
record.index_failure(500, "Max memory ratio exceeded.");
|
||||
}
|
||||
*/
|
||||
|
||||
index_records.emplace_back(std::move(record));
|
||||
|
||||
if((i+1) % index_batch_size == 0 || i == json_lines.size()-1) {
|
||||
do_batched_index:
|
||||
|
||||
if((i+1) % index_batch_size == 0 || i == json_lines.size()-1 || repeated_doc) {
|
||||
batch_index(index_records, json_lines, num_indexed);
|
||||
|
||||
// to return the document for the single doc add cases
|
||||
@ -1393,12 +1385,27 @@ void Collection::parse_search_query(const std::string &query, std::vector<std::s
|
||||
|
||||
void Collection::populate_result_kvs(Topster *topster, std::vector<std::vector<KV *>> &result_kvs) {
|
||||
if(topster->distinct) {
|
||||
for(auto &group_topster_entry: topster->group_kv_map) {
|
||||
Topster* group_topster = group_topster_entry.second;
|
||||
const std::vector<KV*> group_kvs(group_topster->kvs, group_topster->kvs+group_topster->size);
|
||||
result_kvs.emplace_back(group_kvs);
|
||||
// we have to pick top-K groups
|
||||
Topster gtopster(topster->MAX_SIZE);
|
||||
|
||||
for(auto& group_topster: topster->group_kv_map) {
|
||||
group_topster.second->sort();
|
||||
if(group_topster.second->size != 0) {
|
||||
KV* kv_head = group_topster.second->getKV(0);
|
||||
gtopster.add(kv_head);
|
||||
}
|
||||
}
|
||||
|
||||
gtopster.sort();
|
||||
|
||||
for(size_t i = 0; i < gtopster.size; i++) {
|
||||
KV* kv = gtopster.getKV(i);
|
||||
const std::vector<KV*> group_kvs(
|
||||
topster->group_kv_map[kv->distinct_key]->kvs,
|
||||
topster->group_kv_map[kv->distinct_key]->kvs+topster->group_kv_map[kv->distinct_key]->size
|
||||
);
|
||||
result_kvs.emplace_back(group_kvs);
|
||||
}
|
||||
} else {
|
||||
for(uint32_t t = 0; t < topster->size; t++) {
|
||||
KV* kv = topster->getKV(t);
|
||||
|
@ -34,9 +34,15 @@ bool handle_authentication(std::map<std::string, std::string>& req_params, const
|
||||
|
||||
void stream_response(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
|
||||
if(!res->is_alive) {
|
||||
// underlying request is dead or this is a raft log playback
|
||||
return ;
|
||||
}
|
||||
|
||||
if(req->_req->res.status != 0) {
|
||||
// not the first response chunk, so wait for previous chunk to finish
|
||||
res->wait();
|
||||
}
|
||||
|
||||
auto req_res = new async_req_res_t(req, res, true);
|
||||
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
|
||||
}
|
||||
|
@ -767,6 +767,7 @@ void HttpServer::stream_response(stream_response_state_t& state) {
|
||||
|
||||
if(state.res_body.len == 0 && state.send_state != H2O_SEND_STATE_FINAL) {
|
||||
// without this guard, http streaming will break
|
||||
state.generator->proceed(state.generator, req);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -908,13 +909,15 @@ bool HttpServer::on_request_proceed_message(void *data) {
|
||||
// This callback will run concurrently to batch indexer's run() so care must be taken to protect access
|
||||
// to variables that are written to by the batch indexer, which for now is only: last_chunk_aggregate (atomic)
|
||||
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(data);
|
||||
auto stream_state = (req_res->req->last_chunk_aggregate) ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
|
||||
if(req_res->res->is_alive) {
|
||||
auto stream_state = (req_res->req->last_chunk_aggregate) ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
|
||||
|
||||
size_t written = req_res->req->chunk_len;
|
||||
req_res->req->chunk_len = 0;
|
||||
size_t written = req_res->req->chunk_len;
|
||||
req_res->req->chunk_len = 0;
|
||||
|
||||
if(req_res->req->_req && req_res->req->_req->proceed_req) {
|
||||
req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state);
|
||||
if(req_res->req->_req && req_res->req->_req->proceed_req) {
|
||||
req_res->req->_req->proceed_req(req_res->req->_req, written, stream_state);
|
||||
}
|
||||
}
|
||||
|
||||
if(req_res->destroy_after_use) {
|
||||
|
@ -1075,7 +1075,8 @@ void Index::search_candidates(const uint8_t & field_id, bool field_is_array,
|
||||
bool prioritize_exact_match,
|
||||
const bool exhaustive_search,
|
||||
const size_t concurrency,
|
||||
std::set<uint64>& query_hashes) const {
|
||||
std::set<uint64>& query_hashes,
|
||||
std::vector<uint32_t>& id_buff) const {
|
||||
|
||||
auto product = []( long long a, token_candidates & b ) { return a*b.candidates.size(); };
|
||||
long long int N = std::accumulate(token_candidates_vec.begin(), token_candidates_vec.end(), 1LL, product);
|
||||
@ -1180,11 +1181,15 @@ void Index::search_candidates(const uint8_t & field_id, bool field_is_array,
|
||||
for(size_t i = 0; i < concurrency; i++) {
|
||||
// empty vec can happen if not all threads produce results
|
||||
if (!result_id_vecs[i].empty()) {
|
||||
uint32_t* new_all_result_ids = nullptr;
|
||||
all_result_ids_len = ArrayUtils::or_scalar(*all_result_ids, all_result_ids_len, &result_id_vecs[i][0],
|
||||
result_id_vecs[i].size(), &new_all_result_ids);
|
||||
delete[] *all_result_ids;
|
||||
*all_result_ids = new_all_result_ids;
|
||||
if(exhaustive_search) {
|
||||
id_buff.insert(id_buff.end(), result_id_vecs[i].begin(), result_id_vecs[i].end());
|
||||
} else {
|
||||
uint32_t* new_all_result_ids = nullptr;
|
||||
all_result_ids_len = ArrayUtils::or_scalar(*all_result_ids, all_result_ids_len, &result_id_vecs[i][0],
|
||||
result_id_vecs[i].size(), &new_all_result_ids);
|
||||
delete[] *all_result_ids;
|
||||
*all_result_ids = new_all_result_ids;
|
||||
}
|
||||
|
||||
num_result_ids += result_id_vecs[i].size();
|
||||
|
||||
@ -1200,6 +1205,20 @@ void Index::search_candidates(const uint8_t & field_id, bool field_is_array,
|
||||
}
|
||||
}
|
||||
|
||||
if(id_buff.size() > 100000) {
|
||||
// prevents too many ORs during exhaustive searching
|
||||
std::sort(id_buff.begin(), id_buff.end());
|
||||
id_buff.erase(std::unique( id_buff.begin(), id_buff.end() ), id_buff.end());
|
||||
|
||||
uint32_t* new_all_result_ids = nullptr;
|
||||
all_result_ids_len = ArrayUtils::or_scalar(*all_result_ids, all_result_ids_len, &id_buff[0],
|
||||
id_buff.size(), &new_all_result_ids);
|
||||
delete[] *all_result_ids;
|
||||
*all_result_ids = new_all_result_ids;
|
||||
num_result_ids += id_buff.size();
|
||||
id_buff.clear();
|
||||
}
|
||||
|
||||
if(num_result_ids == 0) {
|
||||
continue;
|
||||
}
|
||||
@ -1904,7 +1923,8 @@ bool Index::check_for_overrides(const token_ordering& token_order, const string&
|
||||
std::vector<token_t> window_tokens;
|
||||
std::set<std::string> window_tokens_set;
|
||||
for (size_t i = start_index; i < start_index + window_len; i++) {
|
||||
window_tokens.push_back({i, tokens[i]});
|
||||
bool is_prefix = (i == (start_index + window_len - 1));
|
||||
window_tokens.emplace_back(i, tokens[i], is_prefix);
|
||||
window_tokens_set.emplace(tokens[i]);
|
||||
}
|
||||
|
||||
@ -2081,14 +2101,16 @@ void Index::search(std::vector<query_tokens_t>& field_query_tokens,
|
||||
for(size_t i = 0; i < num_search_fields; i++) {
|
||||
std::vector<token_t> q_include_pos_tokens;
|
||||
for(size_t j=0; j < field_query_tokens[i].q_include_tokens.size(); j++) {
|
||||
q_include_pos_tokens.push_back({j, field_query_tokens[i].q_include_tokens[j]});
|
||||
bool is_prefix = (j == field_query_tokens[i].q_include_tokens.size()-1);
|
||||
q_include_pos_tokens.emplace_back(j, field_query_tokens[i].q_include_tokens[j], is_prefix);
|
||||
}
|
||||
|
||||
std::vector<std::vector<token_t>> q_pos_synonyms;
|
||||
for(const auto& q_syn_vec: field_query_tokens[i].q_synonyms) {
|
||||
std::vector<token_t> q_pos_syn;
|
||||
for(size_t j=0; j < q_syn_vec.size(); j++) {
|
||||
q_pos_syn.push_back({j, q_syn_vec[j]});
|
||||
bool is_prefix = (j == q_syn_vec.size()-1);
|
||||
q_pos_syn.emplace_back(j, q_syn_vec[j], is_prefix);
|
||||
}
|
||||
q_pos_synonyms.emplace_back(q_pos_syn);
|
||||
}
|
||||
@ -2175,6 +2197,8 @@ void Index::search(std::vector<query_tokens_t>& field_query_tokens,
|
||||
}
|
||||
}
|
||||
|
||||
//auto begin0 = std::chrono::high_resolution_clock::now();
|
||||
|
||||
for(auto& seq_id_kvs: topster_ids) {
|
||||
const uint64_t seq_id = seq_id_kvs.first;
|
||||
auto& kvs = seq_id_kvs.second; // each `kv` can be from a different field
|
||||
@ -2354,6 +2378,11 @@ void Index::search(std::vector<query_tokens_t>& field_query_tokens,
|
||||
kvs[0]->scores[kvs[0]->match_score_index] = aggregated_score;
|
||||
topster->add(kvs[0]);
|
||||
}
|
||||
|
||||
/*auto timeMillis0 = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::high_resolution_clock::now() - begin0).count();
|
||||
|
||||
LOG(INFO) << "Time taken for multi-field aggregation: " << timeMillis0 << "ms";*/
|
||||
}
|
||||
|
||||
//LOG(INFO) << "topster size: " << topster->size;
|
||||
@ -2517,8 +2546,9 @@ void Index::compute_facet_infos(const std::vector<facet>& facets, facet_query_t&
|
||||
std::vector<token_t> search_tokens, qtokens;
|
||||
|
||||
for (size_t qtoken_index = 0; qtoken_index < query_tokens.size(); qtoken_index++) {
|
||||
search_tokens.emplace_back(token_t{qtoken_index, query_tokens[qtoken_index]});
|
||||
qtokens.emplace_back(token_t{qtoken_index, query_tokens[qtoken_index]});
|
||||
bool is_prefix = (qtoken_index == query_tokens.size()-1);
|
||||
search_tokens.emplace_back(qtoken_index, query_tokens[qtoken_index], is_prefix);
|
||||
qtokens.emplace_back(qtoken_index, query_tokens[qtoken_index], is_prefix);
|
||||
}
|
||||
|
||||
std::vector<std::vector<art_leaf*>> searched_queries;
|
||||
@ -2867,21 +2897,28 @@ void Index::search_field(const uint8_t & field_id,
|
||||
const std::string token_cost_hash = token + std::to_string(costs[token_index]);
|
||||
|
||||
std::vector<art_leaf*> leaves;
|
||||
//LOG(INFO) << "Searching for field: " << field << ", token:" << token << " - cost: " << costs[token_index];
|
||||
const bool prefix_search = prefix && search_tokens[token_index].prefix;
|
||||
|
||||
const bool prefix_search = prefix && (token_index == search_tokens.size()-1);
|
||||
/*LOG(INFO) << "Searching for field: " << the_field.name << ", token:"
|
||||
<< token << " - cost: " << costs[token_index] << ", prefix_search: " << prefix_search;*/
|
||||
|
||||
if(token_cost_cache.count(token_cost_hash) != 0) {
|
||||
leaves = token_cost_cache[token_cost_hash];
|
||||
} else {
|
||||
// prefix should apply only for last token
|
||||
const size_t token_len = prefix_search ? (int) token.length() : (int) token.length() + 1;
|
||||
|
||||
//auto begin = std::chrono::high_resolution_clock::now();
|
||||
|
||||
// need less candidates for filtered searches since we already only pick tokens with results
|
||||
art_fuzzy_search(search_index.at(field_name), (const unsigned char *) token.c_str(), token_len,
|
||||
costs[token_index], costs[token_index], num_fuzzy_candidates, token_order, prefix_search,
|
||||
filter_ids, filter_ids_length, leaves, unique_tokens);
|
||||
|
||||
/*auto timeMillis = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::high_resolution_clock::now() - begin).count();
|
||||
|
||||
LOG(INFO) << "Time taken for fuzzy search: " << timeMillis << "ms";*/
|
||||
|
||||
if(!leaves.empty()) {
|
||||
token_cost_cache.emplace(token_cost_hash, leaves);
|
||||
for(auto leaf: leaves) {
|
||||
@ -2926,13 +2963,26 @@ void Index::search_field(const uint8_t & field_id,
|
||||
}
|
||||
|
||||
if(!token_candidates_vec.empty()) {
|
||||
std::vector<uint32_t> id_buff;
|
||||
|
||||
// If atleast one token is found, go ahead and search for candidates
|
||||
search_candidates(field_id, the_field.is_array(), filter_ids, filter_ids_length,
|
||||
exclude_token_ids, exclude_token_ids_size,
|
||||
curated_ids, sort_fields, token_candidates_vec, searched_queries, topster,
|
||||
groups_processed, all_result_ids, all_result_ids_len, field_num_results,
|
||||
typo_tokens_threshold, group_limit, group_by_fields, query_tokens,
|
||||
prioritize_exact_match, combination_limit, concurrency, query_hashes);
|
||||
prioritize_exact_match, combination_limit, concurrency, query_hashes, id_buff);
|
||||
|
||||
if(id_buff.size() > 1) {
|
||||
std::sort(id_buff.begin(), id_buff.end());
|
||||
id_buff.erase(std::unique( id_buff.begin(), id_buff.end() ), id_buff.end());
|
||||
}
|
||||
|
||||
uint32_t* new_all_result_ids = nullptr;
|
||||
all_result_ids_len = ArrayUtils::or_scalar(*all_result_ids, all_result_ids_len, &id_buff[0],
|
||||
id_buff.size(), &new_all_result_ids);
|
||||
delete[] *all_result_ids;
|
||||
*all_result_ids = new_all_result_ids;
|
||||
}
|
||||
|
||||
resume_typo_loop:
|
||||
@ -2962,13 +3012,13 @@ void Index::search_field(const uint8_t & field_id,
|
||||
// drop from right
|
||||
size_t end_index = (query_tokens.size() - 1) - num_tokens_dropped;
|
||||
for(size_t i=0; i <= end_index; i++) {
|
||||
truncated_tokens.push_back({query_tokens[i].position, query_tokens[i].value});
|
||||
truncated_tokens.emplace_back(query_tokens[i].position, query_tokens[i].value, query_tokens[i].prefix);
|
||||
}
|
||||
} else {
|
||||
// drop from left
|
||||
size_t start_index = (num_tokens_dropped - mid_index);
|
||||
for(size_t i=start_index; i<query_tokens.size(); i++) {
|
||||
truncated_tokens.push_back({query_tokens[i].position, query_tokens[i].value});
|
||||
truncated_tokens.emplace_back(query_tokens[i].position, query_tokens[i].value, query_tokens[i].prefix);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -221,17 +221,23 @@ bool compact_posting_list_t::contains_atleast_one(const uint32_t* target_ids, si
|
||||
size_t num_existing_offsets = id_offsets[i];
|
||||
size_t existing_id = id_offsets[i + num_existing_offsets + 1];
|
||||
|
||||
if(existing_id == target_ids[target_ids_index]) {
|
||||
return true;
|
||||
// Returns iterator to the first element that is >= to value or last if no such element is found.
|
||||
size_t found_index = std::lower_bound(target_ids + target_ids_index,
|
||||
target_ids + target_ids_size, existing_id) - target_ids;
|
||||
|
||||
if(found_index == target_ids_size) {
|
||||
// all elements are lesser than lowest value (existing_id), so we can stop looking
|
||||
return false;
|
||||
} else {
|
||||
if(target_ids[found_index] == existing_id) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// adjust lower bound to found_index+1 whose value is >= `existing_id`
|
||||
target_ids_index = found_index;
|
||||
}
|
||||
|
||||
if(target_ids[target_ids_index] < existing_id) {
|
||||
while(target_ids_index < target_ids_size && target_ids[target_ids_index] < existing_id) {
|
||||
target_ids_index++;
|
||||
}
|
||||
} else {
|
||||
i += num_existing_offsets + 2;
|
||||
}
|
||||
i += num_existing_offsets + 2;
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -186,7 +186,9 @@ bool posting_list_t::block_t::contains(uint32_t id) {
|
||||
/* posting_list_t operations */
|
||||
|
||||
posting_list_t::posting_list_t(uint16_t max_block_elements): BLOCK_MAX_ELEMENTS(max_block_elements) {
|
||||
|
||||
if(max_block_elements <= 1) {
|
||||
throw std::invalid_argument("max_block_elements must be > 1");
|
||||
}
|
||||
}
|
||||
|
||||
posting_list_t::~posting_list_t() {
|
||||
@ -267,7 +269,8 @@ void posting_list_t::merge_adjacent_blocks(posting_list_t::block_t* block1, post
|
||||
size_t new_block1_offsets_size = block1->offsets.getLength() + num_block2_offsets_to_move;
|
||||
uint32_t* new_block1_offsets = new uint32_t[new_block1_offsets_size];
|
||||
|
||||
uint32_t min = offsets1[0], max = offsets1[0];
|
||||
uint32_t min = block1->offsets.getLength() != 0 ? offsets1[0] : 0;
|
||||
uint32_t max = min;
|
||||
|
||||
// we have to manually copy over so we can find the new min and max
|
||||
for(size_t i = 0; i < block1->offsets.getLength(); i++) {
|
||||
@ -423,7 +426,7 @@ void posting_list_t::upsert(const uint32_t id, const std::vector<uint32_t>& offs
|
||||
} else {
|
||||
const auto it = id_block_map.lower_bound(id);
|
||||
upsert_block = (it == id_block_map.end()) ? id_block_map.rbegin()->second : it->second;
|
||||
before_upsert_last_id = upsert_block->ids.at(upsert_block->size() - 1);
|
||||
before_upsert_last_id = upsert_block->ids.last();
|
||||
}
|
||||
|
||||
// happy path: upsert_block is not full
|
||||
@ -431,7 +434,7 @@ void posting_list_t::upsert(const uint32_t id, const std::vector<uint32_t>& offs
|
||||
uint32_t num_inserted = upsert_block->upsert(id, offsets);
|
||||
ids_length += num_inserted;
|
||||
|
||||
last_id_t after_upsert_last_id = upsert_block->ids.at(upsert_block->size() - 1);
|
||||
last_id_t after_upsert_last_id = upsert_block->ids.last();
|
||||
if(before_upsert_last_id != after_upsert_last_id) {
|
||||
id_block_map.erase(before_upsert_last_id);
|
||||
id_block_map.emplace(after_upsert_last_id, upsert_block);
|
||||
@ -451,12 +454,12 @@ void posting_list_t::upsert(const uint32_t id, const std::vector<uint32_t>& offs
|
||||
// evenly divide elements between both blocks
|
||||
split_block(upsert_block, new_block);
|
||||
|
||||
last_id_t after_upsert_last_id = upsert_block->ids.at(upsert_block->size() - 1);
|
||||
last_id_t after_upsert_last_id = upsert_block->ids.last();
|
||||
id_block_map.erase(before_upsert_last_id);
|
||||
id_block_map.emplace(after_upsert_last_id, upsert_block);
|
||||
}
|
||||
|
||||
last_id_t after_new_block_id = new_block->ids.at(new_block->size() - 1);
|
||||
last_id_t after_new_block_id = new_block->ids.last();
|
||||
id_block_map.emplace(after_new_block_id, new_block);
|
||||
|
||||
new_block->next = upsert_block->next;
|
||||
@ -485,6 +488,17 @@ void posting_list_t::erase(const uint32_t id) {
|
||||
// since we will be deleting the empty node, set the previous node's next pointer to null
|
||||
std::prev(it)->second->next = nullptr;
|
||||
delete erase_block;
|
||||
} else {
|
||||
// The root block cannot be empty if there are other blocks so we will pull some contents from next block
|
||||
// This is only an issue for blocks with max size of 2
|
||||
if(root_block.next != nullptr) {
|
||||
auto next_block_last_id = erase_block->next->ids.last();
|
||||
merge_adjacent_blocks(erase_block, erase_block->next, erase_block->next->size()/2);
|
||||
id_block_map.erase(next_block_last_id);
|
||||
|
||||
id_block_map.emplace(erase_block->next->ids.last(), erase_block->next);
|
||||
id_block_map.emplace(erase_block->ids.last(), erase_block);
|
||||
}
|
||||
}
|
||||
|
||||
id_block_map.erase(before_last_id);
|
||||
@ -493,7 +507,7 @@ void posting_list_t::erase(const uint32_t id) {
|
||||
}
|
||||
|
||||
if(new_ids_length >= BLOCK_MAX_ELEMENTS/2 || erase_block->next == nullptr) {
|
||||
last_id_t after_last_id = erase_block->ids.at(new_ids_length-1);
|
||||
last_id_t after_last_id = erase_block->ids.last();
|
||||
if(before_last_id != after_last_id) {
|
||||
id_block_map.erase(before_last_id);
|
||||
id_block_map.emplace(after_last_id, erase_block);
|
||||
@ -505,7 +519,7 @@ void posting_list_t::erase(const uint32_t id) {
|
||||
// block is less than 50% of max capacity and contains a next node which we can refill from
|
||||
|
||||
auto next_block = erase_block->next;
|
||||
last_id_t next_block_last_id = next_block->ids.at(next_block->ids.getLength()-1);
|
||||
last_id_t next_block_last_id = next_block->ids.last();
|
||||
|
||||
if(erase_block->size() + next_block->size() <= BLOCK_MAX_ELEMENTS) {
|
||||
// we can merge the contents of next block with `erase_block` and delete the next block
|
||||
@ -515,13 +529,15 @@ void posting_list_t::erase(const uint32_t id) {
|
||||
|
||||
id_block_map.erase(next_block_last_id);
|
||||
} else {
|
||||
// only part of the next block can be moved over
|
||||
size_t num_block2_ids = BLOCK_MAX_ELEMENTS - erase_block->size();
|
||||
// Only part of the next block can be moved over.
|
||||
// We will move only 50% of max elements to ensure that we don't end up "flipping" adjacent blocks:
|
||||
// 1, 5 -> 5, 1
|
||||
size_t num_block2_ids = BLOCK_MAX_ELEMENTS/2;
|
||||
merge_adjacent_blocks(erase_block, next_block, num_block2_ids);
|
||||
// NOTE: we don't have to update `id_block_map` for `next_block` as last element doesn't change
|
||||
}
|
||||
|
||||
last_id_t after_last_id = erase_block->ids.at(erase_block->ids.getLength()-1);
|
||||
last_id_t after_last_id = erase_block->ids.last();
|
||||
if(before_last_id != after_last_id) {
|
||||
id_block_map.erase(before_last_id);
|
||||
id_block_map.emplace(after_last_id, erase_block);
|
||||
@ -666,37 +682,18 @@ void posting_list_t::intersect(const std::vector<posting_list_t*>& posting_lists
|
||||
bool posting_list_t::take_id(result_iter_state_t& istate, uint32_t id) {
|
||||
// decide if this result id should be excluded
|
||||
if(istate.excluded_result_ids_size != 0) {
|
||||
while(istate.excluded_result_ids_index < istate.excluded_result_ids_size &&
|
||||
istate.excluded_result_ids[istate.excluded_result_ids_index] < id) {
|
||||
istate.excluded_result_ids_index++;
|
||||
}
|
||||
|
||||
if(istate.excluded_result_ids_index < istate.excluded_result_ids_size &&
|
||||
id == istate.excluded_result_ids[istate.excluded_result_ids_index]) {
|
||||
istate.excluded_result_ids_index++;
|
||||
if (std::binary_search(istate.excluded_result_ids,
|
||||
istate.excluded_result_ids + istate.excluded_result_ids_size, id)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool id_found_in_filter = true;
|
||||
|
||||
// decide if this result be matched with filter results
|
||||
if(istate.filter_ids_length != 0) {
|
||||
id_found_in_filter = false;
|
||||
|
||||
// e.g. [1, 3] vs [2, 3]
|
||||
|
||||
while(istate.filter_ids_index < istate.filter_ids_length && istate.filter_ids[istate.filter_ids_index] < id) {
|
||||
istate.filter_ids_index++;
|
||||
}
|
||||
|
||||
if(istate.filter_ids_index < istate.filter_ids_length && istate.filter_ids[istate.filter_ids_index] == id) {
|
||||
istate.filter_ids_index++;
|
||||
id_found_in_filter = true;
|
||||
}
|
||||
return std::binary_search(istate.filter_ids, istate.filter_ids + istate.filter_ids_length, id);
|
||||
}
|
||||
|
||||
return id_found_in_filter;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool posting_list_t::get_offsets(const std::vector<iterator_t>& its,
|
||||
|
@ -96,7 +96,7 @@ bool Tokenizer::next(std::string &token, size_t& token_index, size_t& start_inde
|
||||
LOG(ERROR) << "Unicode error during parsing: " << errcode;
|
||||
}
|
||||
} else {
|
||||
token = unicode_text.tempSubString(prev_position, length).toUTF8String(word);
|
||||
token = unicode_text.toLower().tempSubString(prev_position, length).toUTF8String(word);
|
||||
}
|
||||
|
||||
if(!token.empty()) {
|
||||
|
@ -154,8 +154,8 @@ TEST_F(CollectionGroupingTest, GroupingCompoundKey) {
|
||||
|
||||
ASSERT_EQ(10, res["found"].get<size_t>());
|
||||
ASSERT_EQ(10, res["grouped_hits"].size());
|
||||
ASSERT_EQ(11, res["grouped_hits"][0]["group_key"][0].get<size_t>());
|
||||
|
||||
ASSERT_EQ(11, res["grouped_hits"][0]["group_key"][0].get<size_t>());
|
||||
ASSERT_STREQ("Beta", res["grouped_hits"][0]["group_key"][1].get<std::string>().c_str());
|
||||
|
||||
// optional field should have no value in the group key component
|
||||
@ -428,4 +428,108 @@ TEST_F(CollectionGroupingTest, GroupingWithArrayFieldAndOverride) {
|
||||
|
||||
ASSERT_EQ(1, (int) res["facet_counts"][0]["counts"][3]["count"]);
|
||||
ASSERT_STREQ("Zeta", res["facet_counts"][0]["counts"][3]["value"].get<std::string>().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CollectionGroupingTest, GroupOrderIndependence) {
|
||||
Collection *coll1;
|
||||
|
||||
std::vector<field> fields = {field("group", field_types::STRING, true),
|
||||
field("points", field_types::INT32, false),};
|
||||
|
||||
coll1 = collectionManager.get_collection("coll1").get();
|
||||
if(coll1 == nullptr) {
|
||||
coll1 = collectionManager.create_collection("coll1", 1, fields, "points").get();
|
||||
}
|
||||
|
||||
nlohmann::json doc;
|
||||
|
||||
for(size_t i = 0; i < 256; i++) {
|
||||
int64_t points = 100 + i;
|
||||
doc["id"] = std::to_string(i);
|
||||
doc["group"] = std::to_string(i);
|
||||
doc["points"] = points;
|
||||
|
||||
ASSERT_TRUE(coll1->add(doc.dump()).ok());
|
||||
}
|
||||
|
||||
// doc id "255" will have points of 255
|
||||
// try to insert doc id "256" with group "256" but having lesser points than all records
|
||||
|
||||
doc["id"] = "256";
|
||||
doc["group"] = "256";
|
||||
doc["points"] = 50;
|
||||
ASSERT_TRUE(coll1->add(doc.dump()).ok());
|
||||
|
||||
// insert doc id "257" of same group "256" with greatest point
|
||||
|
||||
doc["id"] = "257";
|
||||
doc["group"] = "256";
|
||||
doc["points"] = 500;
|
||||
ASSERT_TRUE(coll1->add(doc.dump()).ok());
|
||||
|
||||
// when we search by grouping records, sorting descending on points, both records of group "256" should show up
|
||||
|
||||
std::vector<sort_by> sort_fields = {sort_by("points", "DESC")};
|
||||
|
||||
auto res = coll1->search("*", {}, "", {}, sort_fields, {0}, 10, 1, FREQUENCY,
|
||||
{false}, Index::DROP_TOKENS_THRESHOLD,
|
||||
spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 5,
|
||||
"", 10,
|
||||
{}, {}, {"group"}, 10).get();
|
||||
|
||||
ASSERT_EQ(1, res["grouped_hits"][0]["group_key"].size());
|
||||
ASSERT_STREQ("256", res["grouped_hits"][0]["group_key"][0].get<std::string>().c_str());
|
||||
ASSERT_EQ(2, res["grouped_hits"][0]["hits"].size());
|
||||
}
|
||||
|
||||
TEST_F(CollectionGroupingTest, UseHighestValueInGroupForOrdering) {
|
||||
Collection *coll1;
|
||||
|
||||
std::vector<field> fields = {field("group", field_types::STRING, true),
|
||||
field("points", field_types::INT32, false),};
|
||||
|
||||
coll1 = collectionManager.get_collection("coll1").get();
|
||||
if(coll1 == nullptr) {
|
||||
coll1 = collectionManager.create_collection("coll1", 1, fields, "points").get();
|
||||
}
|
||||
|
||||
nlohmann::json doc;
|
||||
|
||||
for(size_t i = 0; i < 250; i++) {
|
||||
int64_t points = 100 + i;
|
||||
doc["id"] = std::to_string(i);
|
||||
doc["group"] = std::to_string(i);
|
||||
doc["points"] = points;
|
||||
|
||||
ASSERT_TRUE(coll1->add(doc.dump()).ok());
|
||||
}
|
||||
|
||||
// points: 100 -> 349
|
||||
|
||||
// group with highest point is "249" with 349 points
|
||||
// insert another document for that group with 50 points
|
||||
doc["id"] = "250";
|
||||
doc["group"] = "249";
|
||||
doc["points"] = 50;
|
||||
ASSERT_TRUE(coll1->add(doc.dump()).ok());
|
||||
|
||||
// now insert another new group whose points is greater than 50
|
||||
doc["id"] = "251";
|
||||
doc["group"] = "1000";
|
||||
doc["points"] = 60;
|
||||
ASSERT_TRUE(coll1->add(doc.dump()).ok());
|
||||
|
||||
std::vector<sort_by> sort_fields = {sort_by("points", "DESC")};
|
||||
|
||||
auto res = coll1->search("*", {}, "", {}, sort_fields, {0}, 10, 1, FREQUENCY,
|
||||
{false}, Index::DROP_TOKENS_THRESHOLD,
|
||||
spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 5,
|
||||
"", 10,
|
||||
{}, {}, {"group"}, 10).get();
|
||||
|
||||
ASSERT_EQ(1, res["grouped_hits"][0]["group_key"].size());
|
||||
ASSERT_STREQ("249", res["grouped_hits"][0]["group_key"][0].get<std::string>().c_str());
|
||||
ASSERT_EQ(2, res["grouped_hits"][0]["hits"].size());
|
||||
}
|
||||
|
@ -1452,7 +1452,7 @@ TEST_F(CollectionSpecificTest, ImportDocumentWithRepeatingIDInTheSameBatch) {
|
||||
|
||||
ASSERT_TRUE(nlohmann::json::parse(import_records[0])["success"].get<bool>());
|
||||
ASSERT_FALSE(nlohmann::json::parse(import_records[1])["success"].get<bool>());
|
||||
ASSERT_EQ("Document with `id` 0 already exists in the batch.",
|
||||
ASSERT_EQ("A document with id 0 already exists.",
|
||||
nlohmann::json::parse(import_records[1])["error"].get<std::string>());
|
||||
|
||||
auto results = coll1->search("levis", {"name"},
|
||||
@ -1485,7 +1485,7 @@ TEST_F(CollectionSpecificTest, ImportDocumentWithRepeatingIDInTheSameBatch) {
|
||||
ASSERT_TRUE(import_response["success"].get<bool>());
|
||||
ASSERT_EQ(2, import_response["num_imported"].get<int>());
|
||||
|
||||
// repeated ID is rejected even if the first ID is not indexed due to some error
|
||||
// repeated ID is NOT rejected if the first ID is not indexed due to some error
|
||||
import_records.clear();
|
||||
doc1.erase("name");
|
||||
doc1["id"] = "100";
|
||||
@ -1497,12 +1497,199 @@ TEST_F(CollectionSpecificTest, ImportDocumentWithRepeatingIDInTheSameBatch) {
|
||||
import_response = coll1->add_many(import_records, document);
|
||||
|
||||
ASSERT_FALSE(import_response["success"].get<bool>());
|
||||
ASSERT_EQ(0, import_response["num_imported"].get<int>());
|
||||
ASSERT_EQ(1, import_response["num_imported"].get<int>());
|
||||
|
||||
ASSERT_FALSE(nlohmann::json::parse(import_records[0])["success"].get<bool>());
|
||||
ASSERT_FALSE(nlohmann::json::parse(import_records[1])["success"].get<bool>());
|
||||
ASSERT_EQ("Document with `id` 100 already exists in the batch.",
|
||||
nlohmann::json::parse(import_records[1])["error"].get<std::string>());
|
||||
ASSERT_EQ("Field `name` has been declared in the schema, but is not found in the document.",
|
||||
nlohmann::json::parse(import_records[0])["error"].get<std::string>());
|
||||
|
||||
ASSERT_TRUE(nlohmann::json::parse(import_records[1])["success"].get<bool>());
|
||||
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
|
||||
|
||||
TEST_F(CollectionSpecificTest, UpdateOfTwoDocsWithSameIdWithinSameBatch) {
|
||||
std::vector<field> fields = {field("last_chance", field_types::BOOL, false, true),
|
||||
field("points", field_types::INT32, false),};
|
||||
|
||||
Collection* coll1 = collectionManager.create_collection("coll1", 1, fields, "points").get();
|
||||
|
||||
nlohmann::json doc1;
|
||||
doc1["id"] = "0";
|
||||
doc1["points"] = 100;
|
||||
|
||||
ASSERT_TRUE(coll1->add(doc1.dump()).ok());
|
||||
|
||||
// second update should reflect the result of first update
|
||||
std::vector<std::string> updates = {
|
||||
R"({"id": "0", "last_chance": false})",
|
||||
R"({"id": "0", "points": 200})",
|
||||
};
|
||||
|
||||
nlohmann::json update_doc;
|
||||
auto import_response = coll1->add_many(updates, update_doc, UPDATE);
|
||||
ASSERT_TRUE(import_response["success"].get<bool>());
|
||||
ASSERT_EQ(2, import_response["num_imported"].get<int>());
|
||||
|
||||
auto results = coll1->search("*", {},
|
||||
"", {}, {}, {0}, 10,
|
||||
1, FREQUENCY, {true},
|
||||
10, spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
|
||||
"<mark>", "</mark>", {}, 1000, true).get();
|
||||
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
|
||||
TEST_F(CollectionSpecificTest, CyrillicText) {
|
||||
// when the first document containing a token already cannot fit compact posting list
|
||||
|
||||
std::vector<field> fields = {field("title", field_types::STRING, false, false, true, "sr"),};
|
||||
|
||||
Collection* coll1 = collectionManager.create_collection("coll1", 1, fields).get();
|
||||
|
||||
nlohmann::json doc;
|
||||
doc["title"] = "Test Тест";
|
||||
ASSERT_TRUE(coll1->add(doc.dump()).ok());
|
||||
|
||||
doc["title"] = "TEST ТЕСТ";
|
||||
ASSERT_TRUE(coll1->add(doc.dump()).ok());
|
||||
|
||||
auto results = coll1->search("тест", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
|
||||
|
||||
ASSERT_EQ(2, results["hits"].size());
|
||||
ASSERT_EQ("1", results["hits"][0]["document"]["id"].get<std::string>());
|
||||
ASSERT_EQ("0", results["hits"][1]["document"]["id"].get<std::string>());
|
||||
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
|
||||
TEST_F(CollectionSpecificTest, UpsertOfTwoDocsWithSameIdWithinSameBatch) {
|
||||
std::vector<field> fields = {field("last_chance", field_types::BOOL, false, true),
|
||||
field("points", field_types::INT32, false, true),};
|
||||
|
||||
Collection* coll1 = collectionManager.create_collection("coll1", 1, fields, "").get();
|
||||
|
||||
nlohmann::json doc1;
|
||||
doc1["id"] = "0";
|
||||
doc1["points"] = 100;
|
||||
|
||||
ASSERT_TRUE(coll1->add(doc1.dump()).ok());
|
||||
|
||||
// first upsert removes both fields, so second upsert should only insert "points"
|
||||
std::vector<std::string> upserts = {
|
||||
R"({"id": "0", "last_chance": true})",
|
||||
R"({"id": "0", "points": 200})",
|
||||
};
|
||||
|
||||
nlohmann::json update_doc;
|
||||
auto import_response = coll1->add_many(upserts, update_doc, UPSERT);
|
||||
ASSERT_TRUE(import_response["success"].get<bool>());
|
||||
ASSERT_EQ(2, import_response["num_imported"].get<int>());
|
||||
|
||||
auto results = coll1->search("*", {},
|
||||
"", {}, {}, {0}, 10,
|
||||
1, FREQUENCY, {true},
|
||||
10, spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
|
||||
"<mark>", "</mark>", {}, 1000, true).get();
|
||||
|
||||
ASSERT_EQ(1, results["hits"].size());
|
||||
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
|
||||
ASSERT_TRUE(results["hits"][0]["document"].contains("points"));
|
||||
ASSERT_FALSE(results["hits"][0]["document"].contains("last_chance"));
|
||||
ASSERT_EQ(200, results["hits"][0]["document"]["points"].get<int32_t>());
|
||||
|
||||
ASSERT_EQ(1, coll1->_get_index()->_get_numerical_index().at("points")->size());
|
||||
ASSERT_EQ(0, coll1->_get_index()->_get_numerical_index().at("last_chance")->size());
|
||||
|
||||
// update without doc id
|
||||
|
||||
upserts = {
|
||||
R"({"last_chance": true})",
|
||||
};
|
||||
|
||||
import_response = coll1->add_many(upserts, update_doc, UPDATE);
|
||||
ASSERT_FALSE(import_response["success"].get<bool>());
|
||||
ASSERT_EQ(0, import_response["num_imported"].get<int>());
|
||||
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
|
||||
TEST_F(CollectionSpecificTest, UpdateUpsertOfDocWithMissingFields) {
|
||||
std::vector<field> fields = {field("last_chance", field_types::BOOL, false, true),
|
||||
field("points", field_types::INT32, false, true),};
|
||||
|
||||
Collection* coll1 = collectionManager.create_collection("coll1", 1, fields, "").get();
|
||||
|
||||
nlohmann::json doc1;
|
||||
doc1["id"] = "0";
|
||||
doc1["last_chance"] = true;
|
||||
doc1["points"] = 100;
|
||||
|
||||
ASSERT_TRUE(coll1->add(doc1.dump()).ok());
|
||||
|
||||
// upsert doc with missing fields: should be removed from index
|
||||
std::vector<std::string> upserts = {
|
||||
R"({"id": "0"})"
|
||||
};
|
||||
|
||||
nlohmann::json update_doc;
|
||||
auto import_response = coll1->add_many(upserts, update_doc, UPSERT);
|
||||
ASSERT_TRUE(import_response["success"].get<bool>());
|
||||
ASSERT_EQ(1, import_response["num_imported"].get<int>());
|
||||
|
||||
auto results = coll1->search("*", {},
|
||||
"", {}, {}, {0}, 10,
|
||||
1, FREQUENCY, {true},
|
||||
10, spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
|
||||
"<mark>", "</mark>", {}, 1000, true).get();
|
||||
|
||||
ASSERT_EQ(1, results["hits"].size());
|
||||
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
|
||||
ASSERT_EQ(1, results["hits"][0]["document"].size());
|
||||
|
||||
ASSERT_EQ(0, coll1->_get_index()->_get_numerical_index().at("points")->size());
|
||||
ASSERT_EQ(0, coll1->_get_index()->_get_numerical_index().at("last_chance")->size());
|
||||
|
||||
// put the original doc back
|
||||
ASSERT_TRUE(coll1->add(doc1.dump(), UPSERT).ok());
|
||||
|
||||
results = coll1->search("*", {},
|
||||
"", {}, {}, {0}, 10,
|
||||
1, FREQUENCY, {true},
|
||||
10, spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
|
||||
"<mark>", "</mark>", {}, 1000, true).get();
|
||||
|
||||
ASSERT_EQ(1, results["hits"].size());
|
||||
ASSERT_EQ(3, results["hits"][0]["document"].size());
|
||||
|
||||
// update doc with missing fields: existing fields should NOT be removed
|
||||
|
||||
upserts = {
|
||||
R"({"id": "0"})"
|
||||
};
|
||||
|
||||
import_response = coll1->add_many(upserts, update_doc, UPDATE);
|
||||
ASSERT_TRUE(import_response["success"].get<bool>());
|
||||
ASSERT_EQ(1, import_response["num_imported"].get<int>());
|
||||
|
||||
results = coll1->search("*", {},
|
||||
"", {}, {}, {0}, 10,
|
||||
1, FREQUENCY, {true},
|
||||
10, spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
|
||||
"<mark>", "</mark>", {}, 1000, true).get();
|
||||
|
||||
ASSERT_EQ(1, results["hits"].size());
|
||||
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
|
||||
ASSERT_EQ(3, results["hits"][0]["document"].size());
|
||||
|
||||
ASSERT_EQ(1, coll1->_get_index()->_get_numerical_index().at("points")->size());
|
||||
ASSERT_EQ(1, coll1->_get_index()->_get_numerical_index().at("last_chance")->size());
|
||||
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
@ -1603,9 +1790,10 @@ TEST_F(CollectionSpecificTest, VerbatimMatchShouldConsiderTokensMatchedAcrossAll
|
||||
ASSERT_EQ("3", results["hits"][0]["document"]["id"].get<std::string>());
|
||||
ASSERT_EQ("2", results["hits"][1]["document"]["id"].get<std::string>());
|
||||
|
||||
ASSERT_EQ(2, results["hits"].size());
|
||||
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
|
||||
TEST_F(CollectionSpecificTest, CustomNumTyposConfiguration) {
|
||||
// dropped tokens on a single field cannot be deemed as verbatim match
|
||||
|
||||
@ -1728,6 +1916,49 @@ TEST_F(CollectionSpecificTest, HighlightOnPrefixRegression) {
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
|
||||
TEST_F(CollectionSpecificTest, DroppedTokensShouldNotBeUsedForPrefixSearch) {
|
||||
std::vector<field> fields = {field("title", field_types::STRING, false),
|
||||
field("points", field_types::INT32, false),};
|
||||
|
||||
Collection* coll1 = collectionManager.create_collection("coll1", 1, fields, "points").get();
|
||||
|
||||
nlohmann::json doc1;
|
||||
doc1["id"] = "0";
|
||||
doc1["title"] = "Dog Shoemaker";
|
||||
doc1["points"] = 100;
|
||||
|
||||
nlohmann::json doc2;
|
||||
doc2["id"] = "1";
|
||||
doc2["title"] = "Shoe and Sock";
|
||||
doc2["points"] = 200;
|
||||
|
||||
ASSERT_TRUE(coll1->add(doc1.dump()).ok());
|
||||
ASSERT_TRUE(coll1->add(doc2.dump()).ok());
|
||||
|
||||
auto results = coll1->search("shoe cat", {"title"},
|
||||
"", {}, {}, {2}, 10,
|
||||
1, FREQUENCY, {true},
|
||||
10, spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
|
||||
"<mark>", "</mark>", {}, 1000, true).get();
|
||||
|
||||
ASSERT_EQ(1, results["hits"].size());
|
||||
ASSERT_EQ("1", results["hits"][0]["document"]["id"].get<std::string>());
|
||||
|
||||
results = coll1->search("cat shoe", {"title"},
|
||||
"", {}, {}, {2}, 10,
|
||||
1, FREQUENCY, {true},
|
||||
10, spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
|
||||
"<mark>", "</mark>", {}, 1000, true).get();
|
||||
|
||||
ASSERT_EQ(2, results["hits"].size());
|
||||
ASSERT_EQ("1", results["hits"][0]["document"]["id"].get<std::string>());
|
||||
ASSERT_EQ("0", results["hits"][1]["document"]["id"].get<std::string>());
|
||||
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
|
||||
TEST_F(CollectionSpecificTest, SearchShouldJoinToken) {
|
||||
// when the first document containing a token already cannot fit compact posting list
|
||||
std::vector<field> fields = {field("title", field_types::STRING, false),};
|
||||
@ -1758,11 +1989,13 @@ TEST_F(CollectionSpecificTest, SearchShouldJoinToken) {
|
||||
|
||||
// only first 5 words of the query are used for concat/split
|
||||
|
||||
results = coll1->search("nonstick pressure cooker is a greatinvention", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}, 0).get();
|
||||
results = coll1->search("nonstick pressure cooker is a greatinvention", {"title"}, "", {}, {}, {0}, 10, 1,
|
||||
FREQUENCY, {false}, 0).get();
|
||||
ASSERT_EQ(0, results["hits"].size());
|
||||
|
||||
results = coll1->search("nonstick pressure cooker is a gr eat", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}, 0).get();
|
||||
results = coll1->search("nonstick pressure cooker is a gr eat", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY,
|
||||
{false}, 0).get();
|
||||
ASSERT_EQ(0, results["hits"].size());
|
||||
|
||||
collectionManager.drop_collection("coll1");
|
||||
}
|
||||
}
|
@ -1939,7 +1939,7 @@ TEST_F(CollectionTest, DeletionOfDocumentArrayFields) {
|
||||
token_ordering::FREQUENCY, {true}, 10, spp::sparse_hash_set<std::string>(),
|
||||
spp::sparse_hash_set<std::string>(), 10).get();
|
||||
|
||||
ASSERT_EQ(1, res["found"]);
|
||||
ASSERT_EQ(1, res["found"].get<size_t>());
|
||||
|
||||
Option<std::string> rem_op = coll1->remove("100");
|
||||
|
||||
|
@ -447,13 +447,13 @@ TEST_F(PostingListTest, RemovalsOnLaterBlocks) {
|
||||
// only part of the next node contents can be moved over when we delete 8 since (1 + 5) > 5
|
||||
pl.erase(8);
|
||||
|
||||
// [0..4], [9], [10..14] => [0..4], [9,10,11,12,13], [14]
|
||||
// [0..4], [9], [10..14] => [0..4], [9,10,11], [12,13,14]
|
||||
|
||||
ASSERT_EQ(3, pl.num_blocks());
|
||||
ASSERT_EQ(11, pl.num_ids());
|
||||
ASSERT_EQ(5, pl.get_root()->next->size());
|
||||
ASSERT_EQ(1, pl.get_root()->next->next->size());
|
||||
ASSERT_EQ(13, pl.get_root()->next->ids.last());
|
||||
ASSERT_EQ(3, pl.get_root()->next->size());
|
||||
ASSERT_EQ(3, pl.get_root()->next->next->size());
|
||||
ASSERT_EQ(11, pl.get_root()->next->ids.last());
|
||||
ASSERT_EQ(14, pl.get_root()->next->next->ids.last());
|
||||
|
||||
for(size_t i = 0; i < pl.get_root()->next->offset_index.getLength(); i++) {
|
||||
@ -616,27 +616,6 @@ TEST_F(PostingListTest, SplittingOfListsSimple) {
|
||||
std::vector<std::vector<posting_list_t::iterator_t>> partial_its_vec(4);
|
||||
intersector.split_lists(4, partial_its_vec);
|
||||
|
||||
/*for(size_t i = 0; i < partial_its_vec.size(); i++) {
|
||||
auto& partial_its = partial_its_vec[i];
|
||||
|
||||
if (partial_its.empty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG(INFO) << "Vec " << i;
|
||||
|
||||
for (auto& it: partial_its) {
|
||||
while (it.valid()) {
|
||||
LOG(INFO) << it.id();
|
||||
it.next();
|
||||
}
|
||||
|
||||
LOG(INFO) << "---";
|
||||
}
|
||||
}
|
||||
|
||||
return ;*/
|
||||
|
||||
std::vector<std::vector<std::vector<uint32_t>>> split_ids = {
|
||||
{{0, 2}, {1, 3}, {2, 3}},
|
||||
{{3, 20}, {1, 3, 5, 10, 20}, {2, 3, 5, 7, 20}}
|
||||
@ -1325,6 +1304,12 @@ TEST_F(PostingListTest, CompactPostingListContainsAtleastOne) {
|
||||
ASSERT_TRUE(COMPACT_POSTING_PTR(obj)->contains_atleast_one(&target_ids3[0], target_ids3.size()));
|
||||
ASSERT_FALSE(COMPACT_POSTING_PTR(obj)->contains_atleast_one(&target_ids4[0], target_ids4.size()));
|
||||
|
||||
std::vector<uint32_t> target_ids5 = {2, 3};
|
||||
ASSERT_TRUE(COMPACT_POSTING_PTR(obj)->contains_atleast_one(&target_ids5[0], target_ids5.size()));
|
||||
|
||||
std::vector<uint32_t> target_ids6 = {0, 1, 2};
|
||||
ASSERT_FALSE(COMPACT_POSTING_PTR(obj)->contains_atleast_one(&target_ids6[0], target_ids6.size()));
|
||||
|
||||
posting_t::destroy_list(obj);
|
||||
}
|
||||
|
||||
@ -1378,6 +1363,94 @@ TEST_F(PostingListTest, BlockIntersectionOnMixedLists) {
|
||||
free(list1);
|
||||
}
|
||||
|
||||
TEST_F(PostingListTest, InsertAndEraseSequence) {
|
||||
std::vector<uint32_t> offsets = {0, 1, 3};
|
||||
posting_list_t pl(5);
|
||||
|
||||
pl.upsert(0, offsets);
|
||||
pl.upsert(2, offsets);
|
||||
pl.upsert(4, offsets);
|
||||
pl.upsert(6, offsets);
|
||||
pl.upsert(8, offsets);
|
||||
|
||||
// this will cause a split of the root block
|
||||
pl.upsert(3, offsets); // 0,2,3 | 4,6,8
|
||||
pl.erase(0); // 2,3 | 4,6,8
|
||||
pl.upsert(5, offsets); // 2,3 | 4,5,6,8
|
||||
pl.upsert(7, offsets); // 2,3 | 4,5,6,7,8
|
||||
pl.upsert(10, offsets); // 2,3 | 4,5,6,7,8 | 10
|
||||
|
||||
// this will cause adjacent block refill
|
||||
pl.erase(2); // 3,4,5,6,7 | 8 | 10
|
||||
|
||||
// deletes second block
|
||||
pl.erase(8);
|
||||
|
||||
// remove all elements
|
||||
pl.erase(3);
|
||||
pl.erase(4);
|
||||
pl.erase(5);
|
||||
pl.erase(6);
|
||||
pl.erase(7);
|
||||
pl.erase(10);
|
||||
|
||||
ASSERT_EQ(0, pl.num_ids());
|
||||
}
|
||||
|
||||
TEST_F(PostingListTest, InsertAndEraseSequenceWithBlockSizeTwo) {
|
||||
std::vector<uint32_t> offsets = {0, 1, 3};
|
||||
posting_list_t pl(2);
|
||||
|
||||
pl.upsert(2, offsets);
|
||||
pl.upsert(3, offsets);
|
||||
pl.upsert(1, offsets); // inserting 2 again here? // inserting 4 here?
|
||||
|
||||
// 1 | 2,3
|
||||
|
||||
pl.erase(1);
|
||||
|
||||
ASSERT_EQ(1, pl.get_root()->size());
|
||||
ASSERT_EQ(2, pl.num_blocks());
|
||||
|
||||
pl.erase(3);
|
||||
pl.erase(2);
|
||||
|
||||
ASSERT_EQ(0, pl.get_root()->size());
|
||||
}
|
||||
|
||||
TEST_F(PostingListTest, PostingListMustHaveAtleast1Element) {
|
||||
try {
|
||||
std::vector<uint32_t> offsets = {0, 1, 3};
|
||||
posting_list_t pl(1);
|
||||
FAIL() << "Expected std::invalid_argument";
|
||||
}
|
||||
catch(std::invalid_argument const & err) {
|
||||
EXPECT_EQ(err.what(),std::string("max_block_elements must be > 1"));
|
||||
} catch(...) {
|
||||
FAIL() << "Expected std::invalid_argument";
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(PostingListTest, DISABLED_RandInsertAndErase) {
|
||||
std::vector<uint32_t> offsets = {0, 1, 3};
|
||||
posting_list_t pl(5);
|
||||
|
||||
time_t t;
|
||||
srand((unsigned) time(&t));
|
||||
|
||||
for(size_t i = 0; i < 10000; i++) {
|
||||
LOG(INFO) << "i: " << i;
|
||||
uint32_t add_id = rand() % 15;
|
||||
pl.upsert(add_id, offsets);
|
||||
|
||||
uint32_t del_id = rand() % 15;
|
||||
LOG(INFO) << "add: " << add_id << ", erase: " << del_id;
|
||||
pl.erase(del_id);
|
||||
}
|
||||
|
||||
LOG(INFO) << "Num ids: " << pl.num_ids() << ", num bocks: " << pl.num_blocks();
|
||||
}
|
||||
|
||||
TEST_F(PostingListTest, DISABLED_Benchmark) {
|
||||
std::vector<uint32_t> offsets = {0, 1, 3};
|
||||
posting_list_t pl(4096);
|
||||
|
Loading…
x
Reference in New Issue
Block a user