Don't remove vector during update: use update api directly.

This commit is contained in:
Kishore Nallan 2023-09-14 19:16:33 +05:30
parent 2af676916a
commit 0db3dd0d00
3 changed files with 101 additions and 5 deletions

View File

@ -665,7 +665,8 @@ public:
const vector_query_t& vector_query, size_t facet_sample_percent, size_t facet_sample_threshold,
const std::string& collection_name) const;
void remove_field(uint32_t seq_id, const nlohmann::json& document, const std::string& field_name);
void remove_field(uint32_t seq_id, const nlohmann::json& document, const std::string& field_name,
const bool is_update);
Option<uint32_t> remove(const uint32_t seq_id, const nlohmann::json & document,
const std::vector<field>& del_fields, const bool is_update);

View File

@ -5852,7 +5852,8 @@ void Index::remove_facet_token(const field& search_field, spp::sparse_hash_map<s
}
}
void Index::remove_field(uint32_t seq_id, const nlohmann::json& document, const std::string& field_name) {
void Index::remove_field(uint32_t seq_id, const nlohmann::json& document, const std::string& field_name,
const bool is_update) {
const auto& search_field_it = search_schema.find(field_name);
if(search_field_it == search_schema.end()) {
return;
@ -5912,7 +5913,10 @@ void Index::remove_field(uint32_t seq_id, const nlohmann::json& document, const
}
}
} else if(search_field.num_dim) {
vector_index[search_field.name]->vecdex->markDelete(seq_id);
if(!is_update) {
// since vector index supports upsert natively, we should not attempt to delete for update
vector_index[search_field.name]->vecdex->markDelete(seq_id);
}
} else if(search_field.is_float()) {
const std::vector<float>& values = search_field.is_single_float() ?
std::vector<float>{document[field_name].get<float>()} :
@ -6018,7 +6022,7 @@ Option<uint32_t> Index::remove(const uint32_t seq_id, const nlohmann::json & doc
}
try {
remove_field(seq_id, document, the_field.name);
remove_field(seq_id, document, the_field.name, is_update);
} catch(const std::exception& e) {
LOG(WARNING) << "Error while removing field `" << the_field.name << "` from document, message: "
<< e.what();
@ -6028,7 +6032,7 @@ Option<uint32_t> Index::remove(const uint32_t seq_id, const nlohmann::json & doc
for(auto it = document.begin(); it != document.end(); ++it) {
const std::string& field_name = it.key();
try {
remove_field(seq_id, document, field_name);
remove_field(seq_id, document, field_name, is_update);
} catch(const std::exception& e) {
LOG(WARNING) << "Error while removing field `" << field_name << "` from document, message: "
<< e.what();

View File

@ -287,6 +287,97 @@ TEST_F(CollectionVectorTest, VectorUnchangedUpsert) {
ASSERT_EQ(1, results["found"].get<size_t>());
}
TEST_F(CollectionVectorTest, VectorManyUpserts) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "title", "type": "string"},
{"name": "points", "type": "int32"},
{"name": "vec", "type": "float[]", "num_dim": 3}
]
})"_json;
Collection* coll1 = collectionManager.create_collection(schema).get();
size_t d = 3;
size_t n = 50;
std::mt19937 rng;
rng.seed(47);
std::uniform_real_distribution<> distrib;
std::vector<std::string> import_records;
// first insert n docs
for (size_t i = 0; i < n; i++) {
nlohmann::json doc;
doc["id"] = std::to_string(i);
doc["title"] = std::to_string(i) + " title";
doc["points"] = i;
std::vector<float> values;
for (size_t j = 0; j < d; j++) {
values.push_back(distrib(rng));
}
doc["vec"] = values;
import_records.push_back(doc.dump());
}
nlohmann::json document;
nlohmann::json import_response = coll1->add_many(import_records, document);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(n, import_response["num_imported"].get<int>());
import_records.clear();
size_t num_new_records = 0;
// upsert mix of old + new docs50
for (size_t i = 0; i < n; i++) {
nlohmann::json doc;
auto id = i;
if(i % 2 != 0) {
id = (i + 1000);
num_new_records++;
}
doc["id"] = std::to_string(id);
doc["title"] = std::to_string(id) + " title";
doc["points"] = id;
std::vector<float> values;
for (size_t j = 0; j < d; j++) {
values.push_back(distrib(rng) + 0.01);
}
doc["vec"] = values;
import_records.push_back(doc.dump());
}
import_response = coll1->add_many(import_records, document, UPSERT);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(n, import_response["num_imported"].get<int>());
import_records.clear();
/*for(size_t i = 0; i < 100; i++) {
auto results = coll1->search("*", {}, "", {}, {}, {0}, 200, 1, FREQUENCY, {true}, Index::DROP_TOKENS_THRESHOLD,
spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 5,
"", 10, {}, {}, {}, 0,
"<mark>", "</mark>", {}, 1000, true, false, true, "", false, 6000 * 1000, 4, 7, fallback,
4, {off}, 32767, 32767, 2,
false, true, "vec:([0.12, 0.44, 0.55])").get();
if(results["found"].get<size_t>() != n+num_new_records) {
LOG(INFO) << results["found"].get<size_t>();
}
}*/
//LOG(INFO) << "Expected: " << n + num_new_records;
//ASSERT_EQ(n + num_new_records, results["found"].get<size_t>());
//ASSERT_EQ(n + num_new_records, results["hits"].size());
}
TEST_F(CollectionVectorTest, VectorPartialUpdate) {
nlohmann::json schema = R"({
"name": "coll1",