Merge branch 'v0.25-join' into v0.25-join

This commit is contained in:
Ozan Armağan 2023-05-30 12:47:10 +03:00 committed by GitHub
commit 73791a057b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 421 additions and 160 deletions

View File

@ -175,7 +175,7 @@ new_git_repository(
new_git_repository(
name = "hnsw",
build_file = "//bazel:hnsw.BUILD",
commit = "b5c2ebae31cd124e3a625f2de789a3496ebb2286",
commit = "573ab84a7f7645f98778cbb181ba762c5d2f19b5",
remote = "https://github.com/typesense/hnswlib.git",
)

View File

@ -16,8 +16,8 @@ if [[ "$@" == *"--graviton2"* ]] || [[ "$@" == *"--arm"* ]]; then
fi
docker run --user $UID:$GID --volume="/etc/group:/etc/group:ro" --volume="/etc/passwd:/etc/passwd:ro" \
--volume="/etc/shadow:/etc/shadow:ro" -it --rm -v $HOME/docker_bazel_cache:$HOME/docker_bazel_cache -v $PROJECT_DIR:/src \
--workdir /src typesense/bazel_dev:24032023 bazel --output_user_root=$HOME/docker_bazel_cache/cache build --verbose_failures \
--volume="/etc/shadow:/etc/shadow:ro" -it --rm -v /bazeld:/bazeld -v $PROJECT_DIR:/src \
--workdir /src typesense/bazel_dev:24032023 bazel --output_user_root=/bazeld/cache build --verbose_failures \
--jobs=6 --action_env=LD_LIBRARY_PATH="/usr/local/gcc-10.3.0/lib64" \
--define=TYPESENSE_VERSION=\"$TYPESENSE_VERSION\" //:$TYPESENSE_TARGET

View File

@ -20,7 +20,15 @@ private:
std::string name;
std::string suggestion_collection;
std::vector<std::string> query_collections;
size_t max_suggestions;
size_t limit;
void to_json(nlohmann::json& obj) const {
obj["name"] = name;
obj["params"] = nlohmann::json::object();
obj["params"]["suggestion_collection"] = suggestion_collection;
obj["params"]["query_collections"] = query_collections;
obj["params"]["limit"] = limit;
}
};
// config name => config
@ -38,10 +46,14 @@ private:
~AnalyticsManager();
Option<bool> remove_popular_queries_index(const std::string& name);
Option<bool> create_popular_queries_index(nlohmann::json &payload, bool write_to_disk);
public:
static constexpr const char* ANALYTICS_CONFIG_PREFIX = "$AC";
static constexpr const char* RESOURCE_TYPE = "popular_queries";
static constexpr const char* ANALYTICS_RULE_PREFIX = "$AR";
static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries";
static AnalyticsManager& get_instance() {
static AnalyticsManager instance;
@ -55,9 +67,11 @@ public:
void run(ReplicationState* raft_server);
Option<bool> create_index(nlohmann::json& payload, bool write_to_disk = true);
Option<nlohmann::json> list_rules();
Option<bool> remove_suggestion_index(const std::string& name);
Option<bool> create_rule(nlohmann::json& payload, bool write_to_disk = true);
Option<bool> remove_rule(const std::string& name);
void add_suggestion(const std::string& query_collection,
std::string& query, const bool live_query, const std::string& user_id);
@ -65,4 +79,8 @@ public:
void stop();
void dispose();
void persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s);
std::unordered_map<std::string, PopularQueries*> get_popular_queries();
};

View File

@ -496,7 +496,7 @@ public:
// Override operations
Option<uint32_t> add_override(const override_t & override);
Option<uint32_t> add_override(const override_t & override, bool write_to_store = true);
Option<uint32_t> remove_override(const std::string & id);
@ -511,7 +511,7 @@ public:
bool get_synonym(const std::string& id, synonym_t& synonym);
Option<bool> add_synonym(const nlohmann::json& syn_json);
Option<bool> add_synonym(const nlohmann::json& syn_json, bool write_to_store = true);
Option<bool> remove_synonym(const std::string & id);

View File

@ -141,13 +141,15 @@ bool del_throttle(const std::shared_ptr<http_req>& req, const std::shared_ptr<ht
bool del_exceed(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Events
// Analytics
bool post_create_event(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool get_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool del_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_create_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool del_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Misc helpers

View File

@ -404,7 +404,7 @@ struct http_req {
struct route_path {
std::string http_method;
std::vector<std::string> path_parts;
bool (*handler)(const std::shared_ptr<http_req>&, const std::shared_ptr<http_res>&);
bool (*handler)(const std::shared_ptr<http_req>&, const std::shared_ptr<http_res>&) = nullptr;
bool async_req;
bool async_res;
std::string action;

View File

@ -73,7 +73,8 @@ public:
bool get_synonym(const std::string& id, synonym_t& synonym);
Option<bool> add_synonym(const std::string & collection_name, const synonym_t& synonym);
Option<bool> add_synonym(const std::string & collection_name, const synonym_t& synonym,
bool write_to_store = true);
Option<bool> remove_synonym(const std::string & collection_name, const std::string & id);
};

View File

@ -5,40 +5,62 @@
#include "http_client.h"
#include "collection_manager.h"
Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_to_disk) {
Option<bool> AnalyticsManager::create_rule(nlohmann::json& payload, bool write_to_disk) {
/*
Sample payload:
{
"name": "top_queries",
"limit": 1000,
"source": {
"collections": ["brands", "products"]
},
"destination": {
"collection": "top_queries"
"name": "top_search_queries",
"type": "popular_queries",
"params": {
"limit": 1000,
"source": {
"collections": ["brands", "products"]
},
"destination": {
"collection": "top_search_queries"
}
}
}
*/
if(!payload.contains("type") || !payload["type"].is_string()) {
return Option<bool>(400, "Request payload contains invalid type.");
}
if(!payload.contains("name") || !payload["name"].is_string()) {
return Option<bool>(400, "Bad or missing name.");
}
if(!payload.contains("source") || !payload["source"].is_object()) {
if(!payload.contains("params") || !payload["params"].is_object()) {
return Option<bool>(400, "Bad or missing params.");
}
if(payload["type"] == POPULAR_QUERIES_TYPE) {
return create_popular_queries_index(payload, write_to_disk);
}
return Option<bool>(400, "Invalid type.");
}
Option<bool> AnalyticsManager::create_popular_queries_index(nlohmann::json &payload, bool write_to_disk) {
// params and name are validated upstream
const auto& params = payload["params"];
const std::string& suggestion_config_name = payload["name"].get<std::string>();
if(!params.contains("source") || !params["source"].is_object()) {
return Option<bool>(400, "Bad or missing source.");
}
if(!payload.contains("destination") || !payload["destination"].is_object()) {
if(!params.contains("destination") || !params["destination"].is_object()) {
return Option<bool>(400, "Bad or missing destination.");
}
const std::string& suggestion_config_name = payload["name"].get<std::string>();
size_t max_suggestions = 1000;
size_t limit = 1000;
if(payload.contains("limit") && payload["limit"].is_number_integer()) {
max_suggestions = payload["limit"].get<size_t>();
if(params.contains("limit") && params["limit"].is_number_integer()) {
limit = params["limit"].get<size_t>();
}
if(suggestion_configs.find(suggestion_config_name) != suggestion_configs.end()) {
@ -46,21 +68,21 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_
suggestion_config_name + "`.");
}
if(!payload["source"].contains("collections") || !payload["source"]["collections"].is_array()) {
if(!params["source"].contains("collections") || !params["source"]["collections"].is_array()) {
return Option<bool>(400, "Must contain a valid list of source collections.");
}
if(!payload["destination"].contains("collection") || !payload["destination"]["collection"].is_string()) {
if(!params["destination"].contains("collection") || !params["destination"]["collection"].is_string()) {
return Option<bool>(400, "Must contain a valid destination collection.");
}
const std::string& suggestion_collection = payload["destination"]["collection"].get<std::string>();
const std::string& suggestion_collection = params["destination"]["collection"].get<std::string>();
suggestion_config_t suggestion_config;
suggestion_config.name = suggestion_config_name;
suggestion_config.suggestion_collection = suggestion_collection;
suggestion_config.max_suggestions = max_suggestions;
suggestion_config.limit = limit;
for(const auto& coll: payload["source"]["collections"]) {
for(const auto& coll: params["source"]["collections"]) {
if(!coll.is_string()) {
return Option<bool>(400, "Must contain a valid list of source collection names.");
}
@ -77,12 +99,11 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_
query_collection_mapping[query_coll].push_back(suggestion_collection);
}
PopularQueries* popularQueries = new PopularQueries(max_suggestions);
PopularQueries* popularQueries = new PopularQueries(limit);
popular_queries.emplace(suggestion_collection, popularQueries);
if(write_to_disk) {
payload["type"] = RESOURCE_TYPE;
auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + suggestion_config_name;
auto suggestion_key = std::string(ANALYTICS_RULE_PREFIX) + "_" + suggestion_config_name;
bool inserted = store->insert(suggestion_key, payload.dump());
if(!inserted) {
return Option<bool>(500, "Error while storing the config to disk.");
@ -100,13 +121,39 @@ AnalyticsManager::~AnalyticsManager() {
}
}
Option<bool> AnalyticsManager::remove_suggestion_index(const std::string &name) {
Option<nlohmann::json> AnalyticsManager::list_rules() {
std::unique_lock lock(mutex);
nlohmann::json rules = nlohmann::json::object();
rules["rules"]= nlohmann::json::array();
for(const auto& suggestion_config: suggestion_configs) {
nlohmann::json rule;
suggestion_config.second.to_json(rule);
rule["type"] = POPULAR_QUERIES_TYPE;
rules["rules"].push_back(rule);
}
return Option<nlohmann::json>(rules);
}
Option<bool> AnalyticsManager::remove_rule(const string &name) {
std::unique_lock lock(mutex);
auto suggestion_configs_it = suggestion_configs.find(name);
if(suggestion_configs_it != suggestion_configs.end()) {
return remove_popular_queries_index(name);
}
return Option<bool>(404, "Rule not found.");
}
Option<bool> AnalyticsManager::remove_popular_queries_index(const std::string &name) {
// lock is held by caller
auto suggestion_configs_it = suggestion_configs.find(name);
if(suggestion_configs_it == suggestion_configs.end()) {
return Option<bool>(404, "Index not found.");
return Option<bool>(404, "Rule not found.");
}
const auto& suggestion_collection = suggestion_configs_it->second.suggestion_collection;
@ -122,7 +169,7 @@ Option<bool> AnalyticsManager::remove_suggestion_index(const std::string &name)
suggestion_configs.erase(name);
auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + name;
auto suggestion_key = std::string(ANALYTICS_RULE_PREFIX) + "_" + name;
bool erased = store->remove(suggestion_key);
if(!erased) {
return Option<bool>(500, "Error while deleting from disk.");
@ -162,41 +209,41 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
break;
}
for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
persist_suggestions(raft_server, prev_persistence_s);
auto popular_queries_it = popular_queries.find(suggestion_coll);
if(popular_queries_it == popular_queries.end()) {
continue;
}
lk.unlock();
}
// need to prepare the counts as JSON docs for import into the suggestion collection
// {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}}
dispose();
}
PopularQueries* popularQueries = popular_queries_it->second;
void AnalyticsManager::persist_suggestions(ReplicationState *raft_server, uint64_t prev_persistence_s) {
// lock is held by caller
for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
popularQueries->compact_user_queries(now_ts_us);
auto popular_queries_it = popular_queries.find(suggestion_coll);
if(popular_queries_it == popular_queries.end()) {
continue;
}
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::seconds>(now).count();
// need to prepare the counts as JSON docs for import into the suggestion collection
// {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}}
if(now_ts_seconds - prev_persistence_s < Config::get_instance().get_analytics_flush_interval()) {
// we will persist aggregation every hour
continue;
}
PopularQueries* popularQueries = popular_queries_it->second;
prev_persistence_s = now_ts_seconds;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
popularQueries->compact_user_queries(now_ts_us);
std::string import_payload;
popularQueries->serialize_as_docs(import_payload);
if(import_payload.empty()) {
continue;
}
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::seconds>(now).count();
if(now_ts_seconds - prev_persistence_s < Config::get_instance().get_analytics_flush_interval()) {
// we will persist aggregation every hour
continue;
}
// send http request
std::string leader_url = raft_server->get_leader_url();
if(!leader_url.empty()) {
@ -224,15 +271,12 @@ void AnalyticsManager::run(ReplicationState* raft_server) {
}
coll->truncate_after_top_k("count", popularQueries->get_k());
}
}
}
}
lk.unlock();
}
dispose();
}
void AnalyticsManager::stop() {
@ -241,6 +285,8 @@ void AnalyticsManager::stop() {
}
void AnalyticsManager::dispose() {
std::unique_lock lk(mutex);
for(auto& kv: popular_queries) {
delete kv.second;
}
@ -251,3 +297,8 @@ void AnalyticsManager::dispose() {
void AnalyticsManager::init(Store* store) {
this->store = store;
}
std::unordered_map<std::string, PopularQueries*> AnalyticsManager::get_popular_queries() {
std::unique_lock lk(mutex);
return popular_queries;
}

View File

@ -103,11 +103,11 @@ std::string BatchedIndexer::get_collection_name(const std::shared_ptr<http_req>&
std::string& coll_name = req->params["collection"];
if(coll_name.empty()) {
route_path* rpath;
server->get_route(req->route_hash, &rpath);
route_path* rpath = nullptr;
bool route_found = server->get_route(req->route_hash, &rpath);
// ensure that collection creation is sent to the same queue as writes to that collection
if(rpath->handler == post_create_collection) {
if(route_found && rpath->handler == post_create_collection) {
nlohmann::json obj = nlohmann::json::parse(req->body, nullptr, false);
if(!obj.is_discarded() && obj.is_object() &&

View File

@ -1568,12 +1568,12 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
return Option<nlohmann::json>(408, "Request Timeout");
}
if(match_score_index >= 0 && sort_fields_std[match_score_index].text_match_buckets > 1) {
if(match_score_index >= 0 && sort_fields_std[match_score_index].text_match_buckets > 0) {
size_t num_buckets = sort_fields_std[match_score_index].text_match_buckets;
const size_t max_kvs_bucketed = std::min<size_t>(DEFAULT_TOPSTER_SIZE, raw_result_kvs.size());
if(max_kvs_bucketed >= num_buckets) {
std::vector<int64_t> result_scores(max_kvs_bucketed);
spp::sparse_hash_map<uint64_t, int64_t> result_scores;
// only first `max_kvs_bucketed` elements are bucketed to prevent pagination issues past 250 records
size_t block_len = (max_kvs_bucketed / num_buckets);
@ -1582,7 +1582,7 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
int64_t anchor_score = raw_result_kvs[i][0]->scores[raw_result_kvs[i][0]->match_score_index];
size_t j = 0;
while(j < block_len && i+j < max_kvs_bucketed) {
result_scores[i+j] = raw_result_kvs[i+j][0]->scores[raw_result_kvs[i+j][0]->match_score_index];
result_scores[raw_result_kvs[i+j][0]->key] = raw_result_kvs[i+j][0]->scores[raw_result_kvs[i+j][0]->match_score_index];
raw_result_kvs[i+j][0]->scores[raw_result_kvs[i+j][0]->match_score_index] = anchor_score;
j++;
}
@ -1596,7 +1596,8 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
// restore original scores
for(i = 0; i < max_kvs_bucketed; i++) {
raw_result_kvs[i][0]->scores[raw_result_kvs[i][0]->match_score_index] = result_scores[i];
raw_result_kvs[i][0]->scores[raw_result_kvs[i][0]->match_score_index] =
result_scores[raw_result_kvs[i][0]->key];
}
}
}
@ -1801,6 +1802,8 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
// remove fields from highlight doc that were not highlighted
if(!hfield_names.empty()) {
prune_doc(highlight_res, hfield_names, tsl::htrie_set<char>(), "");
} else {
highlight_res.clear();
}
if(enable_highlight_v1) {
@ -2452,6 +2455,10 @@ void Collection::parse_search_query(const std::string &query, std::vector<std::s
}
for(auto& sub_token: sub_tokens) {
if(sub_token.size() > 100) {
sub_token.erase(100);
}
if(exclude_operator_prior) {
if(phrase_search_op_prior) {
phrase.push_back(sub_token);
@ -2816,7 +2823,9 @@ void Collection::highlight_result(const std::string& raw_query, const field &sea
highlight_nested_field(highlight_doc, highlight_doc, path_parts, 0, false, -1,
[&](nlohmann::json& h_obj, bool is_arr_obj_ele, int array_i) {
if(!h_obj.is_string()) {
if(h_obj.is_object()) {
return ;
} else if(!h_obj.is_string()) {
auto val_back = h_obj;
h_obj = nlohmann::json::object();
h_obj["snippet"] = to_string(val_back);
@ -3358,10 +3367,12 @@ Option<bool> Collection::remove_if_found(uint32_t seq_id, const bool remove_from
return Option<bool>(true);
}
Option<uint32_t> Collection::add_override(const override_t & override) {
bool inserted = store->insert(Collection::get_override_key(name, override.id), override.to_json().dump());
if(!inserted) {
return Option<uint32_t>(500, "Error while storing the override on disk.");
Option<uint32_t> Collection::add_override(const override_t & override, bool write_to_store) {
if(write_to_store) {
bool inserted = store->insert(Collection::get_override_key(name, override.id), override.to_json().dump());
if(!inserted) {
return Option<uint32_t>(500, "Error while storing the override on disk.");
}
}
std::unique_lock lock(mutex);
@ -3578,7 +3589,7 @@ Option<bool> Collection::parse_pinned_hits(const std::string& pinned_hits_str,
return Option<bool>(true);
}
Option<bool> Collection::add_synonym(const nlohmann::json& syn_json) {
Option<bool> Collection::add_synonym(const nlohmann::json& syn_json, bool write_to_store) {
std::shared_lock lock(mutex);
synonym_t synonym;
Option<bool> syn_op = synonym_t::parse(syn_json, synonym);
@ -3587,7 +3598,7 @@ Option<bool> Collection::add_synonym(const nlohmann::json& syn_json) {
return syn_op;
}
return synonym_index->add_synonym(name, synonym);
return synonym_index->add_synonym(name, synonym, write_to_store);
}
bool Collection::get_synonym(const std::string& id, synonym_t& synonym) {

View File

@ -732,7 +732,8 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
}
CollectionManager & collectionManager = CollectionManager::get_instance();
auto collection = collectionManager.get_collection(req_params["collection"]);
const std::string& orig_coll_name = req_params["collection"];
auto collection = collectionManager.get_collection(orig_coll_name);
if(collection == nullptr) {
return Option<bool>(404, "Not found.");
@ -1062,9 +1063,8 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
if(Config::get_instance().get_enable_search_analytics()) {
if(result.count("found") != 0 && result["found"].get<size_t>() != 0) {
std::string processed_query = raw_query;
Tokenizer::normalize_ascii(processed_query);
AnalyticsManager::get_instance().add_suggestion(collection->get_name(), processed_query,
std::string analytics_query = raw_query;
AnalyticsManager::get_instance().add_suggestion(orig_coll_name, analytics_query,
true, req_params["x-typesense-user-id"]);
}
}
@ -1279,7 +1279,7 @@ Option<bool> CollectionManager::load_collection(const nlohmann::json &collection
override_t override;
auto parse_op = override_t::parse(collection_override, "", override);
if(parse_op.ok()) {
collection->add_override(override);
collection->add_override(override, false);
} else {
LOG(ERROR) << "Skipping loading of override: " << parse_op.error();
}
@ -1293,20 +1293,18 @@ Option<bool> CollectionManager::load_collection(const nlohmann::json &collection
for(const auto & collection_synonym_json: collection_synonym_jsons) {
nlohmann::json collection_synonym = nlohmann::json::parse(collection_synonym_json);
collection->add_synonym(collection_synonym);
collection->add_synonym(collection_synonym, false);
}
// restore query suggestions configs
std::vector<std::string> analytics_config_jsons;
cm.store->scan_fill(AnalyticsManager::ANALYTICS_CONFIG_PREFIX,
std::string(AnalyticsManager::ANALYTICS_CONFIG_PREFIX) + "`",
cm.store->scan_fill(AnalyticsManager::ANALYTICS_RULE_PREFIX,
std::string(AnalyticsManager::ANALYTICS_RULE_PREFIX) + "`",
analytics_config_jsons);
for(const auto& analytics_config_json: analytics_config_jsons) {
nlohmann::json analytics_config = nlohmann::json::parse(analytics_config_json);
if(analytics_config["type"] == AnalyticsManager::RESOURCE_TYPE) {
AnalyticsManager::get_instance().create_index(analytics_config, false);
}
AnalyticsManager::get_instance().create_rule(analytics_config, false);
}
// Fetch records from the store and re-create memory index

View File

@ -362,6 +362,8 @@ bool get_search(const std::shared_ptr<http_req>& req, const std::shared_ptr<http
}
// Result found in cache but ttl has lapsed.
lock.unlock();
std::unique_lock ulock(mutex);
res_cache.erase(req_hash);
}
}
@ -456,6 +458,8 @@ bool post_multi_search(const std::shared_ptr<http_req>& req, const std::shared_p
}
// Result found in cache but ttl has lapsed.
lock.unlock();
std::unique_lock ulock(mutex);
res_cache.erase(req_hash);
}
}
@ -1110,6 +1114,37 @@ bool del_remove_documents(const std::shared_ptr<http_req>& req, const std::share
const char *BATCH_SIZE = "batch_size";
const char *FILTER_BY = "filter_by";
const char *TOP_K_BY = "top_k_by";
if(req->params.count(TOP_K_BY) != 0) {
std::vector<std::string> parts;
StringUtils::split(req->params[TOP_K_BY], parts, ":");
if(parts.size() != 2 || !StringUtils::is_uint32_t(parts[1])) {
req->last_chunk_aggregate = true;
res->final = true;
res->set_400("The `top_k_by` parameter is not valid.");
stream_response(req, res);
return false;
}
const std::string& field_name = parts[0];
const size_t k = std::stoull(parts[1]);
auto op = collection->truncate_after_top_k(field_name, k);
req->last_chunk_aggregate = true;
res->final = true;
if(!op.ok()) {
res->set_500(op.error());
stream_response(req, res);
return false;
}
res->set_200(R"({"ok": true})");
stream_response(req, res);
return true;
}
if(req->params.count(BATCH_SIZE) == 0) {
req->params[BATCH_SIZE] = "1000000000"; // 1 Billion
@ -2057,7 +2092,19 @@ bool post_create_event(const std::shared_ptr<http_req>& req, const std::shared_p
return false;
}
bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
bool get_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto rules_op = AnalyticsManager::get_instance().list_rules();
if(rules_op.ok()) {
res->set_200(rules_op.get().dump());
return true;
}
res->set(rules_op.code(), rules_op.error());
return false;
}
bool post_create_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
nlohmann::json req_json;
try {
@ -2068,7 +2115,7 @@ bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req,
return false;
}
auto op = AnalyticsManager::get_instance().create_index(req_json);
auto op = AnalyticsManager::get_instance().create_rule(req_json);
if(!op.ok()) {
res->set(op.code(), op.error());
@ -2079,8 +2126,8 @@ bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req,
return true;
}
bool del_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto op = AnalyticsManager::get_instance().remove_suggestion_index(req->params["name"]);
bool del_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto op = AnalyticsManager::get_instance().remove_rule(req->params["name"]);
if(!op.ok()) {
res->set(op.code(), op.error());
return false;

View File

@ -591,7 +591,8 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
}
}
if(std::stoll(version_num) < 7710) { // allow >= v7.71.0
int major_version = version_num[0] - 48; // convert ascii char to integer
if(major_version <= 7 && std::stoll(version_num) < 7710) { // allow >= v7.71.0
std::string message = "{ \"message\": \"HTTP2 is not supported by your curl client. "
"You need to use atleast Curl v7.71.0.\"}";
h2o_iovec_t body = h2o_strdup(&request->_req->pool, message.c_str(), SIZE_MAX);

View File

@ -69,8 +69,9 @@ void master_server_routes() {
server->del("/presets/:name", del_preset);
// analytics
server->post("/analytics/popular-queries", post_create_analytics_popular_queries);
server->del("/analytics/popular-queries/:name", del_analytics_popular_queries);
server->get("/analytics/rules", get_analytics_rules);
server->post("/analytics/rules", post_create_analytics_rules);
server->del("/analytics/rules/:name", del_analytics_rules);
server->post("/analytics/events", post_create_event);
// meta

View File

@ -105,7 +105,8 @@ void SynonymIndex::synonym_reduction(const std::vector<std::string>& tokens,
synonym_reduction_internal(tokens, tokens.size(), 0, processed_syn_hashes, results);
}
Option<bool> SynonymIndex::add_synonym(const std::string & collection_name, const synonym_t& synonym) {
Option<bool> SynonymIndex::add_synonym(const std::string & collection_name, const synonym_t& synonym,
bool write_to_store) {
if(synonym_definitions.count(synonym.id) != 0) {
// first we have to delete existing entries so we can upsert
Option<bool> rem_op = remove_synonym(collection_name, synonym.id);
@ -129,9 +130,11 @@ Option<bool> SynonymIndex::add_synonym(const std::string & collection_name, cons
write_lock.unlock();
bool inserted = store->insert(get_synonym_key(collection_name, synonym.id), synonym.to_view_json().dump());
if(!inserted) {
return Option<bool>(500, "Error while storing the synonym on disk.");
if(write_to_store) {
bool inserted = store->insert(get_synonym_key(collection_name, synonym.id), synonym.to_view_json().dump());
if(!inserted) {
return Option<bool>(500, "Error while storing the synonym on disk.");
}
}
return Option<bool>(true);

View File

@ -0,0 +1,91 @@
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include <collection_manager.h>
#include <analytics_manager.h>
#include "collection.h"
class AnalyticsManagerTest : public ::testing::Test {
protected:
Store *store;
CollectionManager& collectionManager = CollectionManager::get_instance();
std::atomic<bool> quit = false;
std::vector<std::string> query_fields;
std::vector<sort_by> sort_fields;
AnalyticsManager& analyticsManager = AnalyticsManager::get_instance();
void setupCollection() {
std::string state_dir_path = "/tmp/typesense_test/analytics_manager_test";
LOG(INFO) << "Truncating and creating: " << state_dir_path;
system(("rm -rf "+state_dir_path+" && mkdir -p "+state_dir_path).c_str());
system("mkdir -p /tmp/typesense_test/models");
store = new Store(state_dir_path);
collectionManager.init(store, 1.0, "auth_key", quit);
collectionManager.load(8, 1000);
analyticsManager.init(store);
}
virtual void SetUp() {
setupCollection();
}
virtual void TearDown() {
collectionManager.dispose();
delete store;
}
};
TEST_F(AnalyticsManagerTest, AddSuggestion) {
nlohmann::json titles_schema = R"({
"name": "titles",
"fields": [
{"name": "title", "type": "string"}
]
})"_json;
Collection* titles_coll = collectionManager.create_collection(titles_schema).get();
nlohmann::json doc;
doc["title"] = "Cool trousers";
ASSERT_TRUE(titles_coll->add(doc.dump()).ok());
// create a collection to store suggestions
nlohmann::json suggestions_schema = R"({
"name": "top_queries",
"fields": [
{"name": "q", "type": "string" },
{"name": "count", "type": "int32" }
]
})"_json;
Collection* suggestions_coll = collectionManager.create_collection(suggestions_schema).get();
nlohmann::json analytics_rule = R"({
"name": "top_search_queries",
"type": "popular_queries",
"params": {
"limit": 100,
"source": {
"collections": ["titles"]
},
"destination": {
"collection": "top_queries"
}
}
})"_json;
auto create_op = analyticsManager.create_rule(analytics_rule);
ASSERT_TRUE(create_op.ok());
std::string q = "foobar";
analyticsManager.add_suggestion("titles", q, true, "1");
auto popularQueries = analyticsManager.get_popular_queries();
auto userQueries = popularQueries["top_queries"]->get_user_prefix_queries()["1"];
ASSERT_EQ(1, userQueries.size());
ASSERT_EQ("foobar", userQueries[0].query);
}

View File

@ -1636,7 +1636,7 @@ TEST_F(CollectionSortingTest, TextMatchBucketRanking) {
nlohmann::json doc1;
doc1["id"] = "0";
doc1["title"] = "Mark Antony";
doc1["description"] = "Marriage Counsellor";
doc1["description"] = "Counsellor";
doc1["points"] = 100;
nlohmann::json doc2;
@ -1653,47 +1653,51 @@ TEST_F(CollectionSortingTest, TextMatchBucketRanking) {
sort_by("points", "DESC"),
};
auto results = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
auto results = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true).get();
"<mark>", "</mark>", {3}, 1000, true).get();
// when there are more buckets than results, no bucketing will happen
ASSERT_EQ(2, results["hits"].size());
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("1", results["hits"][1]["document"]["id"].get<std::string>());
// bucketing by 1 produces original text match
// bucketing by 1 makes the text match score the same
sort_fields = {
sort_by("_text_match(buckets: 1)", "DESC"),
sort_by("points", "DESC"),
};
results = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
results = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true).get();
"<mark>", "</mark>", {3}, 1000, true).get();
ASSERT_EQ(2, results["hits"].size());
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("1", results["hits"][1]["document"]["id"].get<std::string>());
ASSERT_EQ("1", results["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("0", results["hits"][1]["document"]["id"].get<std::string>());
// likewise with bucket 0
size_t score1 = std::stoul(results["hits"][0]["text_match_info"]["score"].get<std::string>());
size_t score2 = std::stoul(results["hits"][1]["text_match_info"]["score"].get<std::string>());
ASSERT_TRUE(score1 < score2);
// bucketing by 0 produces original text match
sort_fields = {
sort_by("_text_match(buckets: 0)", "DESC"),
sort_by("points", "DESC"),
};
results = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
results = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true).get();
"<mark>", "</mark>", {3}, 1000, true).get();
ASSERT_EQ(2, results["hits"].size());
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
@ -1702,46 +1706,46 @@ TEST_F(CollectionSortingTest, TextMatchBucketRanking) {
// don't allow bad parameter name
sort_fields[0] = sort_by("_text_match(foobar: 0)", "DESC");
auto res_op = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
auto res_op = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true);
"<mark>", "</mark>", {3}, 1000, true);
ASSERT_FALSE(res_op.ok());
ASSERT_EQ("Invalid sorting parameter passed for _text_match.", res_op.error());
// handle bad syntax
sort_fields[0] = sort_by("_text_match(foobar:", "DESC");
res_op = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
res_op = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true);
"<mark>", "</mark>", {3}, 1000, true);
ASSERT_FALSE(res_op.ok());
ASSERT_EQ("Could not find a field named `_text_match(foobar:` in the schema for sorting.", res_op.error());
// handle bad value
sort_fields[0] = sort_by("_text_match(buckets: x)", "DESC");
res_op = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
res_op = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true);
"<mark>", "</mark>", {3}, 1000, true);
ASSERT_FALSE(res_op.ok());
ASSERT_EQ("Invalid value passed for _text_match `buckets` configuration.", res_op.error());
// handle negative value
sort_fields[0] = sort_by("_text_match(buckets: -1)", "DESC");
res_op = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
res_op = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true);
"<mark>", "</mark>", {3}, 1000, true);
ASSERT_FALSE(res_op.ok());
ASSERT_EQ("Invalid value passed for _text_match `buckets` configuration.", res_op.error());

View File

@ -2033,6 +2033,54 @@ TEST_F(CollectionSpecificMoreTest, ExhaustiveSearchWithoutExplicitDropTokens) {
ASSERT_EQ(2, res["hits"].size());
}
TEST_F(CollectionSpecificMoreTest, DoNotHighlightFieldsForSpecialCharacterQuery) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "title", "type": "string"},
{"name": "description", "type": "string"}
]
})"_json;
Collection* coll1 = collectionManager.create_collection(schema).get();
nlohmann::json doc;
doc["title"] = "alpha beta gamma";
doc["description"] = "alpha beta gamma";
ASSERT_TRUE(coll1->add(doc.dump()).ok());
auto res = coll1->search("'", {"title", "description"}, "", {}, {}, {0}, 3, 1, FREQUENCY, {false}, 1,
spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>()).get();
ASSERT_EQ(1, res["hits"].size());
ASSERT_EQ(0, res["hits"][0]["highlight"].size());
ASSERT_EQ(0, res["hits"][0]["highlights"].size());
}
TEST_F(CollectionSpecificMoreTest, SearchForURL) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "url", "type": "string"}
]
})"_json;
Collection* coll1 = collectionManager.create_collection(schema).get();
nlohmann::json doc;
doc["url"] = "https://www.cpf.gov.sg/member/infohub/cpf-clarifies/policy-faqs/"
"why-interest-earned-on-cpf-life-premium-not-paid-to-beneficiaries";
ASSERT_TRUE(coll1->add(doc.dump()).ok());
auto res = coll1->search("https://www.cpf.gov.sg/member/infohub/cpf-clarifies/policy-faqs/"
"why-interest-earned-on-cpf-life-premium-not-paid-to-beneficiaries", {"url"}, "",
{}, {}, {2}, 3, 1,
FREQUENCY, {true}).get();
ASSERT_EQ(1, res["hits"].size());
}
TEST_F(CollectionSpecificMoreTest, CrossFieldTypoAndPrefixWithWeights) {
nlohmann::json schema = R"({
"name": "coll1",

View File

@ -4451,20 +4451,15 @@ TEST_F(CollectionTest, WildcardHighlightFields) {
spp::sparse_hash_set<std::string> dummy_include_exclude;
std::string highlight_fields = "user*";
// user* matches user_name, user.rank and user.phone
auto result = coll->search("+91", {"user"}, "", {}, {}, {0},
auto result = coll->search("123", {"user"}, "", {}, {}, {0},
10, 1, FREQUENCY, {true}, Index::DROP_TOKENS_THRESHOLD, dummy_include_exclude, dummy_include_exclude, 10, "",
30, 4, "", Index::TYPO_TOKENS_THRESHOLD, "", "", {}, 3, "<mark>", "</mark>", {}, UINT32_MAX,
true, false, true, highlight_fields).get();
ASSERT_EQ(1, result["found"].get<size_t>());
ASSERT_EQ(1, result["hits"].size());
// ASSERT_EQ("+<mark>91</mark> 123123123",
// result["hits"][0]["highlight"]["user"]["phone"]["snippet"].get<std::string>());
// ASSERT_EQ("100",
// result["hits"][0]["highlight"]["user"]["rank"]["snippet"].get<std::string>());
ASSERT_EQ("user_a",
result["hits"][0]["highlight"]["user_name"]["snippet"].get<std::string>());
ASSERT_EQ(1, result["hits"][0]["highlight"].size());
ASSERT_EQ("+91 <mark>123</mark>123123", result["hits"][0]["highlight"]["user"]["phone"]["snippet"].get<std::string>());
highlight_fields = "user.*";
// user.* matches user.rank and user.phone
@ -4475,11 +4470,9 @@ TEST_F(CollectionTest, WildcardHighlightFields) {
ASSERT_EQ(1, result["found"].get<size_t>());
ASSERT_EQ(1, result["hits"].size());
ASSERT_EQ(1, result["hits"][0]["highlight"].size());
ASSERT_EQ("+<mark>91</mark> 123123123",
result["hits"][0]["highlight"]["user"]["phone"]["snippet"].get<std::string>());
// ASSERT_EQ("100",
// result["hits"][0]["highlight"]["user"]["rank"]["snippet"].get<std::string>());
highlight_fields = "user*";
// user* matches user_name, user.rank and user.phone
@ -4490,11 +4483,7 @@ TEST_F(CollectionTest, WildcardHighlightFields) {
ASSERT_EQ(1, result["found"].get<size_t>());
ASSERT_EQ(1, result["hits"].size());
// ASSERT_EQ("+91 123123123",
// result["hits"][0]["highlight"]["user"]["phone"]["snippet"].get<std::string>());
// ASSERT_EQ("100",
// result["hits"][0]["highlight"]["user"]["rank"]["snippet"].get<std::string>());
ASSERT_EQ(1, result["hits"][0]["highlight"].size());
ASSERT_EQ("<mark>user_a</mark>",
result["hits"][0]["highlight"]["user_name"]["snippet"].get<std::string>());
@ -4507,11 +4496,7 @@ TEST_F(CollectionTest, WildcardHighlightFields) {
ASSERT_EQ(1, result["found"].get<size_t>());
ASSERT_EQ(1, result["hits"].size());
ASSERT_EQ("+91 123123123",
result["hits"][0]["highlight"]["user"]["phone"]["snippet"].get<std::string>());
ASSERT_EQ("100",
result["hits"][0]["highlight"]["user"]["rank"]["snippet"].get<std::string>());
ASSERT_EQ(0, result["hits"][0]["highlight"].size());
highlight_fields = "foo*";
// No matching field for highlight_fields