Fix iterator skip performance.

This commit is contained in:
Kishore Nallan 2022-06-25 17:37:33 +05:30
parent 6c54c1f6c7
commit 4d3f6dd0de
8 changed files with 113 additions and 271 deletions

View File

@ -69,40 +69,43 @@ bool or_iterator_t::intersect(std::vector<or_iterator_t>& its, result_iter_state
auto id = its[0].id();
if(take_id(istate, id, is_excluded)) {
func(id, its);
its[0].next();
} else {
// skip iterator till next id available in filter
if(istate.filter_ids_length != 0 && !is_excluded) {
if(istate.filter_ids_index < istate.filter_ids_length) {
its[0].skip_to(istate.filter_ids[istate.filter_ids_index]);
} else {
break;
}
}
if(istate.filter_ids_length != 0 && !is_excluded) {
if(istate.filter_ids_index < istate.filter_ids_length) {
// skip iterator till next id available in filter
its[0].skip_to(istate.filter_ids[istate.filter_ids_index]);
} else {
its[0].next();
break;
}
} else {
its[0].next();
}
}
break;
case 2:
if(istate.filter_ids_length != 0) {
its[0].skip_to(istate.filter_ids[istate.filter_ids_index]);
its[1].skip_to(istate.filter_ids[istate.filter_ids_index]);
}
while(its.size() == it_size && !at_end2(its)) {
if(equals2(its)) {
auto id = its[0].id();
if(take_id(istate, id, is_excluded)) {
func(id, its);
advance_all2(its);
} else {
// skip iterator till next id available in filter
if(istate.filter_ids_length != 0 && !is_excluded) {
if(istate.filter_ids_index < istate.filter_ids_length) {
its[0].skip_to(istate.filter_ids[istate.filter_ids_index]);
its[1].skip_to(istate.filter_ids[istate.filter_ids_index]);
} else {
break;
}
}
if(istate.filter_ids_length != 0 && !is_excluded) {
if(istate.filter_ids_index < istate.filter_ids_length) {
// skip iterator till next id available in filter
its[0].skip_to(istate.filter_ids[istate.filter_ids_index]);
its[1].skip_to(istate.filter_ids[istate.filter_ids_index]);
} else {
advance_all2(its);
break;
}
} else {
advance_all2(its);
}
} else {
advance_non_largest2(its);
@ -110,26 +113,30 @@ bool or_iterator_t::intersect(std::vector<or_iterator_t>& its, result_iter_state
}
break;
default:
if(istate.filter_ids_length != 0) {
for(auto& it: its) {
it.skip_to(istate.filter_ids[istate.filter_ids_index]);
}
}
while(its.size() == it_size && !at_end(its)) {
if(equals(its)) {
//LOG(INFO) << its[0].id();
auto id = its[0].id();
if(take_id(istate, id, is_excluded)) {
func(id, its);
advance_all(its);
} else {
// skip iterator till next id available in filter
if(istate.filter_ids_length != 0 && !is_excluded) {
if(istate.filter_ids_index < istate.filter_ids_length) {
for(auto& it: its) {
it.skip_to(istate.filter_ids[istate.filter_ids_index]);
}
} else {
break;
}
if(istate.filter_ids_length != 0 && !is_excluded) {
if(istate.filter_ids_index < istate.filter_ids_length) {
// skip iterator till next id available in filter
for(auto& it: its) {
it.skip_to(istate.filter_ids[istate.filter_ids_index]);
}
} else {
advance_all2(its);
break;
}
} else {
advance_all2(its);
}
} else {
advance_non_largest(its);

View File

@ -49,14 +49,12 @@ public:
std::vector<posting_list_t*> expanded_plists;
result_iter_state_t& iter_state;
ThreadPool* thread_pool;
size_t parallelize_min_ids;
block_intersector_t(const std::vector<void*>& raw_posting_lists,
result_iter_state_t& iter_state,
ThreadPool* thread_pool,
size_t parallelize_min_ids = 1):
iter_state(iter_state), thread_pool(thread_pool),
parallelize_min_ids(parallelize_min_ids) {
iter_state(iter_state), thread_pool(thread_pool) {
to_expanded_plists(raw_posting_lists, plists, expanded_plists);
@ -75,8 +73,6 @@ public:
template<class T>
bool intersect(T func, size_t concurrency=4);
void split_lists(size_t concurrency, std::vector<std::vector<posting_list_t::iterator_t>>& partial_its_vec);
};
static void to_expanded_plists(const std::vector<void*>& raw_posting_lists, std::vector<posting_list_t*>& plists,
@ -120,85 +116,17 @@ public:
template<class T>
bool posting_t::block_intersector_t::intersect(T func, size_t concurrency) {
// Split posting lists into N chunks and intersect them in-parallel
// 1. Sort posting lists by number of blocks
// 2. Iterate on the posting list with least number of blocks on N-block windows
// 3. On each window, pick the last block's last ID and identify blocks from other lists containing that ID
// 4. Construct N groups of iterators this way (the last block must overlap on both sides of the window)
if(plists.empty()) {
return true;
}
if(plists[0]->num_ids() < parallelize_min_ids) {
std::vector<posting_list_t::iterator_t> its;
its.reserve(plists.size());
std::vector<posting_list_t::iterator_t> its;
its.reserve(plists.size());
for(const auto& posting_list: plists) {
its.push_back(posting_list->new_iterator());
}
posting_list_t::block_intersect<T>(its, iter_state, func);
return true;
for(const auto& posting_list: plists) {
its.push_back(posting_list->new_iterator());
}
std::vector<std::vector<posting_list_t::iterator_t>> partial_its_vec(concurrency);
split_lists(concurrency, partial_its_vec);
/*for(size_t i = 0; i < partial_its_vec.size(); i++) {
auto& partial_its = partial_its_vec[i];
if (partial_its.empty()) {
continue;
}
LOG(INFO) << "Vec " << i;
for (auto& it: partial_its) {
while (it.valid()) {
LOG(INFO) << it.id();
it.next();
}
LOG(INFO) << "---";
}
}*/
size_t num_processed = 0;
std::mutex m_process;
std::condition_variable cv_process;
size_t num_non_empty = 0;
for(size_t i = 0; i < partial_its_vec.size(); i++) {
auto& partial_its = partial_its_vec[i];
if(partial_its.empty()) {
continue;
}
num_non_empty++;
/*for(auto& it: partial_its) {
while(it.valid()) {
LOG(INFO) << it.id();
it.next();
}
LOG(INFO) << "---";
}*/
thread_pool->enqueue([this, i, &func, &partial_its, &num_processed, &m_process, &cv_process]() {
auto iter_state_copy = iter_state;
iter_state_copy.index = i;
posting_list_t::block_intersect<T>(partial_its, iter_state_copy, func);
std::unique_lock<std::mutex> lock(m_process);
num_processed++;
cv_process.notify_one();
});
}
std::unique_lock<std::mutex> lock_process(m_process);
cv_process.wait(lock_process, [&](){ return num_processed == num_non_empty; });
posting_list_t::block_intersect<T>(its, iter_state, func);
return true;
}

View File

@ -62,6 +62,7 @@ public:
class iterator_t {
private:
const std::map<last_id_t, block_t*>* id_block_map;
block_t* curr_block;
uint32_t curr_index;
block_t* end_block;
@ -75,18 +76,20 @@ public:
uint32_t* offset_index = nullptr;
uint32_t* offsets = nullptr;
explicit iterator_t(block_t* start, block_t* end, bool auto_destroy = true, uint32_t field_id = 0);
explicit iterator_t(const std::map<last_id_t, block_t*>* id_block_map,
block_t* start, block_t* end, bool auto_destroy = true, uint32_t field_id = 0);
~iterator_t();
iterator_t(iterator_t&& rhs) noexcept;
iterator_t& operator=(iterator_t&& rhs) noexcept;
void destroy();
void reset_cache();
[[nodiscard]] bool valid() const;
void next();
void skip_to(uint32_t id);
void set_index(uint32_t index);
[[nodiscard]] uint32_t id() const;
[[nodiscard]] uint32_t last_block_id() const;
[[nodiscard]] inline uint32_t index() const;
[[nodiscard]] inline block_t* block() const;
[[nodiscard]] uint32_t get_field_id() const;

View File

@ -128,7 +128,7 @@ void or_iterator_t::advance_smallest() {
}
if(!its[i].valid()) {
its[i].destroy();
its[i].reset_cache();
its.erase(its.cbegin() + i);
i--;
}
@ -152,7 +152,7 @@ bool or_iterator_t::skip_to(uint32_t id) {
it.skip_to(id);
if(!it.valid()) {
its[i].destroy();
its[i].reset_cache();
its.erase(its.begin() + i);
i--;
} else {
@ -238,7 +238,7 @@ const std::vector<posting_list_t::iterator_t>& or_iterator_t::get_its() const {
or_iterator_t::~or_iterator_t() noexcept {
for(auto& it: its) {
it.destroy();
it.reset_cache();
}
}

View File

@ -517,56 +517,3 @@ void posting_t::get_phrase_matches(const std::vector<void*>& raw_posting_lists,
delete expanded_plist;
}
}
void posting_t::block_intersector_t::split_lists(size_t concurrency,
std::vector<std::vector<posting_list_t::iterator_t>>& partial_its_vec) {
const size_t num_blocks = this->plists[0]->num_blocks();
const size_t window_size = (num_blocks + concurrency - 1) / concurrency; // rounds up
size_t blocks_traversed = 0;
posting_list_t::block_t* start_block = this->plists[0]->get_root();
posting_list_t::block_t* curr_block = start_block;
size_t window_index = 0;
while(curr_block != nullptr) {
blocks_traversed++;
if(blocks_traversed % window_size == 0 || blocks_traversed == num_blocks) {
// construct partial iterators and intersect within them
std::vector<posting_list_t::iterator_t>& partial_its = partial_its_vec[window_index];
for(size_t i = 0; i < this->plists.size(); i++) {
posting_list_t::block_t* p_start_block = nullptr;
posting_list_t::block_t* p_end_block = nullptr;
// [1, 2] [3, 4] [5, 6]
// [3, 5] [6]
if(i == 0) {
p_start_block = start_block;
p_end_block = curr_block->next;
} else {
auto start_block_first_id = start_block->ids.at(0);
auto end_block_last_id = curr_block->ids.last();
p_start_block = this->plists[i]->block_of(start_block_first_id);
posting_list_t::block_t* last_block = this->plists[i]->block_of(end_block_last_id);
if(p_start_block == last_block && p_start_block != nullptr) {
p_end_block = p_start_block->next;
} else {
p_end_block = last_block == nullptr ? nullptr : last_block->next;
}
}
partial_its.emplace_back(p_start_block, p_end_block);
}
start_block = curr_block->next;
window_index++;
}
curr_block = curr_block->next;
}
}

View File

@ -794,13 +794,6 @@ bool posting_list_t::get_offsets(const std::vector<iterator_t>& its,
continue;
}
/*uint32_t* offsets = curr_block->offsets.uncompress();
uint32_t start_offset = curr_block->offset_index.at(curr_index);
uint32_t end_offset = (curr_index == curr_block->size() - 1) ?
curr_block->offsets.getLength() :
curr_block->offset_index.at(curr_index + 1);*/
uint32_t* offsets = its[j].offsets;
uint32_t start_offset = its[j].offset_index[curr_index];
@ -859,8 +852,6 @@ bool posting_list_t::get_offsets(const std::vector<iterator_t>& its,
// for plain string fields
array_token_pos[0].push_back(token_positions_t{is_last_token, positions});
}
//delete [] offsets;
}
return true;
@ -941,7 +932,7 @@ bool posting_list_t::equals2(std::vector<posting_list_t::iterator_t>& its) {
posting_list_t::iterator_t posting_list_t::new_iterator(block_t* start_block, block_t* end_block, uint32_t field_id) {
start_block = (start_block == nullptr) ? &root_block : start_block;
return posting_list_t::iterator_t(start_block, end_block, true, field_id);
return posting_list_t::iterator_t(&id_block_map, start_block, end_block, true, field_id);
}
void posting_list_t::advance_all(std::vector<posting_list_t::iterator_t>& its) {
@ -1391,9 +1382,11 @@ size_t posting_list_t::get_last_offset(const posting_list_t::iterator_t& it, boo
/* iterator_t operations */
posting_list_t::iterator_t::iterator_t(posting_list_t::block_t* start, posting_list_t::block_t* end,
posting_list_t::iterator_t::iterator_t(const std::map<last_id_t, block_t*>* id_block_map,
posting_list_t::block_t* start, posting_list_t::block_t* end,
bool auto_destroy, uint32_t field_id):
curr_block(start), curr_index(0), end_block(end), auto_destroy(auto_destroy), field_id(field_id) {
id_block_map(id_block_map), curr_block(start), curr_index(0), end_block(end),
auto_destroy(auto_destroy), field_id(field_id) {
if(curr_block != end_block) {
ids = curr_block->ids.uncompress();
@ -1426,6 +1419,10 @@ void posting_list_t::iterator_t::next() {
}
}
uint32_t posting_list_t::iterator_t::last_block_id() const {
return ids[curr_block->size() - 1];
}
uint32_t posting_list_t::iterator_t::id() const {
return ids[curr_index];
}
@ -1439,52 +1436,56 @@ posting_list_t::block_t* posting_list_t::iterator_t::block() const {
}
void posting_list_t::iterator_t::skip_to(uint32_t id) {
bool skipped_block = false;
while(curr_block != end_block && curr_block->ids.last() < id) {
curr_block = curr_block->next;
delete [] ids;
delete [] offset_index;
delete [] offsets;
ids = offset_index = offsets = nullptr;
if(curr_block != end_block) {
ids = curr_block->ids.uncompress();
offset_index = curr_block->offset_index.uncompress();
offsets = curr_block->offsets.uncompress();
// first look to skip within current block
if(id <= this->last_block_id()) {
while(curr_index < curr_block->size() && this->id() < id) {
curr_index++;
}
skipped_block = true;
return ;
}
if(skipped_block) {
curr_index = 0;
// identify the block where the id could exist and skip to that
reset_cache();
const auto it = id_block_map->lower_bound(id);
if(it == id_block_map->end()) {
return;
}
while(curr_block != end_block && curr_index < curr_block->size() && this->id() < id) {
curr_block = it->second;
curr_index = 0;
ids = curr_block->ids.uncompress();
offset_index = curr_block->offset_index.uncompress();
offsets = curr_block->offsets.uncompress();
while(curr_index < curr_block->size() && this->id() < id) {
curr_index++;
}
if(curr_index == curr_block->size()) {
reset_cache();
}
}
posting_list_t::iterator_t::~iterator_t() {
if(auto_destroy) {
destroy();
reset_cache();
}
}
void posting_list_t::iterator_t::destroy() {
void posting_list_t::iterator_t::reset_cache() {
delete [] ids;
ids = nullptr;
delete [] offsets;
offsets = nullptr;
delete [] offset_index;
offset_index = nullptr;
ids = offset_index = offsets = nullptr;
curr_index = 0;
curr_block = end_block = nullptr;
}
posting_list_t::iterator_t::iterator_t(iterator_t&& rhs) noexcept {
id_block_map = rhs.id_block_map;
curr_block = rhs.curr_block;
curr_index = rhs.curr_index;
end_block = rhs.end_block;
@ -1494,6 +1495,7 @@ posting_list_t::iterator_t::iterator_t(iterator_t&& rhs) noexcept {
auto_destroy = rhs.auto_destroy;
field_id = rhs.field_id;
rhs.id_block_map = nullptr;
rhs.curr_block = nullptr;
rhs.end_block = nullptr;
rhs.ids = nullptr;
@ -1502,6 +1504,7 @@ posting_list_t::iterator_t::iterator_t(iterator_t&& rhs) noexcept {
}
posting_list_t::iterator_t& posting_list_t::iterator_t::operator=(posting_list_t::iterator_t&& rhs) noexcept {
id_block_map = rhs.id_block_map;
curr_block = rhs.curr_block;
curr_index = rhs.curr_index;
end_block = rhs.end_block;
@ -1511,6 +1514,7 @@ posting_list_t::iterator_t& posting_list_t::iterator_t::operator=(posting_list_t
auto_destroy = rhs.auto_destroy;
field_id = rhs.field_id;
rhs.id_block_map = nullptr;
rhs.curr_block = nullptr;
rhs.end_block = nullptr;
rhs.ids = nullptr;
@ -1525,7 +1529,8 @@ void posting_list_t::iterator_t::set_index(uint32_t index) {
}
posting_list_t::iterator_t posting_list_t::iterator_t::clone() const {
posting_list_t::iterator_t it(nullptr, nullptr);
posting_list_t::iterator_t it(nullptr, nullptr, nullptr);
it.id_block_map = id_block_map;
it.curr_block = curr_block;
it.curr_index = curr_index;
it.end_block = end_block;

View File

@ -58,7 +58,6 @@ TEST(OrIteratorTest, IntersectTwoListsWith3SubLists) {
std::vector<uint32_t> results;
or_iterator_t::intersect(or_its, istate, [&results](uint32_t id, std::vector<or_iterator_t>& its) {
LOG(INFO) << "id: " << id;
results.push_back(id);
});
@ -69,6 +68,14 @@ TEST(OrIteratorTest, IntersectTwoListsWith3SubLists) {
for(size_t i = 0; i < expected_results.size(); i++) {
ASSERT_EQ(expected_results[i], results[i]);
}
for(auto p: postings1) {
delete p;
}
for(auto p: postings2) {
delete p;
}
}
TEST(OrIteratorTest, IntersectTwoListsWith4SubLists) {
@ -138,4 +145,12 @@ TEST(OrIteratorTest, IntersectTwoListsWith4SubLists) {
for(size_t i = 0; i < expected_results.size(); i++) {
ASSERT_EQ(expected_results[i], results[i]);
}
for(auto p: postings1) {
delete p;
}
for(auto p: postings2) {
delete p;
}
}

View File

@ -600,69 +600,6 @@ TEST_F(PostingListTest, MergeBasics) {
}
}
TEST_F(PostingListTest, SplittingOfListsSimple) {
std::vector<uint32_t> offsets = {0, 1, 3};
// [0, 2] [3, 20]
// [1, 3], [5, 10], [20]
// [2, 3], [5, 7], [20]
posting_list_t p1(2);
p1.upsert(0, offsets);
p1.upsert(2, offsets);
p1.upsert(3, offsets);
p1.upsert(20, offsets);
posting_list_t p2(2);
p2.upsert(1, offsets);
p2.upsert(3, offsets);
p2.upsert(5, offsets);
p2.upsert(10, offsets);
p2.upsert(20, offsets);
posting_list_t p3(2);
p3.upsert(2, offsets);
p3.upsert(3, offsets);
p3.upsert(5, offsets);
p3.upsert(7, offsets);
p3.upsert(20, offsets);
std::vector<void*> raw_lists = {&p1, &p2, &p3};
std::vector<posting_list_t::iterator_t> its;
result_iter_state_t iter_state;
posting_t::block_intersector_t intersector(raw_lists, iter_state, pool);
std::vector<std::vector<posting_list_t::iterator_t>> partial_its_vec(4);
intersector.split_lists(4, partial_its_vec);
std::vector<std::vector<std::vector<uint32_t>>> split_ids = {
{{0, 2}, {1, 3}, {2, 3}},
{{3, 20}, {1, 3, 5, 10, 20}, {2, 3, 5, 7, 20}}
};
ASSERT_EQ(4, partial_its_vec.size());
ASSERT_EQ(3, partial_its_vec[0].size());
ASSERT_EQ(3, partial_its_vec[1].size());
for(size_t i = 0; i < partial_its_vec.size(); i++) {
auto& partial_its = partial_its_vec[i];
for (size_t j = 0; j < partial_its.size(); j++) {
auto& it = partial_its[j];
size_t k = 0;
while (it.valid()) {
//LOG(INFO) << it.id();
ASSERT_EQ(split_ids[i][j][k], it.id());
k++;
it.next();
}
//LOG(INFO) << "---";
}
}
}
TEST_F(PostingListTest, IntersectionBasics) {
std::vector<uint32_t> offsets = {0, 1, 3};