From eff61d2c25fc1513f3ff7759954d867930479c21 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 18 Nov 2017 22:22:52 +0530 Subject: [PATCH] Search for documents parallely. --- include/collection.h | 5 +++- include/index.h | 45 +++++++++++++++++++++++++++++ src/collection.cpp | 67 ++++++++++++++++++++++++++++++++++++++------ src/index.cpp | 40 ++++++++++++++++++++++---- 4 files changed, 143 insertions(+), 14 deletions(-) diff --git a/include/collection.h b/include/collection.h index b2e15a8a..9c07f7f5 100644 --- a/include/collection.h +++ b/include/collection.h @@ -1,7 +1,8 @@ #pragma once #include -#include +#include +#include #include #include #include @@ -24,6 +25,8 @@ private: std::vector indices; + std::vector index_threads; + // Auto incrementing record ID used internally for indexing - not exposed to the client uint32_t next_seq_id; diff --git a/include/index.h b/include/index.h index ebce5288..0f68130d 100644 --- a/include/index.h +++ b/include/index.h @@ -17,6 +17,35 @@ struct token_candidates { std::vector candidates; }; +struct search_args { + std::string query; + std::vector search_fields; + std::string simple_filter_query; + std::vector facets; + std::vector sort_fields_std; + int num_typos; + size_t per_page; + size_t page; + token_ordering token_order; + bool prefix; + std::vector::KV>> field_order_kvs; + size_t all_result_ids_len; + std::vector> searched_queries; + + search_args() { + + } + + search_args(std::string query, std::vector search_fields, std::string simple_filter_query, + std::vector facets, std::vector sort_fields_std, int num_typos, + size_t per_page, size_t page, token_ordering token_order, bool prefix): + query(query), search_fields(search_fields), simple_filter_query(simple_filter_query), facets(facets), + sort_fields_std(sort_fields_std), num_typos(num_typos), per_page(per_page), page(page), + token_order(token_order), prefix(prefix), all_result_ids_len(0) { + + } +}; + class Index { private: std::string name; @@ -94,6 +123,8 @@ public: ~Index(); + void run_search(); + Option search(std::string query, const std::vector search_fields, const std::string & simple_filter_query, std::vector & facets, std::vector sort_fields_std, const int num_typos, @@ -121,5 +152,19 @@ public: static constexpr const char* COLLECTION_NEXT_SEQ_PREFIX = "$CS"; static constexpr const char* SEQ_ID_PREFIX = "$SI"; static constexpr const char* DOC_ID_PREFIX = "$DI"; + + /* + * Concurrency Primitives + */ + + // Used for passing control back and forth between main and worker threads + std::mutex m; + std::condition_variable cv; + + bool ready; // prevents spurious wake up of the worker thread + bool processed; // prevents spurious wake up of the main thread + bool terminate; // used for interrupting the thread during tear down + + search_args search_params; }; diff --git a/src/collection.cpp b/src/collection.cpp index 8afdd46a..324283da 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -27,15 +27,25 @@ Collection::Collection(const std::string name, const uint32_t collection_id, con num_indices = 4; for(auto i = 0; i < num_indices; i++) { - indices.push_back(new Index(name+std::to_string(i), search_schema, facet_schema, sort_schema)); + Index* index = new Index(name+std::to_string(i), search_schema, facet_schema, sort_schema); + indices.push_back(index); + std::thread* thread = new std::thread(&Index::run_search, index); + index_threads.push_back(thread); } num_documents = 0; } Collection::~Collection() { - for(auto index: indices) { - delete index; + for(auto i = 0; i < indices.size(); i++) { + std::thread *t = index_threads[i]; + Index* index = indices[i]; + index->ready = true; + index->terminate = true; + index->cv.notify_one(); + t->join(); + delete t; + delete indices[i]; } } @@ -293,15 +303,56 @@ Option Collection::search(std::string query, const std::vector> searched_queries; - std::vector::KV>> field_order_kvs; size_t total_found = 0; + // send data to individual index threads for(Index* index: indices) { - size_t all_result_ids_len = 0; - index->search(query, search_fields, simple_filter_query, facets, sort_fields_std, num_typos, - per_page, page, token_order, prefix, field_order_kvs, all_result_ids_len, searched_queries); - total_found += all_result_ids_len; + index->search_params = search_args(query, search_fields, simple_filter_query, facets, sort_fields_std, + num_typos, per_page, page, token_order, prefix); + { + std::lock_guard lk(index->m); + index->ready = true; + index->processed = false; + } + index->cv.notify_one(); + } + + for(Index* index: indices) { + // wait for the worker + { + std::unique_lock lk(index->m); + index->cv.wait(lk, [index]{return index->processed;}); + } + + // need to remap the search query index before appending + for(auto & field_order_kv: index->search_params.field_order_kvs) { + field_order_kv.second.query_index += searched_queries.size(); + field_order_kvs.push_back(field_order_kv); + } + + searched_queries.insert(searched_queries.end(), index->search_params.searched_queries.begin(), + index->search_params.searched_queries.end()); + + for(auto fi = 0; fi < index->search_params.facets.size(); fi++) { + auto & this_facet = index->search_params.facets[fi]; + auto & acc_facet = facets[fi]; + + for(auto & facet_kv: this_facet.result_map) { + size_t count = 0; + + if(acc_facet.result_map.count(facet_kv.first) == 0) { + // not found, so set it + count = facet_kv.second; + } else { + count = acc_facet.result_map[facet_kv.first] + facet_kv.second; + } + + acc_facet.result_map[facet_kv.first] = count; + } + } + + total_found += index->search_params.all_result_ids_len; } // All fields are sorted descending diff --git a/src/index.cpp b/src/index.cpp index c84d4a45..0024e9df 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -28,6 +28,10 @@ Index::Index(const std::string name, spp::sparse_hash_map se } num_documents = 0; + + ready = false; + processed = false; + terminate = false; } Index::~Index() { @@ -565,12 +569,38 @@ Option Index::do_filtering(uint32_t** filter_ids_out, const std::strin return Option<>(filter_ids_length); } +void Index::run_search() { + while(true) { + // wait until main thread sends data + std::unique_lock lk(m); + cv.wait(lk, [this]{return ready;}); + + if(terminate) { + break; + } + + // after the wait, we own the lock. + search(search_params.query, search_params.search_fields, search_params.simple_filter_query, search_params.facets, + search_params.sort_fields_std, search_params.num_typos, search_params.per_page, search_params.page, + search_params.token_order, search_params.prefix, search_params.field_order_kvs, + search_params.all_result_ids_len, search_params.searched_queries); + + // hand control back to main thread + processed = true; + ready = false; + + // manual unlocking is done before notifying, to avoid waking up the waiting thread only to block again + lk.unlock(); + cv.notify_one(); + } +} + Option Index::search(std::string query, const std::vector search_fields, - const std::string & simple_filter_query, std::vector & facets, - std::vector sort_fields_std, const int num_typos, - const size_t per_page, const size_t page, const token_ordering token_order, - const bool prefix, std::vector::KV>> & field_order_kvs, - size_t & all_result_ids_len, std::vector> & searched_queries) { + const std::string & simple_filter_query, std::vector & facets, + std::vector sort_fields_std, const int num_typos, + const size_t per_page, const size_t page, const token_ordering token_order, + const bool prefix, std::vector::KV>> & field_order_kvs, + size_t & all_result_ids_len, std::vector> & searched_queries) { const size_t num_results = (page * per_page);