Fix upsert behavior: should accept only whole documents.

This commit is contained in:
Kishore Nallan 2021-06-25 21:02:33 +05:30
parent 315d4d0aed
commit 5cbf810fe5
6 changed files with 119 additions and 64 deletions

View File

@ -473,7 +473,7 @@ public:
Option<bool> get_document_from_store(const std::string & seq_id_key, nlohmann::json & document) const;
Option<uint32_t> index_in_memory(nlohmann::json & document, uint32_t seq_id,
bool is_update, const DIRTY_VALUES& dirty_values);
const index_operation_t op, const DIRTY_VALUES& dirty_values);
static void prune_document(nlohmann::json &document, const spp::sparse_hash_set<std::string> & include_fields,
const spp::sparse_hash_set<std::string> & exclude_fields);

View File

@ -255,8 +255,8 @@ private:
static void compute_facet_stats(facet &a_facet, uint64_t raw_value, const std::string & field_type);
static void get_doc_changes(const nlohmann::json &document, nlohmann::json &old_doc,
nlohmann::json &new_doc, nlohmann::json &del_doc);
static void get_doc_changes(const index_operation_t op, const nlohmann::json &update_doc,
const nlohmann::json &old_doc, nlohmann::json &new_doc, nlohmann::json &del_doc);
static Option<uint32_t> coerce_string(const DIRTY_VALUES& dirty_values, const std::string& fallback_field_type,
const field& a_field, nlohmann::json &document,
@ -330,7 +330,7 @@ public:
const std::vector<art_leaf *> &query_suggestion,
uint32_t *exact_strt_ids, size_t& exact_strt_size) const;
void scrub_reindex_doc(nlohmann::json& update_doc, nlohmann::json& del_doc, nlohmann::json& old_doc);
void scrub_reindex_doc(nlohmann::json& update_doc, nlohmann::json& del_doc, const nlohmann::json& old_doc);
static void tokenize_string_field(const nlohmann::json& document,
const field& search_field, std::vector<std::string>& tokens,
@ -365,7 +365,7 @@ public:
Option<uint32_t> index_in_memory(const nlohmann::json & document, uint32_t seq_id,
const std::string & default_sorting_field,
const bool is_update);
const index_operation_t op);
static size_t batch_memory_index(Index *index,
std::vector<index_record> & iter_batch,
@ -400,7 +400,7 @@ public:
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::map<std::string, field> & facet_schema,
bool is_update,
const index_operation_t op,
const std::string& fallback_field_type,
const DIRTY_VALUES& dirty_values);

View File

@ -58,6 +58,8 @@ private:
mutable std::shared_mutex mutex;
rocksdb::Status init_db() {
LOG(INFO) << "Initializing DB by opening state dir: " << state_dir_path;
rocksdb::Status s = rocksdb::DB::Open(options, state_dir_path, &db);
if(!s.ok()) {
LOG(ERROR) << "Error while initializing store: " << s.ToString();

View File

@ -298,8 +298,9 @@ void Collection::batch_index(std::vector<std::vector<index_record>> &index_batch
if(!write_ok) {
// we will attempt to reindex the old doc on a best-effort basis
LOG(ERROR) << "Update to disk failed. Will restore old document";
remove_document(index_record.new_doc, index_record.seq_id, false);
index_in_memory(index_record.old_doc, index_record.seq_id, false, index_record.dirty_values);
index_in_memory(index_record.old_doc, index_record.seq_id, index_record.operation, index_record.dirty_values);
index_record.index_failure(500, "Could not write to on-disk storage.");
} else {
num_indexed++;
@ -318,6 +319,7 @@ void Collection::batch_index(std::vector<std::vector<index_record>> &index_batch
if(!write_ok) {
// remove from in-memory store to keep the state synced
LOG(ERROR) << "Write to disk failed. Will restore old document";
remove_document(index_record.doc, index_record.seq_id, false);
index_record.index_failure(500, "Could not write to on-disk storage.");
} else {
@ -345,11 +347,11 @@ void Collection::batch_index(std::vector<std::vector<index_record>> &index_batch
}
Option<uint32_t> Collection::index_in_memory(nlohmann::json &document, uint32_t seq_id,
bool is_update, const DIRTY_VALUES& dirty_values) {
const index_operation_t op, const DIRTY_VALUES& dirty_values) {
std::unique_lock lock(mutex);
Option<uint32_t> validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema, is_update,
search_schema, facet_schema, op,
fallback_field_type, dirty_values);
if(!validation_op.ok()) {
@ -357,7 +359,7 @@ Option<uint32_t> Collection::index_in_memory(nlohmann::json &document, uint32_t
}
Index* index = indices[seq_id % num_memory_shards];
index->index_in_memory(document, seq_id, default_sorting_field, is_update);
index->index_in_memory(document, seq_id, default_sorting_field, op);
num_documents += 1;
return Option<>(200);

View File

@ -115,7 +115,7 @@ int64_t Index::float_to_in64_t(float f) {
Option<uint32_t> Index::index_in_memory(const nlohmann::json &document, uint32_t seq_id,
const std::string & default_sorting_field,
const bool is_update) {
const index_operation_t op) {
std::unique_lock lock(mutex);
@ -131,7 +131,7 @@ Option<uint32_t> Index::index_in_memory(const nlohmann::json &document, uint32_t
points = get_points_from_doc(document, default_sorting_field);
}
if(!is_update) {
if(op != UPDATE && op != UPSERT) {
// for updates, the seq_id will already exist
seq_ids.append(seq_id);
}
@ -283,13 +283,13 @@ Option<uint32_t> Index::validate_index_in_memory(nlohmann::json& document, uint3
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::map<std::string, field> & facet_schema,
bool is_update,
const index_operation_t op,
const std::string& fallback_field_type,
const DIRTY_VALUES& dirty_values) {
bool missing_default_sort_field = (!default_sorting_field.empty() && document.count(default_sorting_field) == 0);
if(!is_update && missing_default_sort_field) {
if(op != UPDATE && missing_default_sort_field) {
return Option<>(400, "Field `" + default_sorting_field + "` has been declared as a default sorting field, "
"but is not found in the document.");
}
@ -298,7 +298,7 @@ Option<uint32_t> Index::validate_index_in_memory(nlohmann::json& document, uint3
const std::string& field_name = field_pair.first;
const field& a_field = field_pair.second;
if((a_field.optional || is_update) && document.count(field_name) == 0) {
if((a_field.optional || op == UPDATE) && document.count(field_name) == 0) {
continue;
}
@ -408,37 +408,6 @@ Option<uint32_t> Index::validate_index_in_memory(nlohmann::json& document, uint3
return Option<>(200);
}
void Index::scrub_reindex_doc(nlohmann::json& update_doc, nlohmann::json& del_doc, nlohmann::json& old_doc) {
std::vector<std::string> del_keys;
for(auto it = del_doc.cbegin(); it != del_doc.cend(); it++) {
const std::string& field_name = it.key();
std::shared_lock lock(mutex);
const auto& search_field_it = search_schema.find(field_name);
if(search_field_it == search_schema.end()) {
continue;
}
const auto search_field = search_field_it->second; // copy, don't use reference!
lock.unlock();
// compare values between old and update docs:
// if they match, we will remove them from both del and update docs
if(update_doc[search_field.name] == old_doc[search_field.name]) {
del_keys.push_back(field_name);
}
}
for(const auto& del_key: del_keys) {
del_doc.erase(del_key);
update_doc.erase(del_key);
}
}
size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_batch,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
@ -457,7 +426,7 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.doc, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema,
index_rec.is_update,
index_rec.operation,
fallback_field_type,
index_rec.dirty_values);
@ -468,7 +437,8 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
if(index_rec.is_update) {
// scrub string fields to reduce delete ops
get_doc_changes(index_rec.doc, index_rec.old_doc, index_rec.new_doc, index_rec.del_doc);
get_doc_changes(index_rec.operation, index_rec.doc, index_rec.old_doc, index_rec.new_doc,
index_rec.del_doc);
index->scrub_reindex_doc(index_rec.doc, index_rec.del_doc, index_rec.old_doc);
index->remove(index_rec.seq_id, index_rec.del_doc, index_rec.is_update);
}
@ -476,7 +446,7 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
Option<uint32_t> index_mem_op(0);
try {
index_mem_op = index->index_in_memory(index_rec.doc, index_rec.seq_id, default_sorting_field, index_rec.is_update);
index_mem_op = index->index_in_memory(index_rec.doc, index_rec.seq_id, default_sorting_field, index_rec.operation);
} catch(const std::exception& e) {
const std::string& error_msg = std::string("Fatal error during indexing: ") + e.what();
LOG(ERROR) << error_msg << ", document: " << index_rec.doc;
@ -484,7 +454,7 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
}
if(!index_mem_op.ok()) {
index->index_in_memory(index_rec.del_doc, index_rec.seq_id, default_sorting_field, true);
index->index_in_memory(index_rec.del_doc, index_rec.seq_id, default_sorting_field, index_rec.operation);
index_rec.index_failure(index_mem_op.code(), index_mem_op.error());
continue;
}
@ -1255,7 +1225,7 @@ uint32_t Index::do_filtering(uint32_t** filter_ids_out, const std::vector<filter
// field being a facet is already enforced upstream
uint32_t* exact_strt_ids = new uint32_t[strt_ids_size];
size_t exact_strt_size = 0;
for(size_t strt_ids_index = 0; strt_ids_index < strt_ids_size; strt_ids_index++) {
uint32_t seq_id = strt_ids[strt_ids_index];
const auto& fvalues = facet_index_v3.at(f.name)->at(seq_id);
@ -2909,17 +2879,22 @@ Option<uint32_t> Index::coerce_float(const DIRTY_VALUES& dirty_values, const fie
return Option<uint32_t>(200);
}
void Index::get_doc_changes(const nlohmann::json &document, nlohmann::json &old_doc, nlohmann::json &new_doc,
nlohmann::json &del_doc) {
void Index::get_doc_changes(const index_operation_t op, const nlohmann::json& update_doc,
const nlohmann::json& old_doc, nlohmann::json& new_doc, nlohmann::json& del_doc) {
for(auto it = old_doc.begin(); it != old_doc.end(); ++it) {
new_doc[it.key()] = it.value();
if(op == UPSERT && !update_doc.contains(it.key())) {
del_doc[it.key()] = it.value();
} else {
new_doc[it.key()] = it.value();
}
}
for(auto it = document.begin(); it != document.end(); ++it) {
for(auto it = update_doc.begin(); it != update_doc.end(); ++it) {
// adds new key or overrides existing key from `old_doc`
new_doc[it.key()] = it.value();
// if the update document contains a field that exists in old, we record that (for delete + reindex)
// if the update update_doc contains a field that exists in old, we record that (for delete + reindex)
bool field_exists_in_old_doc = (old_doc.count(it.key()) != 0);
if(field_exists_in_old_doc) {
// key exists in the stored doc, so it must be reindexed
@ -2929,6 +2904,37 @@ void Index::get_doc_changes(const nlohmann::json &document, nlohmann::json &old_
}
}
void Index::scrub_reindex_doc(nlohmann::json& update_doc, nlohmann::json& del_doc, const nlohmann::json& old_doc) {
std::vector<std::string> del_keys;
for(auto it = del_doc.cbegin(); it != del_doc.cend(); it++) {
const std::string& field_name = it.key();
std::shared_lock lock(mutex);
const auto& search_field_it = search_schema.find(field_name);
if(search_field_it == search_schema.end()) {
continue;
}
const auto search_field = search_field_it->second; // copy, don't use reference!
lock.unlock();
// compare values between old and update docs:
// if they match, we will remove them from both del and update docs
if(update_doc.contains(search_field.name) && update_doc[search_field.name] == old_doc[search_field.name]) {
del_keys.push_back(field_name);
}
}
for(const auto& del_key: del_keys) {
del_doc.erase(del_key);
update_doc.erase(del_key);
}
}
// https://stackoverflow.com/questions/924171/geo-fencing-point-inside-outside-polygon
// NOTE: polygon and point should have been transformed with `transform_for_180th_meridian`
bool Index::is_point_in_polygon(const Geofence& poly, const GeoCoord &point) {

View File

@ -1055,10 +1055,10 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
ASSERT_EQ(1, import_response["num_imported"].get<int>());
// update + upsert records
std::vector<std::string> more_records = {R"({"id": "0", "title": "The Fifth Harry", "starring": "Will Ferrell"})",
R"({"id": "2", "cast": ["Chris Fisher", "Rand Alan"]})",
std::vector<std::string> more_records = {R"({"id": "0", "title": "The Fifth Harry", "starring": "Will Ferrell", "points":62, "cast":["Adam McKay","Steve Carell","Paul Rudd"]})",
R"({"id": "2", "cast": ["Chris Fisher", "Rand Alan"], "points":81, "starring":"Daniel Day-Lewis","title":"There Will Be Blood"})",
R"({"id": "18", "title": "Back Again Forest", "points": 45, "starring": "Ronald Wells", "cast": ["Dant Saren"]})",
R"({"id": "6", "points": 77})"};
R"({"id": "6", "points": 77, "cast":["Chris Evans","Scarlett Johansson"], "starring":"Samuel L. Jackson","title":"Captain America: The Winter Soldier"})"};
import_response = coll_mul_fields->add_many(more_records, document, UPSERT);
@ -1104,9 +1104,9 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
ASSERT_EQ(77, results["hits"][0]["document"]["points"].get<size_t>());
// upserting with some bad docs
more_records = {R"({"id": "1", "title": "Wake up, Harry"})",
more_records = {R"({"id": "1", "title": "Wake up, Harry", "cast":["Josh Lawson","Chris Parnell"],"points":63,"starring":"Will Ferrell"})",
R"({"id": "90", "cast": ["Kim Werrel", "Random Wake"]})", // missing fields
R"({"id": "5", "points": 60})",
R"({"id": "5", "points": 60, "cast":["Logan Lerman","Alexandra Daddario"],"starring":"Ron Perlman","starring_facet":"Ron Perlman","title":"Percy Jackson: Sea of Monsters"})",
R"({"id": "24", "starring": "John", "cast": ["John Kim"], "points": 11})"}; // missing fields
import_response = coll_mul_fields->add_many(more_records, document, UPSERT);
@ -1220,6 +1220,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsertOptional) {
for(size_t i=0; i<NUM_RECORDS; i++) {
nlohmann::json updoc;
updoc["id"] = std::to_string(i);
updoc["points"] = i;
updoc["title"] = {
get_text(10),
get_text(10),
@ -1246,6 +1247,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsertOptional) {
for(size_t i=0; i<NUM_RECORDS; i++) {
nlohmann::json updoc;
updoc["id"] = std::to_string(i);
updoc["points"] = i;
updoc["title"] = {
get_text(10),
get_text(10),
@ -1264,6 +1266,27 @@ TEST_F(CollectionTest, ImportDocumentsUpsertOptional) {
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(1000, import_response["num_imported"].get<int>());
// update records (can contain partial fields)
records.clear();
for(size_t i=0; i<NUM_RECORDS; i++) {
nlohmann::json updoc;
updoc["id"] = std::to_string(i);
// no points field
updoc["title"] = {
get_text(10),
get_text(10),
get_text(10),
get_text(10),
};
records.push_back(updoc.dump());
}
import_response = coll1->add_many(records, document, UPDATE);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(1000, import_response["num_imported"].get<int>());
}
TEST_F(CollectionTest, ImportDocuments) {
@ -2099,7 +2122,7 @@ TEST_F(CollectionTest, UpdateDocument) {
Collection *coll1;
std::vector<field> fields = {field("title", field_types::STRING, true),
field("tags", field_types::STRING_ARRAY, true),
field("tags", field_types::STRING_ARRAY, true, true),
field("points", field_types::INT32, false)};
std::vector<sort_by> sort_fields = {sort_by("points", "DESC")};
@ -2145,6 +2168,27 @@ TEST_F(CollectionTest, UpdateDocument) {
ASSERT_STREQ("LAZY", res["facet_counts"][0]["counts"][1]["value"].get<std::string>().c_str());
ASSERT_EQ(1, (int) res["facet_counts"][0]["counts"][1]["count"]);
// upsert only part of the document -- document should be REPLACED
nlohmann::json partial_doc = doc;
partial_doc.erase("tags");
add_op = coll1->add(partial_doc.dump(), UPSERT);
ASSERT_TRUE(add_op.ok());
res = coll1->search("lazy", {"title"}, "", {}, sort_fields, {0}, 10, 1,
token_ordering::FREQUENCY, {true}, 10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 5, 5, "title").get();
ASSERT_EQ(1, res["hits"].size());
ASSERT_FALSE(res["hits"][0].contains("tags"));
// upserting without a mandatory field should be an error
partial_doc = doc;
partial_doc.erase("title");
LOG(INFO) << partial_doc.dump();
add_op = coll1->add(partial_doc.dump(), UPSERT);
ASSERT_FALSE(add_op.ok());
ASSERT_EQ("Field `title` has been declared in the schema, but is not found in the document.", add_op.error());
// try changing the title and searching for an older token
doc["title"] = "The quick brown fox.";
add_op = coll1->add(doc.dump(), UPSERT);
@ -2211,7 +2255,7 @@ TEST_F(CollectionTest, UpdateDocument) {
nlohmann::json doc4;
doc4["points"] = 105;
add_op = coll1->add(doc4.dump(), UPSERT, "100");
add_op = coll1->add(doc4.dump(), UPDATE, "100");
ASSERT_TRUE(add_op.ok());
res = coll1->search("*", {"tags"}, "points: > 101", {"tags"}, sort_fields, {0}, 10, 1,
@ -2223,8 +2267,9 @@ TEST_F(CollectionTest, UpdateDocument) {
// try to change a field with bad value and verify that old document is put back
doc4["points"] = "abc";
add_op = coll1->add(doc4.dump(), UPSERT, "100");
add_op = coll1->add(doc4.dump(), UPDATE, "100");
ASSERT_FALSE(add_op.ok());
ASSERT_EQ("Field `points` must be an int32.", add_op.error());
res = coll1->search("*", {"tags"}, "points: > 101", {"tags"}, sort_fields, {0}, 10, 1,
token_ordering::FREQUENCY, {true}, 10, spp::sparse_hash_set<std::string>(),