Merge branch 'v0.25-join' into facet_index_refactor

This commit is contained in:
krunal1313 2023-05-26 16:08:18 +05:30
commit b966c77429
11 changed files with 145 additions and 74 deletions

View File

@ -16,8 +16,8 @@ if [[ "$@" == *"--graviton2"* ]] || [[ "$@" == *"--arm"* ]]; then
fi
docker run --user $UID:$GID --volume="/etc/group:/etc/group:ro" --volume="/etc/passwd:/etc/passwd:ro" \
--volume="/etc/shadow:/etc/shadow:ro" -it --rm -v $HOME/docker_bazel_cache:$HOME/docker_bazel_cache -v $PROJECT_DIR:/src \
--workdir /src typesense/bazel_dev:24032023 bazel --output_user_root=$HOME/docker_bazel_cache/cache build --verbose_failures \
--volume="/etc/shadow:/etc/shadow:ro" -it --rm -v /bazeld:/bazeld -v $PROJECT_DIR:/src \
--workdir /src typesense/bazel_dev:24032023 bazel --output_user_root=/bazeld/cache build --verbose_failures \
--jobs=6 --action_env=LD_LIBRARY_PATH="/usr/local/gcc-10.3.0/lib64" \
--define=TYPESENSE_VERSION=\"$TYPESENSE_VERSION\" //:$TYPESENSE_TARGET

View File

@ -20,7 +20,14 @@ private:
std::string name;
std::string suggestion_collection;
std::vector<std::string> query_collections;
size_t max_suggestions;
size_t limit;
void to_json(nlohmann::json& obj) const {
obj["name"] = name;
obj["suggestion_collection"] = suggestion_collection;
obj["query_collections"] = query_collections;
obj["limit"] = limit;
}
};
// config name => config
@ -38,10 +45,14 @@ private:
~AnalyticsManager();
Option<bool> remove_popular_queries_index(const std::string& name);
Option<bool> create_popular_queries_index(nlohmann::json &payload, bool write_to_disk);
public:
static constexpr const char* ANALYTICS_CONFIG_PREFIX = "$AC";
static constexpr const char* RESOURCE_TYPE = "popular_queries";
static constexpr const char* ANALYTICS_RULE_PREFIX = "$AR";
static constexpr const char* POPULAR_QUERIES_TYPE = "popular_queries";
static AnalyticsManager& get_instance() {
static AnalyticsManager instance;
@ -55,9 +66,11 @@ public:
void run(ReplicationState* raft_server);
Option<bool> create_index(nlohmann::json& payload, bool write_to_disk = true);
Option<nlohmann::json> list_rules();
Option<bool> remove_suggestion_index(const std::string& name);
Option<bool> create_rule(nlohmann::json& payload, bool write_to_disk = true);
Option<bool> remove_rule(const std::string& name);
void add_suggestion(const std::string& query_collection,
std::string& query, const bool live_query, const std::string& user_id);

View File

@ -141,13 +141,15 @@ bool del_throttle(const std::shared_ptr<http_req>& req, const std::shared_ptr<ht
bool del_exceed(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Events
// Analytics
bool post_create_event(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool get_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool del_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_create_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool del_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Misc helpers

View File

@ -404,7 +404,7 @@ struct http_req {
struct route_path {
std::string http_method;
std::vector<std::string> path_parts;
bool (*handler)(const std::shared_ptr<http_req>&, const std::shared_ptr<http_res>&);
bool (*handler)(const std::shared_ptr<http_req>&, const std::shared_ptr<http_res>&) = nullptr;
bool async_req;
bool async_res;
std::string action;

View File

@ -5,26 +5,39 @@
#include "http_client.h"
#include "collection_manager.h"
Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_to_disk) {
Option<bool> AnalyticsManager::create_rule(nlohmann::json& payload, bool write_to_disk) {
/*
Sample payload:
{
"name": "top_queries",
"name": "top_search_queries",
"type": "popular_queries",
"limit": 1000,
"source": {
"collections": ["brands", "products"]
},
"destination": {
"collection": "top_queries"
"collection": "top_search_queries"
}
}
*/
if(!payload.contains("type") || !payload["type"].is_string()) {
return Option<bool>(400, "Request payload contains invalid type.");
}
if(!payload.contains("name") || !payload["name"].is_string()) {
return Option<bool>(400, "Bad or missing name.");
}
if(payload["type"] == POPULAR_QUERIES_TYPE) {
return create_popular_queries_index(payload, write_to_disk);
}
return Option<bool>(400, "Invalid type.");
}
Option<bool> AnalyticsManager::create_popular_queries_index(nlohmann::json &payload, bool write_to_disk) {
if(!payload.contains("source") || !payload["source"].is_object()) {
return Option<bool>(400, "Bad or missing source.");
}
@ -35,10 +48,10 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_
const std::string& suggestion_config_name = payload["name"].get<std::string>();
size_t max_suggestions = 1000;
size_t limit = 1000;
if(payload.contains("limit") && payload["limit"].is_number_integer()) {
max_suggestions = payload["limit"].get<size_t>();
limit = payload["limit"].get<size_t>();
}
if(suggestion_configs.find(suggestion_config_name) != suggestion_configs.end()) {
@ -58,7 +71,7 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_
suggestion_config_t suggestion_config;
suggestion_config.name = suggestion_config_name;
suggestion_config.suggestion_collection = suggestion_collection;
suggestion_config.max_suggestions = max_suggestions;
suggestion_config.limit = limit;
for(const auto& coll: payload["source"]["collections"]) {
if(!coll.is_string()) {
@ -77,12 +90,11 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_
query_collection_mapping[query_coll].push_back(suggestion_collection);
}
PopularQueries* popularQueries = new PopularQueries(max_suggestions);
PopularQueries* popularQueries = new PopularQueries(limit);
popular_queries.emplace(suggestion_collection, popularQueries);
if(write_to_disk) {
payload["type"] = RESOURCE_TYPE;
auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + suggestion_config_name;
auto suggestion_key = std::string(ANALYTICS_RULE_PREFIX) + "_" + suggestion_config_name;
bool inserted = store->insert(suggestion_key, payload.dump());
if(!inserted) {
return Option<bool>(500, "Error while storing the config to disk.");
@ -100,13 +112,40 @@ AnalyticsManager::~AnalyticsManager() {
}
}
Option<bool> AnalyticsManager::remove_suggestion_index(const std::string &name) {
Option<nlohmann::json> AnalyticsManager::list_rules() {
std::unique_lock lock(mutex);
nlohmann::json rules = nlohmann::json::object();
rules["rules"]= nlohmann::json::array();
for(const auto& suggestion_config: suggestion_configs) {
nlohmann::json rule;
suggestion_config.second.to_json(rule);
rule["type"] = POPULAR_QUERIES_TYPE;
rules["rules"].push_back(rule);
}
return Option<nlohmann::json>(rules);
}
Option<bool> AnalyticsManager::remove_rule(const string &name) {
std::unique_lock lock(mutex);
auto suggestion_configs_it = suggestion_configs.find(name);
if(suggestion_configs_it != suggestion_configs.end()) {
return remove_popular_queries_index(name);
}
return Option<bool>(404, "Rule not found.");
}
Option<bool> AnalyticsManager::remove_popular_queries_index(const std::string &name) {
std::unique_lock lock(mutex);
auto suggestion_configs_it = suggestion_configs.find(name);
if(suggestion_configs_it == suggestion_configs.end()) {
return Option<bool>(404, "Index not found.");
return Option<bool>(404, "Rule not found.");
}
const auto& suggestion_collection = suggestion_configs_it->second.suggestion_collection;
@ -122,7 +161,7 @@ Option<bool> AnalyticsManager::remove_suggestion_index(const std::string &name)
suggestion_configs.erase(name);
auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + name;
auto suggestion_key = std::string(ANALYTICS_RULE_PREFIX) + "_" + name;
bool erased = store->remove(suggestion_key);
if(!erased) {
return Option<bool>(500, "Error while deleting from disk.");
@ -250,3 +289,4 @@ void AnalyticsManager::dispose() {
void AnalyticsManager::init(Store* store) {
this->store = store;
}

View File

@ -103,11 +103,11 @@ std::string BatchedIndexer::get_collection_name(const std::shared_ptr<http_req>&
std::string& coll_name = req->params["collection"];
if(coll_name.empty()) {
route_path* rpath;
server->get_route(req->route_hash, &rpath);
route_path* rpath = nullptr;
bool route_found = server->get_route(req->route_hash, &rpath);
// ensure that collection creation is sent to the same queue as writes to that collection
if(rpath->handler == post_create_collection) {
if(route_found && rpath->handler == post_create_collection) {
nlohmann::json obj = nlohmann::json::parse(req->body, nullptr, false);
if(!obj.is_discarded() && obj.is_object() &&

View File

@ -1558,12 +1558,12 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
return Option<nlohmann::json>(408, "Request Timeout");
}
if(match_score_index >= 0 && sort_fields_std[match_score_index].text_match_buckets > 1) {
if(match_score_index >= 0 && sort_fields_std[match_score_index].text_match_buckets > 0) {
size_t num_buckets = sort_fields_std[match_score_index].text_match_buckets;
const size_t max_kvs_bucketed = std::min<size_t>(DEFAULT_TOPSTER_SIZE, raw_result_kvs.size());
if(max_kvs_bucketed >= num_buckets) {
std::vector<int64_t> result_scores(max_kvs_bucketed);
spp::sparse_hash_map<uint64_t, int64_t> result_scores;
// only first `max_kvs_bucketed` elements are bucketed to prevent pagination issues past 250 records
size_t block_len = (max_kvs_bucketed / num_buckets);
@ -1572,7 +1572,7 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
int64_t anchor_score = raw_result_kvs[i][0]->scores[raw_result_kvs[i][0]->match_score_index];
size_t j = 0;
while(j < block_len && i+j < max_kvs_bucketed) {
result_scores[i+j] = raw_result_kvs[i+j][0]->scores[raw_result_kvs[i+j][0]->match_score_index];
result_scores[raw_result_kvs[i+j][0]->key] = raw_result_kvs[i+j][0]->scores[raw_result_kvs[i+j][0]->match_score_index];
raw_result_kvs[i+j][0]->scores[raw_result_kvs[i+j][0]->match_score_index] = anchor_score;
j++;
}
@ -1586,7 +1586,8 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
// restore original scores
for(i = 0; i < max_kvs_bucketed; i++) {
raw_result_kvs[i][0]->scores[raw_result_kvs[i][0]->match_score_index] = result_scores[i];
raw_result_kvs[i][0]->scores[raw_result_kvs[i][0]->match_score_index] =
result_scores[raw_result_kvs[i][0]->key];
}
}
}

View File

@ -1300,15 +1300,13 @@ Option<bool> CollectionManager::load_collection(const nlohmann::json &collection
// restore query suggestions configs
std::vector<std::string> analytics_config_jsons;
cm.store->scan_fill(AnalyticsManager::ANALYTICS_CONFIG_PREFIX,
std::string(AnalyticsManager::ANALYTICS_CONFIG_PREFIX) + "`",
cm.store->scan_fill(AnalyticsManager::ANALYTICS_RULE_PREFIX,
std::string(AnalyticsManager::ANALYTICS_RULE_PREFIX) + "`",
analytics_config_jsons);
for(const auto& analytics_config_json: analytics_config_jsons) {
nlohmann::json analytics_config = nlohmann::json::parse(analytics_config_json);
if(analytics_config["type"] == AnalyticsManager::RESOURCE_TYPE) {
AnalyticsManager::get_instance().create_index(analytics_config, false);
}
AnalyticsManager::get_instance().create_rule(analytics_config, false);
}
// Fetch records from the store and re-create memory index

View File

@ -2061,7 +2061,19 @@ bool post_create_event(const std::shared_ptr<http_req>& req, const std::shared_p
return false;
}
bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
bool get_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto rules_op = AnalyticsManager::get_instance().list_rules();
if(rules_op.ok()) {
res->set_200(rules_op.get().dump());
return true;
}
res->set(rules_op.code(), rules_op.error());
return false;
}
bool post_create_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
nlohmann::json req_json;
try {
@ -2072,7 +2084,7 @@ bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req,
return false;
}
auto op = AnalyticsManager::get_instance().create_index(req_json);
auto op = AnalyticsManager::get_instance().create_rule(req_json);
if(!op.ok()) {
res->set(op.code(), op.error());
@ -2083,8 +2095,8 @@ bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req,
return true;
}
bool del_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto op = AnalyticsManager::get_instance().remove_suggestion_index(req->params["name"]);
bool del_analytics_rules(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto op = AnalyticsManager::get_instance().remove_rule(req->params["name"]);
if(!op.ok()) {
res->set(op.code(), op.error());
return false;

View File

@ -69,8 +69,9 @@ void master_server_routes() {
server->del("/presets/:name", del_preset);
// analytics
server->post("/analytics/popular-queries", post_create_analytics_popular_queries);
server->del("/analytics/popular-queries/:name", del_analytics_popular_queries);
server->get("/analytics/rules", get_analytics_rules);
server->post("/analytics/rules", post_create_analytics_rules);
server->del("/analytics/rules/:name", del_analytics_rules);
server->post("/analytics/events", post_create_event);
// meta

View File

@ -1636,7 +1636,7 @@ TEST_F(CollectionSortingTest, TextMatchBucketRanking) {
nlohmann::json doc1;
doc1["id"] = "0";
doc1["title"] = "Mark Antony";
doc1["description"] = "Marriage Counsellor";
doc1["description"] = "Counsellor";
doc1["points"] = 100;
nlohmann::json doc2;
@ -1653,47 +1653,51 @@ TEST_F(CollectionSortingTest, TextMatchBucketRanking) {
sort_by("points", "DESC"),
};
auto results = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
auto results = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true).get();
"<mark>", "</mark>", {3}, 1000, true).get();
// when there are more buckets than results, no bucketing will happen
ASSERT_EQ(2, results["hits"].size());
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("1", results["hits"][1]["document"]["id"].get<std::string>());
// bucketing by 1 produces original text match
// bucketing by 1 makes the text match score the same
sort_fields = {
sort_by("_text_match(buckets: 1)", "DESC"),
sort_by("points", "DESC"),
};
results = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
results = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true).get();
"<mark>", "</mark>", {3}, 1000, true).get();
ASSERT_EQ(2, results["hits"].size());
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("1", results["hits"][1]["document"]["id"].get<std::string>());
ASSERT_EQ("1", results["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("0", results["hits"][1]["document"]["id"].get<std::string>());
// likewise with bucket 0
size_t score1 = std::stoul(results["hits"][0]["text_match_info"]["score"].get<std::string>());
size_t score2 = std::stoul(results["hits"][1]["text_match_info"]["score"].get<std::string>());
ASSERT_TRUE(score1 < score2);
// bucketing by 0 produces original text match
sort_fields = {
sort_by("_text_match(buckets: 0)", "DESC"),
sort_by("points", "DESC"),
};
results = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
results = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true).get();
"<mark>", "</mark>", {3}, 1000, true).get();
ASSERT_EQ(2, results["hits"].size());
ASSERT_EQ("0", results["hits"][0]["document"]["id"].get<std::string>());
@ -1702,46 +1706,46 @@ TEST_F(CollectionSortingTest, TextMatchBucketRanking) {
// don't allow bad parameter name
sort_fields[0] = sort_by("_text_match(foobar: 0)", "DESC");
auto res_op = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
auto res_op = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true);
"<mark>", "</mark>", {3}, 1000, true);
ASSERT_FALSE(res_op.ok());
ASSERT_EQ("Invalid sorting parameter passed for _text_match.", res_op.error());
// handle bad syntax
sort_fields[0] = sort_by("_text_match(foobar:", "DESC");
res_op = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
res_op = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true);
"<mark>", "</mark>", {3}, 1000, true);
ASSERT_FALSE(res_op.ok());
ASSERT_EQ("Could not find a field named `_text_match(foobar:` in the schema for sorting.", res_op.error());
// handle bad value
sort_fields[0] = sort_by("_text_match(buckets: x)", "DESC");
res_op = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
res_op = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true);
"<mark>", "</mark>", {3}, 1000, true);
ASSERT_FALSE(res_op.ok());
ASSERT_EQ("Invalid value passed for _text_match `buckets` configuration.", res_op.error());
// handle negative value
sort_fields[0] = sort_by("_text_match(buckets: -1)", "DESC");
res_op = coll1->search("mark", {"title", "description"},
"", {}, sort_fields, {2, 2}, 10,
1, FREQUENCY, {true, true},
res_op = coll1->search("mark", {"title"},
"", {}, sort_fields, {2}, 10,
1, FREQUENCY, {true},
10, spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 4, "title", 20, {}, {}, {}, 0,
"<mark>", "</mark>", {3, 1}, 1000, true);
"<mark>", "</mark>", {3}, 1000, true);
ASSERT_FALSE(res_op.ok());
ASSERT_EQ("Invalid value passed for _text_match `buckets` configuration.", res_op.error());