Combine usage of add() / add_many().

This commit is contained in:
Kishore Nallan 2020-10-17 15:05:37 +05:30
parent 9adbdd1576
commit 73f3e0620e
4 changed files with 39 additions and 90 deletions

View File

@ -231,7 +231,8 @@ public:
Option<nlohmann::json> add(const std::string & json_str, const bool upsert=false, const std::string& id="");
nlohmann::json add_many(std::vector<std::string>& json_lines, const bool upsert=false);
nlohmann::json add_many(std::vector<std::string>& json_lines, nlohmann::json& document,
const bool upsert=false, const std::string& id="");
Option<nlohmann::json> search(const std::string & query, const std::vector<std::string> & search_fields,
const std::string & simple_filter_query, const std::vector<std::string> & facet_fields,

View File

@ -181,77 +181,23 @@ nlohmann::json Collection::get_summary_json() {
Option<nlohmann::json> Collection::add(const std::string & json_str, const bool upsert, const std::string& id) {
nlohmann::json document;
Option<doc_seq_id_t> doc_seq_id_op = to_doc(json_str, document, upsert, id);
std::vector<std::string> json_lines = {json_str};
const nlohmann::json& res = add_many(json_lines, document, upsert, id);
if(!doc_seq_id_op.ok()) {
return Option<nlohmann::json>(doc_seq_id_op.code(), doc_seq_id_op.error());
if(!res["success"].get<bool>()) {
nlohmann::json res_doc;
try {
res_doc = nlohmann::json::parse(json_lines[0]);
} catch(const std::exception& e) {
LOG(ERROR) << "JSON error: " << e.what();
return Option<nlohmann::json>(400, std::string("Bad JSON: ") + e.what());
}
return Option<nlohmann::json>(res_doc["code"].get<size_t>(), res_doc["error"].get<std::string>());
}
/*if(is_exceeding_memory_threshold()) {
return Option<nlohmann::json>(403, "Max memory ratio exceeded.");
}*/
const bool is_update = !doc_seq_id_op.get().is_new;
const uint32_t seq_id = doc_seq_id_op.get().seq_id;
const std::string seq_id_str = std::to_string(seq_id);
if(is_update) {
// validate first
Option<uint32_t> validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema, is_update);
if(!validation_op.ok()) {
return Option<nlohmann::json>(validation_op.code(), validation_op.error());
}
// fetch original doc from store, create new doc and write it back
nlohmann::json old_doc;
nlohmann::json new_doc;
nlohmann::json del_doc; // to identify fields that should be potentially removed
get_document_from_store(get_seq_id_key(seq_id), old_doc);
get_doc_changes(document, old_doc, new_doc, del_doc);
//LOG(INFO) << "del_doc: " << del_doc;
remove_document(del_doc, seq_id, false);
const Option<uint32_t> & index_memory_op = index_in_memory(document, seq_id, is_update);
if(!index_memory_op.ok()) {
index_in_memory(del_doc, seq_id, true);
return Option<nlohmann::json>(index_memory_op.code(), index_memory_op.error());
}
const std::string& serialized_json = new_doc.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore);
bool write_ok = store->insert(get_seq_id_key(seq_id), serialized_json);
if(!write_ok) {
// we will attempt to reindex the old doc on a best-effort basis
remove_document(new_doc, seq_id, false);
index_in_memory(old_doc, seq_id, false);
return Option<nlohmann::json>(500, "Could not write to on-disk storage.");
}
return Option<nlohmann::json>(new_doc);
} else {
const Option<uint32_t> & index_memory_op = index_in_memory(document, seq_id, is_update);
if(!index_memory_op.ok()) {
return Option<nlohmann::json>(index_memory_op.code(), index_memory_op.error());
}
const std::string& serialized_json = document.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore);
rocksdb::WriteBatch batch;
batch.Put(get_doc_id_key(document["id"]), seq_id_str);
batch.Put(get_seq_id_key(seq_id), serialized_json);
bool write_ok = store->batch_write(batch);
if(!write_ok) {
remove_document(document, seq_id, false);
return Option<nlohmann::json>(500, "Could not write to on-disk storage.");
}
return Option<nlohmann::json>(document);
}
return Option<nlohmann::json>(document);
}
void Collection::get_doc_changes(const nlohmann::json &document, nlohmann::json &old_doc,
@ -270,9 +216,9 @@ void Collection::get_doc_changes(const nlohmann::json &document, nlohmann::json
}
}
nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, const bool upsert) {
nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, nlohmann::json& document,
const bool upsert, const std::string& id) {
//LOG(INFO) << "Memory ratio. Max = " << max_memory_ratio << ", Used = " << SystemMetrics::used_memory_ratio();
std::vector<std::vector<index_record>> iter_batch;
for(size_t i = 0; i < num_memory_shards; i++) {
@ -285,8 +231,7 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, const
for(size_t i=0; i < json_lines.size(); i++) {
const std::string & json_line = json_lines[i];
nlohmann::json document;
Option<doc_seq_id_t> doc_seq_id_op = to_doc(json_line, document, upsert);
Option<doc_seq_id_t> doc_seq_id_op = to_doc(json_line, document, upsert, id);
const uint32_t seq_id = doc_seq_id_op.ok() ? doc_seq_id_op.get().seq_id : 0;
index_record record(i, seq_id, document, CREATE);
@ -393,11 +338,13 @@ void Collection::batch_index(std::vector<std::vector<index_record>> &index_batch
if(!index_record.indexed.ok()) {
res["document"] = json_out[index_record.position];
res["error"] = index_record.indexed.error();
res["code"] = index_record.indexed.code();
}
} else {
res["success"] = false;
res["document"] = json_out[index_record.position];
res["error"] = index_record.indexed.error();
res["code"] = index_record.indexed.code();
}
json_out[index_record.position] = res.dump();

View File

@ -695,7 +695,8 @@ bool post_import_documents(http_req& req, http_res& res) {
//LOG(INFO) << "single_partial_record_body: " << single_partial_record_body;
if(!single_partial_record_body) {
nlohmann::json json_res = collection->add_many(json_lines, upsert);
nlohmann::json document;
nlohmann::json json_res = collection->add_many(json_lines, document, upsert);
//const std::string& import_summary_json = json_res.dump();
//response_stream << import_summary_json << "\n";

View File

@ -1320,8 +1320,8 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
}
// try importing records
nlohmann::json import_response = coll_mul_fields->add_many(import_records);
nlohmann::json document;
nlohmann::json import_response = coll_mul_fields->add_many(import_records, document);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(18, import_response["num_imported"].get<int>());
@ -1331,7 +1331,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
R"({"id": "18", "title": "Back Again Forest", "points": 45, "starring": "Ronald Wells", "cast": ["Dant Saren"]})",
R"({"id": "6", "points": 77})"};
import_response = coll_mul_fields->add_many(more_records, true);
import_response = coll_mul_fields->add_many(more_records, document, true);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(4, import_response["num_imported"].get<int>());
@ -1367,11 +1367,11 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
// updates mixed with errors
more_records = {R"({"id": "1", "title": "Wake up, Harry"})",
R"({"id": "90", "cast": ["Kim Werrel", "Random Wake"]})", // error due to missing fields
R"({"id": "90", "cast": ["Kim Werrel", "Random Wake"]})", // missing fields
R"({"id": "5", "points": 60})",
R"({"id": "24", "points": 11})"}; // error due to missing fields
R"({"id": "24", "starring": "John", "cast": ["John Kim"], "points": 11})"}; // missing fields
import_response = coll_mul_fields->add_many(more_records, true);
import_response = coll_mul_fields->add_many(more_records, document, true);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(2, import_response["num_imported"].get<int>());
@ -1387,7 +1387,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
more_records = {R"({"id": "1", "title": "Wake up, Harry"})",
R"({"id": "5", "points": 60})"};
import_response = coll_mul_fields->add_many(more_records, false);
import_response = coll_mul_fields->add_many(more_records, document, false);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(0, import_response["num_imported"].get<int>());
@ -1402,7 +1402,7 @@ TEST_F(CollectionTest, ImportDocumentsUpsert) {
"points":70,"starring":"Robin Williams","starring_facet":"Robin Williams",
"title":"Good Will Hunting"})"};
import_response = coll_mul_fields->add_many(more_records, true);
import_response = coll_mul_fields->add_many(more_records, document, true);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(1, import_response["num_imported"].get<int>());
@ -1434,8 +1434,8 @@ TEST_F(CollectionTest, ImportDocuments) {
}
// try importing records
nlohmann::json import_response = coll_mul_fields->add_many(import_records);
nlohmann::json document;
nlohmann::json import_response = coll_mul_fields->add_many(import_records, document);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(18, import_response["num_imported"].get<int>());
@ -1460,7 +1460,7 @@ TEST_F(CollectionTest, ImportDocuments) {
// verify that empty import is handled gracefully
std::vector<std::string> empty_records;
import_response = coll_mul_fields->add_many(empty_records);
import_response = coll_mul_fields->add_many(empty_records, document);
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(0, import_response["num_imported"].get<int>());
@ -1474,7 +1474,7 @@ TEST_F(CollectionTest, ImportDocuments) {
"{\"title\": \"Test4\", \"points\": 55, "
"\"cast\": [\"Tom Skerritt\"] }"};
import_response = coll_mul_fields->add_many(more_records);
import_response = coll_mul_fields->add_many(more_records, document);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(2, import_response["num_imported"].get<int>());
@ -1499,7 +1499,7 @@ TEST_F(CollectionTest, ImportDocuments) {
"{\"id\": \"id1\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, "
"\"cast\": [\"Tom Skerritt\"] }"};
import_response = coll_mul_fields->add_many(more_records);
import_response = coll_mul_fields->add_many(more_records, document);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(1, import_response["num_imported"].get<int>());
@ -1517,7 +1517,7 @@ TEST_F(CollectionTest, ImportDocuments) {
// valid JSON but not a document
more_records = {"[]"};
import_response = coll_mul_fields->add_many(more_records);
import_response = coll_mul_fields->add_many(more_records, document);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(0, import_response["num_imported"].get<int>());
@ -1531,7 +1531,7 @@ TEST_F(CollectionTest, ImportDocuments) {
// invalid JSON
more_records = {"{"};
import_response = coll_mul_fields->add_many(more_records);
import_response = coll_mul_fields->add_many(more_records, document);
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(0, import_response["num_imported"].get<int>());
@ -1870,7 +1870,7 @@ TEST_F(CollectionTest, IndexingWithBadData) {
sample_collection = collectionManager.create_collection("sample_collection", 4, fields, "age").get();
}
const Option<nlohmann::json> & search_fields_missing_op1 = sample_collection->add("{\"namezz\": \"foo\", \"age\": 29, \"average\": 78}");
const Option<nlohmann::json> & search_fields_missing_op1 = sample_collection->add("{\"name\": \"foo\", \"age\": 29, \"average\": 78}");
ASSERT_FALSE(search_fields_missing_op1.ok());
ASSERT_STREQ("Field `tags` has been declared in the schema, but is not found in the document.",
search_fields_missing_op1.error().c_str());