Refactor block insertor.

This commit is contained in:
Kishore Nallan 2021-06-16 12:25:38 +05:30
parent 7641b5fc15
commit fba1c498e8
3 changed files with 95 additions and 48 deletions

View File

@ -47,6 +47,37 @@ private:
public:
struct block_intersector_t {
size_t batch_size;
std::vector<posting_list_t::iterator_t> its;
std::vector<posting_list_t*> plists;
std::vector<uint32_t> expanded_plist_indices;
posting_list_t::result_iter_state_t& iter_state;
block_intersector_t(const std::vector<void*>& raw_posting_lists,
size_t batch_size,
posting_list_t::result_iter_state_t& iter_state):
batch_size(batch_size), iter_state(iter_state) {
to_expanded_plists(raw_posting_lists, plists, expanded_plist_indices);
its.reserve(plists.size());
for(const auto& posting_list: plists) {
its.push_back(posting_list->new_iterator());
}
}
~block_intersector_t() {
for(uint32_t expanded_plist_index: expanded_plist_indices) {
delete plists[expanded_plist_index];
}
}
bool intersect() {
return posting_list_t::block_intersect(plists, batch_size, its, iter_state);;
}
};
static void upsert(void*& obj, uint32_t id, const std::vector<uint32_t>& offsets);
static void erase(void*& obj, uint32_t id);
@ -64,11 +95,4 @@ public:
static void merge(const std::vector<void*>& posting_lists, std::vector<uint32_t>& result_ids);
static void intersect(const std::vector<void*>& posting_lists, std::vector<uint32_t>& result_ids);
static bool block_intersect(
const std::vector<void*>& posting_lists,
size_t batch_size,
std::vector<posting_list_t::iterator_t>& its,
posting_list_t::result_iter_state_t& iter_state
);
};

View File

@ -8,13 +8,13 @@ int64_t compact_posting_list_t::upsert(const uint32_t id, const std::vector<uint
int64_t compact_posting_list_t::upsert(const uint32_t id, const uint32_t* offsets, uint32_t num_offsets) {
// format: num_offsets, offset1,..,offsetn, id1 | num_offsets, offset1,..,offsetn, id2
uint32_t last_id = (length == 0) ? 0 : id_offsets[length - 1];
int64_t new_storage_needed = 0;
int64_t extra_length_needed = 0;
if(length == 0 || id > last_id) {
new_storage_needed = sizeof(uint32_t) * (num_offsets + 2);
if(length + new_storage_needed > capacity) {
extra_length_needed = (num_offsets + 2);
if(length + extra_length_needed > capacity) {
// enough storage should have been provided upstream
return (length + new_storage_needed) - capacity;
return (length + extra_length_needed) - capacity;
}
// can just append to the end
@ -34,21 +34,21 @@ int64_t compact_posting_list_t::upsert(const uint32_t id, const uint32_t* offset
size_t existing_id = id_offsets[i + num_existing_offsets + 1];
if(existing_id == id) {
new_storage_needed = sizeof(uint32_t) * (num_offsets - num_existing_offsets);
if(new_storage_needed > 0) {
if(length + new_storage_needed > capacity) {
extra_length_needed = (num_offsets - num_existing_offsets);
if(extra_length_needed > 0) {
if(length + extra_length_needed > capacity) {
// enough storage should have been provided upstream
return (length + new_storage_needed) - capacity;
return (length + extra_length_needed) - capacity;
}
// shift offsets to the right to make space
int64_t shift_index = int64_t(length)+new_storage_needed-1;
while(shift_index >= i && (shift_index - new_storage_needed) >= 0) {
id_offsets[shift_index] = id_offsets[shift_index - new_storage_needed];
int64_t shift_index = int64_t(length) + extra_length_needed - 1;
while(shift_index >= i && (shift_index - extra_length_needed) >= 0) {
id_offsets[shift_index] = id_offsets[shift_index - extra_length_needed];
shift_index--;
}
} else if(new_storage_needed < 0) {
} else if(extra_length_needed < 0) {
// shift offsets to the left to reduce space
// [num_offsets][0][2][4][id]
// [num_offsets][0][id]
@ -71,18 +71,18 @@ int64_t compact_posting_list_t::upsert(const uint32_t id, const uint32_t* offset
}
else if(existing_id > id) {
new_storage_needed = sizeof(uint32_t) * (num_offsets + 2);
if(length + new_storage_needed > capacity) {
extra_length_needed = (num_offsets + 2);
if(length + extra_length_needed > capacity) {
// enough storage should have been provided upstream
return (length + new_storage_needed) - capacity;
return (length + extra_length_needed) - capacity;
}
// shift index [i..length-1] by `new_storage_needed` positions
int64_t shift_index = length+new_storage_needed-1;
while((shift_index - new_storage_needed) >= 0 && shift_index >= i) {
// shift index [i..length-1] by `extra_length_needed` positions
int64_t shift_index = length + extra_length_needed - 1;
while((shift_index - extra_length_needed) >= 0 && shift_index >= i) {
// [*1 1 4] [1 1 7]
// [1 1 3]
id_offsets[shift_index] = id_offsets[shift_index - new_storage_needed];
id_offsets[shift_index] = id_offsets[shift_index - extra_length_needed];
shift_index--;
}
// now store the new offsets in the shifted space
@ -100,7 +100,7 @@ int64_t compact_posting_list_t::upsert(const uint32_t id, const uint32_t* offset
i += num_existing_offsets + 2;
}
length += new_storage_needed; // new_storage_needed can be negative here but that's okay
length += extra_length_needed; // extra_length_needed can be negative here but that's okay
}
return 0;
@ -247,7 +247,7 @@ void posting_t::upsert(void*& obj, uint32_t id, const std::vector<uint32_t>& off
if(extra_capacity_required != 0) {
// grow the container by 30%
size_t new_capacity = (list->capacity + extra_capacity_required) * 1.3;
size_t new_capacity_bytes = new_capacity * sizeof(uint32_t);
size_t new_capacity_bytes = sizeof(compact_posting_list_t) + (new_capacity * sizeof(uint32_t));
auto new_list = (compact_posting_list_t *) realloc(list, new_capacity_bytes);
if(new_list == nullptr) {
abort();
@ -283,7 +283,7 @@ void posting_t::erase(void*& obj, uint32_t id) {
if(list->length < list->capacity/2) {
// resize container
size_t new_capacity = list->capacity/2;
size_t new_capacity_bytes = new_capacity * sizeof(uint32_t);
size_t new_capacity_bytes = sizeof(compact_posting_list_t) + (new_capacity * sizeof(uint32_t));
auto new_list = (compact_posting_list_t *) realloc(list, new_capacity_bytes);
if(new_list == nullptr) {
abort();
@ -400,25 +400,6 @@ void posting_t::to_expanded_plists(const std::vector<void*>& raw_posting_lists,
}
}
bool posting_t::block_intersect(const std::vector<void*>& raw_posting_lists, size_t batch_size,
std::vector<posting_list_t::iterator_t>& its,
posting_list_t::result_iter_state_t& iter_state) {
// we will have to convert the compact posting list (if any) to full form
std::vector<posting_list_t*> plists;
std::vector<uint32_t> expanded_plist_indices;
to_expanded_plists(raw_posting_lists, plists, expanded_plist_indices);
bool has_more = posting_list_t::block_intersect(plists, batch_size, its, iter_state);
if(!has_more) {
for(uint32_t expanded_plist_index: expanded_plist_indices) {
delete plists[expanded_plist_index];
}
}
return has_more;
}
void posting_t::destroy_list(void*& obj) {
if(obj == nullptr) {
return;

View File

@ -1059,6 +1059,48 @@ TEST(PostingListTest, CompactPostingListContainsAtleastOne) {
posting_t::destroy_list(obj);
}
TEST(PostingListTest, CompactToFullPostingListConversion) {
uint32_t ids[] = {5, 6, 7, 8};
uint32_t offset_index[] = {0, 3, 6, 9};
uint32_t offsets[] = {0, 3, 4, 0, 3, 4, 0, 3, 4, 0, 3, 4};
compact_posting_list_t* c1 = compact_posting_list_t::create(4, ids, offset_index, 12, offsets);
posting_list_t* p1 = c1->to_full_posting_list();
ASSERT_EQ(4, c1->num_ids());
ASSERT_EQ(4, p1->num_ids());
}
TEST(PostingListTest, BlockIntersectionOnMixedLists) {
uint32_t ids[] = {5, 6, 7, 8};
uint32_t offset_index[] = {0, 3, 6, 9};
uint32_t offsets[] = {0, 3, 4, 0, 3, 4, 0, 3, 4, 0, 3, 4};
compact_posting_list_t* list1 = compact_posting_list_t::create(4, ids, offset_index, 12, offsets);
posting_list_t p1(2);
std::vector<uint32_t> offsets1 = {2, 4};
p1.upsert(0, offsets1);
p1.upsert(5, offsets1);
p1.upsert(8, offsets1);
p1.upsert(20, offsets1);
std::vector<void*> raw_posting_lists = {SET_COMPACT_POSTING(list1), &p1};
posting_list_t::result_iter_state_t iter_state;
posting_t::block_intersector_t intersector(raw_posting_lists, 1, iter_state);
ASSERT_TRUE(intersector.intersect());
ASSERT_EQ(1, iter_state.ids.size());
ASSERT_EQ(5, iter_state.ids[0]);
ASSERT_FALSE(intersector.intersect());
ASSERT_EQ(1, iter_state.ids.size());
ASSERT_EQ(8, iter_state.ids[0]);
free(list1);
}
TEST(PostingListTest, DISABLED_Benchmark) {
std::vector<uint32_t> offsets = {0, 1, 3};
posting_list_t pl(4096);