From 4d3f6dd0de24ca1e0cd910b97ba72632c48b77b6 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 25 Jun 2022 17:37:33 +0530 Subject: [PATCH] Fix iterator skip performance. --- include/or_iterator.h | 73 ++++++++++++++++++--------------- include/posting.h | 84 +++----------------------------------- include/posting_list.h | 7 +++- src/or_iterator.cpp | 6 +-- src/posting.cpp | 53 ------------------------ src/posting_list.cpp | 81 +++++++++++++++++++----------------- test/or_iterator_test.cpp | 17 +++++++- test/posting_list_test.cpp | 63 ---------------------------- 8 files changed, 113 insertions(+), 271 deletions(-) diff --git a/include/or_iterator.h b/include/or_iterator.h index 566d310a..bdabfee7 100644 --- a/include/or_iterator.h +++ b/include/or_iterator.h @@ -69,40 +69,43 @@ bool or_iterator_t::intersect(std::vector& its, result_iter_state auto id = its[0].id(); if(take_id(istate, id, is_excluded)) { func(id, its); - its[0].next(); - } else { - // skip iterator till next id available in filter - if(istate.filter_ids_length != 0 && !is_excluded) { - if(istate.filter_ids_index < istate.filter_ids_length) { - its[0].skip_to(istate.filter_ids[istate.filter_ids_index]); - } else { - break; - } + } + + if(istate.filter_ids_length != 0 && !is_excluded) { + if(istate.filter_ids_index < istate.filter_ids_length) { + // skip iterator till next id available in filter + its[0].skip_to(istate.filter_ids[istate.filter_ids_index]); } else { - its[0].next(); + break; } + } else { + its[0].next(); } } break; case 2: + if(istate.filter_ids_length != 0) { + its[0].skip_to(istate.filter_ids[istate.filter_ids_index]); + its[1].skip_to(istate.filter_ids[istate.filter_ids_index]); + } + while(its.size() == it_size && !at_end2(its)) { if(equals2(its)) { auto id = its[0].id(); if(take_id(istate, id, is_excluded)) { func(id, its); - advance_all2(its); - } else { - // skip iterator till next id available in filter - if(istate.filter_ids_length != 0 && !is_excluded) { - if(istate.filter_ids_index < istate.filter_ids_length) { - its[0].skip_to(istate.filter_ids[istate.filter_ids_index]); - its[1].skip_to(istate.filter_ids[istate.filter_ids_index]); - } else { - break; - } + } + + if(istate.filter_ids_length != 0 && !is_excluded) { + if(istate.filter_ids_index < istate.filter_ids_length) { + // skip iterator till next id available in filter + its[0].skip_to(istate.filter_ids[istate.filter_ids_index]); + its[1].skip_to(istate.filter_ids[istate.filter_ids_index]); } else { - advance_all2(its); + break; } + } else { + advance_all2(its); } } else { advance_non_largest2(its); @@ -110,26 +113,30 @@ bool or_iterator_t::intersect(std::vector& its, result_iter_state } break; default: + if(istate.filter_ids_length != 0) { + for(auto& it: its) { + it.skip_to(istate.filter_ids[istate.filter_ids_index]); + } + } + while(its.size() == it_size && !at_end(its)) { if(equals(its)) { - //LOG(INFO) << its[0].id(); auto id = its[0].id(); if(take_id(istate, id, is_excluded)) { func(id, its); - advance_all(its); - } else { - // skip iterator till next id available in filter - if(istate.filter_ids_length != 0 && !is_excluded) { - if(istate.filter_ids_index < istate.filter_ids_length) { - for(auto& it: its) { - it.skip_to(istate.filter_ids[istate.filter_ids_index]); - } - } else { - break; + } + + if(istate.filter_ids_length != 0 && !is_excluded) { + if(istate.filter_ids_index < istate.filter_ids_length) { + // skip iterator till next id available in filter + for(auto& it: its) { + it.skip_to(istate.filter_ids[istate.filter_ids_index]); } } else { - advance_all2(its); + break; } + } else { + advance_all2(its); } } else { advance_non_largest(its); diff --git a/include/posting.h b/include/posting.h index 0e7595f8..9bb68c9e 100644 --- a/include/posting.h +++ b/include/posting.h @@ -49,14 +49,12 @@ public: std::vector expanded_plists; result_iter_state_t& iter_state; ThreadPool* thread_pool; - size_t parallelize_min_ids; block_intersector_t(const std::vector& raw_posting_lists, result_iter_state_t& iter_state, ThreadPool* thread_pool, size_t parallelize_min_ids = 1): - iter_state(iter_state), thread_pool(thread_pool), - parallelize_min_ids(parallelize_min_ids) { + iter_state(iter_state), thread_pool(thread_pool) { to_expanded_plists(raw_posting_lists, plists, expanded_plists); @@ -75,8 +73,6 @@ public: template bool intersect(T func, size_t concurrency=4); - - void split_lists(size_t concurrency, std::vector>& partial_its_vec); }; static void to_expanded_plists(const std::vector& raw_posting_lists, std::vector& plists, @@ -120,85 +116,17 @@ public: template bool posting_t::block_intersector_t::intersect(T func, size_t concurrency) { - // Split posting lists into N chunks and intersect them in-parallel - // 1. Sort posting lists by number of blocks - // 2. Iterate on the posting list with least number of blocks on N-block windows - // 3. On each window, pick the last block's last ID and identify blocks from other lists containing that ID - // 4. Construct N groups of iterators this way (the last block must overlap on both sides of the window) - if(plists.empty()) { return true; } - if(plists[0]->num_ids() < parallelize_min_ids) { - std::vector its; - its.reserve(plists.size()); + std::vector its; + its.reserve(plists.size()); - for(const auto& posting_list: plists) { - its.push_back(posting_list->new_iterator()); - } - - posting_list_t::block_intersect(its, iter_state, func); - return true; + for(const auto& posting_list: plists) { + its.push_back(posting_list->new_iterator()); } - std::vector> partial_its_vec(concurrency); - split_lists(concurrency, partial_its_vec); - - /*for(size_t i = 0; i < partial_its_vec.size(); i++) { - auto& partial_its = partial_its_vec[i]; - - if (partial_its.empty()) { - continue; - } - - LOG(INFO) << "Vec " << i; - - for (auto& it: partial_its) { - while (it.valid()) { - LOG(INFO) << it.id(); - it.next(); - } - - LOG(INFO) << "---"; - } - }*/ - - size_t num_processed = 0; - std::mutex m_process; - std::condition_variable cv_process; - size_t num_non_empty = 0; - - for(size_t i = 0; i < partial_its_vec.size(); i++) { - auto& partial_its = partial_its_vec[i]; - - if(partial_its.empty()) { - continue; - } - - num_non_empty++; - - /*for(auto& it: partial_its) { - while(it.valid()) { - LOG(INFO) << it.id(); - it.next(); - } - - LOG(INFO) << "---"; - }*/ - - thread_pool->enqueue([this, i, &func, &partial_its, &num_processed, &m_process, &cv_process]() { - auto iter_state_copy = iter_state; - iter_state_copy.index = i; - posting_list_t::block_intersect(partial_its, iter_state_copy, func); - std::unique_lock lock(m_process); - num_processed++; - cv_process.notify_one(); - }); - } - - std::unique_lock lock_process(m_process); - cv_process.wait(lock_process, [&](){ return num_processed == num_non_empty; }); - + posting_list_t::block_intersect(its, iter_state, func); return true; } diff --git a/include/posting_list.h b/include/posting_list.h index 7500fb5c..d044333e 100644 --- a/include/posting_list.h +++ b/include/posting_list.h @@ -62,6 +62,7 @@ public: class iterator_t { private: + const std::map* id_block_map; block_t* curr_block; uint32_t curr_index; block_t* end_block; @@ -75,18 +76,20 @@ public: uint32_t* offset_index = nullptr; uint32_t* offsets = nullptr; - explicit iterator_t(block_t* start, block_t* end, bool auto_destroy = true, uint32_t field_id = 0); + explicit iterator_t(const std::map* id_block_map, + block_t* start, block_t* end, bool auto_destroy = true, uint32_t field_id = 0); ~iterator_t(); iterator_t(iterator_t&& rhs) noexcept; iterator_t& operator=(iterator_t&& rhs) noexcept; - void destroy(); + void reset_cache(); [[nodiscard]] bool valid() const; void next(); void skip_to(uint32_t id); void set_index(uint32_t index); [[nodiscard]] uint32_t id() const; + [[nodiscard]] uint32_t last_block_id() const; [[nodiscard]] inline uint32_t index() const; [[nodiscard]] inline block_t* block() const; [[nodiscard]] uint32_t get_field_id() const; diff --git a/src/or_iterator.cpp b/src/or_iterator.cpp index e22ab63f..dbfe9e31 100644 --- a/src/or_iterator.cpp +++ b/src/or_iterator.cpp @@ -128,7 +128,7 @@ void or_iterator_t::advance_smallest() { } if(!its[i].valid()) { - its[i].destroy(); + its[i].reset_cache(); its.erase(its.cbegin() + i); i--; } @@ -152,7 +152,7 @@ bool or_iterator_t::skip_to(uint32_t id) { it.skip_to(id); if(!it.valid()) { - its[i].destroy(); + its[i].reset_cache(); its.erase(its.begin() + i); i--; } else { @@ -238,7 +238,7 @@ const std::vector& or_iterator_t::get_its() const { or_iterator_t::~or_iterator_t() noexcept { for(auto& it: its) { - it.destroy(); + it.reset_cache(); } } diff --git a/src/posting.cpp b/src/posting.cpp index e6be0a9e..8b72f078 100644 --- a/src/posting.cpp +++ b/src/posting.cpp @@ -517,56 +517,3 @@ void posting_t::get_phrase_matches(const std::vector& raw_posting_lists, delete expanded_plist; } } - -void posting_t::block_intersector_t::split_lists(size_t concurrency, - std::vector>& partial_its_vec) { - const size_t num_blocks = this->plists[0]->num_blocks(); - const size_t window_size = (num_blocks + concurrency - 1) / concurrency; // rounds up - - size_t blocks_traversed = 0; - posting_list_t::block_t* start_block = this->plists[0]->get_root(); - posting_list_t::block_t* curr_block = start_block; - - size_t window_index = 0; - - while(curr_block != nullptr) { - blocks_traversed++; - if(blocks_traversed % window_size == 0 || blocks_traversed == num_blocks) { - // construct partial iterators and intersect within them - - std::vector& partial_its = partial_its_vec[window_index]; - - for(size_t i = 0; i < this->plists.size(); i++) { - posting_list_t::block_t* p_start_block = nullptr; - posting_list_t::block_t* p_end_block = nullptr; - - // [1, 2] [3, 4] [5, 6] - // [3, 5] [6] - - if(i == 0) { - p_start_block = start_block; - p_end_block = curr_block->next; - } else { - auto start_block_first_id = start_block->ids.at(0); - auto end_block_last_id = curr_block->ids.last(); - - p_start_block = this->plists[i]->block_of(start_block_first_id); - posting_list_t::block_t* last_block = this->plists[i]->block_of(end_block_last_id); - - if(p_start_block == last_block && p_start_block != nullptr) { - p_end_block = p_start_block->next; - } else { - p_end_block = last_block == nullptr ? nullptr : last_block->next; - } - } - - partial_its.emplace_back(p_start_block, p_end_block); - } - - start_block = curr_block->next; - window_index++; - } - - curr_block = curr_block->next; - } -} diff --git a/src/posting_list.cpp b/src/posting_list.cpp index f3966cb5..2de0fdde 100644 --- a/src/posting_list.cpp +++ b/src/posting_list.cpp @@ -794,13 +794,6 @@ bool posting_list_t::get_offsets(const std::vector& its, continue; } - /*uint32_t* offsets = curr_block->offsets.uncompress(); - - uint32_t start_offset = curr_block->offset_index.at(curr_index); - uint32_t end_offset = (curr_index == curr_block->size() - 1) ? - curr_block->offsets.getLength() : - curr_block->offset_index.at(curr_index + 1);*/ - uint32_t* offsets = its[j].offsets; uint32_t start_offset = its[j].offset_index[curr_index]; @@ -859,8 +852,6 @@ bool posting_list_t::get_offsets(const std::vector& its, // for plain string fields array_token_pos[0].push_back(token_positions_t{is_last_token, positions}); } - - //delete [] offsets; } return true; @@ -941,7 +932,7 @@ bool posting_list_t::equals2(std::vector& its) { posting_list_t::iterator_t posting_list_t::new_iterator(block_t* start_block, block_t* end_block, uint32_t field_id) { start_block = (start_block == nullptr) ? &root_block : start_block; - return posting_list_t::iterator_t(start_block, end_block, true, field_id); + return posting_list_t::iterator_t(&id_block_map, start_block, end_block, true, field_id); } void posting_list_t::advance_all(std::vector& its) { @@ -1391,9 +1382,11 @@ size_t posting_list_t::get_last_offset(const posting_list_t::iterator_t& it, boo /* iterator_t operations */ -posting_list_t::iterator_t::iterator_t(posting_list_t::block_t* start, posting_list_t::block_t* end, +posting_list_t::iterator_t::iterator_t(const std::map* id_block_map, + posting_list_t::block_t* start, posting_list_t::block_t* end, bool auto_destroy, uint32_t field_id): - curr_block(start), curr_index(0), end_block(end), auto_destroy(auto_destroy), field_id(field_id) { + id_block_map(id_block_map), curr_block(start), curr_index(0), end_block(end), + auto_destroy(auto_destroy), field_id(field_id) { if(curr_block != end_block) { ids = curr_block->ids.uncompress(); @@ -1426,6 +1419,10 @@ void posting_list_t::iterator_t::next() { } } +uint32_t posting_list_t::iterator_t::last_block_id() const { + return ids[curr_block->size() - 1]; +} + uint32_t posting_list_t::iterator_t::id() const { return ids[curr_index]; } @@ -1439,52 +1436,56 @@ posting_list_t::block_t* posting_list_t::iterator_t::block() const { } void posting_list_t::iterator_t::skip_to(uint32_t id) { - bool skipped_block = false; - while(curr_block != end_block && curr_block->ids.last() < id) { - curr_block = curr_block->next; - - delete [] ids; - delete [] offset_index; - delete [] offsets; - - ids = offset_index = offsets = nullptr; - - if(curr_block != end_block) { - ids = curr_block->ids.uncompress(); - offset_index = curr_block->offset_index.uncompress(); - offsets = curr_block->offsets.uncompress(); + // first look to skip within current block + if(id <= this->last_block_id()) { + while(curr_index < curr_block->size() && this->id() < id) { + curr_index++; } - skipped_block = true; + return ; } - if(skipped_block) { - curr_index = 0; + // identify the block where the id could exist and skip to that + reset_cache(); + + const auto it = id_block_map->lower_bound(id); + if(it == id_block_map->end()) { + return; } - while(curr_block != end_block && curr_index < curr_block->size() && this->id() < id) { + curr_block = it->second; + curr_index = 0; + ids = curr_block->ids.uncompress(); + offset_index = curr_block->offset_index.uncompress(); + offsets = curr_block->offsets.uncompress(); + + while(curr_index < curr_block->size() && this->id() < id) { curr_index++; } + + if(curr_index == curr_block->size()) { + reset_cache(); + } } posting_list_t::iterator_t::~iterator_t() { if(auto_destroy) { - destroy(); + reset_cache(); } } -void posting_list_t::iterator_t::destroy() { +void posting_list_t::iterator_t::reset_cache() { delete [] ids; - ids = nullptr; - delete [] offsets; - offsets = nullptr; - delete [] offset_index; - offset_index = nullptr; + + ids = offset_index = offsets = nullptr; + curr_index = 0; + curr_block = end_block = nullptr; } posting_list_t::iterator_t::iterator_t(iterator_t&& rhs) noexcept { + id_block_map = rhs.id_block_map; curr_block = rhs.curr_block; curr_index = rhs.curr_index; end_block = rhs.end_block; @@ -1494,6 +1495,7 @@ posting_list_t::iterator_t::iterator_t(iterator_t&& rhs) noexcept { auto_destroy = rhs.auto_destroy; field_id = rhs.field_id; + rhs.id_block_map = nullptr; rhs.curr_block = nullptr; rhs.end_block = nullptr; rhs.ids = nullptr; @@ -1502,6 +1504,7 @@ posting_list_t::iterator_t::iterator_t(iterator_t&& rhs) noexcept { } posting_list_t::iterator_t& posting_list_t::iterator_t::operator=(posting_list_t::iterator_t&& rhs) noexcept { + id_block_map = rhs.id_block_map; curr_block = rhs.curr_block; curr_index = rhs.curr_index; end_block = rhs.end_block; @@ -1511,6 +1514,7 @@ posting_list_t::iterator_t& posting_list_t::iterator_t::operator=(posting_list_t auto_destroy = rhs.auto_destroy; field_id = rhs.field_id; + rhs.id_block_map = nullptr; rhs.curr_block = nullptr; rhs.end_block = nullptr; rhs.ids = nullptr; @@ -1525,7 +1529,8 @@ void posting_list_t::iterator_t::set_index(uint32_t index) { } posting_list_t::iterator_t posting_list_t::iterator_t::clone() const { - posting_list_t::iterator_t it(nullptr, nullptr); + posting_list_t::iterator_t it(nullptr, nullptr, nullptr); + it.id_block_map = id_block_map; it.curr_block = curr_block; it.curr_index = curr_index; it.end_block = end_block; diff --git a/test/or_iterator_test.cpp b/test/or_iterator_test.cpp index 19c949b0..4472da61 100644 --- a/test/or_iterator_test.cpp +++ b/test/or_iterator_test.cpp @@ -58,7 +58,6 @@ TEST(OrIteratorTest, IntersectTwoListsWith3SubLists) { std::vector results; or_iterator_t::intersect(or_its, istate, [&results](uint32_t id, std::vector& its) { - LOG(INFO) << "id: " << id; results.push_back(id); }); @@ -69,6 +68,14 @@ TEST(OrIteratorTest, IntersectTwoListsWith3SubLists) { for(size_t i = 0; i < expected_results.size(); i++) { ASSERT_EQ(expected_results[i], results[i]); } + + for(auto p: postings1) { + delete p; + } + + for(auto p: postings2) { + delete p; + } } TEST(OrIteratorTest, IntersectTwoListsWith4SubLists) { @@ -138,4 +145,12 @@ TEST(OrIteratorTest, IntersectTwoListsWith4SubLists) { for(size_t i = 0; i < expected_results.size(); i++) { ASSERT_EQ(expected_results[i], results[i]); } + + for(auto p: postings1) { + delete p; + } + + for(auto p: postings2) { + delete p; + } } \ No newline at end of file diff --git a/test/posting_list_test.cpp b/test/posting_list_test.cpp index 7c41028b..55495152 100644 --- a/test/posting_list_test.cpp +++ b/test/posting_list_test.cpp @@ -600,69 +600,6 @@ TEST_F(PostingListTest, MergeBasics) { } } -TEST_F(PostingListTest, SplittingOfListsSimple) { - std::vector offsets = {0, 1, 3}; - - // [0, 2] [3, 20] - // [1, 3], [5, 10], [20] - // [2, 3], [5, 7], [20] - - posting_list_t p1(2); - p1.upsert(0, offsets); - p1.upsert(2, offsets); - p1.upsert(3, offsets); - p1.upsert(20, offsets); - - posting_list_t p2(2); - p2.upsert(1, offsets); - p2.upsert(3, offsets); - p2.upsert(5, offsets); - p2.upsert(10, offsets); - p2.upsert(20, offsets); - - posting_list_t p3(2); - p3.upsert(2, offsets); - p3.upsert(3, offsets); - p3.upsert(5, offsets); - p3.upsert(7, offsets); - p3.upsert(20, offsets); - - std::vector raw_lists = {&p1, &p2, &p3}; - - std::vector its; - result_iter_state_t iter_state; - posting_t::block_intersector_t intersector(raw_lists, iter_state, pool); - - std::vector> partial_its_vec(4); - intersector.split_lists(4, partial_its_vec); - - std::vector>> split_ids = { - {{0, 2}, {1, 3}, {2, 3}}, - {{3, 20}, {1, 3, 5, 10, 20}, {2, 3, 5, 7, 20}} - }; - - ASSERT_EQ(4, partial_its_vec.size()); - ASSERT_EQ(3, partial_its_vec[0].size()); - ASSERT_EQ(3, partial_its_vec[1].size()); - - for(size_t i = 0; i < partial_its_vec.size(); i++) { - auto& partial_its = partial_its_vec[i]; - for (size_t j = 0; j < partial_its.size(); j++) { - auto& it = partial_its[j]; - size_t k = 0; - - while (it.valid()) { - //LOG(INFO) << it.id(); - ASSERT_EQ(split_ids[i][j][k], it.id()); - k++; - it.next(); - } - - //LOG(INFO) << "---"; - } - } -} - TEST_F(PostingListTest, IntersectionBasics) { std::vector offsets = {0, 1, 3};