Merge branch 'v0.25-join' of https://github.com/ozanarmagan/typesense into v0.25-join

This commit is contained in:
ozanarmagan 2023-07-02 14:07:58 +03:00
commit 992ff2d9ad
20 changed files with 383 additions and 229 deletions

View File

@ -738,7 +738,7 @@ struct sort_by {
};
struct eval_t {
filter_node_t* filter_tree_root;
filter_node_t* filter_tree_root = nullptr;
uint32_t* ids = nullptr;
uint32_t size = 0;
};

View File

@ -26,7 +26,7 @@ private:
static size_t curl_write_download(void *ptr, size_t size, size_t nmemb, FILE *stream);
static CURL* init_curl(const std::string& url, std::string& response);
static CURL* init_curl(const std::string& url, std::string& response, const size_t timeout_ms = 0);
static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk,
bool send_ts_api_header);
@ -57,7 +57,7 @@ public:
bool send_ts_api_header = false);
static long delete_response(const std::string& url, std::string& response,
std::map<std::string, std::string>& res_headers, long timeout_ms=120000,
std::map<std::string, std::string>& res_headers, long timeout_ms=4000,
bool send_ts_api_header = false);
static long post_response(const std::string & url, const std::string & body, std::string & response,

View File

@ -51,7 +51,13 @@ public:
bool is_res_start = true;
h2o_send_state_t send_state = H2O_SEND_STATE_IN_PROGRESS;
h2o_iovec_t res_body{};
std::string res_body;
h2o_iovec_t res_buff;
h2o_iovec_t res_content_type{};
int status = 0;
const char* reason = nullptr;
h2o_generator_t* generator = nullptr;
@ -61,14 +67,15 @@ public:
}
}
void set_response(uint32_t status_code, const std::string& content_type, const std::string& body) {
res_body = h2o_strdup(&req->pool, body.c_str(), SIZE_MAX);
void set_response(uint32_t status_code, const std::string& content_type, std::string& body) {
std::string().swap(res_body);
res_body = std::move(body);
res_buff = h2o_iovec_t{.base = res_body.data(), .len = res_body.size()};
if(is_res_start) {
req->res.status = status_code;
req->res.reason = http_res::get_status_reason(status_code);
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL,
content_type.c_str(), content_type.size());
res_content_type = h2o_strdup(&req->pool, content_type.c_str(), SIZE_MAX);
status = status_code;
reason = http_res::get_status_reason(status_code);
}
}

View File

@ -631,9 +631,6 @@ public:
const std::vector<char>& local_token_separators,
const std::vector<char>& local_symbols_to_index);
static void scrub_reindex_doc(const tsl::htrie_map<char, field>& search_schema,
nlohmann::json& update_doc, nlohmann::json& del_doc, const nlohmann::json& old_doc);
static void tokenize_string_field(const nlohmann::json& document,
const field& search_field, std::vector<std::string>& tokens,
const std::string& locale,

View File

@ -27,6 +27,7 @@ struct sort_fields_guard_t {
~sort_fields_guard_t() {
for(auto& sort_by_clause: sort_fields_std) {
delete sort_by_clause.eval.filter_tree_root;
if(sort_by_clause.eval.ids) {
delete [] sort_by_clause.eval.ids;
sort_by_clause.eval.ids = nullptr;
@ -418,7 +419,7 @@ Option<nlohmann::json> Collection::update_matching_filter(const std::string& fil
}
const auto& dirty_values = parse_dirty_values_option(req_dirty_values);
size_t docs_updated_count;
size_t docs_updated_count = 0;
nlohmann::json update_document, dummy;
try {
@ -1578,6 +1579,11 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
std::unique_ptr<search_args> search_params_guard(search_params);
auto search_op = index->run_search(search_params, name);
// filter_tree_root might be updated in Index::static_filter_query_eval.
filter_tree_root_guard.release();
filter_tree_root_guard.reset(filter_tree_root);
if (!search_op.ok()) {
return Option<nlohmann::json>(search_op.code(), search_op.error());
}

View File

@ -732,6 +732,27 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
AuthManager::add_item_to_params(req_params, item, true);
}
const auto preset_it = req_params.find("preset");
if(preset_it != req_params.end()) {
nlohmann::json preset;
const auto& preset_op = CollectionManager::get_instance().get_preset(preset_it->second, preset);
if(preset_op.ok()) {
if(!preset.is_object()) {
return Option<bool>(400, "Search preset is not an object.");
}
for(const auto& search_item: preset.items()) {
// overwrite = false since req params will contain embedded params and so has higher priority
bool populated = AuthManager::add_item_to_params(req_params, search_item, false);
if(!populated) {
return Option<bool>(400, "One or more search parameters are malformed.");
}
}
}
}
CollectionManager & collectionManager = CollectionManager::get_instance();
const std::string& orig_coll_name = req_params["collection"];
auto collection = collectionManager.get_collection(orig_coll_name);

View File

@ -58,10 +58,8 @@ void stream_response(const std::shared_ptr<http_req>& req, const std::shared_ptr
return ;
}
if(req->_req->res.status != 0) {
// not the first response chunk, so wait for previous chunk to finish
res->wait();
}
// wait for previous chunk to finish (if any)
res->wait();
auto req_res = new async_req_res_t(req, res, true);
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res);
@ -369,29 +367,6 @@ bool get_search(const std::shared_ptr<http_req>& req, const std::shared_ptr<http
}
}
const auto preset_it = req->params.find("preset");
if(preset_it != req->params.end()) {
nlohmann::json preset;
const auto& preset_op = CollectionManager::get_instance().get_preset(preset_it->second, preset);
if(preset_op.ok()) {
if(!preset.is_object()) {
res->set_400("Search preset is not an object.");
return false;
}
for(const auto& search_item: preset.items()) {
// overwrite = false since req params will contain embedded params and so has higher priority
bool populated = AuthManager::add_item_to_params(req->params, search_item, false);
if(!populated) {
res->set_400("One or more search parameters are malformed.");
return false;
}
}
}
}
if(req->embedded_params_vec.empty()) {
res->set_500("Embedded params is empty.");
return false;
@ -570,27 +545,6 @@ bool post_multi_search(const std::shared_ptr<http_req>& req, const std::shared_p
}
}
if(search_params.count("preset") != 0) {
nlohmann::json preset;
auto preset_op = CollectionManager::get_instance().get_preset(search_params["preset"].get<std::string>(),
preset);
if(preset_op.ok()) {
if(!search_params.is_object()) {
res->set_400("Search preset is not an object.");
return false;
}
for(const auto& search_item: preset.items()) {
// overwrite = false since req params will contain embedded params and so has higher priority
bool populated = AuthManager::add_item_to_params(req->params, search_item, false);
if(!populated) {
res->set_400("One or more search parameters are malformed.");
return false;
}
}
}
}
std::string results_json_str;
Option<bool> search_op = CollectionManager::do_search(req->params, req->embedded_params_vec[i],
results_json_str, req->conn_ts);

View File

@ -19,7 +19,7 @@ long HttpClient::post_response(const std::string &url, const std::string &body,
std::map<std::string, std::string>& res_headers,
const std::unordered_map<std::string, std::string>& headers, long timeout_ms,
bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
CURL *curl = init_curl(url, response, timeout_ms);
if(curl == nullptr) {
return 500;
}
@ -60,7 +60,7 @@ long HttpClient::post_response_async(const std::string &url, const std::shared_p
long HttpClient::put_response(const std::string &url, const std::string &body, std::string &response,
std::map<std::string, std::string>& res_headers, long timeout_ms,
bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
CURL *curl = init_curl(url, response, timeout_ms);
if(curl == nullptr) {
return 500;
}
@ -73,7 +73,7 @@ long HttpClient::put_response(const std::string &url, const std::string &body, s
long HttpClient::patch_response(const std::string &url, const std::string &body, std::string &response,
std::map<std::string, std::string>& res_headers, long timeout_ms,
bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
CURL *curl = init_curl(url, response, timeout_ms);
if(curl == nullptr) {
return 500;
}
@ -86,7 +86,7 @@ long HttpClient::patch_response(const std::string &url, const std::string &body,
long HttpClient::delete_response(const std::string &url, std::string &response,
std::map<std::string, std::string>& res_headers, long timeout_ms,
bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
CURL *curl = init_curl(url, response, timeout_ms);
if(curl == nullptr) {
return 500;
}
@ -99,7 +99,7 @@ long HttpClient::get_response(const std::string &url, std::string &response,
std::map<std::string, std::string>& res_headers,
const std::unordered_map<std::string, std::string>& headers,
long timeout_ms, bool send_ts_api_header) {
CURL *curl = init_curl(url, response);
CURL *curl = init_curl(url, response, timeout_ms);
if(curl == nullptr) {
return 500;
}
@ -279,6 +279,8 @@ size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) {
if(!req_res->res->is_alive) {
// underlying client request is dead, don't try to send anymore data
// also, close the socket as we've overridden the close socket handler!
close(item);
return 0;
}
@ -349,7 +351,7 @@ CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* re
return curl;
}
CURL *HttpClient::init_curl(const std::string& url, std::string& response) {
CURL *HttpClient::init_curl(const std::string& url, std::string& response, const size_t timeout_ms) {
CURL *curl = curl_easy_init();
if(curl == nullptr) {
@ -367,6 +369,7 @@ CURL *HttpClient::init_curl(const std::string& url, std::string& response) {
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 4000);
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms);
curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE);
// to allow self-signed certs

View File

@ -504,6 +504,9 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
);
*allocated_generator = custom_gen;
// ensures that the first response need not wait for previous chunk to be done sending
response->notify();
//LOG(INFO) << "Init res: " << custom_gen->response << ", ref count: " << custom_gen->response.use_count();
if(root_resource == "multi_search") {
@ -544,7 +547,9 @@ bool HttpServer::is_write_request(const std::string& root_resource, const std::s
return false;
}
bool write_free_request = (root_resource == "multi_search" || root_resource == "operations");
bool write_free_request = (root_resource == "multi_search" || root_resource == "proxy" ||
root_resource == "operations");
if(!write_free_request &&
(http_method == "POST" || http_method == "PUT" ||
http_method == "DELETE" || http_method == "PATCH")) {
@ -803,6 +808,13 @@ void HttpServer::stream_response(stream_response_state_t& state) {
h2o_req_t* req = state.get_req();
if(state.is_res_start) {
h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL,
state.res_content_type.base, state.res_content_type.len);
req->res.status = state.status;
req->res.reason = state.reason;
}
if(state.is_req_early_exit) {
// premature termination of async request: handle this explicitly as otherwise, request is not being closed
LOG(INFO) << "Premature termination of async request.";
@ -811,7 +823,7 @@ void HttpServer::stream_response(stream_response_state_t& state) {
h2o_start_response(req, state.generator);
}
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL);
h2o_send(req, &state.res_buff, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(req);
return ;
@ -823,13 +835,13 @@ void HttpServer::stream_response(stream_response_state_t& state) {
h2o_start_response(req, state.generator);
}
if(state.res_body.len == 0 && state.send_state != H2O_SEND_STATE_FINAL) {
if(state.res_buff.len == 0 && state.send_state != H2O_SEND_STATE_FINAL) {
// without this guard, http streaming will break
state.generator->proceed(state.generator, req);
return;
}
h2o_send(req, &state.res_body, 1, state.send_state);
h2o_send(req, &state.res_buff, 1, state.send_state);
//LOG(INFO) << "stream_response after send";
}

View File

@ -454,7 +454,6 @@ void Index::validate_and_preprocess(Index *index, std::vector<index_record>& ite
// scrub string fields to reduce delete ops
get_doc_changes(index_rec.operation, search_schema, index_rec.doc, index_rec.old_doc,
index_rec.new_doc, index_rec.del_doc);
scrub_reindex_doc(search_schema, index_rec.doc, index_rec.del_doc, index_rec.old_doc);
if(generate_embeddings) {
for(auto& field: index_rec.doc.items()) {
@ -2855,9 +2854,7 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
collate_included_ids({}, included_ids_map, curated_topster, searched_queries);
if (!vector_query.field_name.empty()) {
// use k as 250 by default for ensuring results stability in pagination
size_t default_k = 250;
auto k = std::max<size_t>(vector_query.k, default_k);
auto k = std::max<size_t>(vector_query.k, fetch_size);
if(vector_query.query_doc_given) {
// since we will omit the query doc from results
k++;
@ -3145,8 +3142,8 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
VectorFilterFunctor filterFunctor(filter_result.docs, filter_result.count);
auto& field_vector_index = vector_index.at(vector_query.field_name);
std::vector<std::pair<float, size_t>> dist_labels;
// use k as 250 by default for ensuring results stability in pagination
size_t default_k = 250;
// use k as 100 by default for ensuring results stability in pagination
size_t default_k = 100;
auto k = std::max<size_t>(vector_query.k, default_k);
if(field_vector_index->distance_type == cosine) {
@ -3186,17 +3183,26 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
result->scores[result->match_score_index] = float_to_int64_t((1.0 / (i + 1)) * TEXT_MATCH_WEIGHT);
}
for(int i = 0; i < vec_results.size(); i++) {
auto& vec_result = vec_results[i];
auto doc_id = vec_result.first;
std::vector<uint32_t> vec_search_ids; // list of IDs found only in vector search
for(size_t res_index = 0; res_index < vec_results.size(); res_index++) {
auto& vec_result = vec_results[res_index];
auto doc_id = vec_result.first;
auto result_it = topster->kv_map.find(doc_id);
if(result_it != topster->kv_map.end()&& result_it->second->match_score_index >= 0 && result_it->second->match_score_index <= 2) {
if(result_it != topster->kv_map.end()) {
if(result_it->second->match_score_index < 0 || result_it->second->match_score_index > 2) {
continue;
}
// result overlaps with keyword search: we have to combine the scores
auto result = result_it->second;
// old_score + (1 / rank_of_document) * WEIGHT)
result->vector_distance = vec_result.second;
result->scores[result->match_score_index] = float_to_int64_t((int64_t_to_float(result->scores[result->match_score_index])) + ((1.0 / (i + 1)) * VECTOR_SEARCH_WEIGHT));
result->scores[result->match_score_index] = float_to_int64_t(
(int64_t_to_float(result->scores[result->match_score_index])) +
((1.0 / (res_index + 1)) * VECTOR_SEARCH_WEIGHT));
for(size_t i = 0;i < 3; i++) {
if(field_values[i] == &vector_distance_sentinel_value) {
@ -3209,17 +3215,26 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
}
} else {
int64_t scores[3] = {0};
// Result has been found only in vector search: we have to add it to both KV and result_ids
// (1 / rank_of_document) * WEIGHT)
int64_t match_score = float_to_int64_t((1.0 / (i + 1)) * VECTOR_SEARCH_WEIGHT);
int64_t scores[3] = {0};
int64_t match_score = float_to_int64_t((1.0 / (res_index + 1)) * VECTOR_SEARCH_WEIGHT);
int64_t match_score_index = -1;
compute_sort_scores(sort_fields_std, sort_order, field_values, geopoint_indices, doc_id, 0, match_score, scores, match_score_index, vec_result.second);
KV kv(searched_queries.size(), doc_id, doc_id, match_score_index, scores);
kv.vector_distance = vec_result.second;
topster->add(&kv);
++all_result_ids_len;
vec_search_ids.push_back(doc_id);
}
}
if(!vec_search_ids.empty()) {
uint32_t* new_all_result_ids = nullptr;
all_result_ids_len = ArrayUtils::or_scalar(all_result_ids, all_result_ids_len, &vec_search_ids[0],
vec_search_ids.size(), &new_all_result_ids);
delete[] all_result_ids;
all_result_ids = new_all_result_ids;
}
}
}
@ -6220,11 +6235,19 @@ void Index::get_doc_changes(const index_operation_t op, const tsl::htrie_map<cha
if(op == UPSERT) {
new_doc = update_doc;
// since UPSERT could replace a doc with lesser fields, we have to add those missing fields to del_doc
for(auto it = old_doc.begin(); it != old_doc.end(); ++it) {
if(it.value().is_object() || (it.value().is_array() && (it.value().empty() || it.value()[0].is_object()))) {
continue;
}
if(!update_doc.contains(it.key())) {
del_doc[it.key()] = it.value();
}
}
} else {
new_doc = old_doc;
handle_doc_ops(search_schema, update_doc, old_doc);
new_doc = old_doc;
new_doc.merge_patch(update_doc);
if(old_doc.contains(".flat")) {
@ -6235,87 +6258,33 @@ void Index::get_doc_changes(const index_operation_t op, const tsl::htrie_map<cha
}
}
for(auto it = old_doc.begin(); it != old_doc.end(); ++it) {
if(it.value().is_object() || (it.value().is_array() && (it.value().empty() || it.value()[0].is_object()))) {
continue;
}
if(op == UPSERT && !update_doc.contains(it.key())) {
del_doc[it.key()] = it.value();
}
}
auto it = update_doc.begin();
for(; it != update_doc.end(); ) {
if(it.value().is_object() || (it.value().is_array() && (it.value().empty() || it.value()[0].is_object()))) {
while(it != update_doc.end()) {
if(it.value().is_object() || (it.value().is_array() && !it.value().empty() && it.value()[0].is_object())) {
++it;
continue;
}
// if the update doc contains a field that exists in old, we record that (for delete + reindex)
bool field_exists_in_old_doc = (old_doc.count(it.key()) != 0);
if(field_exists_in_old_doc) {
// key exists in the stored doc, so it must be reindexed
// we need to check for this because a field can be optional
del_doc[it.key()] = old_doc[it.key()];
}
// adds new key or overrides existing key from `old_doc`
if(it.value().is_null()) {
// null values should not indexed
// null values should not be indexed
new_doc.erase(it.key());
del_doc[it.key()] = old_doc[it.key()];
it = update_doc.erase(it);
} else {
++it;
}
}
}
void Index::scrub_reindex_doc(const tsl::htrie_map<char, field>& search_schema,
nlohmann::json& update_doc,
nlohmann::json& del_doc,
const nlohmann::json& old_doc) {
/*LOG(INFO) << "update_doc: " << update_doc;
LOG(INFO) << "old_doc: " << old_doc;
LOG(INFO) << "del_doc: " << del_doc;*/
// del_doc contains fields that exist in both update doc and old doc
// But we will only remove fields that are different
std::vector<std::string> unchanged_keys;
for(auto it = del_doc.cbegin(); it != del_doc.cend(); it++) {
const std::string& field_name = it.key();
const auto& search_field_it = search_schema.find(field_name);
if(search_field_it == search_schema.end()) {
continue;
}
if(it.value().is_object() || (it.value().is_array() && (it.value().empty() || it.value()[0].is_object()))) {
continue;
}
const auto search_field = search_field_it.value(); // copy, don't use reference!
// compare values between old and update docs:
// if they match, we will remove them from both del and update docs
if(update_doc.contains(search_field.name)) {
if(update_doc[search_field.name].is_null()) {
// we don't allow null values to be stored or indexed but need to be removed from stored doc
update_doc.erase(search_field.name);
}
else if(update_doc[search_field.name] == old_doc[search_field.name]) {
unchanged_keys.push_back(field_name);
if(old_doc.contains(it.key())) {
if(old_doc[it.key()] == it.value()) {
// unchanged so should not be part of update doc
it = update_doc.erase(it);
continue;
} else {
// delete this old value from index
del_doc[it.key()] = old_doc[it.key()];
}
}
}
for(const auto& unchanged_key: unchanged_keys) {
del_doc.erase(unchanged_key);
update_doc.erase(unchanged_key);
it++;
}
}

View File

@ -147,6 +147,7 @@ int main(int argc, char **argv) {
Option<bool> config_validitation = config.is_valid();
if(!config_validitation.ok()) {
std::cerr << "Typesense " << TYPESENSE_VERSION << std::endl;
std::cerr << "Invalid configuration: " << config_validitation.error() << std::endl;
std::cerr << "Command line " << options.usage() << std::endl;
std::cerr << "You can also pass these arguments as environment variables such as "

View File

@ -194,7 +194,8 @@ void ReplicationState::write(const std::shared_ptr<http_req>& request, const std
auto resource_check = cached_resource_stat_t::get_instance().has_enough_resources(raft_dir_path,
config->get_disk_used_max_percentage(), config->get_memory_used_max_percentage());
if (resource_check != cached_resource_stat_t::OK && request->http_method != "DELETE") {
if (resource_check != cached_resource_stat_t::OK &&
request->http_method != "DELETE" && request->path_without_query != "/health") {
response->set_422("Rejecting write: running out of resource type: " +
std::string(magic_enum::enum_name(resource_check)));
response->final = true;
@ -307,18 +308,19 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
}
} else {
std::string api_res;
long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 4000, true);
long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 10*1000, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
}
} else if(request->http_method == "PUT") {
std::string api_res;
long status = HttpClient::put_response(url, request->body, api_res, res_headers, 4000, true);
long status = HttpClient::put_response(url, request->body, api_res, res_headers, 10*1000, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
} else if(request->http_method == "DELETE") {
std::string api_res;
long status = HttpClient::delete_response(url, api_res, res_headers, 120000, true);
// timeout: 0 since delete can take a long time
long status = HttpClient::delete_response(url, api_res, res_headers, 0, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
} else if(request->http_method == "PATCH") {
@ -326,9 +328,9 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
route_path* rpath = nullptr;
bool route_found = server->get_route(request->route_hash, &rpath);
long timeout_ms = 4 * 1000;
long timeout_ms = 10 * 1000;
if(route_found && rpath->handler == patch_update_collection) {
timeout_ms = 300 * 1000; // 5 minutes for patching a collection which can take some time
timeout_ms = 0; // patching a collection can take a long time
}
long status = HttpClient::patch_response(url, request->body, api_res, res_headers, timeout_ms, true);
@ -676,7 +678,7 @@ void ReplicationState::refresh_catchup_status(bool log_msg) {
std::string api_res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 4000, true);
long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 5*1000, true);
if(status_code == 200) {
// compare leader's applied log with local applied to see if we are lagging
nlohmann::json leader_status = nlohmann::json::parse(api_res);
@ -936,7 +938,7 @@ void ReplicationState::do_snapshot(const std::string& nodes) {
std::string url = get_node_url_path(peer_addr, "/health", protocol);
std::string api_res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 4000, true);
long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 5*1000, true);
bool peer_healthy = (status_code == 200);
//LOG(INFO) << "do_snapshot, status_code: " << status_code;

View File

@ -464,7 +464,11 @@ Option<bool> StringUtils::split_include_fields(const std::string& include_fields
if (range_pos == std::string::npos && comma_pos == std::string::npos) {
if (start < size - 1) {
tokens.push_back(include_fields.substr(start, size - start));
include_field = include_fields.substr(start, size - start);
include_field = trim(include_field);
if (!include_field.empty()) {
tokens.push_back(include_field);
}
}
break;
} else if (range_pos < comma_pos) {

View File

@ -46,7 +46,7 @@ TextEmbedder::TextEmbedder(const std::string& model_name) {
TextEmbedder::TextEmbedder(const nlohmann::json& model_config) {
auto model_name = model_config["model_name"].get<std::string>();
LOG(INFO) << "Loading model from remote: " << model_name;
LOG(INFO) << "Initializing remote embedding model: " << model_name;
auto model_namespace = TextEmbedderManager::get_model_namespace(model_name);
if(model_namespace == "openai") {

View File

@ -118,7 +118,7 @@ int init_root_logger(Config & config, const std::string & server_version) {
if(log_dir.empty()) {
// use console logger if log dir is not specified
FLAGS_logtostderr = true;
FLAGS_logtostdout = true;
} else {
if(!directory_exists(log_dir)) {
std::cerr << "Typesense failed to start. " << "Log directory " << log_dir << " does not exist.";

View File

@ -1225,6 +1225,153 @@ TEST_F(CollectionSpecificMoreTest, UpsertUpdateEmplaceShouldAllRemoveIndex) {
ASSERT_EQ(1, results["found"].get<size_t>());
}
TEST_F(CollectionSpecificMoreTest, UpdateWithEmptyArray) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "tags", "type": "string[]"}
]
})"_json;
auto op = collectionManager.create_collection(schema);
ASSERT_TRUE(op.ok());
Collection* coll1 = op.get();
auto doc1 = R"({
"id": "0",
"tags": ["alpha", "beta", "gamma"]
})"_json;
ASSERT_TRUE(coll1->add(doc1.dump(), CREATE).ok());
auto doc2 = R"({
"id": "1",
"tags": ["one", "two"]
})"_json;
ASSERT_TRUE(coll1->add(doc2.dump(), CREATE).ok());
// via update
auto doc_update = R"({
"id": "0",
"tags": []
})"_json;
ASSERT_TRUE(coll1->add(doc_update.dump(), UPDATE).ok());
auto results = coll1->search("alpha", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
ASSERT_EQ(0, results["found"].get<size_t>());
// via upsert
doc_update = R"({
"id": "1",
"tags": []
})"_json;
ASSERT_TRUE(coll1->add(doc_update.dump(), UPSERT).ok());
results = coll1->search("one", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
ASSERT_EQ(0, results["found"].get<size_t>());
}
TEST_F(CollectionSpecificMoreTest, UpdateArrayWithNullValue) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "tags", "type": "string[]", "optional": true}
]
})"_json;
auto op = collectionManager.create_collection(schema);
ASSERT_TRUE(op.ok());
Collection* coll1 = op.get();
auto doc1 = R"({
"id": "0",
"tags": ["alpha", "beta", "gamma"]
})"_json;
ASSERT_TRUE(coll1->add(doc1.dump(), CREATE).ok());
auto doc2 = R"({
"id": "1",
"tags": ["one", "two"]
})"_json;
ASSERT_TRUE(coll1->add(doc2.dump(), CREATE).ok());
// via update
auto doc_update = R"({
"id": "0",
"tags": null
})"_json;
ASSERT_TRUE(coll1->add(doc_update.dump(), UPDATE).ok());
auto results = coll1->search("alpha", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
ASSERT_EQ(0, results["found"].get<size_t>());
// via upsert
doc_update = R"({
"id": "1",
"tags": null
})"_json;
ASSERT_TRUE(coll1->add(doc_update.dump(), UPSERT).ok());
results = coll1->search("one", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
ASSERT_EQ(0, results["found"].get<size_t>());
}
TEST_F(CollectionSpecificMoreTest, ReplaceArrayElement) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "tags", "type": "string[]"}
]
})"_json;
auto op = collectionManager.create_collection(schema);
ASSERT_TRUE(op.ok());
Collection* coll1 = op.get();
auto doc1 = R"({
"id": "0",
"tags": ["alpha", "beta", "gamma"]
})"_json;
ASSERT_TRUE(coll1->add(doc1.dump(), CREATE).ok());
auto doc2 = R"({
"id": "1",
"tags": ["one", "two", "three"]
})"_json;
ASSERT_TRUE(coll1->add(doc2.dump(), CREATE).ok());
// via update
auto doc_update = R"({
"id": "0",
"tags": ["alpha", "gamma"]
})"_json;
ASSERT_TRUE(coll1->add(doc_update.dump(), UPDATE).ok());
auto results = coll1->search("beta", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
ASSERT_EQ(0, results["found"].get<size_t>());
// via upsert
doc_update = R"({
"id": "1",
"tags": ["one", "three"]
})"_json;
ASSERT_TRUE(coll1->add(doc_update.dump(), UPSERT).ok());
results = coll1->search("two", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get();
ASSERT_EQ(0, results["found"].get<size_t>());
}
TEST_F(CollectionSpecificMoreTest, UnorderedWeightingOfFields) {
nlohmann::json schema = R"({
"name": "coll1",

View File

@ -740,6 +740,38 @@ TEST_F(CollectionVectorTest, HybridSearchWithExplicitVector) {
ASSERT_EQ("pam beesly", results_op.get()["hits"][2]["document"]["name"].get<std::string>());
}
TEST_F(CollectionVectorTest, HybridSearchOnlyVectorMatches) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "name", "type": "string", "facet": true},
{"name": "vec", "type": "float[]", "embed":{"from": ["name"], "model_config": {"model_name": "ts/e5-small"}}}
]
})"_json;
TextEmbedderManager::set_model_dir("/tmp/typesense_test/models");
Collection* coll1 = collectionManager.create_collection(schema).get();
nlohmann::json doc;
doc["name"] = "john doe";
ASSERT_TRUE(coll1->add(doc.dump()).ok());
auto results_op = coll1->search("zzz", {"name", "vec"}, "", {"name"}, {}, {0}, 20, 1, FREQUENCY, {true},
Index::DROP_TOKENS_THRESHOLD,
spp::sparse_hash_set<std::string>(),
spp::sparse_hash_set<std::string>(), 10, "", 30, 5,
"", 10, {}, {}, {}, 0,
"<mark>", "</mark>", {}, 1000, true, false, true, "", false, 6000 * 1000, 4, 7,
fallback,
4, {off}, 32767, 32767, 2);
ASSERT_EQ(true, results_op.ok());
ASSERT_EQ(1, results_op.get()["found"].get<size_t>());
ASSERT_EQ(1, results_op.get()["hits"].size());
ASSERT_EQ(1, results_op.get()["facet_counts"].size());
ASSERT_EQ(4, results_op.get()["facet_counts"][0].size());
ASSERT_EQ("name", results_op.get()["facet_counts"][0]["field_name"]);
}
TEST_F(CollectionVectorTest, DistanceThresholdTest) {
nlohmann::json schema = R"({
"name": "test",

View File

@ -253,6 +253,40 @@ TEST_F(CoreAPIUtilsTest, MultiSearchEmbeddedKeys) {
}
TEST_F(CoreAPIUtilsTest, SearchEmbeddedPresetKey) {
nlohmann::json preset_value = R"(
{"per_page": 100}
)"_json;
Option<bool> success_op = collectionManager.upsert_preset("apple", preset_value);
ASSERT_TRUE(success_op.ok());
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
nlohmann::json embedded_params;
embedded_params["preset"] = "apple";
req->embedded_params_vec.push_back(embedded_params);
req->params["collection"] = "foo";
get_search(req, res);
ASSERT_EQ("100", req->params["per_page"]);
// with multi search
req->params.clear();
nlohmann::json body;
body["searches"] = nlohmann::json::array();
nlohmann::json search;
search["collection"] = "users";
search["filter_by"] = "age: > 100";
body["searches"].push_back(search);
req->body = body.dump();
post_multi_search(req, res);
ASSERT_EQ("100", req->params["per_page"]);
}
TEST_F(CoreAPIUtilsTest, ExtractCollectionsFromRequestBody) {
std::map<std::string, std::string> req_params;
std::string body = R"(

View File

@ -3,66 +3,6 @@
#include <vector>
#include <s2/s2loop.h>
TEST(IndexTest, ScrubReindexDoc) {
tsl::htrie_map<char, field> search_schema;
search_schema.emplace("title", field("title", field_types::STRING, false));
search_schema.emplace("points", field("points", field_types::INT32, false));
search_schema.emplace("cast", field("cast", field_types::STRING_ARRAY, false));
search_schema.emplace("movie", field("movie", field_types::BOOL, false));
ThreadPool pool(4);
Index index("index", 1, nullptr, nullptr, &pool, search_schema, {}, {});
nlohmann::json old_doc;
old_doc["id"] = "1";
old_doc["title"] = "One more thing.";
old_doc["points"] = 100;
old_doc["cast"] = {"John Wick", "Jeremy Renner"};
old_doc["movie"] = true;
// all fields remain same
nlohmann::json update_doc1, del_doc1;
update_doc1 = old_doc;
del_doc1 = old_doc;
index.scrub_reindex_doc(search_schema, update_doc1, del_doc1, old_doc);
ASSERT_EQ(1, del_doc1.size());
ASSERT_STREQ("1", del_doc1["id"].get<std::string>().c_str());
// when only some fields are updated
nlohmann::json update_doc2, del_doc2;
update_doc2["id"] = "1";
update_doc2["points"] = 100;
update_doc2["cast"] = {"Jack"};
del_doc2 = update_doc2;
index.scrub_reindex_doc(search_schema, update_doc2, del_doc2, old_doc);
ASSERT_EQ(2, del_doc2.size());
ASSERT_STREQ("1", del_doc2["id"].get<std::string>().c_str());
std::vector<std::string> cast = del_doc2["cast"].get<std::vector<std::string>>();
ASSERT_EQ(1, cast.size());
ASSERT_STREQ("Jack", cast[0].c_str());
// containing fields not part of search schema
nlohmann::json update_doc3, del_doc3;
update_doc3["id"] = "1";
update_doc3["title"] = "The Lawyer";
update_doc3["foo"] = "Bar";
del_doc3 = update_doc3;
index.scrub_reindex_doc(search_schema, update_doc3, del_doc3, old_doc);
ASSERT_EQ(3, del_doc3.size());
ASSERT_STREQ("1", del_doc3["id"].get<std::string>().c_str());
ASSERT_STREQ("The Lawyer", del_doc3["title"].get<std::string>().c_str());
ASSERT_STREQ("Bar", del_doc3["foo"].get<std::string>().c_str());
pool.shutdown();
}
/*TEST(IndexTest, PointInPolygon180thMeridian) {
// somewhere in far eastern russia
GeoCoord verts[3] = {

View File

@ -391,7 +391,32 @@ TEST(StringUtilsTest, TokenizeFilterQuery) {
tokenList = {"(", "(", "age:<5", "||", "age:>10", ")", "&&", "location:(48.906,2.343,5mi)", ")", "||", "tags:AT&T"};
tokenizeTestHelper(filter_query, tokenList);
filter_query = "((age: <5 || age: >10) && category:= [shoes]) && $Customers(customer_id:=customer_a && (product_price:>100 && product_price:<200))";
tokenList = {"(", "(", "age: <5", "||", "age: >10", ")", "&&", "category:= [shoes]", ")", "&&", "$Customers(customer_id:=customer_a && (product_price:>100 && product_price:<200))"};
filter_query = "((age: <5 || age: >10) && category:= [shoes]) &&"
" $Customers(customer_id:=customer_a && (product_price:>100 && product_price:<200))";
tokenList = {"(", "(", "age: <5", "||", "age: >10", ")", "&&", "category:= [shoes]", ")", "&&",
"$Customers(customer_id:=customer_a && (product_price:>100 && product_price:<200))"};
tokenizeTestHelper(filter_query, tokenList);
}
void splitIncludeTestHelper(const std::string& include_fields, const std::vector<std::string>& expected) {
std::vector<std::string> output;
auto tokenize_op = StringUtils::split_include_fields(include_fields, output);
ASSERT_TRUE(tokenize_op.ok());
ASSERT_EQ(expected.size(), output.size());
for (auto i = 0; i < output.size(); i++) {
ASSERT_EQ(expected[i], output[i]);
}
}
TEST(StringUtilsTest, SplitIncludeFields) {
std::string include_fields;
std::vector<std::string> tokens;
include_fields = "id, title, count";
tokens = {"id", "title", "count"};
splitIncludeTestHelper(include_fields, tokens);
include_fields = "id, $Collection(title, pref*), count";
tokens = {"id", "$Collection(title, pref*)", "count"};
splitIncludeTestHelper(include_fields, tokens);
}