Handle null-value updates + errors during indexing.

This commit is contained in:
Kishore Nallan 2021-11-17 13:23:42 +05:30
parent 076d5f6711
commit 58f3d4bf66
3 changed files with 152 additions and 53 deletions

View File

@ -545,7 +545,7 @@ private:
static void compute_facet_stats(facet &a_facet, uint64_t raw_value, const std::string & field_type);
static void get_doc_changes(const index_operation_t op, const nlohmann::json &update_doc,
static void get_doc_changes(const index_operation_t op, nlohmann::json &update_doc,
const nlohmann::json &old_doc, nlohmann::json &new_doc, nlohmann::json &del_doc);
static Option<uint32_t> coerce_string(const DIRTY_VALUES& dirty_values, const std::string& fallback_field_type,
@ -753,11 +753,13 @@ public:
const uint32_t* all_result_ids, const size_t& all_result_ids_len,
const std::vector<std::string>& group_by_fields,
std::vector<facet_info_t>& facet_infos) const;
size_t num_seq_ids() const;
};
template<class T>
void Index::iterate_and_index_numerical_field(std::vector<index_record>& iter_batch, const field& afield, T func) {
for(const auto& record: iter_batch) {
for(auto& record: iter_batch) {
if(!record.indexed.ok()) {
continue;
}
@ -769,6 +771,11 @@ void Index::iterate_and_index_numerical_field(std::vector<index_record>& iter_ba
continue;
}
func(record, seq_id);
try {
func(record, seq_id);
} catch(const std::exception &e) {
LOG(INFO) << "Error while indexing numerical field." << e.what();
record.index_failure(400, e.what());
}
}
}

View File

@ -427,56 +427,61 @@ void Index::validate_and_preprocess(Index *index, std::vector<index_record>& ite
for(size_t i = 0; i < batch_size; i++) {
index_record& index_rec = iter_batch[batch_start_index + i];
if(!index_rec.indexed.ok()) {
// some records could have been invalidated upstream
continue;
}
try {
if(!index_rec.indexed.ok()) {
// some records could have been invalidated upstream
continue;
}
if(index_rec.operation == DELETE) {
continue;
}
if(index_rec.operation == DELETE) {
continue;
}
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.doc, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema,
index_rec.operation,
fallback_field_type,
index_rec.dirty_values);
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.doc, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema,
index_rec.operation,
fallback_field_type,
index_rec.dirty_values);
if(!validation_op.ok()) {
index_rec.index_failure(validation_op.code(), validation_op.error());
continue;
}
if(!validation_op.ok()) {
index_rec.index_failure(validation_op.code(), validation_op.error());
continue;
}
if(index_rec.is_update) {
// scrub string fields to reduce delete ops
get_doc_changes(index_rec.operation, index_rec.doc, index_rec.old_doc, index_rec.new_doc,
index_rec.del_doc);
scrub_reindex_doc(search_schema, index_rec.doc, index_rec.del_doc, index_rec.old_doc);
}
if(index_rec.is_update) {
// scrub string fields to reduce delete ops
get_doc_changes(index_rec.operation, index_rec.doc, index_rec.old_doc, index_rec.new_doc,
index_rec.del_doc);
scrub_reindex_doc(search_schema, index_rec.doc, index_rec.del_doc, index_rec.old_doc);
}
compute_token_offsets_facets(index_rec, search_schema, facet_schema, token_separators, symbols_to_index);
compute_token_offsets_facets(index_rec, search_schema, facet_schema, token_separators, symbols_to_index);
int64_t points = 0;
int64_t points = 0;
if(index_rec.doc.count(default_sorting_field) == 0) {
auto default_sorting_field_it = index->sort_index.find(default_sorting_field);
if(default_sorting_field_it != index->sort_index.end()) {
auto seq_id_it = default_sorting_field_it->second->find(index_rec.seq_id);
if(seq_id_it != default_sorting_field_it->second->end()) {
points = seq_id_it->second;
if(index_rec.doc.count(default_sorting_field) == 0) {
auto default_sorting_field_it = index->sort_index.find(default_sorting_field);
if(default_sorting_field_it != index->sort_index.end()) {
auto seq_id_it = default_sorting_field_it->second->find(index_rec.seq_id);
if(seq_id_it != default_sorting_field_it->second->end()) {
points = seq_id_it->second;
} else {
points = INT64_MIN;
}
} else {
points = INT64_MIN;
}
} else {
points = INT64_MIN;
points = get_points_from_doc(index_rec.doc, default_sorting_field);
}
} else {
points = get_points_from_doc(index_rec.doc, default_sorting_field);
}
index_rec.points = points;
index_rec.index_success();
index_rec.points = points;
index_rec.index_success();
} catch(const std::exception &e) {
LOG(INFO) << "Error while validating document: " << e.what();
index_rec.index_failure(400, e.what());
}
}
}
@ -534,7 +539,7 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record>& iter_b
auto& index_rec = iter_batch[i];
if(index_rec.is_update) {
index->remove(index_rec.seq_id, index_rec.del_doc, index_rec.is_update);
} else {
} else if(index_rec.indexed.ok()) {
num_indexed++;
}
@ -576,7 +581,7 @@ void Index::index_field_in_memory(const field& afield, std::vector<index_record>
if(afield.name == "id") {
for(const auto& record: iter_batch) {
if(!record.is_update) {
if(!record.is_update && record.indexed.ok()) {
// for updates, the seq_id will already exist
seq_ids.append(record.seq_id);
}
@ -600,7 +605,7 @@ void Index::index_field_in_memory(const field& afield, std::vector<index_record>
const auto& document = record.doc;
const auto seq_id = record.seq_id;
if(document.count(afield.name) == 0 || !afield.index) {
if(document.count(afield.name) == 0 || !afield.index || !record.indexed.ok()) {
continue;
}
@ -3839,7 +3844,7 @@ Option<uint32_t> Index::coerce_float(const DIRTY_VALUES& dirty_values, const fie
return Option<uint32_t>(200);
}
void Index::get_doc_changes(const index_operation_t op, const nlohmann::json& update_doc,
void Index::get_doc_changes(const index_operation_t op, nlohmann::json& update_doc,
const nlohmann::json& old_doc, nlohmann::json& new_doc, nlohmann::json& del_doc) {
// construct new doc with old doc values first
@ -3852,15 +3857,8 @@ void Index::get_doc_changes(const index_operation_t op, const nlohmann::json& up
}
// now override new doc with updated doc values and also create del doc
for(auto it = update_doc.begin(); it != update_doc.end(); ++it) {
// adds new key or overrides existing key from `old_doc`
if(!it.value().is_null()) {
// null values should not indexed
new_doc[it.key()] = it.value();
} else {
new_doc.erase(it.key());
}
auto it = update_doc.begin();
for(; it != update_doc.end(); ) {
// if the update doc contains a field that exists in old, we record that (for delete + reindex)
bool field_exists_in_old_doc = (old_doc.count(it.key()) != 0);
if(field_exists_in_old_doc) {
@ -3868,6 +3866,16 @@ void Index::get_doc_changes(const index_operation_t op, const nlohmann::json& up
// we need to check for this because a field can be optional
del_doc[it.key()] = old_doc[it.key()];
}
// adds new key or overrides existing key from `old_doc`
if(it.value().is_null()) {
// null values should not indexed
new_doc.erase(it.key());
it = update_doc.erase(it);
} else {
new_doc[it.key()] = it.value();
++it;
}
}
}
@ -3910,6 +3918,11 @@ void Index::scrub_reindex_doc(const std::unordered_map<std::string, field>& sear
}
}
size_t Index::num_seq_ids() const {
std::shared_lock lock(mutex);
return seq_ids.getLength();
}
/*
// https://stackoverflow.com/questions/924171/geo-fencing-point-inside-outside-polygon
// NOTE: polygon and point should have been transformed with `transform_for_180th_meridian`

View File

@ -1266,6 +1266,10 @@ TEST_F(CollectionAllFieldsTest, NullValueUpdate) {
add_op = coll1->add(doc.dump(), UPDATE);
ASSERT_TRUE(add_op.ok());
// try updating the doc with null value again
add_op = coll1->add(doc.dump(), UPDATE);
ASSERT_TRUE(add_op.ok());
// ensure that the fields are removed from the document
auto results = coll1->search("*", {}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
@ -1276,3 +1280,78 @@ TEST_F(CollectionAllFieldsTest, NullValueUpdate) {
collectionManager.drop_collection("coll1");
}
TEST_F(CollectionAllFieldsTest, NullValueArrayUpdate) {
Collection *coll1;
std::vector<field> fields = {field("titles", field_types::STRING_ARRAY, false, true),
field(".*_names", field_types::STRING_ARRAY, true, true),
field("unindexed", field_types::STRING, false, true, false),
field(".*", field_types::STRING_ARRAY, false, true)};
coll1 = collectionManager.get_collection("coll1").get();
if (coll1 == nullptr) {
auto op = collectionManager.create_collection("coll1", 1, fields, "", 0, field_types::STRING_ARRAY);
ASSERT_TRUE(op.ok());
coll1 = op.get();
}
nlohmann::json doc;
doc["id"] = "0";
doc["titles"] = {"Running Shoes"};
doc["company_names"] = {"Nike"};
doc["countries"] = {"USA", nullptr};
doc["unindexed"] = "Hello";
auto add_op = coll1->add(doc.dump(), CREATE);
ASSERT_FALSE(add_op.ok());
ASSERT_EQ("Field `countries` must be an array of string.", add_op.error());
doc["countries"] = {nullptr};
add_op = coll1->add(doc.dump(), CREATE);
ASSERT_FALSE(add_op.ok());
ASSERT_EQ("Field `countries` must be an array of string.", add_op.error());
doc["countries"] = {"USA"};
add_op = coll1->add(doc.dump(), CREATE);
ASSERT_TRUE(add_op.ok());
ASSERT_EQ(1, coll1->get_num_documents());
ASSERT_EQ(1, coll1->_get_index()->num_seq_ids());
doc["titles"] = nullptr;
doc["company_names"] = nullptr;
doc["countries"] = nullptr;
add_op = coll1->add(doc.dump(), UPDATE);
ASSERT_TRUE(add_op.ok());
// try updating the doc with null value again
add_op = coll1->add(doc.dump(), UPDATE);
ASSERT_TRUE(add_op.ok());
ASSERT_EQ(1, coll1->get_num_documents());
// ensure that the fields are removed from the document
auto results = coll1->search("*", {}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
ASSERT_EQ(1, results["hits"].size());
ASSERT_EQ(2, results["hits"][0]["document"].size());
ASSERT_EQ("Hello", results["hits"][0]["document"]["unindexed"].get<std::string>());
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
// update with null values inside array
doc["countries"] = {nullptr};
add_op = coll1->add(doc.dump(), UPDATE);
ASSERT_FALSE(add_op.ok());
ASSERT_EQ("Field `countries` must be an array of string.", add_op.error());
doc["countries"] = {"USA", nullptr};
add_op = coll1->add(doc.dump(), UPDATE);
ASSERT_FALSE(add_op.ok());
ASSERT_EQ("Field `countries` must be an array of string.", add_op.error());
ASSERT_EQ(1, coll1->get_num_documents());
collectionManager.drop_collection("coll1");
}