Allow indexing of fields without pre-defined schema.

# Conflicts:
#	include/collection.h
#	include/index.h
#	src/collection.cpp
#	src/collection_manager.cpp
This commit is contained in:
kishorenc 2021-02-17 21:59:18 +05:30
parent df32c32497
commit e9df6e58e2
10 changed files with 319 additions and 61 deletions

View File

@ -302,7 +302,7 @@ private:
Store* store;
const std::vector<field> fields;
std::vector<field> fields;
std::unordered_map<std::string, field> search_schema;
@ -323,6 +323,8 @@ private:
const std::vector<Index*> indices;
const bool index_all_fields;
// methods
std::string get_doc_id_key(const std::string & doc_id) const;
@ -399,13 +401,22 @@ public:
static constexpr const char* SEQ_ID_PREFIX = "$SI";
static constexpr const char* DOC_ID_PREFIX = "$DI";
static constexpr const char* COLLECTION_NAME_KEY = "name";
static constexpr const char* COLLECTION_ID_KEY = "id";
static constexpr const char* COLLECTION_SEARCH_FIELDS_KEY = "fields";
static constexpr const char* COLLECTION_DEFAULT_SORTING_FIELD_KEY = "default_sorting_field";
static constexpr const char* COLLECTION_CREATED = "created_at";
static constexpr const char* COLLECTION_NUM_MEMORY_SHARDS = "num_memory_shards";
static constexpr const char* COLLECTION_INDEX_ALL_FIELDS = "index_all_fields";
// methods
Collection() = delete;
Collection(const std::string& name, const uint32_t collection_id, const uint64_t created_at,
const uint32_t next_seq_id, Store *store, const std::vector<field>& fields,
const std::string& default_sorting_field, const size_t num_memory_shards, const float max_memory_ratio);
const std::string& default_sorting_field, const size_t num_memory_shards,
const float max_memory_ratio, const bool index_all_fields);
~Collection();
@ -516,6 +527,9 @@ public:
Option<bool> remove_if_found(uint32_t seq_id, bool remove_from_store = true);
bool facet_value_to_string(const facet &a_facet, const facet_count_t &facet_count, const nlohmann::json &document,
std::string &value);
size_t get_num_memory_shards();
size_t get_num_documents() const;

View File

@ -67,13 +67,6 @@ private:
// Using a ID instead of a collection's name makes renaming possible
std::atomic<uint32_t> next_collection_id;
static constexpr const char* COLLECTION_NAME_KEY = "name";
static constexpr const char* COLLECTION_ID_KEY = "id";
static constexpr const char* COLLECTION_SEARCH_FIELDS_KEY = "fields";
static constexpr const char* COLLECTION_DEFAULT_SORTING_FIELD_KEY = "default_sorting_field";
static constexpr const char* COLLECTION_CREATED = "created_at";
static constexpr const char* COLLECTION_NUM_MEMORY_SHARDS = "num_memory_shards";
std::string bootstrap_auth_key;
float max_memory_ratio;
@ -135,7 +128,8 @@ public:
Option<Collection*> create_collection(const std::string& name, const size_t num_memory_shards,
const std::vector<field> & fields,
const std::string & default_sorting_field,
const uint64_t created_at = static_cast<uint64_t>(std::time(nullptr)));
const uint64_t created_at = static_cast<uint64_t>(std::time(nullptr)),
const bool index_all_fields = false);
locked_resource_view_t<Collection> get_collection(const std::string & collection_name) const;

View File

@ -4,6 +4,7 @@
#include "art.h"
#include "option.h"
#include "string_utils.h"
#include "json.hpp"
namespace field_types {
static const std::string STRING = "string";
@ -111,6 +112,89 @@ struct field {
std::string faceted_name() const {
return (facet && !is_string()) ? "_fstr_" + name : name;
}
static bool get_type(const nlohmann::json& obj, std::string& field_type) {
if(obj.is_array()) {
if(obj.empty() || obj[0].is_array()) {
return false;
}
bool parseable = get_single_type(obj[0], field_type);
if(!parseable) {
return false;
}
field_type = field_type + "[]";
return true;
}
if(obj.is_object()) {
return false;
}
return get_single_type(obj, field_type);
}
static bool get_single_type(const nlohmann::json& obj, std::string& field_type) {
if(obj.is_string()) {
field_type = field_types::STRING;
return true;
}
if(obj.is_number_float()) {
field_type = field_types::FLOAT;
return true;
}
if(obj.is_number_integer()) {
field_type = field_types::INT64;
return true;
}
if(obj.is_boolean()) {
field_type = field_types::BOOL;
return true;
}
return false;
}
static Option<bool> fields_to_json_fields(const std::vector<field> & fields,
const std::string & default_sorting_field, nlohmann::json& fields_json,
bool& found_default_sorting_field) {
for(const field & field: fields) {
nlohmann::json field_val;
field_val[fields::name] = field.name;
field_val[fields::type] = field.type;
field_val[fields::facet] = field.facet;
field_val[fields::optional] = field.optional;
fields_json.push_back(field_val);
if(!field.has_valid_type()) {
return Option<bool>(400, "Field `" + field.name +
"` has an invalid data type `" + field.type +
"`, see docs for supported data types.");
}
if(field.name == default_sorting_field && !(field.type == field_types::INT32 ||
field.type == field_types::INT64 ||
field.type == field_types::FLOAT)) {
return Option<bool>(400, "Default sorting field `" + default_sorting_field +
"` must be a single valued numerical field.");
}
if(field.name == default_sorting_field) {
if(field.optional) {
return Option<bool>(400, "Default sorting field `" + default_sorting_field +
"` cannot be an optional field.");
}
found_default_sorting_field = true;
}
}
return Option<bool>(true);
}
};
struct filter {

View File

@ -354,5 +354,7 @@ public:
const std::map<std::string, field> & facet_schema,
bool is_update);
void refresh_search_schema(const std::unordered_map<std::string, field>& src_search_schema);
};

View File

@ -40,13 +40,13 @@ struct match_index_t {
Collection::Collection(const std::string& name, const uint32_t collection_id, const uint64_t created_at,
const uint32_t next_seq_id, Store *store, const std::vector<field> &fields,
const std::string& default_sorting_field, const size_t num_memory_shards,
const float max_memory_ratio):
const float max_memory_ratio, const bool index_all_fields):
name(name), collection_id(collection_id), created_at(created_at),
next_seq_id(next_seq_id), store(store),
fields(fields), default_sorting_field(default_sorting_field),
num_memory_shards(num_memory_shards),
max_memory_ratio(max_memory_ratio),
indices(init_indices()) {
indices(init_indices()), index_all_fields(index_all_fields) {
this->num_documents = 0;
}
@ -234,6 +234,67 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines, nlohma
get_document_from_store(get_seq_id_key(seq_id), record.old_doc);
get_doc_changes(document, record.old_doc, record.new_doc, record.del_doc);
}
// if `index_all_fields` is enabled, we will have to update schema first before indexing
if(index_all_fields) {
size_t old_field_count = fields.size();
for(const auto& kv: document.items()) {
if (search_schema.count(kv.key()) == 0) {
std::string field_type;
bool parseable = field::get_type(kv.value(), field_type);
if (parseable) {
// this ensures that we can handle the same field having different types in different docs
const std::string fname = kv.key();// + "_" + field_type;
field new_field(fname, field_type, false);
search_schema.emplace(fname, new_field);
fields.emplace_back(new_field);
}
}
}
if(fields.size() != old_field_count) {
// we should persist changes to fields in store
std::string coll_meta_json;
StoreStatus status = store->get(Collection::get_meta_key(name), coll_meta_json);
if(status != StoreStatus::FOUND) {
record.index_failure(500, "Could not fetch collection meta from store.");
continue;
}
nlohmann::json collection_meta;
try {
collection_meta = nlohmann::json::parse(coll_meta_json);
bool found_default_sorting_field = false;
nlohmann::json fields_json = nlohmann::json::array();;
Option<bool> fields_json_op = field::fields_to_json_fields(fields, default_sorting_field, fields_json,
found_default_sorting_field);
if(!fields_json_op.ok()) {
record.index_failure(fields_json_op.code(), fields_json_op.error());
continue;
}
collection_meta[COLLECTION_SEARCH_FIELDS_KEY] = fields_json;
bool persisted = store->insert(Collection::get_meta_key(name), collection_meta.dump());
if(!persisted) {
record.index_failure(500, "Could not persist collection meta to store.");
continue;
}
for(auto index: indices) {
index->refresh_search_schema(search_schema);
}
} catch(...) {
record.index_failure(500, "Unable to parse collection meta.");
continue;
}
}
}
}
/*

View File

@ -4,7 +4,6 @@
#include "collection_manager.h"
#include "logger.h"
constexpr const char* CollectionManager::COLLECTION_NUM_MEMORY_SHARDS;
constexpr const size_t CollectionManager::DEFAULT_NUM_MEMORY_SHARDS;
CollectionManager::CollectionManager() {
@ -13,10 +12,10 @@ CollectionManager::CollectionManager() {
Collection* CollectionManager::init_collection(const nlohmann::json & collection_meta,
const uint32_t collection_next_seq_id) {
std::string this_collection_name = collection_meta[COLLECTION_NAME_KEY].get<std::string>();
std::string this_collection_name = collection_meta[Collection::COLLECTION_NAME_KEY].get<std::string>();
std::vector<field> fields;
nlohmann::json fields_map = collection_meta[COLLECTION_SEARCH_FIELDS_KEY];
nlohmann::json fields_map = collection_meta[Collection::COLLECTION_SEARCH_FIELDS_KEY];
for (nlohmann::json::iterator it = fields_map.begin(); it != fields_map.end(); ++it) {
nlohmann::json & field_obj = it.value();
@ -30,25 +29,31 @@ Collection* CollectionManager::init_collection(const nlohmann::json & collection
field_obj[fields::facet], field_obj[fields::optional]});
}
std::string default_sorting_field = collection_meta[COLLECTION_DEFAULT_SORTING_FIELD_KEY].get<std::string>();
uint64_t created_at = collection_meta.find((const char*)COLLECTION_CREATED) != collection_meta.end() ?
collection_meta[COLLECTION_CREATED].get<uint64_t>() : 0;
std::string default_sorting_field = collection_meta[Collection::COLLECTION_DEFAULT_SORTING_FIELD_KEY].get<std::string>();
size_t num_memory_shards = collection_meta.count(COLLECTION_NUM_MEMORY_SHARDS) != 0 ?
collection_meta[COLLECTION_NUM_MEMORY_SHARDS].get<size_t>() :
uint64_t created_at = collection_meta.find((const char*)Collection::COLLECTION_CREATED) != collection_meta.end() ?
collection_meta[Collection::COLLECTION_CREATED].get<uint64_t>() : 0;
size_t num_memory_shards = collection_meta.count(Collection::COLLECTION_NUM_MEMORY_SHARDS) != 0 ?
collection_meta[Collection::COLLECTION_NUM_MEMORY_SHARDS].get<size_t>() :
DEFAULT_NUM_MEMORY_SHARDS;
size_t index_all_fields = collection_meta.count(Collection::COLLECTION_INDEX_ALL_FIELDS) != 0 ?
collection_meta[Collection::COLLECTION_INDEX_ALL_FIELDS].get<bool>() :
false;
LOG(INFO) << "Found collection " << this_collection_name << " with " << num_memory_shards << " memory shards.";
Collection* collection = new Collection(this_collection_name,
collection_meta[COLLECTION_ID_KEY].get<uint32_t>(),
collection_meta[Collection::COLLECTION_ID_KEY].get<uint32_t>(),
created_at,
collection_next_seq_id,
store,
fields,
default_sorting_field,
num_memory_shards,
max_memory_ratio);
max_memory_ratio,
index_all_fields);
return collection;
}
@ -111,7 +116,7 @@ Option<bool> CollectionManager::load(const size_t init_batch_size) {
return Option<bool>(500, "Error while parsing collection meta.");
}
const std::string & this_collection_name = collection_meta[COLLECTION_NAME_KEY].get<std::string>();
const std::string & this_collection_name = collection_meta[Collection::COLLECTION_NAME_KEY].get<std::string>();
std::string collection_next_seq_id_str;
StoreStatus next_seq_id_status = store->get(Collection::get_next_seq_id_key(this_collection_name),
collection_next_seq_id_str);
@ -296,7 +301,8 @@ Option<Collection*> CollectionManager::create_collection(const std::string& name
const size_t num_memory_shards,
const std::vector<field> & fields,
const std::string & default_sorting_field,
const uint64_t created_at) {
const uint64_t created_at,
const bool index_all_fields) {
std::unique_lock lock(mutex);
if(store->contains(Collection::get_meta_key(name))) {
@ -306,35 +312,11 @@ Option<Collection*> CollectionManager::create_collection(const std::string& name
bool found_default_sorting_field = false;
nlohmann::json fields_json = nlohmann::json::array();;
for(const field & field: fields) {
nlohmann::json field_val;
field_val[fields::name] = field.name;
field_val[fields::type] = field.type;
field_val[fields::facet] = field.facet;
field_val[fields::optional] = field.optional;
fields_json.push_back(field_val);
Option<bool> fields_json_op = field::fields_to_json_fields(fields, default_sorting_field, fields_json,
found_default_sorting_field);
if(!field.has_valid_type()) {
return Option<Collection*>(400, "Field `" + field.name +
"` has an invalid data type `" + field.type +
"`, see docs for supported data types.");
}
if(field.name == default_sorting_field && !(field.type == field_types::INT32 ||
field.type == field_types::INT64 ||
field.type == field_types::FLOAT)) {
return Option<Collection*>(400, "Default sorting field `" + default_sorting_field +
"` must be a single valued numerical field.");
}
if(field.name == default_sorting_field) {
if(field.optional) {
return Option<Collection*>(400, "Default sorting field `" + default_sorting_field +
"` cannot be an optional field.");
}
found_default_sorting_field = true;
}
if(!fields_json_op.ok()) {
return Option<Collection*>(fields_json_op.code(), fields_json_op.error());
}
if(!found_default_sorting_field) {
@ -343,16 +325,17 @@ Option<Collection*> CollectionManager::create_collection(const std::string& name
}
nlohmann::json collection_meta;
collection_meta[COLLECTION_NAME_KEY] = name;
collection_meta[COLLECTION_ID_KEY] = next_collection_id.load();
collection_meta[COLLECTION_SEARCH_FIELDS_KEY] = fields_json;
collection_meta[COLLECTION_DEFAULT_SORTING_FIELD_KEY] = default_sorting_field;
collection_meta[COLLECTION_CREATED] = created_at;
collection_meta[COLLECTION_NUM_MEMORY_SHARDS] = num_memory_shards;
collection_meta[Collection::COLLECTION_NAME_KEY] = name;
collection_meta[Collection::COLLECTION_ID_KEY] = next_collection_id.load();
collection_meta[Collection::COLLECTION_SEARCH_FIELDS_KEY] = fields_json;
collection_meta[Collection::COLLECTION_DEFAULT_SORTING_FIELD_KEY] = default_sorting_field;
collection_meta[Collection::COLLECTION_CREATED] = created_at;
collection_meta[Collection::COLLECTION_NUM_MEMORY_SHARDS] = num_memory_shards;
collection_meta[Collection::COLLECTION_INDEX_ALL_FIELDS] = index_all_fields;
Collection* new_collection = new Collection(name, next_collection_id, created_at, 0, store, fields,
default_sorting_field, num_memory_shards,
this->max_memory_ratio);
this->max_memory_ratio, index_all_fields);
next_collection_id++;
rocksdb::WriteBatch batch;

View File

@ -75,6 +75,8 @@ bool get_collections(http_req & req, http_res & res) {
bool post_create_collection(http_req & req, http_res & res) {
const char* NUM_MEMORY_SHARDS = "num_memory_shards";
const char* INDEX_ALL_FIELDS = "index_all_fields";
nlohmann::json req_json;
try {
@ -126,6 +128,12 @@ bool post_create_collection(http_req & req, http_res & res) {
return false;
}
bool index_all_fields = false;
if(req_json.count(INDEX_ALL_FIELDS) != 0 && req_json[INDEX_ALL_FIELDS].is_boolean()) {
index_all_fields = req_json[INDEX_ALL_FIELDS].get<bool>();
}
if(collectionManager.get_collection(req_json["name"]) != nullptr) {
res.set_409("Collection with name `" + req_json["name"].get<std::string>() + "` already exists.");
return false;
@ -171,9 +179,11 @@ bool post_create_collection(http_req & req, http_res & res) {
}
const std::string & default_sorting_field = req_json[DEFAULT_SORTING_FIELD].get<std::string>();
const uint64_t created_at = static_cast<uint64_t>(std::time(nullptr));
const Option<Collection*> & collection_op =
collectionManager.create_collection(req_json["name"], req_json[NUM_MEMORY_SHARDS].get<size_t>(),
fields, default_sorting_field);
fields, default_sorting_field, created_at, index_all_fields);
if(collection_op.ok()) {
nlohmann::json json_response = collection_op.get()->get_summary_json();

View File

@ -2336,3 +2336,20 @@ const spp::sparse_hash_map<std::string, art_tree *> &Index::_get_search_index()
const spp::sparse_hash_map<std::string, num_tree_t*>& Index::_get_numerical_index() const {
return numerical_index;
}
void Index::refresh_search_schema(const std::unordered_map<std::string, field>& src_search_schema) {
for(const auto & fname_field: src_search_schema) {
search_schema.emplace(fname_field.first, fname_field.second);
if(search_index.count(fname_field.first) == 0) {
if(fname_field.second.is_string()) {
art_tree *t = new art_tree;
art_tree_init(t);
search_index.emplace(fname_field.first, t);
} else {
num_tree_t* num_tree = new num_tree_t;
numerical_index.emplace(fname_field.first, num_tree);
}
}
}
}

View File

@ -0,0 +1,92 @@
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include <fstream>
#include <algorithm>
#include <collection_manager.h>
#include "collection.h"
class CollectionAllFieldsTest : public ::testing::Test {
protected:
Store *store;
CollectionManager & collectionManager = CollectionManager::get_instance();
std::vector<std::string> query_fields;
std::vector<sort_by> sort_fields;
void setupCollection() {
std::string state_dir_path = "/tmp/typesense_test/collection_all_fields";
LOG(INFO) << "Truncating and creating: " << state_dir_path;
system(("rm -rf "+state_dir_path+" && mkdir -p "+state_dir_path).c_str());
store = new Store(state_dir_path);
collectionManager.init(store, 1.0, "auth_key");
collectionManager.load();
}
virtual void SetUp() {
setupCollection();
}
virtual void TearDown() {
collectionManager.dispose();
delete store;
}
};
TEST_F(CollectionAllFieldsTest, IndexDocsWithoutSchema) {
Collection *coll1;
std::ifstream infile(std::string(ROOT_DIR)+"test/multi_field_documents.jsonl");
std::vector<field> fields = {
field("points", field_types::INT32, false)
};
std::vector<sort_by> sort_fields = { sort_by("points", "DESC") };
coll1 = collectionManager.get_collection("coll1");
if(coll1 == nullptr) {
coll1 = collectionManager.create_collection("coll1", 1, fields, "points", 0, true).get();
}
std::string json_line;
while (std::getline(infile, json_line)) {
nlohmann::json document = nlohmann::json::parse(json_line);
Option<nlohmann::json> add_op = coll1->add(document.dump());
ASSERT_TRUE(add_op.ok());
}
infile.close();
query_fields = {"starring"};
std::vector<std::string> facets;
// same should succeed when verbatim filter is made
auto results = coll1->search("will", query_fields, "", facets, sort_fields, 0, 10, 1, FREQUENCY, false).get();
ASSERT_EQ(2, results["hits"].size());
ASSERT_EQ(2, results["found"].get<size_t>());
ASSERT_STREQ("1", results["hits"][0]["document"]["id"].get<std::string>().c_str());
ASSERT_STREQ("0", results["hits"][1]["document"]["id"].get<std::string>().c_str());
results = coll1->search("chris", {"cast"}, "", facets, sort_fields, 0, 10, 1, FREQUENCY, false).get();
ASSERT_EQ(3, results["hits"].size());
ASSERT_EQ(3, results["found"].get<size_t>());
ASSERT_STREQ("6", results["hits"][0]["document"]["id"].get<std::string>().c_str());
ASSERT_STREQ("1", results["hits"][1]["document"]["id"].get<std::string>().c_str());
ASSERT_STREQ("7", results["hits"][2]["document"]["id"].get<std::string>().c_str());
// reject field with a different type than already inferred type
auto doc_json = R"({"cast":"William Barnes","points":63,"starring":"Will Ferrell",
"starring_facet":"Will Ferrell","title":"Anchorman 2: The Legend Continues"})";
Option<nlohmann::json> add_op = coll1->add(doc_json);
ASSERT_FALSE(add_op.ok());
ASSERT_STREQ("Field `cast` must be a string array.", add_op.error().c_str());
collectionManager.drop_collection("coll1");
}

View File

@ -86,7 +86,8 @@ TEST_F(CollectionManagerTest, CollectionCreation) {
"\"fields\":[{\"facet\":false,\"name\":\"title\",\"optional\":false,\"type\":\"string\"},"
"{\"facet\":false,\"name\":\"starring\",\"optional\":false,\"type\":\"string\"},"
"{\"facet\":true,\"name\":\"cast\",\"optional\":true,\"type\":\"string[]\"},"
"{\"facet\":false,\"name\":\"points\",\"optional\":false,\"type\":\"int32\"}],\"id\":0,\"name\":\"collection1\",\"num_memory_shards\":4}",
"{\"facet\":false,\"name\":\"points\",\"optional\":false,\"type\":\"int32\"}],\"id\":0,"
"\"index_all_fields\":false,\"name\":\"collection1\",\"num_memory_shards\":4}",
collection_meta_json);
ASSERT_EQ("1", next_collection_id);
}