Support upsert during import.

This commit is contained in:
Kishore Nallan 2020-10-10 18:09:17 +05:30
parent 7ced978520
commit 60d4e9bf5a
7 changed files with 244 additions and 51 deletions

View File

@ -231,7 +231,7 @@ public:
Option<nlohmann::json> add(const std::string & json_str, const bool upsert=false, const std::string& id="");
nlohmann::json add_many(std::vector<std::string>& json_lines);
nlohmann::json add_many(std::vector<std::string>& json_lines, const bool upsert=false);
Option<nlohmann::json> search(const std::string & query, const std::vector<std::string> & search_fields,
const std::string & simple_filter_query, const std::vector<std::string> & facet_fields,
@ -305,5 +305,9 @@ public:
size_t &num_indexed);
bool is_exceeding_memory_threshold() const;
void get_doc_changes(const nlohmann::json &document, nlohmann::json &old_doc,
nlohmann::json &new_doc,
nlohmann::json &changed_doc);
};

View File

@ -86,15 +86,20 @@ enum index_operation_t {
};
struct index_record {
size_t position; // position of record in the original request
size_t position; // position of record in the original request
uint32_t seq_id;
nlohmann::json document;
nlohmann::json doc;
nlohmann::json old_doc;
nlohmann::json new_doc;
nlohmann::json changed_doc;
index_operation_t operation;
Option<bool> indexed; // indicates if the indexing operation was a success
Option<bool> indexed; // indicates if the indexing operation was a success
index_record(size_t record_pos, uint32_t seq_id, const nlohmann::json& doc, index_operation_t operation):
position(record_pos), seq_id(seq_id), document(doc), operation(operation), indexed(true) {
position(record_pos), seq_id(seq_id), doc(doc), operation(operation), indexed(false) {
}
@ -102,7 +107,7 @@ struct index_record {
indexed = Option<bool>(err_code, err_msg);
}
void index_success(const index_record & record) {
void index_success() {
indexed = Option<bool>(true);
}
};
@ -257,10 +262,10 @@ public:
bool is_update);
static size_t batch_memory_index(Index *index,
std::vector<index_record> & iter_batch,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::map<std::string, field> & facet_schema);
std::vector<index_record> & iter_batch,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::map<std::string, field> & facet_schema);
const spp::sparse_hash_map<std::string, art_tree *> &_get_search_index() const;

View File

@ -199,6 +199,14 @@ struct StringUtils {
return (*p == 0) && val >= std::numeric_limits<int32_t>::min() && val <= std::numeric_limits<int32_t>::max();
}
static bool is_bool(const std::string &s) {
if(s.empty()) {
return false;
}
return s == "true" || s == "false";
}
static void toupper(std::string& str) {
std::transform(str.begin(), str.end(), str.begin(), ::toupper);
}

View File

@ -196,29 +196,29 @@ Option<nlohmann::json> Collection::add(const std::string & json_str, const bool
const std::string seq_id_str = std::to_string(seq_id);
if(is_update) {
// validate first
Option<uint32_t> validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema, is_update);
if(!validation_op.ok()) {
return Option<nlohmann::json>(validation_op.code(), validation_op.error());
}
// fetch original doc from store, create new doc and write it back
nlohmann::json old_doc;
get_document_from_store(get_seq_id_key(seq_id), old_doc);
nlohmann::json new_doc;
for(const auto& it: old_doc.items()) {
new_doc[it.key()] = it.value();
}
nlohmann::json changed_doc; // to identify changed fields
for(const auto& it: document.items()) {
new_doc[it.key()] = it.value();
if(old_doc.count(it.key()) != 0) {
changed_doc[it.key()] = old_doc[it.key()];
}
}
get_document_from_store(get_seq_id_key(seq_id), old_doc);
get_doc_changes(document, old_doc, new_doc, changed_doc);
//LOG(INFO) << "changed_doc: " << changed_doc;
remove_document(changed_doc, seq_id, false);
const Option<uint32_t> & index_memory_op = index_in_memory(document, seq_id, is_update);
if(!index_memory_op.ok()) {
index_in_memory(changed_doc, seq_id, true);
return Option<nlohmann::json>(index_memory_op.code(), index_memory_op.error());
}
@ -255,7 +255,22 @@ Option<nlohmann::json> Collection::add(const std::string & json_str, const bool
}
}
nlohmann::json Collection::add_many(std::vector<std::string>& json_lines) {
void Collection::get_doc_changes(const nlohmann::json &document, nlohmann::json &old_doc,
nlohmann::json &new_doc, nlohmann::json &changed_doc) {
for(const auto& it: old_doc.items()) {
new_doc[it.key()] = it.value();
}
for(const auto& it: document.items()) {
new_doc[it.key()] = it.value();
if(old_doc.count(it.key()) != 0) {
changed_doc[it.key()] = old_doc[it.key()];
}
}
}
nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, const bool upsert) {
//LOG(INFO) << "Memory ratio. Max = " << max_memory_ratio << ", Used = " << SystemMetrics::used_memory_ratio();
std::vector<std::vector<index_record>> iter_batch;
@ -271,15 +286,24 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines) {
for(size_t i=0; i < json_lines.size(); i++) {
const std::string & json_line = json_lines[i];
nlohmann::json document;
Option<doc_seq_id_t> doc_seq_id_op = to_doc(json_line, document, false);
Option<doc_seq_id_t> doc_seq_id_op = to_doc(json_line, document, upsert);
const uint32_t seq_id = doc_seq_id_op.ok() ? doc_seq_id_op.get().seq_id : 0;
index_record record(i, seq_id, document, CREATE);
// NOTE: we overwrite the input json_lines with result to avoid memory pressure
bool is_update = false;
if(!doc_seq_id_op.ok()) {
record.index_failure(doc_seq_id_op.code(), doc_seq_id_op.error());
} else {
is_update = !doc_seq_id_op.get().is_new;
if(is_update) {
record.operation = UPDATE;
get_document_from_store(get_seq_id_key(seq_id), record.old_doc);
get_doc_changes(document, record.old_doc, record.new_doc, record.changed_doc);
}
}
/*
@ -328,41 +352,68 @@ void Collection::batch_index(std::vector<std::vector<index_record>> &index_batch
// store only documents that were indexed in-memory successfully
for(auto& index_batch: index_batches) {
for(auto& index_record: index_batch) {
nlohmann::json res;
if(index_record.indexed.ok()) {
const std::string& seq_id_str = std::to_string(index_record.seq_id);
const std::string& serialized_json = index_record.document.dump(-1, ' ', false,
nlohmann::detail::error_handler_t::ignore);
if(index_record.operation == UPDATE) {
const std::string& serialized_json = index_record.new_doc.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore);
bool write_ok = store->insert(get_seq_id_key(index_record.seq_id), serialized_json);
rocksdb::WriteBatch batch;
batch.Put(get_doc_id_key(index_record.document["id"]), seq_id_str);
batch.Put(get_seq_id_key(index_record.seq_id), serialized_json);
bool write_ok = store->batch_write(batch);
if(!write_ok) {
// we will attempt to reindex the old doc on a best-effort basis
remove_document(index_record.new_doc, index_record.seq_id, false);
index_in_memory(index_record.old_doc, index_record.seq_id, false);
index_record.index_failure(500, "Could not write to on-disk storage.");
} else {
num_indexed++;
index_record.index_success();
}
if(!write_ok) {
index_record.indexed = Option<bool>(500, "Could not write to on-disk storage.");;
// remove from in-memory store to keep the state synced
remove_document(index_record.document, index_record.seq_id, false);
} else if(index_record.operation == CREATE) {
const std::string& seq_id_str = std::to_string(index_record.seq_id);
const std::string& serialized_json = index_record.doc.dump(-1, ' ', false,
nlohmann::detail::error_handler_t::ignore);
rocksdb::WriteBatch batch;
batch.Put(get_doc_id_key(index_record.doc["id"]), seq_id_str);
batch.Put(get_seq_id_key(index_record.seq_id), serialized_json);
bool write_ok = store->batch_write(batch);
if(!write_ok) {
// remove from in-memory store to keep the state synced
remove_document(index_record.doc, index_record.seq_id, false);
index_record.index_failure(500, "Could not write to on-disk storage.");
} else {
num_indexed++;
index_record.index_success();
}
}
json_out[index_record.position] = R"({"success": true})";
num_indexed++;
res["success"] = index_record.indexed.ok();
if(!index_record.indexed.ok()) {
res["document"] = json_out[index_record.position];
res["error"] = index_record.indexed.error();
}
} else {
nlohmann::json res;
res["success"] = false;
res["error"] = index_record.indexed.error();
res["document"] = json_out[index_record.position];
json_out[index_record.position] = res.dump();
res["error"] = index_record.indexed.error();
}
json_out[index_record.position] = res.dump();
}
}
}
Option<uint32_t> Collection::index_in_memory(const nlohmann::json &document, uint32_t seq_id, bool is_update) {
Option<uint32_t> validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema, is_update);
if(!is_update) {
// for update, validation should be done prior
Option<uint32_t> validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema, is_update);
if(!validation_op.ok()) {
return validation_op;
if(!validation_op.ok()) {
return validation_op;
}
}
Index* index = indices[seq_id % num_memory_shards];

View File

@ -592,11 +592,16 @@ bool post_import_documents(http_req& req, http_res& res) {
//LOG(INFO) << "post_import_documents";
//LOG(INFO) << "req.first_chunk=" << req.first_chunk_aggregate << ", last_chunk=" << req.last_chunk_aggregate;
const char *BATCH_SIZE = "batch_size";
const char *UPSERT = "upsert";
if(req.params.count(BATCH_SIZE) == 0) {
req.params[BATCH_SIZE] = "40";
}
if(req.params.count(UPSERT) == 0) {
req.params[UPSERT] = "false";
}
if(!StringUtils::is_uint32_t(req.params[BATCH_SIZE])) {
req.last_chunk_aggregate = true;
res.final = true;
@ -605,7 +610,16 @@ bool post_import_documents(http_req& req, http_res& res) {
return false;
}
if(!StringUtils::is_bool(req.params[UPSERT])) {
req.last_chunk_aggregate = true;
res.final = true;
res.set_400("Parameter `" + std::string(UPSERT) + "` must be a boolean.");
HttpServer::stream_response(req, res);
return false;
}
const size_t IMPORT_BATCH_SIZE = std::stoi(req.params[BATCH_SIZE]);
const bool upsert = (req.params[UPSERT] == "true");
if(IMPORT_BATCH_SIZE == 0) {
res.set_400("Parameter `" + std::string(BATCH_SIZE) + "` must be a positive integer.");
@ -681,7 +695,7 @@ bool post_import_documents(http_req& req, http_res& res) {
//LOG(INFO) << "single_partial_record_body: " << single_partial_record_body;
if(!single_partial_record_body) {
nlohmann::json json_res = collection->add_many(json_lines);
nlohmann::json json_res = collection->add_many(json_lines, upsert);
//const std::string& import_summary_json = json_res.dump();
//response_stream << import_summary_json << "\n";

View File

@ -334,23 +334,31 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
continue;
}
if(index_rec.operation == CREATE) {
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.document, index_rec.seq_id,
bool is_update = (index_rec.operation == UPDATE);
if(index_rec.operation == CREATE || index_rec.operation == UPDATE) {
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.doc, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema, false);
search_schema, facet_schema, is_update);
if(!validation_op.ok()) {
index_rec.index_failure(validation_op.code(), validation_op.error());
continue;
}
Option<uint32_t> index_mem_op = index->index_in_memory(index_rec.document, index_rec.seq_id, default_sorting_field, false);
if(is_update) {
index->remove(index_rec.seq_id, index_rec.changed_doc);
}
Option<uint32_t> index_mem_op = index->index_in_memory(index_rec.doc, index_rec.seq_id,
default_sorting_field, is_update);
if(!index_mem_op.ok()) {
index->index_in_memory(index_rec.changed_doc, index_rec.seq_id, default_sorting_field, true);
index_rec.index_failure(index_mem_op.code(), index_mem_op.error());
continue;
}
index_rec.index_success(index_rec);
index_rec.index_success();
num_indexed++;
}
}

View File

@ -1296,6 +1296,97 @@ std::vector<nlohmann::json> import_res_to_json(const std::vector<std::string>& i
return out;
}
TEST_F(CollectionTest, ImportDocumentsUpsert) {
Collection *coll_mul_fields;
std::ifstream infile(std::string(ROOT_DIR)+"test/multi_field_documents.jsonl");
std::stringstream strstream;
strstream << infile.rdbuf();
infile.close();
std::vector<std::string> import_records;
StringUtils::split(strstream.str(), import_records, "\n");
std::vector<field> fields = {
field("title", field_types::STRING, false),
field("starring", field_types::STRING, false),
field("cast", field_types::STRING_ARRAY, false),
field("points", field_types::INT32, false)
};
coll_mul_fields = collectionManager.get_collection("coll_mul_fields");
if(coll_mul_fields == nullptr) {
coll_mul_fields = collectionManager.create_collection("coll_mul_fields", 4, fields, "points").get();
}
// try importing records
nlohmann::json import_response = coll_mul_fields->add_many(import_records);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(18, import_response["num_imported"].get<int>());
// import some new records along with updates
std::vector<std::string> more_records = {R"({"id": "0", "title": "Wake up, Harry"})",
R"({"id": "2", "cast": ["Kim Werrel", "Random Wake"]})",
R"({"id": "18", "title": "Back Again Forest", "points": 45, "starring": "Ronald Wells", "cast": ["Dant Saren"]})",
R"({"id": "6", "points": 77})"};
import_response = coll_mul_fields->add_many(more_records, true);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(4, import_response["num_imported"].get<int>());
std::vector<nlohmann::json> import_results = import_res_to_json(more_records);
ASSERT_EQ(4, import_results.size());
for(size_t i=0; i<4; i++) {
ASSERT_TRUE(import_results[i]["success"].get<bool>());
ASSERT_EQ(1, import_results[i].size());
}
auto results = coll_mul_fields->search("burgundy", query_fields, "", {}, sort_fields, 0, 10, 1, FREQUENCY, false).get();
ASSERT_EQ(0, results["hits"].size());
results = coll_mul_fields->search("harry", query_fields, "", {}, sort_fields, 0, 10, 1, FREQUENCY, false).get();
ASSERT_EQ(1, results["hits"].size());
results = coll_mul_fields->search("captain america", query_fields, "", {}, sort_fields, 0, 10, 1, FREQUENCY, false).get();
ASSERT_EQ(1, results["hits"].size());
ASSERT_EQ(77, results["hits"][0]["document"]["points"].get<size_t>());
// updates mixed with errors
more_records = {R"({"id": "1", "title": "Wake up, Harry"})",
R"({"id": "90", "cast": ["Kim Werrel", "Random Wake"]})", // error due to missing fields
R"({"id": "5", "points": 60})",
R"({"id": "24", "points": 11})"}; // error due to missing fields
import_response = coll_mul_fields->add_many(more_records, true);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(2, import_response["num_imported"].get<int>());
import_results = import_res_to_json(more_records);
ASSERT_FALSE(import_results[1]["success"].get<bool>());
ASSERT_FALSE(import_results[3]["success"].get<bool>());
ASSERT_STREQ("Field `points` has been declared as a default sorting field, but is not found in the document.", import_results[1]["error"].get<std::string>().c_str());
ASSERT_STREQ("Field `title` has been declared in the schema, but is not found in the document.", import_results[3]["error"].get<std::string>().c_str());
// try to update without upsert option
more_records = {R"({"id": "1", "title": "Wake up, Harry"})",
R"({"id": "5", "points": 60})"};
import_response = coll_mul_fields->add_many(more_records, false);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(0, import_response["num_imported"].get<int>());
import_results = import_res_to_json(more_records);
ASSERT_FALSE(import_results[0]["success"].get<bool>());
ASSERT_FALSE(import_results[1]["success"].get<bool>());
ASSERT_STREQ("A document with id 1 already exists.", import_results[0]["error"].get<std::string>().c_str());
ASSERT_STREQ("A document with id 5 already exists.", import_results[1]["error"].get<std::string>().c_str());
}
TEST_F(CollectionTest, ImportDocuments) {
Collection *coll_mul_fields;
@ -2333,6 +2424,18 @@ TEST_F(CollectionTest, UpdateDocument) {
ASSERT_EQ(1, res["hits"].size());
ASSERT_EQ(105, res["hits"][0]["document"]["points"].get<size_t>());
// try to change a field with bad value and verify that old document is put back
doc4["points"] = "abc";
add_op = coll1->add(doc4.dump(), true, "100");
ASSERT_FALSE(add_op.ok());
res = coll1->search("*", {"tags"}, "points: > 101", {}, sort_fields, 0, 10, 1,
token_ordering::FREQUENCY, true, 10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 5, 5, "title").get();
ASSERT_EQ(1, res["hits"].size());
ASSERT_EQ(105, res["hits"][0]["document"]["points"].get<size_t>());
}
TEST_F(CollectionTest, SearchHighlightFieldFully) {