diff --git a/include/topster.h b/include/topster.h index e564a0e2..ab6d752b 100644 --- a/include/topster.h +++ b/include/topster.h @@ -7,12 +7,12 @@ #include struct KV { - uint8_t field_id; - uint16_t query_index; - uint16_t array_index; - uint64_t key; - uint64_t distinct_key; - uint64_t match_score; + uint8_t field_id{}; + uint16_t query_index{}; + uint16_t array_index{}; + uint64_t key{}; + uint64_t distinct_key{}; + uint64_t match_score{}; int64_t scores[3]{}; // match score + 2 custom attributes KV(uint8_t fieldId, uint16_t queryIndex, uint64_t key, uint64_t distinct_key, @@ -24,7 +24,7 @@ struct KV { this->scores[2] = scores[2]; } - KV() {} + KV() = default; }; /* @@ -37,9 +37,9 @@ struct Topster { KV *data; KV** kvs; + // For distinct, stores the min heap kv of each group_kv_map topster value spp::sparse_hash_map kv_map; - KV* group_min_kv; spp::sparse_hash_map group_kv_map; size_t distinct; @@ -61,14 +61,11 @@ struct Topster { data[i].match_score = 0; kvs[i] = &data[i]; } - - group_min_kv = new KV(); } ~Topster() { delete[] data; delete[] kvs; - delete group_min_kv; for(auto& kv: group_kv_map) { delete kv.second; } @@ -84,22 +81,12 @@ struct Topster { (*b)->array_index = a_index; } - static inline void copyMe(KV* a, KV* b) { - size_t b_index = b->array_index; - *b = *a; - b->array_index = b_index; - } - bool add(KV* kv) { //LOG(INFO) << "kv_map size: " << kv_map.size() << " -- kvs[0]: " << kvs[0]->match_score; /*for(auto kv: kv_map) { LOG(INFO) << "kv key: " << kv.first << " => " << kv.second->match_score; }*/ - /*if(kv->key == 5) { - LOG(INFO) << "Key is 5"; - }*/ - bool less_than_min_heap = (size >= MAX_SIZE) && is_smaller_equal(kv, kvs[0]); size_t heap_op_index = 0; @@ -115,45 +102,61 @@ struct Topster { bool is_duplicate_key = (found_it != group_kv_map.end()); if(!is_duplicate_key && less_than_min_heap) { - // for distinct, if a non duplicate kv is < than min heap we also ignore + // for distinct, if a non duplicate kv is < than min heap we ignore return false; } if(is_duplicate_key) { - // if min heap (group_topster.kvs[0]) changes, we have to update kvs and sift down + // if min heap (group_topster.kvs[0]) changes, we have to update kvs and sift Topster* group_topster = found_it->second; - uint16_t old_min_heap_array_index = group_min_kv->array_index; + KV old_min_heap_kv = *kv_map[kv->distinct_key]; bool added = group_topster->add(kv); + if(!added) { return false; } - // if added, guaranteed to be larger than old_min_heap_ele - copyMe(kv, group_min_kv); - heap_op_index = old_min_heap_array_index; - } else { - // create fresh topster for this distinct group key since it does not exist + // if new kv score is greater than previous min heap score we sift dowm, otherwise sift up + SIFT_DOWN = is_greater_kv(kv, &old_min_heap_kv); + // new kv is different from old_min_heap_kv so we have to sift heap + heap_op_index = old_min_heap_kv.array_index; + + // erase current min heap key from kv_map + kv_map.erase(old_min_heap_kv.distinct_key); + + // kv will be copied into the pointer at heap_op_index + kv_map.emplace(kv->distinct_key, kvs[heap_op_index]); + } else { + // kv is guaranteed to be > current min heap: kvs[0] + // create fresh topster for this distinct group key since it does not exist Topster* group_topster = new Topster(distinct, 0); group_topster->add(kv); - copyMe(kv, group_min_kv); - - if(size < MAX_SIZE) { - // we just copy to end of array - heap_op_index = size; - size++; - } else { - // kv is guaranteed to be > current min heap (group_topster.kvs[0]) - // so we have to replace min heap element (kvs[0]) - heap_op_index = 0; - - // remove current min heap group key from map - delete group_kv_map[kvs[heap_op_index]->distinct_key]; - group_kv_map.erase(kvs[heap_op_index]->distinct_key); - } // add new group key to map group_kv_map.emplace(kv->distinct_key, group_topster); + + // find heap operation index for updating kvs + + if(size < MAX_SIZE) { + // there is enough space in heap we just copy to end + SIFT_DOWN = false; + heap_op_index = size; + size++; + } else { + SIFT_DOWN = true; + + // max size is reached so we are forced to replace current min heap element (kvs[0]) + heap_op_index = 0; + + // remove current min heap group key from maps + delete group_kv_map[kvs[heap_op_index]->distinct_key]; + group_kv_map.erase(kvs[heap_op_index]->distinct_key); + kv_map.erase(kvs[heap_op_index]->distinct_key); + } + + // kv will be copied into the pointer at heap_op_index + kv_map.emplace(kv->distinct_key, kvs[heap_op_index]); } } else { // not distinct @@ -184,11 +187,10 @@ struct Topster { heap_op_index = existing_kv->array_index; kv_map.erase(kvs[heap_op_index]->key); - // kv will be swapped into heap_op_index + // kv will be copied into the pointer at heap_op_index kv_map.emplace(kv->key, kvs[heap_op_index]); - } else { // not duplicate - + if(size < MAX_SIZE) { // we just copy to end of array SIFT_DOWN = false; @@ -202,13 +204,14 @@ struct Topster { kv_map.erase(kvs[heap_op_index]->key); } - // kv will be swapped into heap_op_index pointer + // kv will be copied into the pointer at heap_op_index kv_map.emplace(kv->key, kvs[heap_op_index]); } } // we have to replace the existing element in the heap and sift down - copyMe(kv, kvs[heap_op_index]); + kv->array_index = heap_op_index; + *kvs[heap_op_index] = *kv; // sift up/down to maintain heap property @@ -262,6 +265,9 @@ struct Topster { // topster must be sorted before iterated upon to remove dead array entries void sort() { std::stable_sort(kvs, kvs+size, is_greater_kv); + for(auto &group_topster: group_kv_map) { + group_topster.second->sort(); + } } void clear(){ @@ -272,6 +278,10 @@ struct Topster { return kvs[index]->key; } + uint64_t getDistinctKeyAt(uint32_t index) { + return kvs[index]->distinct_key; + } + KV* getKV(uint32_t index) { return kvs[index]; } diff --git a/test/topster_test.cpp b/test/topster_test.cpp index 3c4b4521..104e941e 100644 --- a/test/topster_test.cpp +++ b/test/topster_test.cpp @@ -99,4 +99,64 @@ TEST(TopsterTest, MaxFloatValues) { for(uint32_t i = 0; i < topster.size; i++) { EXPECT_EQ(ids[i], topster.getKeyAt(i)); } +} + +TEST(TopsterTest, DistinctIntValues) { + Topster dist_topster(5, 2); + + struct { + uint8_t field_id; + uint16_t query_index; + uint64_t distinct_key; + uint64_t match_score; + int64_t primary_attr; + int64_t secondary_attr; + } data[14] = { + {1, 0, 1, 11, 20, 30}, + {1, 0, 1, 12, 20, 32}, + {1, 0, 2, 4, 20, 30}, + {1, 2, 3, 7, 20, 30}, + {1, 0, 4, 14, 20, 30}, + {1, 1, 5, 9, 20, 30}, + {1, 1, 5, 10, 20, 32}, + {1, 1, 5, 9, 20, 30}, + {1, 0, 6, 6, 20, 30}, + {1, 2, 7, 6, 22, 30}, + {1, 2, 7, 6, 22, 30}, + {1, 1, 8, 9, 20, 30}, + {1, 0, 9, 8, 20, 30}, + {1, 3, 10, 5, 20, 30}, + }; + + for(int i = 0; i < 14; i++) { + int64_t scores[3]; + scores[0] = int64_t(data[i].match_score); + scores[1] = data[i].primary_attr; + scores[2] = data[i].secondary_attr; + + KV kv(data[i].field_id, data[i].query_index, i+100, data[i].distinct_key, data[i].match_score, scores); + dist_topster.add(&kv); + } + + dist_topster.sort(); + + std::vector distinct_ids = {4, 1, 5, 8, 9}; + + for(uint32_t i = 0; i < dist_topster.size; i++) { + EXPECT_EQ(distinct_ids[i], dist_topster.getDistinctKeyAt(i)); + + if(distinct_ids[i] == 1) { + EXPECT_EQ(12, (int) dist_topster.getKV(i)->match_score); + EXPECT_EQ(2, dist_topster.group_kv_map[dist_topster.getDistinctKeyAt(i)]->size); + EXPECT_EQ(12, dist_topster.group_kv_map[dist_topster.getDistinctKeyAt(i)]->getKV(0)->match_score); + EXPECT_EQ(11, dist_topster.group_kv_map[dist_topster.getDistinctKeyAt(i)]->getKV(1)->match_score); + } + + if(distinct_ids[i] == 5) { + EXPECT_EQ(10, (int) dist_topster.getKV(i)->match_score); + EXPECT_EQ(2, dist_topster.group_kv_map[dist_topster.getDistinctKeyAt(i)]->size); + EXPECT_EQ(10, dist_topster.group_kv_map[dist_topster.getDistinctKeyAt(i)]->getKV(0)->match_score); + EXPECT_EQ(9, dist_topster.group_kv_map[dist_topster.getDistinctKeyAt(i)]->getKV(1)->match_score); + } + } } \ No newline at end of file