Merge pull request #53 from typesense/concurrent_init_loading

Load the indices parallely from disk.
This commit is contained in:
Kishore Nallan 2019-02-09 18:54:14 +05:30 committed by GitHub
commit 365c6a4cdc
6 changed files with 263 additions and 162 deletions

View File

@ -68,8 +68,6 @@ private:
std::string get_seq_id_key(uint32_t seq_id);
Option<uint32_t> validate_index_in_memory(const nlohmann::json &document, uint32_t seq_id);
void highlight_result(const field &search_field, const std::vector<std::vector<art_leaf *>> &searched_queries,
const Topster<512>::KV &field_order_kv, const nlohmann::json &document,
StringUtils & string_utils, highlight_t &highlight);
@ -130,8 +128,14 @@ public:
Option<std::string> remove(const std::string & id, const bool remove_from_store = true);
size_t get_num_indices();
static uint32_t get_seq_id_key(const std::string & key);
Option<uint32_t> index_in_memory(const nlohmann::json & document, uint32_t seq_id);
Option<uint32_t> par_index_in_memory(const std::vector<std::vector<std::pair<uint32_t, std::string>>> & iter_batch);
static void prune_document(nlohmann::json &document, const spp::sparse_hash_set<std::string> include_fields,
const spp::sparse_hash_set<std::string> exclude_fields);

View File

@ -158,7 +158,21 @@ public:
const uint32_t total_cost, Topster<512> &topster, const std::vector<art_leaf *> & query_suggestion,
const uint32_t *result_ids, const size_t result_size) const;
Option<uint32_t> index_in_memory(const nlohmann::json & document, uint32_t seq_id, int32_t points);
static int32_t get_points_from_doc(const nlohmann::json &document, const std::string & default_sorting_field);
Option<uint32_t> index_in_memory(const nlohmann::json & document, uint32_t seq_id,
const std::string & default_sorting_field);
static Option<uint32_t> validate_index_in_memory(const nlohmann::json &document, uint32_t seq_id,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::unordered_map<std::string, field> & facet_schema);
static Option<uint32_t> batch_index(Index *index,
const std::vector<std::pair<uint32_t, std::string>> & iter_batch,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::unordered_map<std::string, field> & facet_schema);
// for limiting number of results on multiple candidates / query rewrites
enum {SEARCH_LIMIT_NUM = 100};

View File

@ -7,6 +7,7 @@
#include <string_utils.h>
#include <art.h>
#include <thread>
#include <future>
#include <chrono>
#include <rocksdb/write_batch.h>
#include "topster.h"
@ -130,156 +131,40 @@ Option<nlohmann::json> Collection::add(const std::string & json_str) {
return Option<nlohmann::json>(document);
}
Option<uint32_t> Collection::validate_index_in_memory(const nlohmann::json &document, uint32_t seq_id) {
if(document.count(default_sorting_field) == 0) {
return Option<>(400, "Field `" + default_sorting_field + "` has been declared as a default sorting field, "
"but is not found in the document.");
}
if(!document[default_sorting_field].is_number_integer() && !document[default_sorting_field].is_number_float()) {
return Option<>(400, "Default sorting field `" + default_sorting_field + "` must be of type int32 or float.");
}
if(document[default_sorting_field].is_number_integer() &&
document[default_sorting_field].get<int64_t>() > std::numeric_limits<int32_t>::max()) {
return Option<>(400, "Default sorting field `" + default_sorting_field + "` exceeds maximum value of an int32.");
}
if(document[default_sorting_field].is_number_float() &&
document[default_sorting_field].get<float>() > std::numeric_limits<float>::max()) {
return Option<>(400, "Default sorting field `" + default_sorting_field + "` exceeds maximum value of a float.");
}
for(const std::pair<std::string, field> & field_pair: search_schema) {
const std::string & field_name = field_pair.first;
if(document.count(field_name) == 0) {
return Option<>(400, "Field `" + field_name + "` has been declared in the schema, "
"but is not found in the document.");
}
if(field_pair.second.type == field_types::STRING) {
if(!document[field_name].is_string()) {
return Option<>(400, "Field `" + field_name + "` must be a string.");
}
} else if(field_pair.second.type == field_types::INT32) {
if(!document[field_name].is_number_integer()) {
return Option<>(400, "Field `" + field_name + "` must be an int32.");
}
if(document[field_name].get<int64_t>() > INT32_MAX) {
return Option<>(400, "Field `" + field_name + "` exceeds maximum value of int32.");
}
} else if(field_pair.second.type == field_types::INT64) {
if(!document[field_name].is_number_integer()) {
return Option<>(400, "Field `" + field_name + "` must be an int64.");
}
} else if(field_pair.second.type == field_types::FLOAT) {
if(!document[field_name].is_number()) { // allows integer to be passed to a float field
return Option<>(400, "Field `" + field_name + "` must be a float.");
}
} else if(field_pair.second.type == field_types::BOOL) {
if(!document[field_name].is_boolean()) {
return Option<>(400, "Field `" + field_name + "` must be a bool.");
}
} else if(field_pair.second.type == field_types::STRING_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be a string array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_string()) {
return Option<>(400, "Field `" + field_name + "` must be a string array.");
}
} else if(field_pair.second.type == field_types::INT32_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be an int32 array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_number_integer()) {
return Option<>(400, "Field `" + field_name + "` must be an int32 array.");
}
} else if(field_pair.second.type == field_types::INT64_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be an int64 array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_number_integer()) {
return Option<>(400, "Field `" + field_name + "` must be an int64 array.");
}
} else if(field_pair.second.type == field_types::FLOAT_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be a float array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_number_float()) {
return Option<>(400, "Field `" + field_name + "` must be a float array.");
}
} else if(field_pair.second.type == field_types::BOOL_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be a bool array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_boolean()) {
return Option<>(400, "Field `" + field_name + "` must be a bool array.");
}
}
}
for(const std::pair<std::string, field> & field_pair: facet_schema) {
const std::string & field_name = field_pair.first;
if(document.count(field_name) == 0) {
return Option<>(400, "Field `" + field_name + "` has been declared as a facet field in the schema, "
"but is not found in the document.");
}
if(field_pair.second.type == field_types::STRING) {
if(!document[field_name].is_string()) {
return Option<>(400, "Facet field `" + field_name + "` must be a string.");
}
} else if(field_pair.second.type == field_types::STRING_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Facet field `" + field_name + "` must be a string array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_string()) {
return Option<>(400, "Facet field `" + field_name + "` must be a string array.");
}
} else {
return Option<>(400, "Facet field `" + field_name + "` must be a string or a string[].");
}
}
return Option<>(200);
}
Option<uint32_t> Collection::index_in_memory(const nlohmann::json &document, uint32_t seq_id) {
Option<uint32_t> validation_op = validate_index_in_memory(document, seq_id);
Option<uint32_t> validation_op = Index::validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema);
if(!validation_op.ok()) {
return validation_op;
}
int32_t points = 0;
if(!default_sorting_field.empty()) {
if(document[default_sorting_field].is_number_float()) {
// serialize float to an integer and reverse the inverted range
float n = document[default_sorting_field];
memcpy(&points, &n, sizeof(int32_t));
points ^= ((points >> (std::numeric_limits<int32_t>::digits - 1)) | INT32_MIN);
points = -1 * (INT32_MAX - points);
} else {
points = document[default_sorting_field];
}
}
Index* index = indices[seq_id % num_indices];
index->index_in_memory(document, seq_id, points);
index->index_in_memory(document, seq_id, default_sorting_field);
num_documents += 1;
return Option<>(200);
}
Option<uint32_t> Collection::par_index_in_memory(const std::vector<std::vector<std::pair<uint32_t, std::string>>> & iter_batch) {
std::vector<std::future<Option<uint32_t>>> futures;
for(size_t i=0; i < num_indices; i++) {
futures.push_back(
std::async(&Index::batch_index, indices[i], iter_batch[i], default_sorting_field,
search_schema, facet_schema)
);
}
for(size_t i=0; i < futures.size(); i++) {
Option<uint32_t> res = futures[i].get();
if(!res.ok()) {
return res;
}
}
return Option<uint32_t>(201);
}
void Collection::prune_document(nlohmann::json &document, const spp::sparse_hash_set<std::string> include_fields,
const spp::sparse_hash_set<std::string> exclude_fields) {
auto it = document.begin();
@ -850,6 +735,16 @@ Option<std::string> Collection::remove(const std::string & id, const bool remove
return Option<std::string>(id);
}
size_t Collection::get_num_indices() {
return num_indices;
}
uint32_t Collection::get_seq_id_key(const std::string & key) {
// last 4 bytes of the key would be the serialized version of the sequence id
std::string serialized_seq_id = key.substr(key.length() - 4);
return StringUtils::deserialize_uint32_t(serialized_seq_id);
}
std::string Collection::get_next_seq_id_key(const std::string & collection_name) {
return std::string(COLLECTION_NEXT_SEQ_PREFIX) + "_" + collection_name;
}

View File

@ -2,6 +2,7 @@
#include <vector>
#include <json.hpp>
#include "collection_manager.h"
#include "logger.h"
CollectionManager::CollectionManager() {
@ -89,37 +90,47 @@ Option<bool> CollectionManager::init(Store *store, const std::string & auth_key,
Collection* collection = init_collection(collection_meta, collection_next_seq_id);
LOG(INFO) << "Loading collection " << collection->get_name() << std::endl;
// Fetch records from the store and re-create memory index
std::vector<std::string> documents;
const std::string seq_id_prefix = collection->get_seq_id_collection_prefix();
rocksdb::Iterator* iter = store->scan(seq_id_prefix);
std::vector<std::vector<std::pair<uint32_t, std::string>>> iter_batch;
for(size_t i = 0; i < collection->get_num_indices(); i++) {
iter_batch.push_back(std::vector<std::pair<uint32_t, std::string>>());
}
while(iter->Valid() && iter->key().starts_with(seq_id_prefix)) {
const std::string doc_json_str = iter->value().ToString();
const uint32_t seq_id = Collection::get_seq_id_key(iter->key().ToString());
iter_batch[seq_id % collection->get_num_indices()].push_back(
std::make_pair(Collection::get_seq_id_key(iter->key().ToString()), iter->value().ToString())
);
nlohmann::json document;
try {
document = nlohmann::json::parse(doc_json_str);
} catch(...) {
delete iter;
return Option<bool>(500, std::string("Error while parsing stored document from collection " +
collection->get_name() + " with key: ") + iter->key().ToString());
if(iter_batch.size() == 1000) {
Option<uint32_t> res = collection->par_index_in_memory(iter_batch);
for(size_t i = 0; i < collection->get_num_indices(); i++) {
iter_batch[i].clear();
}
if(!res.ok()) {
delete iter;
return Option<bool>(false, res.error());
}
}
Option<uint32_t> seq_id_op = collection->doc_id_to_seq_id(document["id"]);
if(!seq_id_op.ok()) {
delete iter;
return Option<bool>(500, std::string("Error while fetching sequence id of document id " +
document["id"].get<std::string>() + " in collection `" +
collection->get_name() + "`"));
}
collection->index_in_memory(document, seq_id_op.get());
iter->Next();
}
delete iter;
Option<uint32_t> res = collection->par_index_in_memory(iter_batch);
if(!res.ok()) {
return Option<bool>(false, res.error());
}
add_to_collections(collection);
}

View File

@ -53,7 +53,28 @@ Index::~Index() {
sort_index.clear();
}
Option<uint32_t> Index::index_in_memory(const nlohmann::json &document, uint32_t seq_id, int32_t points) {
int32_t Index::get_points_from_doc(const nlohmann::json &document, const std::string & default_sorting_field) {
int32_t points = 0;
if(!default_sorting_field.empty()) {
if(document[default_sorting_field].is_number_float()) {
// serialize float to an integer and reverse the inverted range
float n = document[default_sorting_field];
memcpy(&points, &n, sizeof(int32_t));
points ^= ((points >> (std::numeric_limits<int32_t>::digits - 1)) | INT32_MIN);
points = -1 * (INT32_MAX - points);
} else {
points = document[default_sorting_field];
}
}
return points;
}
Option<uint32_t> Index::index_in_memory(const nlohmann::json &document, uint32_t seq_id,
const std::string & default_sorting_field) {
int32_t points = get_points_from_doc(document, default_sorting_field);
// assumes that validation has already been done
for(const std::pair<std::string, field> & field_pair: search_schema) {
const std::string & field_name = field_pair.first;
@ -119,9 +140,167 @@ Option<uint32_t> Index::index_in_memory(const nlohmann::json &document, uint32_t
}
num_documents += 1;
return Option<>(201);
}
Option<uint32_t> Index::validate_index_in_memory(const nlohmann::json &document, uint32_t seq_id,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::unordered_map<std::string, field> & facet_schema) {
if(document.count(default_sorting_field) == 0) {
return Option<>(400, "Field `" + default_sorting_field + "` has been declared as a default sorting field, "
"but is not found in the document.");
}
if(!document[default_sorting_field].is_number_integer() && !document[default_sorting_field].is_number_float()) {
return Option<>(400, "Default sorting field `" + default_sorting_field + "` must be of type int32 or float.");
}
if(document[default_sorting_field].is_number_integer() &&
document[default_sorting_field].get<int64_t>() > std::numeric_limits<int32_t>::max()) {
return Option<>(400, "Default sorting field `" + default_sorting_field + "` exceeds maximum value of an int32.");
}
if(document[default_sorting_field].is_number_float() &&
document[default_sorting_field].get<float>() > std::numeric_limits<float>::max()) {
return Option<>(400, "Default sorting field `" + default_sorting_field + "` exceeds maximum value of a float.");
}
for(const std::pair<std::string, field> & field_pair: search_schema) {
const std::string & field_name = field_pair.first;
if(document.count(field_name) == 0) {
return Option<>(400, "Field `" + field_name + "` has been declared in the schema, "
"but is not found in the document.");
}
if(field_pair.second.type == field_types::STRING) {
if(!document[field_name].is_string()) {
return Option<>(400, "Field `" + field_name + "` must be a string.");
}
} else if(field_pair.second.type == field_types::INT32) {
if(!document[field_name].is_number_integer()) {
return Option<>(400, "Field `" + field_name + "` must be an int32.");
}
if(document[field_name].get<int64_t>() > INT32_MAX) {
return Option<>(400, "Field `" + field_name + "` exceeds maximum value of int32.");
}
} else if(field_pair.second.type == field_types::INT64) {
if(!document[field_name].is_number_integer()) {
return Option<>(400, "Field `" + field_name + "` must be an int64.");
}
} else if(field_pair.second.type == field_types::FLOAT) {
if(!document[field_name].is_number()) { // allows integer to be passed to a float field
return Option<>(400, "Field `" + field_name + "` must be a float.");
}
} else if(field_pair.second.type == field_types::BOOL) {
if(!document[field_name].is_boolean()) {
return Option<>(400, "Field `" + field_name + "` must be a bool.");
}
} else if(field_pair.second.type == field_types::STRING_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be a string array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_string()) {
return Option<>(400, "Field `" + field_name + "` must be a string array.");
}
} else if(field_pair.second.type == field_types::INT32_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be an int32 array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_number_integer()) {
return Option<>(400, "Field `" + field_name + "` must be an int32 array.");
}
} else if(field_pair.second.type == field_types::INT64_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be an int64 array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_number_integer()) {
return Option<>(400, "Field `" + field_name + "` must be an int64 array.");
}
} else if(field_pair.second.type == field_types::FLOAT_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be a float array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_number_float()) {
return Option<>(400, "Field `" + field_name + "` must be a float array.");
}
} else if(field_pair.second.type == field_types::BOOL_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Field `" + field_name + "` must be a bool array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_boolean()) {
return Option<>(400, "Field `" + field_name + "` must be a bool array.");
}
}
}
for(const std::pair<std::string, field> & field_pair: facet_schema) {
const std::string & field_name = field_pair.first;
if(document.count(field_name) == 0) {
return Option<>(400, "Field `" + field_name + "` has been declared as a facet field in the schema, "
"but is not found in the document.");
}
if(field_pair.second.type == field_types::STRING) {
if(!document[field_name].is_string()) {
return Option<>(400, "Facet field `" + field_name + "` must be a string.");
}
} else if(field_pair.second.type == field_types::STRING_ARRAY) {
if(!document[field_name].is_array()) {
return Option<>(400, "Facet field `" + field_name + "` must be a string array.");
}
if(document[field_name].size() > 0 && !document[field_name][0].is_string()) {
return Option<>(400, "Facet field `" + field_name + "` must be a string array.");
}
} else {
return Option<>(400, "Facet field `" + field_name + "` must be a string or a string[].");
}
}
return Option<>(200);
}
Option<uint32_t> Index::batch_index(Index *index, const std::vector<std::pair<uint32_t, std::string>> & iter_batch,
const std::string & default_sorting_field,
const std::unordered_map<std::string, field> & search_schema,
const std::unordered_map<std::string, field> & facet_schema) {
for(auto & kv: iter_batch) {
uint32_t seq_id = kv.first;
nlohmann::json document;
try {
document = nlohmann::json::parse(kv.second);
} catch(...) {
return Option<uint32_t>(500, std::string("Error while parsing stored document with sequence ID: " +
std::to_string(seq_id)));
}
Option<uint32_t> validation_op = validate_index_in_memory(document, seq_id, default_sorting_field,
search_schema, facet_schema);
if(!validation_op.ok()) {
std::string error_msg = std::string("Error validating document with ID: ") +
document["id"].get<std::string>() + " - " + validation_op.error();
return Option<>(validation_op.code(), error_msg);
}
Option<uint32_t> res = index->index_in_memory(document, seq_id, default_sorting_field);
if(!res.ok()) {
return res;
}
}
return Option<>(201);
}
void Index::index_int32_field(const int32_t value, uint32_t score, art_tree *t, uint32_t seq_id) const {
const int KEY_LEN = 8;
unsigned char key[KEY_LEN];

View File

@ -163,9 +163,7 @@ void Replicator::on_replication_event(void *data) {
Collection* collection = collection_manager.get_collection_with_id(std::stoi(parts[0]));
nlohmann::json document = nlohmann::json::parse(replication_event->value);
// last 4 bytes of the key would be the serialized version of the sequence id
std::string serialized_seq_id = replication_event->key.substr(replication_event->key.length() - 4);
uint32_t seq_id = StringUtils::deserialize_uint32_t(serialized_seq_id);
uint32_t seq_id = Collection::get_seq_id_key(replication_event->key);
collection->index_in_memory(document, seq_id);
}