Search for documents parallely.

This commit is contained in:
Kishore Nallan 2017-11-18 22:22:52 +05:30
parent 78b9ee52ec
commit eff61d2c25
4 changed files with 143 additions and 14 deletions

View File

@ -1,7 +1,8 @@
#pragma once
#include <string>
#include <vector>
#include <string>
#include <thread>
#include <art.h>
#include <index.h>
#include <number.h>
@ -24,6 +25,8 @@ private:
std::vector<Index*> indices;
std::vector<std::thread*> index_threads;
// Auto incrementing record ID used internally for indexing - not exposed to the client
uint32_t next_seq_id;

View File

@ -17,6 +17,35 @@ struct token_candidates {
std::vector<art_leaf*> candidates;
};
struct search_args {
std::string query;
std::vector<std::string> search_fields;
std::string simple_filter_query;
std::vector<facet> facets;
std::vector<sort_by> sort_fields_std;
int num_typos;
size_t per_page;
size_t page;
token_ordering token_order;
bool prefix;
std::vector<std::pair<int, Topster<100>::KV>> field_order_kvs;
size_t all_result_ids_len;
std::vector<std::vector<art_leaf*>> searched_queries;
search_args() {
}
search_args(std::string query, std::vector<std::string> search_fields, std::string simple_filter_query,
std::vector<facet> facets, std::vector<sort_by> sort_fields_std, int num_typos,
size_t per_page, size_t page, token_ordering token_order, bool prefix):
query(query), search_fields(search_fields), simple_filter_query(simple_filter_query), facets(facets),
sort_fields_std(sort_fields_std), num_typos(num_typos), per_page(per_page), page(page),
token_order(token_order), prefix(prefix), all_result_ids_len(0) {
}
};
class Index {
private:
std::string name;
@ -94,6 +123,8 @@ public:
~Index();
void run_search();
Option<size_t> search(std::string query, const std::vector<std::string> search_fields,
const std::string & simple_filter_query, std::vector<facet> & facets,
std::vector<sort_by> sort_fields_std, const int num_typos,
@ -121,5 +152,19 @@ public:
static constexpr const char* COLLECTION_NEXT_SEQ_PREFIX = "$CS";
static constexpr const char* SEQ_ID_PREFIX = "$SI";
static constexpr const char* DOC_ID_PREFIX = "$DI";
/*
* Concurrency Primitives
*/
// Used for passing control back and forth between main and worker threads
std::mutex m;
std::condition_variable cv;
bool ready; // prevents spurious wake up of the worker thread
bool processed; // prevents spurious wake up of the main thread
bool terminate; // used for interrupting the thread during tear down
search_args search_params;
};

View File

@ -27,15 +27,25 @@ Collection::Collection(const std::string name, const uint32_t collection_id, con
num_indices = 4;
for(auto i = 0; i < num_indices; i++) {
indices.push_back(new Index(name+std::to_string(i), search_schema, facet_schema, sort_schema));
Index* index = new Index(name+std::to_string(i), search_schema, facet_schema, sort_schema);
indices.push_back(index);
std::thread* thread = new std::thread(&Index::run_search, index);
index_threads.push_back(thread);
}
num_documents = 0;
}
Collection::~Collection() {
for(auto index: indices) {
delete index;
for(auto i = 0; i < indices.size(); i++) {
std::thread *t = index_threads[i];
Index* index = indices[i];
index->ready = true;
index->terminate = true;
index->cv.notify_one();
t->join();
delete t;
delete indices[i];
}
}
@ -293,15 +303,56 @@ Option<nlohmann::json> Collection::search(std::string query, const std::vector<s
// all search queries that were used for generating the results
std::vector<std::vector<art_leaf*>> searched_queries;
std::vector<std::pair<int, Topster<100>::KV>> field_order_kvs;
size_t total_found = 0;
// send data to individual index threads
for(Index* index: indices) {
size_t all_result_ids_len = 0;
index->search(query, search_fields, simple_filter_query, facets, sort_fields_std, num_typos,
per_page, page, token_order, prefix, field_order_kvs, all_result_ids_len, searched_queries);
total_found += all_result_ids_len;
index->search_params = search_args(query, search_fields, simple_filter_query, facets, sort_fields_std,
num_typos, per_page, page, token_order, prefix);
{
std::lock_guard<std::mutex> lk(index->m);
index->ready = true;
index->processed = false;
}
index->cv.notify_one();
}
for(Index* index: indices) {
// wait for the worker
{
std::unique_lock<std::mutex> lk(index->m);
index->cv.wait(lk, [index]{return index->processed;});
}
// need to remap the search query index before appending
for(auto & field_order_kv: index->search_params.field_order_kvs) {
field_order_kv.second.query_index += searched_queries.size();
field_order_kvs.push_back(field_order_kv);
}
searched_queries.insert(searched_queries.end(), index->search_params.searched_queries.begin(),
index->search_params.searched_queries.end());
for(auto fi = 0; fi < index->search_params.facets.size(); fi++) {
auto & this_facet = index->search_params.facets[fi];
auto & acc_facet = facets[fi];
for(auto & facet_kv: this_facet.result_map) {
size_t count = 0;
if(acc_facet.result_map.count(facet_kv.first) == 0) {
// not found, so set it
count = facet_kv.second;
} else {
count = acc_facet.result_map[facet_kv.first] + facet_kv.second;
}
acc_facet.result_map[facet_kv.first] = count;
}
}
total_found += index->search_params.all_result_ids_len;
}
// All fields are sorted descending

View File

@ -28,6 +28,10 @@ Index::Index(const std::string name, spp::sparse_hash_map<std::string, field> se
}
num_documents = 0;
ready = false;
processed = false;
terminate = false;
}
Index::~Index() {
@ -565,12 +569,38 @@ Option<uint32_t> Index::do_filtering(uint32_t** filter_ids_out, const std::strin
return Option<>(filter_ids_length);
}
void Index::run_search() {
while(true) {
// wait until main thread sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [this]{return ready;});
if(terminate) {
break;
}
// after the wait, we own the lock.
search(search_params.query, search_params.search_fields, search_params.simple_filter_query, search_params.facets,
search_params.sort_fields_std, search_params.num_typos, search_params.per_page, search_params.page,
search_params.token_order, search_params.prefix, search_params.field_order_kvs,
search_params.all_result_ids_len, search_params.searched_queries);
// hand control back to main thread
processed = true;
ready = false;
// manual unlocking is done before notifying, to avoid waking up the waiting thread only to block again
lk.unlock();
cv.notify_one();
}
}
Option<size_t> Index::search(std::string query, const std::vector<std::string> search_fields,
const std::string & simple_filter_query, std::vector<facet> & facets,
std::vector<sort_by> sort_fields_std, const int num_typos,
const size_t per_page, const size_t page, const token_ordering token_order,
const bool prefix, std::vector<std::pair<int, Topster<100>::KV>> & field_order_kvs,
size_t & all_result_ids_len, std::vector<std::vector<art_leaf*>> & searched_queries) {
const std::string & simple_filter_query, std::vector<facet> & facets,
std::vector<sort_by> sort_fields_std, const int num_typos,
const size_t per_page, const size_t page, const token_ordering token_order,
const bool prefix, std::vector<std::pair<int, Topster<100>::KV>> & field_order_kvs,
size_t & all_result_ids_len, std::vector<std::vector<art_leaf*>> & searched_queries) {
const size_t num_results = (page * per_page);