Fix search cutoff for wildcard filtering.

This commit is contained in:
Kishore Nallan 2021-11-13 13:55:12 +05:30
parent 13d4c14889
commit 483afa5b34
3 changed files with 29 additions and 13 deletions

View File

@ -1,6 +1,9 @@
#include <chrono>
extern thread_local int64_t write_log_index;
extern thread_local std::chrono::high_resolution_clock::time_point begin;
// These are used for circuit breaking search requests
// NOTE: if you fork off main search thread, care must be taken to initialize these from parent thread values
extern thread_local std::chrono::high_resolution_clock::time_point search_begin;
extern thread_local int64_t search_stop_ms;
extern thread_local bool search_cutoff;

View File

@ -22,13 +22,13 @@
#include "logger.h"
#define RETURN_CIRCUIT_BREAKER if(std::chrono::duration_cast<std::chrono::milliseconds>(\
std::chrono::high_resolution_clock::now() - begin).count() > search_stop_ms) { \
std::chrono::high_resolution_clock::now() - search_begin).count() > search_stop_ms) { \
search_cutoff = true; \
return ;\
}
#define BREAK_CIRCUIT_BREAKER if(std::chrono::duration_cast<std::chrono::milliseconds>(\
std::chrono::high_resolution_clock::now() - begin).count() > search_stop_ms) { \
std::chrono::high_resolution_clock::now() - search_begin).count() > search_stop_ms) { \
search_cutoff = true; \
break;\
}
@ -1985,8 +1985,9 @@ void Index::search(std::vector<query_tokens_t>& field_query_tokens,
size_t min_len_1typo,
size_t min_len_2typo) const {
begin = std::chrono::high_resolution_clock::now();
search_begin = std::chrono::high_resolution_clock::now();
search_stop_ms = search_cutoff_ms;
search_cutoff = false;
// process the filters
@ -2670,6 +2671,10 @@ void Index::search_wildcard(const std::vector<std::string>& qtokens, const std::
size_t num_queued = 0;
size_t filter_index = 0;
const auto parent_search_begin = search_begin;
const auto parent_search_stop_ms = search_stop_ms;
auto parent_search_cutoff = search_cutoff;
for(size_t thread_id = 0; thread_id < num_threads && filter_index < filter_ids_length; thread_id++) {
size_t batch_res_len = window_size;
@ -2682,12 +2687,17 @@ void Index::search_wildcard(const std::vector<std::string>& qtokens, const std::
topsters[thread_id] = new Topster(topster->MAX_SIZE, topster->distinct);
thread_pool->enqueue([this, thread_id, &sort_fields_std, &searched_queries, &field_id,
&group_limit, &group_by_fields, &topsters, &tgroups_processed,
&sort_order, field_values, &geopoint_indices, &token_bits, &plists,
check_for_circuit_break,
batch_result_ids, batch_res_len,
&num_processed, &m_process, &cv_process]() {
thread_pool->enqueue([this, &parent_search_begin, &parent_search_stop_ms, &parent_search_cutoff,
thread_id, &sort_fields_std, &searched_queries, &field_id,
&group_limit, &group_by_fields, &topsters, &tgroups_processed,
&sort_order, field_values, &geopoint_indices, &token_bits, &plists,
check_for_circuit_break,
batch_result_ids, batch_res_len,
&num_processed, &m_process, &cv_process]() {
search_begin = parent_search_begin;
search_stop_ms = parent_search_stop_ms;
search_cutoff = parent_search_cutoff;
for(size_t i = 0; i < batch_res_len; i++) {
const uint32_t seq_id = batch_result_ids[i];
@ -2696,14 +2706,15 @@ void Index::search_wildcard(const std::vector<std::string>& qtokens, const std::
geopoint_indices, group_limit, group_by_fields, token_bits,
false, false, plists);
if(check_for_circuit_break && i % (1 << 17) == 0) {
// check only once every 2^17 docs to reduce overhead
if(check_for_circuit_break && ((i + 1) % (1 << 15)) == 0) {
// check only once every 2^15 docs to reduce overhead
BREAK_CIRCUIT_BREAKER
}
}
std::unique_lock<std::mutex> lock(m_process);
num_processed++;
parent_search_cutoff = parent_search_cutoff || search_cutoff;
cv_process.notify_one();
});
@ -2713,6 +2724,8 @@ void Index::search_wildcard(const std::vector<std::string>& qtokens, const std::
std::unique_lock<std::mutex> lock_process(m_process);
cv_process.wait(lock_process, [&](){ return num_processed == num_queued; });
search_cutoff = parent_search_cutoff;
for(size_t thread_id = 0; thread_id < num_processed; thread_id++) {
groups_processed.insert(tgroups_processed[thread_id].begin(), tgroups_processed[thread_id].end());
aggregate_topster(topster, topsters[thread_id]);

View File

@ -2,6 +2,6 @@
#include "thread_local_vars.h"
thread_local int64_t write_log_index = 0;
thread_local std::chrono::high_resolution_clock::time_point begin;
thread_local std::chrono::high_resolution_clock::time_point search_begin;
thread_local int64_t search_stop_ms;
thread_local bool search_cutoff = false;