Merge branch 'gh-437' into single-index-concurrency

This commit is contained in:
Kishore Nallan 2021-11-23 08:53:03 +05:30
commit e96039a1e5
8 changed files with 99 additions and 47 deletions

View File

@ -510,7 +510,8 @@ private:
bool prioritize_exact_match,
bool exhaustive_search,
size_t concurrency,
std::set<uint64>& query_hashes) const;
std::set<uint64>& query_hashes,
std::vector<uint32_t>& id_buff) const;
void do_filtering(uint32_t*& filter_ids, uint32_t& filter_ids_length, const std::vector<filter>& filters,
const bool enable_short_circuit) const;

View File

@ -65,9 +65,11 @@ public:
to_expanded_plists(raw_posting_lists, plists, expanded_plists);
std::sort(this->plists.begin(), this->plists.end(), [](posting_list_t* a, posting_list_t* b) {
return a->num_blocks() < b->num_blocks();
});
if(plists.size() > 1) {
std::sort(this->plists.begin(), this->plists.end(), [](posting_list_t* a, posting_list_t* b) {
return a->num_blocks() < b->num_blocks();
});
}
}
~block_intersector_t() {

View File

@ -950,6 +950,8 @@ int art_topk_iter(const art_node *root, token_ordering token_order, size_t max_r
q.push(root);
size_t num_large_lists = 0;
while(!q.empty() && results.size() < max_results*4) {
art_node *n = (art_node *) q.top();
q.pop();
@ -974,6 +976,10 @@ int art_topk_iter(const art_node *root, token_ordering token_order, size_t max_r
results.push_back(l);
} else {
// we will push leaf only if filter matches with leaf IDs
if(!IS_COMPACT_POSTING(l->values)) {
num_large_lists++;
}
bool found_atleast_one = posting_t::contains_atleast_one(l->values, filter_ids, filter_ids_length);
if(found_atleast_one) {
results.push_back(l);
@ -1024,6 +1030,10 @@ int art_topk_iter(const art_node *root, token_ordering token_order, size_t max_r
}
}
LOG(INFO) << "leaf results.size: " << results.size()
<< ", filter_ids_length: " << filter_ids_length
<< ", num_large_lists: " << num_large_lists;
printf("OUTSIDE art_topk_iter: results size: %d\n", results.size());
return 0;
}

View File

@ -1075,7 +1075,8 @@ void Index::search_candidates(const uint8_t & field_id, bool field_is_array,
bool prioritize_exact_match,
const bool exhaustive_search,
const size_t concurrency,
std::set<uint64>& query_hashes) const {
std::set<uint64>& query_hashes,
std::vector<uint32_t>& id_buff) const {
auto product = []( long long a, token_candidates & b ) { return a*b.candidates.size(); };
long long int N = std::accumulate(token_candidates_vec.begin(), token_candidates_vec.end(), 1LL, product);
@ -1180,11 +1181,15 @@ void Index::search_candidates(const uint8_t & field_id, bool field_is_array,
for(size_t i = 0; i < concurrency; i++) {
// empty vec can happen if not all threads produce results
if (!result_id_vecs[i].empty()) {
uint32_t* new_all_result_ids = nullptr;
all_result_ids_len = ArrayUtils::or_scalar(*all_result_ids, all_result_ids_len, &result_id_vecs[i][0],
result_id_vecs[i].size(), &new_all_result_ids);
delete[] *all_result_ids;
*all_result_ids = new_all_result_ids;
if(exhaustive_search) {
id_buff.insert(id_buff.end(), result_id_vecs[i].begin(), result_id_vecs[i].end());
} else {
uint32_t* new_all_result_ids = nullptr;
all_result_ids_len = ArrayUtils::or_scalar(*all_result_ids, all_result_ids_len, &result_id_vecs[i][0],
result_id_vecs[i].size(), &new_all_result_ids);
delete[] *all_result_ids;
*all_result_ids = new_all_result_ids;
}
num_result_ids += result_id_vecs[i].size();
@ -1200,6 +1205,20 @@ void Index::search_candidates(const uint8_t & field_id, bool field_is_array,
}
}
if(id_buff.size() > 100000) {
// prevents too many ORs during exhaustive searching
std::sort(id_buff.begin(), id_buff.end());
id_buff.erase(std::unique( id_buff.begin(), id_buff.end() ), id_buff.end());
uint32_t* new_all_result_ids = nullptr;
all_result_ids_len = ArrayUtils::or_scalar(*all_result_ids, all_result_ids_len, &id_buff[0],
id_buff.size(), &new_all_result_ids);
delete[] *all_result_ids;
*all_result_ids = new_all_result_ids;
num_result_ids += id_buff.size();
id_buff.clear();
}
if(num_result_ids == 0) {
continue;
}
@ -1212,7 +1231,7 @@ void Index::search_candidates(const uint8_t & field_id, bool field_is_array,
void Index::do_filtering(uint32_t*& filter_ids, uint32_t& filter_ids_length,
const std::vector<filter>& filters,
const bool enable_short_circuit) const {
//auto begin = std::chrono::high_resolution_clock::now();
auto begin = std::chrono::high_resolution_clock::now();
for(size_t i = 0; i < filters.size(); i++) {
const filter & a_filter = filters[i];
@ -1552,10 +1571,10 @@ void Index::do_filtering(uint32_t*& filter_ids, uint32_t& filter_ids_length,
}
}
/*long long int timeMillis =
long long int timeMillis =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - begin).count();
LOG(INFO) << "Time taken for filtering: " << timeMillis << "ms";*/
LOG(INFO) << "Time taken for filtering: " << timeMillis << "ms";
}
@ -2175,6 +2194,8 @@ void Index::search(std::vector<query_tokens_t>& field_query_tokens,
}
}
auto begin0 = std::chrono::high_resolution_clock::now();
for(auto& seq_id_kvs: topster_ids) {
const uint64_t seq_id = seq_id_kvs.first;
auto& kvs = seq_id_kvs.second; // each `kv` can be from a different field
@ -2354,6 +2375,11 @@ void Index::search(std::vector<query_tokens_t>& field_query_tokens,
kvs[0]->scores[kvs[0]->match_score_index] = aggregated_score;
topster->add(kvs[0]);
}
auto timeMillis0 = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - begin0).count();
LOG(INFO) << "Time taken for multi-field aggregation: " << timeMillis0 << "ms";
}
//LOG(INFO) << "topster size: " << topster->size;
@ -2877,11 +2903,18 @@ void Index::search_field(const uint8_t & field_id,
// prefix should apply only for last token
const size_t token_len = prefix_search ? (int) token.length() : (int) token.length() + 1;
auto begin = std::chrono::high_resolution_clock::now();
// need less candidates for filtered searches since we already only pick tokens with results
art_fuzzy_search(search_index.at(field_name), (const unsigned char *) token.c_str(), token_len,
costs[token_index], costs[token_index], num_fuzzy_candidates, token_order, prefix_search,
filter_ids, filter_ids_length, leaves, unique_tokens);
auto timeMillis = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - begin).count();
LOG(INFO) << "Time taken for fuzzy search: " << timeMillis << "ms";
if(!leaves.empty()) {
token_cost_cache.emplace(token_cost_hash, leaves);
for(auto leaf: leaves) {
@ -2926,13 +2959,26 @@ void Index::search_field(const uint8_t & field_id,
}
if(!token_candidates_vec.empty()) {
std::vector<uint32_t> id_buff;
// If atleast one token is found, go ahead and search for candidates
search_candidates(field_id, the_field.is_array(), filter_ids, filter_ids_length,
exclude_token_ids, exclude_token_ids_size,
curated_ids, sort_fields, token_candidates_vec, searched_queries, topster,
groups_processed, all_result_ids, all_result_ids_len, field_num_results,
typo_tokens_threshold, group_limit, group_by_fields, query_tokens,
prioritize_exact_match, combination_limit, concurrency, query_hashes);
prioritize_exact_match, combination_limit, concurrency, query_hashes, id_buff);
if(id_buff.size() > 1) {
std::sort(id_buff.begin(), id_buff.end());
id_buff.erase(std::unique( id_buff.begin(), id_buff.end() ), id_buff.end());
}
uint32_t* new_all_result_ids = nullptr;
all_result_ids_len = ArrayUtils::or_scalar(*all_result_ids, all_result_ids_len, &id_buff[0],
id_buff.size(), &new_all_result_ids);
delete[] *all_result_ids;
*all_result_ids = new_all_result_ids;
}
resume_typo_loop:

View File

@ -221,17 +221,23 @@ bool compact_posting_list_t::contains_atleast_one(const uint32_t* target_ids, si
size_t num_existing_offsets = id_offsets[i];
size_t existing_id = id_offsets[i + num_existing_offsets + 1];
if(existing_id == target_ids[target_ids_index]) {
return true;
// Returns iterator to the first element that is >= to value or last if no such element is found.
size_t found_index = std::lower_bound(target_ids + target_ids_index,
target_ids + target_ids_size, existing_id) - target_ids;
if(found_index == target_ids_size) {
// all elements are lesser than lowest value (existing_id), so we can stop looking
return false;
} else {
if(target_ids[found_index] == existing_id) {
return true;
}
// adjust lower bound to found_index+1 whose value is >= `existing_id`
target_ids_index = found_index;
}
if(target_ids[target_ids_index] < existing_id) {
while(target_ids_index < target_ids_size && target_ids[target_ids_index] < existing_id) {
target_ids_index++;
}
} else {
i += num_existing_offsets + 2;
}
i += num_existing_offsets + 2;
}
return false;

View File

@ -666,37 +666,18 @@ void posting_list_t::intersect(const std::vector<posting_list_t*>& posting_lists
bool posting_list_t::take_id(result_iter_state_t& istate, uint32_t id) {
// decide if this result id should be excluded
if(istate.excluded_result_ids_size != 0) {
while(istate.excluded_result_ids_index < istate.excluded_result_ids_size &&
istate.excluded_result_ids[istate.excluded_result_ids_index] < id) {
istate.excluded_result_ids_index++;
}
if(istate.excluded_result_ids_index < istate.excluded_result_ids_size &&
id == istate.excluded_result_ids[istate.excluded_result_ids_index]) {
istate.excluded_result_ids_index++;
if (std::binary_search(istate.excluded_result_ids,
istate.excluded_result_ids + istate.excluded_result_ids_size, id)) {
return false;
}
}
bool id_found_in_filter = true;
// decide if this result be matched with filter results
if(istate.filter_ids_length != 0) {
id_found_in_filter = false;
// e.g. [1, 3] vs [2, 3]
while(istate.filter_ids_index < istate.filter_ids_length && istate.filter_ids[istate.filter_ids_index] < id) {
istate.filter_ids_index++;
}
if(istate.filter_ids_index < istate.filter_ids_length && istate.filter_ids[istate.filter_ids_index] == id) {
istate.filter_ids_index++;
id_found_in_filter = true;
}
return std::binary_search(istate.filter_ids, istate.filter_ids + istate.filter_ids_length, id);
}
return id_found_in_filter;
return true;
}
bool posting_list_t::get_offsets(const std::vector<iterator_t>& its,

View File

@ -1939,7 +1939,7 @@ TEST_F(CollectionTest, DeletionOfDocumentArrayFields) {
token_ordering::FREQUENCY, {true}, 10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10).get();
ASSERT_EQ(1, res["found"]);
ASSERT_EQ(1, res["found"].get<size_t>());
Option<std::string> rem_op = coll1->remove("100");

View File

@ -1325,6 +1325,12 @@ TEST_F(PostingListTest, CompactPostingListContainsAtleastOne) {
ASSERT_TRUE(COMPACT_POSTING_PTR(obj)->contains_atleast_one(&target_ids3[0], target_ids3.size()));
ASSERT_FALSE(COMPACT_POSTING_PTR(obj)->contains_atleast_one(&target_ids4[0], target_ids4.size()));
std::vector<uint32_t> target_ids5 = {2, 3};
ASSERT_TRUE(COMPACT_POSTING_PTR(obj)->contains_atleast_one(&target_ids5[0], target_ids5.size()));
std::vector<uint32_t> target_ids6 = {0, 1, 2};
ASSERT_FALSE(COMPACT_POSTING_PTR(obj)->contains_atleast_one(&target_ids6[0], target_ids6.size()));
posting_t::destroy_list(obj);
}