Refactor underlying APIs to support insert/update/upsert.

This commit is contained in:
Kishore Nallan 2020-10-24 09:23:33 +05:30
parent bbda412cc8
commit 2041de033f
6 changed files with 77 additions and 64 deletions

View File

@ -224,15 +224,16 @@ public:
std::string get_default_sorting_field();
Option<doc_seq_id_t> to_doc(const std::string& json_str, nlohmann::json& document, bool upsert,
const std::string& id="");
Option<doc_seq_id_t> to_doc(const std::string& json_str, nlohmann::json& document,
const index_operation_t& operation, const std::string& id="");
nlohmann::json get_summary_json();
Option<nlohmann::json> add(const std::string & json_str, const bool upsert=false, const std::string& id="");
Option<nlohmann::json> add(const std::string & json_str,
const index_operation_t& operation=CREATE, const std::string& id="");
nlohmann::json add_many(std::vector<std::string>& json_lines, nlohmann::json& document,
const bool upsert=false, const std::string& id="");
const index_operation_t& operation=CREATE, const std::string& id="");
Option<nlohmann::json> search(const std::string & query, const std::vector<std::string> & search_fields,
const std::string & simple_filter_query, const std::vector<std::string> & facet_fields,

View File

@ -81,6 +81,7 @@ struct search_args {
enum index_operation_t {
CREATE,
UPSERT,
UPDATE,
DELETE
};
@ -95,6 +96,7 @@ struct index_record {
nlohmann::json del_doc;
index_operation_t operation;
bool is_update;
Option<bool> indexed; // indicates if the indexing operation was a success

View File

@ -98,8 +98,8 @@ void Collection::increment_next_seq_id_field() {
next_seq_id++;
}
Option<doc_seq_id_t> Collection::to_doc(const std::string & json_str, nlohmann::json & document,
bool upsert, const std::string& id) {
Option<doc_seq_id_t> Collection::to_doc(const std::string & json_str, nlohmann::json& document,
const index_operation_t& operation, const std::string& id) {
try {
document = nlohmann::json::parse(json_str);
} catch(const std::exception& e) {
@ -111,14 +111,18 @@ Option<doc_seq_id_t> Collection::to_doc(const std::string & json_str, nlohmann::
return Option<doc_seq_id_t>(400, "Bad JSON: not a properly formed document.");
}
if(upsert && document.count("id") == 0 && !id.empty()) {
// operation could be: insert, upsert or delete and we have to validate based on that
if(document.count("id") == 0 && !id.empty()) {
// use the explicit ID (usually from a PUT request) if document body does not have it
document["id"] = id;
}
if(document.count("id") == 0) {
if(upsert) {
return Option<doc_seq_id_t>(400, "For update, the `id` key must be present.");
if(operation == UPDATE) {
return Option<doc_seq_id_t>(400, "For update, the `id` key must be provided.");
}
// for UPSERT or CREATE, if a document does not have an ID, we will treat it as a new doc
uint32_t seq_id = get_next_seq_id();
document["id"] = std::to_string(seq_id);
return Option<doc_seq_id_t>(doc_seq_id_t{seq_id, true});
@ -129,29 +133,33 @@ Option<doc_seq_id_t> Collection::to_doc(const std::string & json_str, nlohmann::
const std::string& doc_id = document["id"];
if(upsert) {
// try to get the corresponding sequence id from disk if present
std::string seq_id_str;
StoreStatus seq_id_status = store->get(get_doc_id_key(doc_id), seq_id_str);
// try to get the corresponding sequence id from disk if present
std::string seq_id_str;
StoreStatus seq_id_status = store->get(get_doc_id_key(doc_id), seq_id_str);
if(seq_id_status == StoreStatus::ERROR) {
return Option<doc_seq_id_t>(500, "Error fetching the sequence key for document with id: " + doc_id);
}
if(seq_id_status == StoreStatus::FOUND) {
uint32_t seq_id = (uint32_t) std::stoul(seq_id_str);
return Option<doc_seq_id_t>(doc_seq_id_t{seq_id, false});
}
// if key is not found, we will fallback to the "insert" workflow
if(seq_id_status == StoreStatus::ERROR) {
return Option<doc_seq_id_t>(500, "Error fetching the sequence key for document with id: " + doc_id);
}
if(doc_exists(doc_id)) {
return Option<doc_seq_id_t>(409, std::string("A document with id ") + doc_id + " already exists.");
}
if(seq_id_status == StoreStatus::FOUND) {
if(operation == CREATE) {
return Option<doc_seq_id_t>(409, std::string("A document with id ") + doc_id + " already exists.");
}
uint32_t seq_id = get_next_seq_id();
return Option<doc_seq_id_t>(doc_seq_id_t{seq_id, true});
// UPSERT or UPDATE
uint32_t seq_id = (uint32_t) std::stoul(seq_id_str);
return Option<doc_seq_id_t>(doc_seq_id_t{seq_id, false});
} else {
if(operation == UPDATE) {
// for UPDATE, a document with given ID must be found
return Option<doc_seq_id_t>(400, "Could not find a document with id: " + doc_id);
} else {
// for UPSERT or CREATE, if a document with given ID is not found, we will treat it as a new doc
uint32_t seq_id = get_next_seq_id();
return Option<doc_seq_id_t>(doc_seq_id_t{seq_id, true});
}
}
}
}
@ -179,10 +187,11 @@ nlohmann::json Collection::get_summary_json() {
return json_response;
}
Option<nlohmann::json> Collection::add(const std::string & json_str, const bool upsert, const std::string& id) {
Option<nlohmann::json> Collection::add(const std::string & json_str,
const index_operation_t& operation, const std::string& id) {
nlohmann::json document;
std::vector<std::string> json_lines = {json_str};
const nlohmann::json& res = add_many(json_lines, document, upsert, id);
const nlohmann::json& res = add_many(json_lines, document, operation, id);
if(!res["success"].get<bool>()) {
nlohmann::json res_doc;
@ -218,7 +227,7 @@ void Collection::get_doc_changes(const nlohmann::json &document, nlohmann::json
}
nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, nlohmann::json& document,
const bool upsert, const std::string& id) {
const index_operation_t& operation, const std::string& id) {
//LOG(INFO) << "Memory ratio. Max = " << max_memory_ratio << ", Used = " << SystemMetrics::used_memory_ratio();
std::vector<std::vector<index_record>> iter_batch;
@ -232,21 +241,20 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, nlohma
for(size_t i=0; i < json_lines.size(); i++) {
const std::string & json_line = json_lines[i];
Option<doc_seq_id_t> doc_seq_id_op = to_doc(json_line, document, upsert, id);
Option<doc_seq_id_t> doc_seq_id_op = to_doc(json_line, document, operation, id);
const uint32_t seq_id = doc_seq_id_op.ok() ? doc_seq_id_op.get().seq_id : 0;
index_record record(i, seq_id, document, CREATE);
index_record record(i, seq_id, document, operation);
// NOTE: we overwrite the input json_lines with result to avoid memory pressure
bool is_update = false;
record.is_update = false;
if(!doc_seq_id_op.ok()) {
record.index_failure(doc_seq_id_op.code(), doc_seq_id_op.error());
} else {
is_update = !doc_seq_id_op.get().is_new;
if(is_update) {
record.operation = UPDATE;
record.is_update = !doc_seq_id_op.get().is_new;
if(record.is_update) {
get_document_from_store(get_seq_id_key(seq_id), record.old_doc);
get_doc_changes(document, record.old_doc, record.new_doc, record.del_doc);
}
@ -301,7 +309,7 @@ void Collection::batch_index(std::vector<std::vector<index_record>> &index_batch
nlohmann::json res;
if(index_record.indexed.ok()) {
if(index_record.operation == UPDATE) {
if(index_record.operation == UPDATE || index_record.operation == UPSERT) {
const std::string& serialized_json = index_record.new_doc.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore);
bool write_ok = store->insert(get_seq_id_key(index_record.seq_id), serialized_json);

View File

@ -694,9 +694,11 @@ bool post_import_documents(http_req& req, http_res& res) {
//LOG(INFO) << "single_partial_record_body: " << single_partial_record_body;
const index_operation_t operation = upsert ? index_operation_t::UPSERT : index_operation_t::CREATE;
if(!single_partial_record_body) {
nlohmann::json document;
nlohmann::json json_res = collection->add_many(json_lines, document, upsert);
nlohmann::json json_res = collection->add_many(json_lines, document, operation);
//const std::string& import_summary_json = json_res.dump();
//response_stream << import_summary_json << "\n";
@ -746,7 +748,8 @@ bool post_add_document(http_req & req, http_res & res) {
return false;
}
Option<nlohmann::json> inserted_doc_op = collection->add(req.body, upsert);
index_operation_t operation = upsert ? index_operation_t::UPSERT : index_operation_t::CREATE;
Option<nlohmann::json> inserted_doc_op = collection->add(req.body, operation);
if(!inserted_doc_op.ok()) {
res.set(inserted_doc_op.code(), inserted_doc_op.error());
@ -768,7 +771,7 @@ bool put_upsert_document(http_req & req, http_res & res) {
return false;
}
Option<nlohmann::json> upserted_doc_op = collection->add(req.body, false, doc_id);
Option<nlohmann::json> upserted_doc_op = collection->add(req.body, index_operation_t::UPDATE, doc_id);
if(!upserted_doc_op.ok()) {
res.set(upserted_doc_op.code(), upserted_doc_op.error());

View File

@ -376,26 +376,24 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
continue;
}
bool is_update = (index_rec.operation == UPDATE);
if(index_rec.operation == CREATE || index_rec.operation == UPDATE) {
if(index_rec.operation != DELETE) {
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.doc, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema, is_update);
search_schema, facet_schema, index_rec.is_update);
if(!validation_op.ok()) {
index_rec.index_failure(validation_op.code(), validation_op.error());
continue;
}
if(is_update) {
if(index_rec.is_update) {
// scrub string fields to reduce delete ops
index->scrub_reindex_doc(index_rec.doc, index_rec.del_doc, index_rec.old_doc);
index->remove(index_rec.seq_id, index_rec.del_doc);
}
Option<uint32_t> index_mem_op = index->index_in_memory(index_rec.doc, index_rec.seq_id,
default_sorting_field, is_update);
default_sorting_field, index_rec.is_update);
if(!index_mem_op.ok()) {
index->index_in_memory(index_rec.del_doc, index_rec.seq_id, default_sorting_field, true);
index_rec.index_failure(index_mem_op.code(), index_mem_op.error());
@ -404,7 +402,7 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
index_rec.index_success();
if(!is_update) {
if(!index_rec.is_update) {
num_indexed++;
}
}

View File

@ -1352,7 +1352,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
R"({"id": "18", "title": "Back Again Forest", "points": 45, "starring": "Ronald Wells", "cast": ["Dant Saren"]})",
R"({"id": "6", "points": 77})"};
import_response = coll_mul_fields->add_many(more_records, document, true);
import_response = coll_mul_fields->add_many(more_records, document, UPSERT);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(4, import_response["num_imported"].get<int>());
@ -1392,7 +1392,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
R"({"id": "5", "points": 60})",
R"({"id": "24", "starring": "John", "cast": ["John Kim"], "points": 11})"}; // missing fields
import_response = coll_mul_fields->add_many(more_records, document, true);
import_response = coll_mul_fields->add_many(more_records, document, UPSERT);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(2, import_response["num_imported"].get<int>());
@ -1403,12 +1403,12 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
ASSERT_STREQ("Field `points` has been declared as a default sorting field, but is not found in the document.", import_results[1]["error"].get<std::string>().c_str());
ASSERT_STREQ("Field `title` has been declared in the schema, but is not found in the document.", import_results[3]["error"].get<std::string>().c_str());
// try to update without upsert option
// try to add without upsert option
more_records = {R"({"id": "1", "title": "Wake up, Harry"})",
R"({"id": "5", "points": 60})"};
import_response = coll_mul_fields->add_many(more_records, document, false);
import_response = coll_mul_fields->add_many(more_records, document, CREATE);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(0, import_response["num_imported"].get<int>());
@ -1423,7 +1423,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
"points":70,"starring":"Robin Williams","starring_facet":"Robin Williams",
"title":"Good Will Hunting"})"};
import_response = coll_mul_fields->add_many(more_records, document, true);
import_response = coll_mul_fields->add_many(more_records, document, UPSERT);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(1, import_response["num_imported"].get<int>());
@ -1458,7 +1458,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsertOptional) {
// import records without title
nlohmann::json document;
nlohmann::json import_response = coll1->add_many(records, document, false);
nlohmann::json import_response = coll1->add_many(records, document, CREATE);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(1000, import_response["num_imported"].get<int>());
@ -1479,7 +1479,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsertOptional) {
}
auto begin = std::chrono::high_resolution_clock::now();
import_response = coll1->add_many(records, document, true);
import_response = coll1->add_many(records, document, UPSERT);
auto time_micros = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - begin).count();
@ -1505,7 +1505,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsertOptional) {
}
begin = std::chrono::high_resolution_clock::now();
import_response = coll1->add_many(records, document, true);
import_response = coll1->add_many(records, document, UPSERT);
time_micros = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - begin).count();
@ -2481,7 +2481,7 @@ TEST_F(CollectionTest, UpdateDocument) {
// try changing the title and searching for an older token
doc["title"] = "The quick brown fox.";
add_op = coll1->add(doc.dump(), true);
add_op = coll1->add(doc.dump(), UPSERT);
ASSERT_TRUE(add_op.ok());
ASSERT_EQ(1, coll1->get_num_documents());
@ -2499,16 +2499,16 @@ TEST_F(CollectionTest, UpdateDocument) {
ASSERT_EQ(1, res["hits"].size());
ASSERT_STREQ("The quick brown fox.", res["hits"][0]["document"]["title"].get<std::string>().c_str());
// try to change tags without `id`
// try to update document tags without `id`
nlohmann::json doc2;
doc2["tags"] = {"SENTENCE"};
add_op = coll1->add(doc2.dump(), true);
add_op = coll1->add(doc2.dump(), UPDATE);
ASSERT_FALSE(add_op.ok());
ASSERT_STREQ("For update, the `id` key must be present.", add_op.error().c_str());
ASSERT_STREQ("For update, the `id` key must be provided.", add_op.error().c_str());
// now change tags with id
doc2["id"] = "100";
add_op = coll1->add(doc2.dump(), true);
add_op = coll1->add(doc2.dump(), UPDATE);
ASSERT_TRUE(add_op.ok());
// check for old tag
@ -2531,7 +2531,7 @@ TEST_F(CollectionTest, UpdateDocument) {
doc3["points"] = 99;
doc3["id"] = "100";
add_op = coll1->add(doc3.dump(), true);
add_op = coll1->add(doc3.dump(), UPDATE);
ASSERT_TRUE(add_op.ok());
res = coll1->search("*", {"tags"}, "points: > 90", {}, sort_fields, 0, 10, 1,
@ -2545,7 +2545,8 @@ TEST_F(CollectionTest, UpdateDocument) {
nlohmann::json doc4;
doc4["points"] = 105;
add_op = coll1->add(doc4.dump(), true, "100");
add_op = coll1->add(doc4.dump(), UPSERT, "100");
LOG(INFO) << add_op.error();
ASSERT_TRUE(add_op.ok());
res = coll1->search("*", {"tags"}, "points: > 101", {}, sort_fields, 0, 10, 1,
@ -2557,7 +2558,7 @@ TEST_F(CollectionTest, UpdateDocument) {
// try to change a field with bad value and verify that old document is put back
doc4["points"] = "abc";
add_op = coll1->add(doc4.dump(), true, "100");
add_op = coll1->add(doc4.dump(), UPSERT, "100");
ASSERT_FALSE(add_op.ok());
res = coll1->search("*", {"tags"}, "points: > 101", {}, sort_fields, 0, 10, 1,