Refactor bulk indexing process.

This commit is contained in:
Kishore Nallan 2019-05-25 22:28:13 +05:30
parent 94b160e799
commit ba205222b7
7 changed files with 214 additions and 76 deletions

View File

@ -81,6 +81,8 @@ public:
~Collection();
long long int micros = 0;
static std::string get_next_seq_id_key(const std::string & collection_name);
static std::string get_meta_key(const std::string & collection_name);
@ -113,6 +115,8 @@ public:
std::string get_default_sorting_field();
Option<uint32_t> to_doc(const std::string & json_str, nlohmann::json & document);
Option<nlohmann::json> add(const std::string & json_str);
Option<nlohmann::json> add_many(const std::string & json_str);
@ -136,7 +140,8 @@ public:
Option<uint32_t> index_in_memory(const nlohmann::json & document, uint32_t seq_id);
Option<uint32_t> par_index_in_memory(const std::vector<std::vector<std::pair<uint32_t, std::string>>> & iter_batch);
void par_index_in_memory(std::vector<std::vector<index_record>> & iter_batch,
batch_index_result & result);
static void prune_document(nlohmann::json &document, const spp::sparse_hash_set<std::string> include_fields,
const spp::sparse_hash_set<std::string> exclude_fields);

View File

@ -49,7 +49,8 @@ public:
Option<bool> init(Store *store,
const size_t default_num_indices,
const std::string & auth_key,
const std::string & search_only_auth_key);
const std::string & search_only_auth_key,
const size_t init_batch_size=1000);
// frees in-memory data structures when server is shutdown - helps us run a memory leak detecter properly
void dispose();

View File

@ -53,6 +53,55 @@ struct search_args {
}
};
struct index_record {
size_t record_pos; // position of record in the original request
uint32_t seq_id;
std::string json_str;
nlohmann::json document;
index_record(size_t record_pos, uint32_t seq_id, const std::string & json_str, const nlohmann::json & doc):
record_pos(record_pos), seq_id(seq_id), json_str(json_str), document(doc) {
}
};
struct index_result {
index_record record;
Option<bool> index_op; // indicates if the indexing operation was a success
index_result(const index_record & record, const Option<bool> & index_op):
record(record), index_op(index_op) {
}
bool operator<(const index_result & a) const {
return record.record_pos < a.record.record_pos;
}
};
struct batch_index_result {
std::vector<index_result> items;
size_t num_indexed = 0;
batch_index_result() {
}
void failure(const index_record & record, const uint32_t err_code, const std::string & err_msg) {
Option<bool> index_op_failure(err_code, err_msg);
index_result res(record, index_op_failure);
items.push_back(res);
}
void success(const index_record & record) {
Option<bool> index_op_success(true);
index_result res(record, index_op_success);
items.push_back(res);
num_indexed++;
}
};
class Index {
private:
std::string name;
@ -168,8 +217,8 @@ public:
const std::unordered_map<std::string, field> & search_schema,
const std::unordered_map<std::string, field> & facet_schema);
static Option<uint32_t> batch_index(Index *index,
const std::vector<std::pair<uint32_t, std::string>> & iter_batch,
static batch_index_result batch_memory_index(Index *index,
std::vector<index_record> & iter_batch,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::unordered_map<std::string, field> & facet_schema);

View File

@ -87,13 +87,12 @@ void Collection::increment_next_seq_id_field() {
next_seq_id++;
}
Option<nlohmann::json> Collection::add(const std::string & json_str) {
nlohmann::json document;
Option<uint32_t> Collection::to_doc(const std::string & json_str, nlohmann::json & document) {
try {
document = nlohmann::json::parse(json_str);
} catch(const std::exception& e) {
LOG(ERR) << "JSON error: " << e.what();
return Option<nlohmann::json>(400, "Bad JSON.");
return Option<uint32_t>(400, "Bad JSON.");
}
uint32_t seq_id = get_next_seq_id();
@ -102,7 +101,7 @@ Option<nlohmann::json> Collection::add(const std::string & json_str) {
if(document.count("id") == 0) {
document["id"] = seq_id_str;
} else if(!document["id"].is_string()) {
return Option<nlohmann::json>(400, "Document's `id` field should be a string.");
return Option<uint32_t>(400, "Document's `id` field should be a string.");
}
std::string doc_id = document["id"];
@ -110,9 +109,23 @@ Option<nlohmann::json> Collection::add(const std::string & json_str) {
// we need to check if document ID already exists before attempting to index
if(doc_option.ok()) {
return Option<nlohmann::json>(409, std::string("A document with id ") + doc_id + " already exists.");
return Option<uint32_t>(409, std::string("A document with id ") + doc_id + " already exists.");
}
return Option<uint32_t>(seq_id);
}
Option<nlohmann::json> Collection::add(const std::string & json_str) {
nlohmann::json document;
Option<uint32_t> doc_seq_id_op = to_doc(json_str, document);
if(!doc_seq_id_op.ok()) {
return Option<nlohmann::json>(doc_seq_id_op.code(), doc_seq_id_op.error());
}
const uint32_t seq_id = doc_seq_id_op.get();
const std::string seq_id_str = std::to_string(seq_id);
const Option<uint32_t> & index_memory_op = index_in_memory(document, seq_id);
if(!index_memory_op.ok()) {
@ -139,36 +152,68 @@ Option<nlohmann::json> Collection::add_many(const std::string & json_lines_str)
return Option<nlohmann::json>(400, "The request body was empty. So, no records were imported.");
}
std::vector<Option<bool>> errors;
size_t record_num = 1;
size_t record_imported = 0;
std::vector<std::vector<index_record>> iter_batch;
batch_index_result result;
for(const std::string & json_line: json_lines) {
Option<nlohmann::json> op = add(json_line);
for(size_t i = 0; i < num_indices; i++) {
iter_batch.push_back(std::vector<index_record>());
}
if(!op.ok()) {
std::string err_msg = std::string("Error importing record in line number ") +
std::to_string(record_num) + ": " + op.error();
Option<bool> err = Option<bool>(op.code(), err_msg);
errors.push_back(err);
} else {
record_imported++;
for(size_t i=0; i < json_lines.size(); i++) {
const std::string & json_line = json_lines[i];
nlohmann::json document;
Option<uint32_t> doc_seq_id_op = to_doc(json_line, document);
if(!doc_seq_id_op.ok()) {
index_record record(i, 0, "", document);
result.failure(record, doc_seq_id_op.code(), doc_seq_id_op.error());
continue;
}
record_num++;
const uint32_t seq_id = doc_seq_id_op.get();
index_record record(i, seq_id, json_line, document);
iter_batch[seq_id % this->get_num_indices()].push_back(record);
}
par_index_in_memory(iter_batch, result);
std::sort(result.items.begin(), result.items.end());
// store documents only documents that were indexed in-memory successfully
for(index_result & item: result.items) {
if(item.index_op.ok()) {
rocksdb::WriteBatch batch;
const std::string seq_id_str = std::to_string(item.record.seq_id);
batch.Put(get_doc_id_key(item.record.document["id"]), seq_id_str);
batch.Put(get_seq_id_key(item.record.seq_id), item.record.document.dump());
bool write_ok = store->batch_write(batch);
if(!write_ok) {
Option<bool> index_op_failure(500, "Could not write to on-disk storage.");
item.index_op = index_op_failure;
}
}
}
nlohmann::json resp;
resp["ok"] = (errors.size() == 0);
resp["num_imported"] = record_imported;
resp["success"] = (result.num_indexed == json_lines.size());
resp["num_imported"] = result.num_indexed;
if(errors.size() != 0) {
resp["errors"] = nlohmann::json::array();
for(const Option<bool> & err: errors) {
nlohmann::json err_obj;
err_obj["message"] = err.error();
resp["errors"].push_back(err_obj);
resp["items"] = nlohmann::json::array();
for(const index_result & item: result.items) {
nlohmann::json item_obj;
if(!item.index_op.ok()) {
item_obj["error"] = item.index_op.error();
item_obj["success"] = false;
} else {
item_obj["success"] = true;
}
resp["items"].push_back(item_obj);
}
return Option<nlohmann::json>(resp);
@ -189,23 +234,24 @@ Option<uint32_t> Collection::index_in_memory(const nlohmann::json &document, uin
return Option<>(200);
}
Option<uint32_t> Collection::par_index_in_memory(const std::vector<std::vector<std::pair<uint32_t, std::string>>> & iter_batch) {
std::vector<std::future<Option<uint32_t>>> futures;
void Collection::par_index_in_memory(std::vector<std::vector<index_record>> & iter_batch,
batch_index_result & result) {
std::vector<std::future<batch_index_result>> futures;
for(size_t i=0; i < num_indices; i++) {
futures.push_back(
std::async(&Index::batch_index, indices[i], iter_batch[i], default_sorting_field,
std::async(&Index::batch_memory_index, indices[i], std::ref(iter_batch[i]), default_sorting_field,
search_schema, facet_schema)
);
}
for(size_t i=0; i < futures.size(); i++) {
Option<uint32_t> res = futures[i].get();
if(!res.ok()) {
return res;
}
batch_index_result future_res = futures[i].get();
result.items.insert(result.items.end(), future_res.items.begin(), future_res.items.end());
result.num_indexed += future_res.num_indexed;
num_documents += future_res.num_indexed;
}
return Option<uint32_t>(201);
}
void Collection::prune_document(nlohmann::json &document, const spp::sparse_hash_set<std::string> include_fields,

View File

@ -43,7 +43,8 @@ void CollectionManager::add_to_collections(Collection* collection) {
Option<bool> CollectionManager::init(Store *store,
const size_t default_num_indices,
const std::string & auth_key,
const std::string & search_only_auth_key) {
const std::string & search_only_auth_key,
const size_t init_batch_size) {
this->store = store;
this->auth_key = auth_key;
this->search_only_auth_key = search_only_auth_key;
@ -101,27 +102,36 @@ Option<bool> CollectionManager::init(Store *store,
const std::string seq_id_prefix = collection->get_seq_id_collection_prefix();
rocksdb::Iterator* iter = store->scan(seq_id_prefix);
std::vector<std::vector<std::pair<uint32_t, std::string>>> iter_batch;
std::vector<std::vector<index_record>> iter_batch;
for(size_t i = 0; i < collection->get_num_indices(); i++) {
iter_batch.push_back(std::vector<std::pair<uint32_t, std::string>>());
iter_batch.push_back(std::vector<index_record>());
}
while(iter->Valid() && iter->key().starts_with(seq_id_prefix)) {
const uint32_t seq_id = Collection::get_seq_id_key(iter->key().ToString());
nlohmann::json document;
Option<uint32_t> seq_id_doc_op = collection->to_doc(iter->value().ToString(), document);
if(!seq_id_doc_op.ok()) {
return Option<bool>(500, "Error while parsing document."); // FIXME: populate error
}
iter_batch[seq_id % collection->get_num_indices()].push_back(
std::make_pair(Collection::get_seq_id_key(iter->key().ToString()), iter->value().ToString())
index_record(0, seq_id, iter->value().ToString(), document)
);
if(iter_batch.size() == 1000) {
Option<uint32_t> res = collection->par_index_in_memory(iter_batch);
if(iter_batch.size() == init_batch_size) {
batch_index_result res;
collection->par_index_in_memory(iter_batch, res);
for(size_t i = 0; i < collection->get_num_indices(); i++) {
iter_batch[i].clear();
}
if(!res.ok()) {
if(res.num_indexed != iter_batch.size()) {
delete iter;
return Option<bool>(false, res.error());
return Option<bool>(false, "Error while loading records."); // FIXME: populate actual record error
}
}
@ -130,9 +140,11 @@ Option<bool> CollectionManager::init(Store *store,
delete iter;
Option<uint32_t> res = collection->par_index_in_memory(iter_batch);
if(!res.ok()) {
return Option<bool>(false, res.error());
batch_index_result res;
collection->par_index_in_memory(iter_batch, res);
if(res.num_indexed != iter_batch.size()) {
return Option<bool>(false, "Error while loading records."); // FIXME: populate actual record error
}
add_to_collections(collection);

View File

@ -268,37 +268,38 @@ Option<uint32_t> Index::validate_index_in_memory(const nlohmann::json &document,
return Option<>(200);
}
Option<uint32_t> Index::batch_index(Index *index, const std::vector<std::pair<uint32_t, std::string>> & iter_batch,
batch_index_result Index::batch_memory_index(Index *index, std::vector<index_record> & iter_batch,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::unordered_map<std::string, field> & facet_schema) {
for(auto & kv: iter_batch) {
uint32_t seq_id = kv.first;
nlohmann::json document;
try {
document = nlohmann::json::parse(kv.second);
} catch(...) {
return Option<uint32_t>(500, std::string("Error while parsing stored document with sequence ID: " +
std::to_string(seq_id)));
batch_index_result result;
for(auto & index_rec: iter_batch) {
if(index_rec.json_str.empty()) {
// indicates bad record (upstream validation failure)
continue;
}
Option<uint32_t> validation_op = validate_index_in_memory(document, seq_id, default_sorting_field,
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.document, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema);
if(!validation_op.ok()) {
std::string error_msg = std::string("Error validating document with ID: ") +
document["id"].get<std::string>() + " - " + validation_op.error();
return Option<>(validation_op.code(), error_msg);
result.failure(index_rec, validation_op.code(), validation_op.error());
continue;
}
Option<uint32_t> res = index->index_in_memory(document, seq_id, default_sorting_field);
if(!res.ok()) {
return res;
Option<uint32_t> index_mem_op = index->index_in_memory(index_rec.document, index_rec.seq_id, default_sorting_field);
if(!index_mem_op.ok()) {
result.failure(index_rec, index_mem_op.code(), index_mem_op.error());
continue;
}
result.success(index_rec);
}
return Option<>(201);
return result;
}
void Index::insert_doc(const uint32_t score, art_tree *t, uint32_t seq_id,

View File

@ -1133,7 +1133,7 @@ TEST_F(CollectionTest, ImportDocuments) {
Option<nlohmann::json> import_res = coll_mul_fields->add_many(import_records);
ASSERT_TRUE(import_res.ok());
nlohmann::json import_response = import_res.get();
ASSERT_TRUE(import_response["ok"].get<bool>());
ASSERT_TRUE(import_response["success"].get<bool>());
ASSERT_EQ(18, import_response["num_imported"].get<int>());
ASSERT_EQ(0, import_response.count("errors"));
@ -1162,7 +1162,7 @@ TEST_F(CollectionTest, ImportDocuments) {
ASSERT_STREQ("The request body was empty. So, no records were imported.", import_res.error().c_str());
// verify that only bad records are rejected, rest must be imported (records 2 and 4 are bad)
std::string more_records = std::string("{\"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, "
std::string more_records = std::string("{\"id\": \"id1\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, "
"\"cast\": [\"Tom Skerritt\"] }\n") +
"{\"title\": 123, \"starring\": \"Jazz Gosh\", \"points\": 23, "
"\"cast\": [\"Tom Skerritt\"] }\n" +
@ -1175,16 +1175,40 @@ TEST_F(CollectionTest, ImportDocuments) {
ASSERT_TRUE(import_res.ok());
import_response = import_res.get();
ASSERT_FALSE(import_response["ok"].get<bool>());
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(2, import_response["num_imported"].get<int>());
ASSERT_EQ(1, import_response.count("errors"));
ASSERT_EQ(2, import_response["errors"].size());
ASSERT_EQ(4, import_response["items"].size());
ASSERT_STREQ("Error importing record in line number 2: Field `title` must be a string.",
import_response["errors"][0]["message"].get<std::string>().c_str());
ASSERT_TRUE(import_response["items"][0]["success"].get<bool>());
ASSERT_FALSE(import_response["items"][1]["success"].get<bool>());
ASSERT_TRUE(import_response["items"][2]["success"].get<bool>());
ASSERT_FALSE(import_response["items"][3]["success"].get<bool>());
ASSERT_STREQ("Error importing record in line number 4: Field `starring` has been declared in the schema, but is not "
"found in the document.", import_response["errors"][1]["message"].get<std::string>().c_str());
ASSERT_STREQ("Field `title` must be a string.", import_response["items"][1]["error"].get<std::string>().c_str());
ASSERT_STREQ("Field `starring` has been declared in the schema, but is not found in the document.",
import_response["items"][3]["error"].get<std::string>().c_str());
// record with duplicate IDs
more_records = std::string("{\"id\": \"id1\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, "
"\"cast\": [\"Tom Skerritt\"] }\n") +
"{\"id\": \"id2\", \"title\": \"Test1\", \"starring\": \"Rand Fish\", \"points\": 12, "
"\"cast\": [\"Tom Skerritt\"] }";
import_res = coll_mul_fields->add_many(more_records);
ASSERT_TRUE(import_res.ok());
import_response = import_res.get();
ASSERT_FALSE(import_response["success"].get<bool>());
ASSERT_EQ(1, import_response["num_imported"].get<int>());
std::cout << import_response << std::endl;
ASSERT_FALSE(import_response["items"][0]["success"].get<bool>());
ASSERT_TRUE(import_response["items"][1]["success"].get<bool>());
ASSERT_STREQ("A document with id id1 already exists.", import_response["items"][0]["error"].get<std::string>().c_str());
collectionManager.drop_collection("coll_mul_fields");
}