diff --git a/include/thread_local_vars.h b/include/thread_local_vars.h index 0b5f024d..d4dd725f 100644 --- a/include/thread_local_vars.h +++ b/include/thread_local_vars.h @@ -1,6 +1,9 @@ #include extern thread_local int64_t write_log_index; -extern thread_local std::chrono::high_resolution_clock::time_point begin; + +// These are used for circuit breaking search requests +// NOTE: if you fork off main search thread, care must be taken to initialize these from parent thread values +extern thread_local std::chrono::high_resolution_clock::time_point search_begin; extern thread_local int64_t search_stop_ms; extern thread_local bool search_cutoff; \ No newline at end of file diff --git a/src/index.cpp b/src/index.cpp index 9d2af0f4..e3c0130d 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -22,13 +22,13 @@ #include "logger.h" #define RETURN_CIRCUIT_BREAKER if(std::chrono::duration_cast(\ - std::chrono::high_resolution_clock::now() - begin).count() > search_stop_ms) { \ + std::chrono::high_resolution_clock::now() - search_begin).count() > search_stop_ms) { \ search_cutoff = true; \ return ;\ } #define BREAK_CIRCUIT_BREAKER if(std::chrono::duration_cast(\ - std::chrono::high_resolution_clock::now() - begin).count() > search_stop_ms) { \ + std::chrono::high_resolution_clock::now() - search_begin).count() > search_stop_ms) { \ search_cutoff = true; \ break;\ } @@ -1985,8 +1985,9 @@ void Index::search(std::vector& field_query_tokens, size_t min_len_1typo, size_t min_len_2typo) const { - begin = std::chrono::high_resolution_clock::now(); + search_begin = std::chrono::high_resolution_clock::now(); search_stop_ms = search_cutoff_ms; + search_cutoff = false; // process the filters @@ -2670,6 +2671,10 @@ void Index::search_wildcard(const std::vector& qtokens, const std:: size_t num_queued = 0; size_t filter_index = 0; + const auto parent_search_begin = search_begin; + const auto parent_search_stop_ms = search_stop_ms; + auto parent_search_cutoff = search_cutoff; + for(size_t thread_id = 0; thread_id < num_threads && filter_index < filter_ids_length; thread_id++) { size_t batch_res_len = window_size; @@ -2682,12 +2687,17 @@ void Index::search_wildcard(const std::vector& qtokens, const std:: topsters[thread_id] = new Topster(topster->MAX_SIZE, topster->distinct); - thread_pool->enqueue([this, thread_id, &sort_fields_std, &searched_queries, &field_id, - &group_limit, &group_by_fields, &topsters, &tgroups_processed, - &sort_order, field_values, &geopoint_indices, &token_bits, &plists, - check_for_circuit_break, - batch_result_ids, batch_res_len, - &num_processed, &m_process, &cv_process]() { + thread_pool->enqueue([this, &parent_search_begin, &parent_search_stop_ms, &parent_search_cutoff, + thread_id, &sort_fields_std, &searched_queries, &field_id, + &group_limit, &group_by_fields, &topsters, &tgroups_processed, + &sort_order, field_values, &geopoint_indices, &token_bits, &plists, + check_for_circuit_break, + batch_result_ids, batch_res_len, + &num_processed, &m_process, &cv_process]() { + + search_begin = parent_search_begin; + search_stop_ms = parent_search_stop_ms; + search_cutoff = parent_search_cutoff; for(size_t i = 0; i < batch_res_len; i++) { const uint32_t seq_id = batch_result_ids[i]; @@ -2696,14 +2706,15 @@ void Index::search_wildcard(const std::vector& qtokens, const std:: geopoint_indices, group_limit, group_by_fields, token_bits, false, false, plists); - if(check_for_circuit_break && i % (1 << 17) == 0) { - // check only once every 2^17 docs to reduce overhead + if(check_for_circuit_break && ((i + 1) % (1 << 15)) == 0) { + // check only once every 2^15 docs to reduce overhead BREAK_CIRCUIT_BREAKER } } std::unique_lock lock(m_process); num_processed++; + parent_search_cutoff = parent_search_cutoff || search_cutoff; cv_process.notify_one(); }); @@ -2713,6 +2724,8 @@ void Index::search_wildcard(const std::vector& qtokens, const std:: std::unique_lock lock_process(m_process); cv_process.wait(lock_process, [&](){ return num_processed == num_queued; }); + search_cutoff = parent_search_cutoff; + for(size_t thread_id = 0; thread_id < num_processed; thread_id++) { groups_processed.insert(tgroups_processed[thread_id].begin(), tgroups_processed[thread_id].end()); aggregate_topster(topster, topsters[thread_id]); diff --git a/src/thread_local_vars.cpp b/src/thread_local_vars.cpp index 816a898e..4cb3562a 100644 --- a/src/thread_local_vars.cpp +++ b/src/thread_local_vars.cpp @@ -2,6 +2,6 @@ #include "thread_local_vars.h" thread_local int64_t write_log_index = 0; -thread_local std::chrono::high_resolution_clock::time_point begin; +thread_local std::chrono::high_resolution_clock::time_point search_begin; thread_local int64_t search_stop_ms; thread_local bool search_cutoff = false;