From e0e5d49053775cf508638d9b18f40e5a07432173 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 13 Jun 2023 11:10:35 +0530 Subject: [PATCH 01/15] Print TS version ahead of invalid configuration message. --- src/main/typesense_server.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 30b7281d..69ef1590 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -144,6 +144,7 @@ int main(int argc, char **argv) { Option config_validitation = config.is_valid(); if(!config_validitation.ok()) { + std::cerr << "Typesense " << TYPESENSE_VERSION << std::endl; std::cerr << "Invalid configuration: " << config_validitation.error() << std::endl; std::cerr << "Command line " << options.usage() << std::endl; std::cerr << "You can also pass these arguments as environment variables such as " From 9f78009796318a18a3e230949ebfd425abd36210 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Wed, 14 Jun 2023 22:12:33 +0530 Subject: [PATCH 02/15] Handle preset in embedded key. --- src/collection_manager.cpp | 21 +++++++++++++++++ src/core_api.cpp | 44 ------------------------------------ test/core_api_utils_test.cpp | 34 ++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 44 deletions(-) diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 5a3468b2..b0c76f69 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -731,6 +731,27 @@ Option CollectionManager::do_search(std::map& re AuthManager::add_item_to_params(req_params, item, true); } + const auto preset_it = req_params.find("preset"); + + if(preset_it != req_params.end()) { + nlohmann::json preset; + const auto& preset_op = CollectionManager::get_instance().get_preset(preset_it->second, preset); + + if(preset_op.ok()) { + if(!preset.is_object()) { + return Option(400, "Search preset is not an object."); + } + + for(const auto& search_item: preset.items()) { + // overwrite = false since req params will contain embedded params and so has higher priority + bool populated = AuthManager::add_item_to_params(req_params, search_item, false); + if(!populated) { + return Option(400, "One or more search parameters are malformed."); + } + } + } + } + CollectionManager & collectionManager = CollectionManager::get_instance(); const std::string& orig_coll_name = req_params["collection"]; auto collection = collectionManager.get_collection(orig_coll_name); diff --git a/src/core_api.cpp b/src/core_api.cpp index 6b9534e9..8967b160 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -368,29 +368,6 @@ bool get_search(const std::shared_ptr& req, const std::shared_ptrparams.find("preset"); - - if(preset_it != req->params.end()) { - nlohmann::json preset; - const auto& preset_op = CollectionManager::get_instance().get_preset(preset_it->second, preset); - - if(preset_op.ok()) { - if(!preset.is_object()) { - res->set_400("Search preset is not an object."); - return false; - } - - for(const auto& search_item: preset.items()) { - // overwrite = false since req params will contain embedded params and so has higher priority - bool populated = AuthManager::add_item_to_params(req->params, search_item, false); - if(!populated) { - res->set_400("One or more search parameters are malformed."); - return false; - } - } - } - } - if(req->embedded_params_vec.empty()) { res->set_500("Embedded params is empty."); return false; @@ -569,27 +546,6 @@ bool post_multi_search(const std::shared_ptr& req, const std::shared_p } } - if(search_params.count("preset") != 0) { - nlohmann::json preset; - auto preset_op = CollectionManager::get_instance().get_preset(search_params["preset"].get(), - preset); - if(preset_op.ok()) { - if(!search_params.is_object()) { - res->set_400("Search preset is not an object."); - return false; - } - - for(const auto& search_item: preset.items()) { - // overwrite = false since req params will contain embedded params and so has higher priority - bool populated = AuthManager::add_item_to_params(req->params, search_item, false); - if(!populated) { - res->set_400("One or more search parameters are malformed."); - return false; - } - } - } - } - std::string results_json_str; Option search_op = CollectionManager::do_search(req->params, req->embedded_params_vec[i], results_json_str, req->conn_ts); diff --git a/test/core_api_utils_test.cpp b/test/core_api_utils_test.cpp index 362ef643..57391357 100644 --- a/test/core_api_utils_test.cpp +++ b/test/core_api_utils_test.cpp @@ -253,6 +253,40 @@ TEST_F(CoreAPIUtilsTest, MultiSearchEmbeddedKeys) { } +TEST_F(CoreAPIUtilsTest, SearchEmbeddedPresetKey) { + nlohmann::json preset_value = R"( + {"per_page": 100} + )"_json; + + Option success_op = collectionManager.upsert_preset("apple", preset_value); + ASSERT_TRUE(success_op.ok()); + + std::shared_ptr req = std::make_shared(); + std::shared_ptr res = std::make_shared(nullptr); + + nlohmann::json embedded_params; + embedded_params["preset"] = "apple"; + req->embedded_params_vec.push_back(embedded_params); + req->params["collection"] = "foo"; + + get_search(req, res); + ASSERT_EQ("100", req->params["per_page"]); + + // with multi search + + req->params.clear(); + nlohmann::json body; + body["searches"] = nlohmann::json::array(); + nlohmann::json search; + search["collection"] = "users"; + search["filter_by"] = "age: > 100"; + body["searches"].push_back(search); + req->body = body.dump(); + + post_multi_search(req, res); + ASSERT_EQ("100", req->params["per_page"]); +} + TEST_F(CoreAPIUtilsTest, ExtractCollectionsFromRequestBody) { std::map req_params; std::string body = R"( From 80641096c9599f94c486663dd62745c7968b9fe7 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 17 Jun 2023 17:31:15 +0530 Subject: [PATCH 03/15] Fix asan warnings. --- include/field.h | 2 +- src/collection.cpp | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/include/field.h b/include/field.h index 8d1606e3..b828692b 100644 --- a/include/field.h +++ b/include/field.h @@ -736,7 +736,7 @@ struct sort_by { }; struct eval_t { - filter_node_t* filter_tree_root; + filter_node_t* filter_tree_root = nullptr; uint32_t* ids = nullptr; uint32_t size = 0; }; diff --git a/src/collection.cpp b/src/collection.cpp index 3831a53a..bc7d40b8 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -27,6 +27,7 @@ struct sort_fields_guard_t { ~sort_fields_guard_t() { for(auto& sort_by_clause: sort_fields_std) { + delete sort_by_clause.eval.filter_tree_root; if(sort_by_clause.eval.ids) { delete [] sort_by_clause.eval.ids; sort_by_clause.eval.ids = nullptr; @@ -1541,6 +1542,11 @@ Option Collection::search(std::string raw_query, std::unique_ptr search_params_guard(search_params); auto search_op = index->run_search(search_params, name); + + // filter_tree_root might be updated in Index::static_filter_query_eval. + filter_tree_root_guard.release(); + filter_tree_root_guard.reset(filter_tree_root); + if (!search_op.ok()) { return Option(search_op.code(), search_op.error()); } From 9e9569c1cdef95c523f08c3dae36950b847d77ef Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sun, 18 Jun 2023 16:15:25 +0530 Subject: [PATCH 04/15] Fix asan warnings for req/res cycles. --- include/http_server.h | 10 ++++++---- src/core_api.cpp | 6 ++---- src/http_server.cpp | 10 ++++++++++ 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/include/http_server.h b/include/http_server.h index 8edef305..b6465e43 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -52,6 +52,9 @@ public: bool is_res_start = true; h2o_send_state_t send_state = H2O_SEND_STATE_IN_PROGRESS; h2o_iovec_t res_body{}; + h2o_iovec_t res_content_type{}; + int status = 0; + const char* reason = nullptr; h2o_generator_t* generator = nullptr; @@ -65,10 +68,9 @@ public: res_body = h2o_strdup(&req->pool, body.c_str(), SIZE_MAX); if(is_res_start) { - req->res.status = status_code; - req->res.reason = http_res::get_status_reason(status_code); - h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, - content_type.c_str(), content_type.size()); + res_content_type = h2o_strdup(&req->pool, content_type.c_str(), SIZE_MAX); + status = status_code; + reason = http_res::get_status_reason(status_code); } } diff --git a/src/core_api.cpp b/src/core_api.cpp index 8d367dfa..7de42f6a 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -58,10 +58,8 @@ void stream_response(const std::shared_ptr& req, const std::shared_ptr return ; } - if(req->_req->res.status != 0) { - // not the first response chunk, so wait for previous chunk to finish - res->wait(); - } + // wait for previous chunk to finish (if any) + res->wait(); auto req_res = new async_req_res_t(req, res, true); server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE, req_res); diff --git a/src/http_server.cpp b/src/http_server.cpp index b71a8a24..6a704153 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -632,6 +632,9 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) { if(request->first_chunk_aggregate) { request->first_chunk_aggregate = false; + + // ensures that the first response need not wait for previous chunk to be done sending + response->notify(); } // default value for last_chunk_aggregate is false @@ -803,6 +806,13 @@ void HttpServer::stream_response(stream_response_state_t& state) { h2o_req_t* req = state.get_req(); + if(state.is_res_start) { + h2o_add_header(&req->pool, &req->res.headers, H2O_TOKEN_CONTENT_TYPE, NULL, + state.res_content_type.base, state.res_content_type.len); + req->res.status = state.status; + req->res.reason = state.reason; + } + if(state.is_req_early_exit) { // premature termination of async request: handle this explicitly as otherwise, request is not being closed LOG(INFO) << "Premature termination of async request."; From 62ba570491e3087847c45c9e4132fce5a3fafe42 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Thu, 22 Jun 2023 11:49:58 +0530 Subject: [PATCH 05/15] Remove resource check for POST /health --- src/raft_server.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 920b6b34..e882244a 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -194,7 +194,8 @@ void ReplicationState::write(const std::shared_ptr& request, const std auto resource_check = cached_resource_stat_t::get_instance().has_enough_resources(raft_dir_path, config->get_disk_used_max_percentage(), config->get_memory_used_max_percentage()); - if (resource_check != cached_resource_stat_t::OK && request->http_method != "DELETE") { + if (resource_check != cached_resource_stat_t::OK && + request->http_method != "DELETE" && request->path_without_query != "/health") { response->set_422("Rejecting write: running out of resource type: " + std::string(magic_enum::enum_name(resource_check))); response->final = true; From d9d6a38226b0bc7f8a9f59ec55bc8ed8e7651f15 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Thu, 22 Jun 2023 17:38:15 +0530 Subject: [PATCH 06/15] Reduce export/import memory usage. --- include/http_server.h | 11 ++++++++--- src/http_server.cpp | 12 ++++++------ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/include/http_server.h b/include/http_server.h index b6465e43..1ced92e7 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -51,7 +51,10 @@ public: bool is_res_start = true; h2o_send_state_t send_state = H2O_SEND_STATE_IN_PROGRESS; - h2o_iovec_t res_body{}; + + std::string res_body; + h2o_iovec_t res_buff; + h2o_iovec_t res_content_type{}; int status = 0; const char* reason = nullptr; @@ -64,8 +67,10 @@ public: } } - void set_response(uint32_t status_code, const std::string& content_type, const std::string& body) { - res_body = h2o_strdup(&req->pool, body.c_str(), SIZE_MAX); + void set_response(uint32_t status_code, const std::string& content_type, std::string& body) { + std::string().swap(res_body); + res_body = std::move(body); + res_buff = h2o_iovec_t{.base = res_body.data(), .len = res_body.size()}; if(is_res_start) { res_content_type = h2o_strdup(&req->pool, content_type.c_str(), SIZE_MAX); diff --git a/src/http_server.cpp b/src/http_server.cpp index 6a704153..f72de8da 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -504,6 +504,9 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { ); *allocated_generator = custom_gen; + // ensures that the first response need not wait for previous chunk to be done sending + response->notify(); + //LOG(INFO) << "Init res: " << custom_gen->response << ", ref count: " << custom_gen->response.use_count(); if(root_resource == "multi_search") { @@ -632,9 +635,6 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) { if(request->first_chunk_aggregate) { request->first_chunk_aggregate = false; - - // ensures that the first response need not wait for previous chunk to be done sending - response->notify(); } // default value for last_chunk_aggregate is false @@ -821,7 +821,7 @@ void HttpServer::stream_response(stream_response_state_t& state) { h2o_start_response(req, state.generator); } - h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL); + h2o_send(req, &state.res_buff, 1, H2O_SEND_STATE_FINAL); h2o_dispose_request(req); return ; @@ -833,13 +833,13 @@ void HttpServer::stream_response(stream_response_state_t& state) { h2o_start_response(req, state.generator); } - if(state.res_body.len == 0 && state.send_state != H2O_SEND_STATE_FINAL) { + if(state.res_buff.len == 0 && state.send_state != H2O_SEND_STATE_FINAL) { // without this guard, http streaming will break state.generator->proceed(state.generator, req); return; } - h2o_send(req, &state.res_body, 1, state.send_state); + h2o_send(req, &state.res_buff, 1, state.send_state); //LOG(INFO) << "stream_response after send"; } From b44a374229988de97f4ebafe30f8fae0573df3b2 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 24 Jun 2023 10:05:04 +0530 Subject: [PATCH 07/15] Close async curl socket on premature error. --- src/http_client.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/http_client.cpp b/src/http_client.cpp index ef11b57d..7b3c274b 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -279,6 +279,8 @@ size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) { if(!req_res->res->is_alive) { // underlying client request is dead, don't try to send anymore data + // also, close the socket as we've overridden the close socket handler! + close(item); return 0; } From 3a8a3997835a4d8f094515c82571ddb3172be346 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 27 Jun 2023 14:12:19 +0530 Subject: [PATCH 08/15] Don't send proxy logs through raft log. --- src/http_server.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/http_server.cpp b/src/http_server.cpp index f72de8da..813ee450 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -547,7 +547,9 @@ bool HttpServer::is_write_request(const std::string& root_resource, const std::s return false; } - bool write_free_request = (root_resource == "multi_search" || root_resource == "operations"); + bool write_free_request = (root_resource == "multi_search" || root_resource == "proxy" || + root_resource == "operations"); + if(!write_free_request && (http_method == "POST" || http_method == "PUT" || http_method == "DELETE" || http_method == "PATCH")) { From 425611fe0169fc36dc5852e697a4cbbcd7e07c2c Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 27 Jun 2023 14:21:36 +0530 Subject: [PATCH 09/15] Improve logging message. --- src/text_embedder.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/text_embedder.cpp b/src/text_embedder.cpp index ae2297a2..b2a67607 100644 --- a/src/text_embedder.cpp +++ b/src/text_embedder.cpp @@ -46,7 +46,7 @@ TextEmbedder::TextEmbedder(const std::string& model_name) { TextEmbedder::TextEmbedder(const nlohmann::json& model_config) { auto model_name = model_config["model_name"].get(); - LOG(INFO) << "Loading model from remote: " << model_name; + LOG(INFO) << "Initializing remote embedding model: " << model_name; auto model_namespace = TextEmbedderManager::get_model_namespace(model_name); if(model_namespace == "openai") { From c0686a59365a225751a164cce4bb49ca29943f9f Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Wed, 28 Jun 2023 13:21:08 +0530 Subject: [PATCH 10/15] Fixed bug when only vector search produced results in hybrid. The vector search produced IDs must be merged back to all_result_ids in addition to incrementing the all_result_ids_len. --- src/index.cpp | 34 ++++++++++++++++++++------ test/collection_vector_search_test.cpp | 34 ++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/src/index.cpp b/src/index.cpp index 65f8f7fc..285b2af8 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -3186,17 +3186,26 @@ Option Index::search(std::vector& field_query_tokens, cons result->scores[result->match_score_index] = float_to_int64_t((1.0 / (i + 1)) * TEXT_MATCH_WEIGHT); } - for(int i = 0; i < vec_results.size(); i++) { - auto& vec_result = vec_results[i]; - auto doc_id = vec_result.first; + std::vector vec_search_ids; // list of IDs found only in vector search + for(size_t res_index = 0; res_index < vec_results.size(); res_index++) { + auto& vec_result = vec_results[res_index]; + auto doc_id = vec_result.first; auto result_it = topster->kv_map.find(doc_id); - if(result_it != topster->kv_map.end()&& result_it->second->match_score_index >= 0 && result_it->second->match_score_index <= 2) { + if(result_it != topster->kv_map.end()) { + if(result_it->second->match_score_index < 0 || result_it->second->match_score_index > 2) { + continue; + } + + // result overlaps with keyword search: we have to combine the scores + auto result = result_it->second; // old_score + (1 / rank_of_document) * WEIGHT) result->vector_distance = vec_result.second; - result->scores[result->match_score_index] = float_to_int64_t((int64_t_to_float(result->scores[result->match_score_index])) + ((1.0 / (i + 1)) * VECTOR_SEARCH_WEIGHT)); + result->scores[result->match_score_index] = float_to_int64_t( + (int64_t_to_float(result->scores[result->match_score_index])) + + ((1.0 / (res_index + 1)) * VECTOR_SEARCH_WEIGHT)); for(size_t i = 0;i < 3; i++) { if(field_values[i] == &vector_distance_sentinel_value) { @@ -3209,17 +3218,26 @@ Option Index::search(std::vector& field_query_tokens, cons } } else { - int64_t scores[3] = {0}; + // Result has been found only in vector search: we have to add it to both KV and result_ids // (1 / rank_of_document) * WEIGHT) - int64_t match_score = float_to_int64_t((1.0 / (i + 1)) * VECTOR_SEARCH_WEIGHT); + int64_t scores[3] = {0}; + int64_t match_score = float_to_int64_t((1.0 / (res_index + 1)) * VECTOR_SEARCH_WEIGHT); int64_t match_score_index = -1; compute_sort_scores(sort_fields_std, sort_order, field_values, geopoint_indices, doc_id, 0, match_score, scores, match_score_index, vec_result.second); KV kv(searched_queries.size(), doc_id, doc_id, match_score_index, scores); kv.vector_distance = vec_result.second; topster->add(&kv); - ++all_result_ids_len; + vec_search_ids.push_back(doc_id); } } + + if(!vec_search_ids.empty()) { + uint32_t* new_all_result_ids = nullptr; + all_result_ids_len = ArrayUtils::or_scalar(all_result_ids, all_result_ids_len, &vec_search_ids[0], + vec_search_ids.size(), &new_all_result_ids); + delete[] all_result_ids; + all_result_ids = new_all_result_ids; + } } } diff --git a/test/collection_vector_search_test.cpp b/test/collection_vector_search_test.cpp index 995bddd7..b6b598dc 100644 --- a/test/collection_vector_search_test.cpp +++ b/test/collection_vector_search_test.cpp @@ -711,12 +711,42 @@ TEST_F(CollectionVectorTest, HybridSearchWithExplicitVector) { 4, {off}, 32767, 32767, 2, false, true, "vec:(" + dummy_vec_string +")"); ASSERT_EQ(true, results_op.ok()); - - ASSERT_EQ(1, results_op.get()["found"].get()); ASSERT_EQ(1, results_op.get()["hits"].size()); } +TEST_F(CollectionVectorTest, HybridSearchOnlyVectorMatches) { + nlohmann::json schema = R"({ + "name": "coll1", + "fields": [ + {"name": "name", "type": "string", "facet": true}, + {"name": "vec", "type": "float[]", "embed":{"from": ["name"], "model_config": {"model_name": "ts/e5-small"}}} + ] + })"_json; + + TextEmbedderManager::set_model_dir("/tmp/typesense_test/models"); + Collection* coll1 = collectionManager.create_collection(schema).get(); + + nlohmann::json doc; + doc["name"] = "john doe"; + ASSERT_TRUE(coll1->add(doc.dump()).ok()); + + auto results_op = coll1->search("zzz", {"name", "vec"}, "", {"name"}, {}, {0}, 20, 1, FREQUENCY, {true}, + Index::DROP_TOKENS_THRESHOLD, + spp::sparse_hash_set(), + spp::sparse_hash_set(), 10, "", 30, 5, + "", 10, {}, {}, {}, 0, + "", "", {}, 1000, true, false, true, "", false, 6000 * 1000, 4, 7, + fallback, + 4, {off}, 32767, 32767, 2); + ASSERT_EQ(true, results_op.ok()); + ASSERT_EQ(1, results_op.get()["found"].get()); + ASSERT_EQ(1, results_op.get()["hits"].size()); + ASSERT_EQ(1, results_op.get()["facet_counts"].size()); + ASSERT_EQ(4, results_op.get()["facet_counts"][0].size()); + ASSERT_EQ("name", results_op.get()["facet_counts"][0]["field_name"]); +} + TEST_F(CollectionVectorTest, DistanceThresholdTest) { nlohmann::json schema = R"({ "name": "test", From a7d549d0aeea26310e1a02889bfaaf9539e5a085 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Wed, 28 Jun 2023 13:24:17 +0530 Subject: [PATCH 11/15] Tweak default values for k in vector search. --- src/index.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/index.cpp b/src/index.cpp index 285b2af8..5cfb2287 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -2855,9 +2855,7 @@ Option Index::search(std::vector& field_query_tokens, cons collate_included_ids({}, included_ids_map, curated_topster, searched_queries); if (!vector_query.field_name.empty()) { - // use k as 250 by default for ensuring results stability in pagination - size_t default_k = 250; - auto k = std::max(vector_query.k, default_k); + auto k = std::max(vector_query.k, fetch_size); if(vector_query.query_doc_given) { // since we will omit the query doc from results k++; @@ -3145,8 +3143,8 @@ Option Index::search(std::vector& field_query_tokens, cons VectorFilterFunctor filterFunctor(filter_result.docs, filter_result.count); auto& field_vector_index = vector_index.at(vector_query.field_name); std::vector> dist_labels; - // use k as 250 by default for ensuring results stability in pagination - size_t default_k = 250; + // use k as 100 by default for ensuring results stability in pagination + size_t default_k = 100; auto k = std::max(vector_query.k, default_k); if(field_vector_index->distance_type == cosine) { From 176a72e6ca0515ff3eae2fde9d3f40ce29f134f6 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Fri, 30 Jun 2023 10:58:15 +0530 Subject: [PATCH 12/15] Fix http curl client timeout API. --- include/http_client.h | 4 ++-- src/http_client.cpp | 13 +++++++------ src/raft_server.cpp | 15 ++++++++------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/include/http_client.h b/include/http_client.h index 8305d1b5..94cb636c 100644 --- a/include/http_client.h +++ b/include/http_client.h @@ -26,7 +26,7 @@ private: static size_t curl_write_download(void *ptr, size_t size, size_t nmemb, FILE *stream); - static CURL* init_curl(const std::string& url, std::string& response); + static CURL* init_curl(const std::string& url, std::string& response, const size_t timeout_ms = 0); static CURL* init_curl_async(const std::string& url, deferred_req_res_t* req_res, curl_slist*& chunk, bool send_ts_api_header); @@ -57,7 +57,7 @@ public: bool send_ts_api_header = false); static long delete_response(const std::string& url, std::string& response, - std::map& res_headers, long timeout_ms=120000, + std::map& res_headers, long timeout_ms=4000, bool send_ts_api_header = false); static long post_response(const std::string & url, const std::string & body, std::string & response, diff --git a/src/http_client.cpp b/src/http_client.cpp index 7b3c274b..c6e4b7fd 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -19,7 +19,7 @@ long HttpClient::post_response(const std::string &url, const std::string &body, std::map& res_headers, const std::unordered_map& headers, long timeout_ms, bool send_ts_api_header) { - CURL *curl = init_curl(url, response); + CURL *curl = init_curl(url, response, timeout_ms); if(curl == nullptr) { return 500; } @@ -60,7 +60,7 @@ long HttpClient::post_response_async(const std::string &url, const std::shared_p long HttpClient::put_response(const std::string &url, const std::string &body, std::string &response, std::map& res_headers, long timeout_ms, bool send_ts_api_header) { - CURL *curl = init_curl(url, response); + CURL *curl = init_curl(url, response, timeout_ms); if(curl == nullptr) { return 500; } @@ -73,7 +73,7 @@ long HttpClient::put_response(const std::string &url, const std::string &body, s long HttpClient::patch_response(const std::string &url, const std::string &body, std::string &response, std::map& res_headers, long timeout_ms, bool send_ts_api_header) { - CURL *curl = init_curl(url, response); + CURL *curl = init_curl(url, response, timeout_ms); if(curl == nullptr) { return 500; } @@ -86,7 +86,7 @@ long HttpClient::patch_response(const std::string &url, const std::string &body, long HttpClient::delete_response(const std::string &url, std::string &response, std::map& res_headers, long timeout_ms, bool send_ts_api_header) { - CURL *curl = init_curl(url, response); + CURL *curl = init_curl(url, response, timeout_ms); if(curl == nullptr) { return 500; } @@ -99,7 +99,7 @@ long HttpClient::get_response(const std::string &url, std::string &response, std::map& res_headers, const std::unordered_map& headers, long timeout_ms, bool send_ts_api_header) { - CURL *curl = init_curl(url, response); + CURL *curl = init_curl(url, response, timeout_ms); if(curl == nullptr) { return 500; } @@ -351,7 +351,7 @@ CURL *HttpClient::init_curl_async(const std::string& url, deferred_req_res_t* re return curl; } -CURL *HttpClient::init_curl(const std::string& url, std::string& response) { +CURL *HttpClient::init_curl(const std::string& url, std::string& response, const size_t timeout_ms) { CURL *curl = curl_easy_init(); if(curl == nullptr) { @@ -369,6 +369,7 @@ CURL *HttpClient::init_curl(const std::string& url, std::string& response) { curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 4000); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms); curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE); // to allow self-signed certs diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 0ca0b7de..7e18456e 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -308,18 +308,19 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, } } else { std::string api_res; - long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 4000, true); + long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 10*1000, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } } else if(request->http_method == "PUT") { std::string api_res; - long status = HttpClient::put_response(url, request->body, api_res, res_headers, 4000, true); + long status = HttpClient::put_response(url, request->body, api_res, res_headers, 10*1000, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else if(request->http_method == "DELETE") { std::string api_res; - long status = HttpClient::delete_response(url, api_res, res_headers, 120000, true); + // timeout: 0 since delete can take a long time + long status = HttpClient::delete_response(url, api_res, res_headers, 0, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else if(request->http_method == "PATCH") { @@ -327,9 +328,9 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, route_path* rpath = nullptr; bool route_found = server->get_route(request->route_hash, &rpath); - long timeout_ms = 4 * 1000; + long timeout_ms = 10 * 1000; if(route_found && rpath->handler == patch_update_collection) { - timeout_ms = 300 * 1000; // 5 minutes for patching a collection which can take some time + timeout_ms = 0; // patching a collection can take a long time } long status = HttpClient::patch_response(url, request->body, api_res, res_headers, timeout_ms, true); @@ -677,7 +678,7 @@ void ReplicationState::refresh_catchup_status(bool log_msg) { std::string api_res; std::map res_headers; - long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 4000, true); + long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 5*1000, true); if(status_code == 200) { // compare leader's applied log with local applied to see if we are lagging nlohmann::json leader_status = nlohmann::json::parse(api_res); @@ -937,7 +938,7 @@ void ReplicationState::do_snapshot(const std::string& nodes) { std::string url = get_node_url_path(peer_addr, "/health", protocol); std::string api_res; std::map res_headers; - long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 4000, true); + long status_code = HttpClient::get_response(url, api_res, res_headers, {}, 5*1000, true); bool peer_healthy = (status_code == 200); //LOG(INFO) << "do_snapshot, status_code: " << status_code; From cad2d802e8c8cd98dc073e594715876a9d50307a Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Fri, 30 Jun 2023 11:12:40 +0530 Subject: [PATCH 13/15] Glog stdout/stderr console log separation. --- src/typesense_server_utils.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index c628c569..e43a67c9 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -118,7 +118,7 @@ int init_root_logger(Config & config, const std::string & server_version) { if(log_dir.empty()) { // use console logger if log dir is not specified - FLAGS_logtostderr = true; + FLAGS_logtostdout = true; } else { if(!directory_exists(log_dir)) { std::cerr << "Typesense failed to start. " << "Log directory " << log_dir << " does not exist."; From e277415ccbb67a4e62dd7582a1ab2183c0178a9b Mon Sep 17 00:00:00 2001 From: Harpreet Sangar Date: Fri, 30 Jun 2023 12:19:10 +0530 Subject: [PATCH 14/15] Fix `docs_updated_count` getting garbage value. Fix `StringUtils::split_include_fields` not trimming last value. --- src/collection.cpp | 2 +- src/string_utils.cpp | 6 +++++- test/string_utils_test.cpp | 29 +++++++++++++++++++++++++++-- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/collection.cpp b/src/collection.cpp index bac865ad..c55bee7d 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -419,7 +419,7 @@ Option Collection::update_matching_filter(const std::string& fil } const auto& dirty_values = parse_dirty_values_option(req_dirty_values); - size_t docs_updated_count; + size_t docs_updated_count = 0; nlohmann::json update_document, dummy; try { diff --git a/src/string_utils.cpp b/src/string_utils.cpp index b2caf445..04711402 100644 --- a/src/string_utils.cpp +++ b/src/string_utils.cpp @@ -464,7 +464,11 @@ Option StringUtils::split_include_fields(const std::string& include_fields if (range_pos == std::string::npos && comma_pos == std::string::npos) { if (start < size - 1) { - tokens.push_back(include_fields.substr(start, size - start)); + include_field = include_fields.substr(start, size - start); + include_field = trim(include_field); + if (!include_field.empty()) { + tokens.push_back(include_field); + } } break; } else if (range_pos < comma_pos) { diff --git a/test/string_utils_test.cpp b/test/string_utils_test.cpp index 8fa60a92..75700c8a 100644 --- a/test/string_utils_test.cpp +++ b/test/string_utils_test.cpp @@ -391,7 +391,32 @@ TEST(StringUtilsTest, TokenizeFilterQuery) { tokenList = {"(", "(", "age:<5", "||", "age:>10", ")", "&&", "location:(48.906,2.343,5mi)", ")", "||", "tags:AT&T"}; tokenizeTestHelper(filter_query, tokenList); - filter_query = "((age: <5 || age: >10) && category:= [shoes]) && $Customers(customer_id:=customer_a && (product_price:>100 && product_price:<200))"; - tokenList = {"(", "(", "age: <5", "||", "age: >10", ")", "&&", "category:= [shoes]", ")", "&&", "$Customers(customer_id:=customer_a && (product_price:>100 && product_price:<200))"}; + filter_query = "((age: <5 || age: >10) && category:= [shoes]) &&" + " $Customers(customer_id:=customer_a && (product_price:>100 && product_price:<200))"; + tokenList = {"(", "(", "age: <5", "||", "age: >10", ")", "&&", "category:= [shoes]", ")", "&&", + "$Customers(customer_id:=customer_a && (product_price:>100 && product_price:<200))"}; tokenizeTestHelper(filter_query, tokenList); } + +void splitIncludeTestHelper(const std::string& include_fields, const std::vector& expected) { + std::vector output; + auto tokenize_op = StringUtils::split_include_fields(include_fields, output); + ASSERT_TRUE(tokenize_op.ok()); + ASSERT_EQ(expected.size(), output.size()); + for (auto i = 0; i < output.size(); i++) { + ASSERT_EQ(expected[i], output[i]); + } +} + +TEST(StringUtilsTest, SplitIncludeFields) { + std::string include_fields; + std::vector tokens; + + include_fields = "id, title, count"; + tokens = {"id", "title", "count"}; + splitIncludeTestHelper(include_fields, tokens); + + include_fields = "id, $Collection(title, pref*), count"; + tokens = {"id", "$Collection(title, pref*)", "count"}; + splitIncludeTestHelper(include_fields, tokens); +} \ No newline at end of file From c066120fb1b61936c1860088188e735a3483c0d1 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Sat, 1 Jul 2023 13:46:03 +0530 Subject: [PATCH 15/15] Fix edge case in updating empty array strings. --- include/index.h | 3 - src/index.cpp | 95 ++++------------ test/collection_specific_more_test.cpp | 147 +++++++++++++++++++++++++ test/index_test.cpp | 60 ---------- 4 files changed, 171 insertions(+), 134 deletions(-) diff --git a/include/index.h b/include/index.h index 3f93e182..5c1b77e0 100644 --- a/include/index.h +++ b/include/index.h @@ -631,9 +631,6 @@ public: const std::vector& local_token_separators, const std::vector& local_symbols_to_index); - static void scrub_reindex_doc(const tsl::htrie_map& search_schema, - nlohmann::json& update_doc, nlohmann::json& del_doc, const nlohmann::json& old_doc); - static void tokenize_string_field(const nlohmann::json& document, const field& search_field, std::vector& tokens, const std::string& locale, diff --git a/src/index.cpp b/src/index.cpp index 5cfb2287..4e6ac914 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -454,7 +454,6 @@ void Index::validate_and_preprocess(Index *index, std::vector& ite // scrub string fields to reduce delete ops get_doc_changes(index_rec.operation, search_schema, index_rec.doc, index_rec.old_doc, index_rec.new_doc, index_rec.del_doc); - scrub_reindex_doc(search_schema, index_rec.doc, index_rec.del_doc, index_rec.old_doc); if(generate_embeddings) { for(auto& field: index_rec.doc.items()) { @@ -6236,11 +6235,19 @@ void Index::get_doc_changes(const index_operation_t op, const tsl::htrie_map& search_schema, - nlohmann::json& update_doc, - nlohmann::json& del_doc, - const nlohmann::json& old_doc) { - - /*LOG(INFO) << "update_doc: " << update_doc; - LOG(INFO) << "old_doc: " << old_doc; - LOG(INFO) << "del_doc: " << del_doc;*/ - - // del_doc contains fields that exist in both update doc and old doc - // But we will only remove fields that are different - - std::vector unchanged_keys; - - for(auto it = del_doc.cbegin(); it != del_doc.cend(); it++) { - const std::string& field_name = it.key(); - - const auto& search_field_it = search_schema.find(field_name); - if(search_field_it == search_schema.end()) { continue; } - if(it.value().is_object() || (it.value().is_array() && (it.value().empty() || it.value()[0].is_object()))) { - continue; - } - - const auto search_field = search_field_it.value(); // copy, don't use reference! - - // compare values between old and update docs: - // if they match, we will remove them from both del and update docs - - if(update_doc.contains(search_field.name)) { - if(update_doc[search_field.name].is_null()) { - // we don't allow null values to be stored or indexed but need to be removed from stored doc - update_doc.erase(search_field.name); - } - else if(update_doc[search_field.name] == old_doc[search_field.name]) { - unchanged_keys.push_back(field_name); + if(old_doc.contains(it.key())) { + if(old_doc[it.key()] == it.value()) { + // unchanged so should not be part of update doc + it = update_doc.erase(it); + continue; + } else { + // delete this old value from index + del_doc[it.key()] = old_doc[it.key()]; } } - } - for(const auto& unchanged_key: unchanged_keys) { - del_doc.erase(unchanged_key); - update_doc.erase(unchanged_key); + it++; } } diff --git a/test/collection_specific_more_test.cpp b/test/collection_specific_more_test.cpp index 370fd45f..f53ce1a7 100644 --- a/test/collection_specific_more_test.cpp +++ b/test/collection_specific_more_test.cpp @@ -1225,6 +1225,153 @@ TEST_F(CollectionSpecificMoreTest, UpsertUpdateEmplaceShouldAllRemoveIndex) { ASSERT_EQ(1, results["found"].get()); } +TEST_F(CollectionSpecificMoreTest, UpdateWithEmptyArray) { + nlohmann::json schema = R"({ + "name": "coll1", + "fields": [ + {"name": "tags", "type": "string[]"} + ] + })"_json; + + auto op = collectionManager.create_collection(schema); + ASSERT_TRUE(op.ok()); + Collection* coll1 = op.get(); + + auto doc1 = R"({ + "id": "0", + "tags": ["alpha", "beta", "gamma"] + })"_json; + + ASSERT_TRUE(coll1->add(doc1.dump(), CREATE).ok()); + + auto doc2 = R"({ + "id": "1", + "tags": ["one", "two"] + })"_json; + + ASSERT_TRUE(coll1->add(doc2.dump(), CREATE).ok()); + + // via update + + auto doc_update = R"({ + "id": "0", + "tags": [] + })"_json; + ASSERT_TRUE(coll1->add(doc_update.dump(), UPDATE).ok()); + + auto results = coll1->search("alpha", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get(); + ASSERT_EQ(0, results["found"].get()); + + // via upsert + + doc_update = R"({ + "id": "1", + "tags": [] + })"_json; + ASSERT_TRUE(coll1->add(doc_update.dump(), UPSERT).ok()); + + results = coll1->search("one", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get(); + ASSERT_EQ(0, results["found"].get()); +} + +TEST_F(CollectionSpecificMoreTest, UpdateArrayWithNullValue) { + nlohmann::json schema = R"({ + "name": "coll1", + "fields": [ + {"name": "tags", "type": "string[]", "optional": true} + ] + })"_json; + + auto op = collectionManager.create_collection(schema); + ASSERT_TRUE(op.ok()); + Collection* coll1 = op.get(); + + auto doc1 = R"({ + "id": "0", + "tags": ["alpha", "beta", "gamma"] + })"_json; + + ASSERT_TRUE(coll1->add(doc1.dump(), CREATE).ok()); + + auto doc2 = R"({ + "id": "1", + "tags": ["one", "two"] + })"_json; + + ASSERT_TRUE(coll1->add(doc2.dump(), CREATE).ok()); + + // via update + + auto doc_update = R"({ + "id": "0", + "tags": null + })"_json; + ASSERT_TRUE(coll1->add(doc_update.dump(), UPDATE).ok()); + + auto results = coll1->search("alpha", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get(); + ASSERT_EQ(0, results["found"].get()); + + // via upsert + + doc_update = R"({ + "id": "1", + "tags": null + })"_json; + ASSERT_TRUE(coll1->add(doc_update.dump(), UPSERT).ok()); + + results = coll1->search("one", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get(); + ASSERT_EQ(0, results["found"].get()); +} + +TEST_F(CollectionSpecificMoreTest, ReplaceArrayElement) { + nlohmann::json schema = R"({ + "name": "coll1", + "fields": [ + {"name": "tags", "type": "string[]"} + ] + })"_json; + + auto op = collectionManager.create_collection(schema); + ASSERT_TRUE(op.ok()); + Collection* coll1 = op.get(); + + auto doc1 = R"({ + "id": "0", + "tags": ["alpha", "beta", "gamma"] + })"_json; + + ASSERT_TRUE(coll1->add(doc1.dump(), CREATE).ok()); + + auto doc2 = R"({ + "id": "1", + "tags": ["one", "two", "three"] + })"_json; + + ASSERT_TRUE(coll1->add(doc2.dump(), CREATE).ok()); + + // via update + + auto doc_update = R"({ + "id": "0", + "tags": ["alpha", "gamma"] + })"_json; + ASSERT_TRUE(coll1->add(doc_update.dump(), UPDATE).ok()); + + auto results = coll1->search("beta", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get(); + ASSERT_EQ(0, results["found"].get()); + + // via upsert + + doc_update = R"({ + "id": "1", + "tags": ["one", "three"] + })"_json; + ASSERT_TRUE(coll1->add(doc_update.dump(), UPSERT).ok()); + + results = coll1->search("two", {"tags"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}).get(); + ASSERT_EQ(0, results["found"].get()); +} + TEST_F(CollectionSpecificMoreTest, UnorderedWeightingOfFields) { nlohmann::json schema = R"({ "name": "coll1", diff --git a/test/index_test.cpp b/test/index_test.cpp index d788d5f5..8b2b1fcf 100644 --- a/test/index_test.cpp +++ b/test/index_test.cpp @@ -3,66 +3,6 @@ #include #include -TEST(IndexTest, ScrubReindexDoc) { - tsl::htrie_map search_schema; - search_schema.emplace("title", field("title", field_types::STRING, false)); - search_schema.emplace("points", field("points", field_types::INT32, false)); - search_schema.emplace("cast", field("cast", field_types::STRING_ARRAY, false)); - search_schema.emplace("movie", field("movie", field_types::BOOL, false)); - - ThreadPool pool(4); - - Index index("index", 1, nullptr, nullptr, &pool, search_schema, {}, {}); - nlohmann::json old_doc; - old_doc["id"] = "1"; - old_doc["title"] = "One more thing."; - old_doc["points"] = 100; - old_doc["cast"] = {"John Wick", "Jeremy Renner"}; - old_doc["movie"] = true; - - // all fields remain same - - nlohmann::json update_doc1, del_doc1; - update_doc1 = old_doc; - del_doc1 = old_doc; - - index.scrub_reindex_doc(search_schema, update_doc1, del_doc1, old_doc); - ASSERT_EQ(1, del_doc1.size()); - ASSERT_STREQ("1", del_doc1["id"].get().c_str()); - - // when only some fields are updated - - nlohmann::json update_doc2, del_doc2; - update_doc2["id"] = "1"; - update_doc2["points"] = 100; - update_doc2["cast"] = {"Jack"}; - - del_doc2 = update_doc2; - - index.scrub_reindex_doc(search_schema, update_doc2, del_doc2, old_doc); - ASSERT_EQ(2, del_doc2.size()); - ASSERT_STREQ("1", del_doc2["id"].get().c_str()); - std::vector cast = del_doc2["cast"].get>(); - ASSERT_EQ(1, cast.size()); - ASSERT_STREQ("Jack", cast[0].c_str()); - - // containing fields not part of search schema - - nlohmann::json update_doc3, del_doc3; - update_doc3["id"] = "1"; - update_doc3["title"] = "The Lawyer"; - update_doc3["foo"] = "Bar"; - - del_doc3 = update_doc3; - index.scrub_reindex_doc(search_schema, update_doc3, del_doc3, old_doc); - ASSERT_EQ(3, del_doc3.size()); - ASSERT_STREQ("1", del_doc3["id"].get().c_str()); - ASSERT_STREQ("The Lawyer", del_doc3["title"].get().c_str()); - ASSERT_STREQ("Bar", del_doc3["foo"].get().c_str()); - - pool.shutdown(); -} - /*TEST(IndexTest, PointInPolygon180thMeridian) { // somewhere in far eastern russia GeoCoord verts[3] = {