diff --git a/include/collection.h b/include/collection.h index 5fcb54c8..230ea65f 100644 --- a/include/collection.h +++ b/include/collection.h @@ -461,7 +461,8 @@ public: const uint64_t search_time_start_us = 0, const text_match_type_t match_type = max_score, const size_t facet_sample_percent = 100, - const size_t facet_sample_threshold = 0) const; + const size_t facet_sample_threshold = 0, + const size_t page_offset = UINT32_MAX) const; Option get_filter_ids(const std::string & filter_query, filter_result_t& filter_result) const; diff --git a/include/core_api.h b/include/core_api.h index fd5a4140..f435ccc4 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -119,6 +119,8 @@ bool post_clear_cache(const std::shared_ptr& req, const std::shared_pt bool post_compact_db(const std::shared_ptr& req, const std::shared_ptr& res); +bool post_reset_peers(const std::shared_ptr& req, const std::shared_ptr& res); + // Rate Limiting bool get_rate_limits(const std::shared_ptr& req, const std::shared_ptr& res); diff --git a/include/http_data.h b/include/http_data.h index ec7aa17e..893c1138 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -244,13 +244,12 @@ struct http_req { int64_t log_index; - std::atomic is_http_v1; std::atomic is_diposed; std::string client_ip = "0.0.0.0"; http_req(): _req(nullptr), route_hash(1), first_chunk_aggregate(true), last_chunk_aggregate(false), - chunk_len(0), body_index(0), data(nullptr), ready(false), log_index(0), is_http_v1(true), + chunk_len(0), body_index(0), data(nullptr), ready(false), log_index(0), is_diposed(false) { start_ts = std::chrono::duration_cast( @@ -272,7 +271,6 @@ struct http_req { if(_req != nullptr) { const auto& tv = _req->processed_at.at; conn_ts = (tv.tv_sec * 1000 * 1000) + tv.tv_usec; - is_http_v1 = (_req->version < 0x200); } else { conn_ts = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); diff --git a/include/http_server.h b/include/http_server.h index 55390ab1..8edef305 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -48,7 +48,6 @@ private: public: bool is_req_early_exit = false; - bool is_req_http1 = true; bool is_res_start = true; h2o_send_state_t send_state = H2O_SEND_STATE_IN_PROGRESS; @@ -125,7 +124,6 @@ public: h2o_custom_generator_t* res_generator = static_cast(res->generator.load()); res_state.is_req_early_exit = (res_generator->rpath->async_req && res->final && !req->last_chunk_aggregate); - res_state.is_req_http1 = req->is_http_v1; res_state.send_state = res->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS; res_state.generator = (res_generator == nullptr) ? nullptr : &res_generator->h2o_generator; res_state.set_response(res->status_code, res->content_type_header, res->body); @@ -326,6 +324,8 @@ public: bool trigger_vote(); + bool reset_peers(); + void persist_applying_index(); int64_t get_num_queued_writes(); diff --git a/include/index.h b/include/index.h index 2a18bc9f..6c4d66fe 100644 --- a/include/index.h +++ b/include/index.h @@ -109,7 +109,7 @@ struct search_args { std::vector num_typos; size_t max_facet_values; size_t per_page; - size_t page; + size_t offset; token_ordering token_order; std::vector prefixes; size_t drop_tokens_threshold; @@ -164,7 +164,7 @@ struct search_args { search_fields(search_fields), match_type(match_type), filter_tree_root(filter_tree_root), facets(facets), included_ids(included_ids), excluded_ids(excluded_ids), sort_fields_std(sort_fields_std), facet_query(facet_query), num_typos(num_typos), max_facet_values(max_facet_values), per_page(per_page), - page(page), token_order(token_order), prefixes(prefixes), + offset(offset), token_order(token_order), prefixes(prefixes), drop_tokens_threshold(drop_tokens_threshold), typo_tokens_threshold(typo_tokens_threshold), group_by_fields(group_by_fields), group_limit(group_limit), default_sorting_field(default_sorting_field), prioritize_exact_match(prioritize_exact_match), prioritize_token_position(prioritize_token_position), @@ -637,7 +637,7 @@ public: const std::vector& excluded_ids, std::vector& sort_fields_std, const std::vector& num_typos, Topster* topster, Topster* curated_topster, const size_t per_page, - const size_t page, const token_ordering token_order, const std::vector& prefixes, + const size_t offset, const token_ordering token_order, const std::vector& prefixes, const size_t drop_tokens_threshold, size_t& all_result_ids_len, spp::sparse_hash_map& groups_processed, std::vector>& searched_queries, diff --git a/include/raft_server.h b/include/raft_server.h index 5ae7cf27..40a09de2 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -139,6 +139,8 @@ private: const uint64_t snapshot_interval_s; // frequency of actual snapshotting uint64_t last_snapshot_ts; // when last snapshot ran + butil::EndPoint peering_endpoint; + public: static constexpr const char* log_dir_name = "log"; @@ -163,12 +165,15 @@ public: void read(const std::shared_ptr& response); // updates cluster membership - void refresh_nodes(const std::string & nodes); + void refresh_nodes(const std::string & nodes, const size_t raft_counter, + const std::atomic& reset_peers_on_error); void refresh_catchup_status(bool log_msg); bool trigger_vote(); + bool reset_peers(); + bool has_leader_term() const { return leader_term.load(butil::memory_order_acquire) > 0; } diff --git a/include/tsconfig.h b/include/tsconfig.h index 0c02a188..57796930 100644 --- a/include/tsconfig.h +++ b/include/tsconfig.h @@ -62,6 +62,8 @@ private: std::atomic log_slow_searches_time_ms; + std::atomic reset_peers_on_error; + protected: Config() { @@ -84,6 +86,7 @@ protected: this->memory_used_max_percentage = 100; this->skip_writes = false; this->log_slow_searches_time_ms = 30 * 1000; + this->reset_peers_on_error = false; } Config(Config const&) { @@ -166,6 +169,10 @@ public: this->skip_writes = skip_writes; } + void set_reset_peers_on_error(bool reset_peers_on_error) { + this->reset_peers_on_error = reset_peers_on_error; + } + // getters std::string get_data_dir() const { @@ -265,6 +272,10 @@ public: return this->log_slow_searches_time_ms; } + const std::atomic& get_reset_peers_on_error() const { + return reset_peers_on_error; + } + size_t get_num_collections_parallel_load() const { return this->num_collections_parallel_load; } @@ -419,6 +430,7 @@ public: } this->skip_writes = ("TRUE" == get_env("TYPESENSE_SKIP_WRITES")); + this->reset_peers_on_error = ("TRUE" == get_env("TYPESENSE_RESET_PEERS_ON_ERROR")); } void load_config_file(cmdline::parser & options) { @@ -575,6 +587,11 @@ public: this->skip_writes = (skip_writes_str == "true"); } + if(reader.Exists("server", "reset-peers-on-error")) { + auto reset_peers_on_error_str = reader.Get("server", "reset-peers-on-error", "false"); + this->reset_peers_on_error = (reset_peers_on_error_str == "true"); + } + if(reader.Exists("server", "model-dir")) { this->model_dir = reader.Get("server", "model-dir", ""); } @@ -715,6 +732,10 @@ public: if(options.exist("skip-writes")) { this->skip_writes = options.get("skip-writes"); } + + if(options.exist("reset-peers-on-error")) { + this->reset_peers_on_error = options.get("reset-peers-on-error"); + } } void set_cors_domains(std::string& cors_domains_value) { @@ -743,4 +764,8 @@ public: } Option update_config(const nlohmann::json& req_json); + + static Option fetch_file_contents(const std::string & file_path); + + static Option fetch_nodes_config(const std::string& path_to_nodes); }; diff --git a/include/validator.h b/include/validator.h index c9533cb9..a6d51af4 100644 --- a/include/validator.h +++ b/include/validator.h @@ -60,8 +60,10 @@ public: const std::string &field_name, nlohmann::json::iterator& array_iter, bool is_array, bool& array_ele_erased); - static Option coerce_geopoint(const DIRTY_VALUES& dirty_values, const field& a_field, nlohmann::json &document, - const std::string &field_name, - nlohmann::json::iterator& array_iter, bool is_array, bool& array_ele_erased); + static Option coerce_geopoint(const DIRTY_VALUES& dirty_values, const field& a_field, + nlohmann::json &document, const std::string &field_name, + nlohmann::json& lat, nlohmann::json& lng, + nlohmann::json::iterator& array_iter, + bool is_array, bool& array_ele_erased); }; \ No newline at end of file diff --git a/src/collection.cpp b/src/collection.cpp index 3937bbfc..2579abcf 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -1082,7 +1082,8 @@ Option Collection::search(std::string raw_query, const uint64_t search_time_start_us, const text_match_type_t match_type, const size_t facet_sample_percent, - const size_t facet_sample_threshold) const { + const size_t facet_sample_threshold, + const size_t page_offset) const { std::shared_lock lock(mutex); @@ -1341,7 +1342,7 @@ Option Collection::search(std::string raw_query, } // check for valid pagination - if(page < 1) { + if(page != 0 && page < 1) { std::string message = "Page must be an integer of value greater than 0."; return Option(422, message); } @@ -1351,7 +1352,10 @@ Option Collection::search(std::string raw_query, return Option(422, message); } - if((page * per_page) > limit_hits) { + size_t offset = (page != 0) ? (per_page * (page - 1)) : page_offset; + size_t fetch_size = offset + per_page; + + if(fetch_size > limit_hits) { std::string message = "Only upto " + std::to_string(limit_hits) + " hits can be fetched. " + "Ensure that `page` and `per_page` parameters are within this range."; return Option(422, message); @@ -1361,9 +1365,9 @@ Option Collection::search(std::string raw_query, // ensure that `max_hits` never exceeds number of documents in collection if(search_fields.size() <= 1 || raw_query == "*") { - max_hits = std::min(std::max((page * per_page), max_hits), get_num_documents()); + max_hits = std::min(std::max(fetch_size, max_hits), get_num_documents()); } else { - max_hits = std::min(std::max((page * per_page), max_hits), get_num_documents()); + max_hits = std::min(std::max(fetch_size, max_hits), get_num_documents()); } if(token_order == NOT_SET) { @@ -1520,7 +1524,7 @@ Option Collection::search(std::string raw_query, match_type, filter_tree_root, facets, included_ids, excluded_ids, sort_fields_std, facet_query, num_typos, max_facet_values, max_hits, - per_page, page, token_order, prefixes, + per_page, offset, token_order, prefixes, drop_tokens_threshold, typo_tokens_threshold, group_by_fields, group_limit, default_sorting_field, prioritize_exact_match, prioritize_token_position, @@ -1654,10 +1658,10 @@ Option Collection::search(std::string raw_query, facet_query_last_token = facet_query_tokens.empty() ? "" : facet_query_tokens.back(); } - const long start_result_index = (page - 1) * per_page; + const long start_result_index = offset; // `end_result_index` could be -1 when max_hits is 0 - const long end_result_index = std::min((page * per_page), std::min(max_hits, result_group_kvs.size())) - 1; + const long end_result_index = std::min(fetch_size, std::min(max_hits, result_group_kvs.size())) - 1; // handle which fields have to be highlighted diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 0475dfbe..3231475a 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -675,6 +675,8 @@ Option CollectionManager::do_search(std::map& re const char *LIMIT_HITS = "limit_hits"; const char *PER_PAGE = "per_page"; const char *PAGE = "page"; + const char *OFFSET = "offset"; + const char *LIMIT = "limit"; const char *RANK_TOKENS_BY = "rank_tokens_by"; const char *INCLUDE_FIELDS = "include_fields"; const char *EXCLUDE_FIELDS = "exclude_fields"; @@ -756,7 +758,8 @@ Option CollectionManager::do_search(std::map& re std::vector facet_fields; std::vector sort_fields; size_t per_page = 10; - size_t page = 1; + size_t page = 0; + size_t offset = UINT32_MAX; token_ordering token_order = NOT_SET; std::string vector_query; @@ -809,7 +812,9 @@ Option CollectionManager::do_search(std::map& re {SNIPPET_THRESHOLD, &snippet_threshold}, {HIGHLIGHT_AFFIX_NUM_TOKENS, &highlight_affix_num_tokens}, {PAGE, &page}, + {OFFSET, &offset}, {PER_PAGE, &per_page}, + {LIMIT, &per_page}, {GROUP_LIMIT, &group_limit}, {SEARCH_CUTOFF_MS, &search_cutoff_ms}, {MAX_EXTRA_PREFIX, &max_extra_prefix}, @@ -947,6 +952,10 @@ Option CollectionManager::do_search(std::map& re per_page = 0; } + if(req_params[PAGE].empty() && req_params[OFFSET].empty()) { + page = 1; + } + include_fields.insert(include_fields_vec.begin(), include_fields_vec.end()); exclude_fields.insert(exclude_fields_vec.begin(), exclude_fields_vec.end()); @@ -1030,7 +1039,8 @@ Option CollectionManager::do_search(std::map& re start_ts, match_type, facet_sample_percent, - facet_sample_threshold + facet_sample_threshold, + offset ); uint64_t timeMillis = std::chrono::duration_cast( @@ -1049,7 +1059,12 @@ Option CollectionManager::do_search(std::map& re result["search_time_ms"] = timeMillis; } - result["page"] = page; + if(page != 0) { + result["page"] = page; + } else { + result["offset"] = offset; + } + results_json_str = result.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore); //LOG(INFO) << "Time taken: " << timeMillis << "ms"; diff --git a/src/core_api.cpp b/src/core_api.cpp index c03bf8c5..6732d0aa 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -1598,6 +1598,17 @@ bool post_compact_db(const std::shared_ptr& req, const std::shared_ptr return true; } +bool post_reset_peers(const std::shared_ptr& req, const std::shared_ptr& res) { + res->status_code = 200; + res->content_type_header = "application/json"; + + nlohmann::json response; + response["success"] = server->reset_peers(); + res->body = response.dump(); + + return true; +} + bool get_synonyms(const std::shared_ptr& req, const std::shared_ptr& res) { CollectionManager & collectionManager = CollectionManager::get_instance(); auto collection = collectionManager.get_collection(req->params["collection"]); diff --git a/src/http_server.cpp b/src/http_server.cpp index b6cabeb9..7756b481 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -544,17 +544,18 @@ bool HttpServer::is_write_request(const std::string& root_resource, const std::s } int HttpServer::async_req_cb(void *ctx, int is_end_stream) { - // NOTE: this callback is triggered multiple times by HTTP 2 but only once by HTTP 1 - // This quirk is because of the underlying buffer/window sizes. We will have to deal with both cases. h2o_custom_generator_t* custom_generator = static_cast(ctx); const std::shared_ptr& request = custom_generator->req(); const std::shared_ptr& response = custom_generator->res(); h2o_iovec_t chunk = request->_req->entity; + bool async_req = custom_generator->rpath->async_req; + bool is_http_v1 = (0x101 <= request->_req->version && request->_req->version < 0x200); /* LOG(INFO) << "async_req_cb, chunk.len=" << chunk.len + << ", is_http_v1: " << is_http_v1 << ", request->req->entity.len=" << request->req->entity.len << ", content_len: " << request->req->content_length << ", is_end_stream=" << is_end_stream; @@ -562,8 +563,12 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) { // disallow specific curl clients from using import call via http2 // detects: https://github.com/curl/curl/issues/1410 - if(request->_req->version >= 0x200 && request->path_without_query.find("import") != std::string::npos) { - ssize_t agent_header_cursor = h2o_find_header_by_str(&request->_req->headers, http_req::AGENT_HEADER, strlen(http_req::AGENT_HEADER), -1); + if(!is_http_v1 && async_req && request->first_chunk_aggregate && request->chunk_len == 0 && + request->path_without_query.find("import") != std::string::npos) { + + ssize_t agent_header_cursor = h2o_find_header_by_str(&request->_req->headers, + http_req::AGENT_HEADER, + strlen(http_req::AGENT_HEADER), -1); if(agent_header_cursor != -1) { h2o_iovec_t & slot = request->_req->headers.entries[agent_header_cursor].value; const std::string user_agent = std::string(slot.base, slot.len); @@ -601,25 +606,7 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) { //LOG(INFO) << "request->body.size(): " << request->body.size() << ", request->chunk_len=" << request->chunk_len; // LOG(INFO) << "req->entity.len: " << request->req->entity.len << ", request->chunk_len=" << request->chunk_len; - bool async_req = custom_generator->rpath->async_req; - - /* - On HTTP2, the request body callback is invoked multiple times with chunks of 16,384 bytes until the - `active_stream_window_size` is reached. For the first iteration, `active_stream_window_size` - includes initial request entity size and as well as chunk sizes - - On HTTP 1, though, the handler is called only once with a small chunk size and requires proceed_req() to - be called for fetching further chunks. We need to handle this difference. - */ - - bool exceeds_chunk_limit; - - if(!request->is_http_v1 && request->first_chunk_aggregate) { - exceeds_chunk_limit = ((request->chunk_len + request->_req->entity.len) >= ACTIVE_STREAM_WINDOW_SIZE); - } else { - exceeds_chunk_limit = (request->chunk_len >= ACTIVE_STREAM_WINDOW_SIZE); - } - + bool exceeds_chunk_limit = (request->chunk_len >= ACTIVE_STREAM_WINDOW_SIZE); bool can_process_async = async_req && exceeds_chunk_limit; /*if(is_end_stream == 1) { @@ -641,20 +628,7 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) { return 0; } - // we are not ready to fire the request handler, so that means we need to buffer the request further - // this could be because we are a) dealing with a HTTP v1 request or b) a synchronous request - - if(request->is_http_v1) { - // http v1 callbacks fire on small chunk sizes, so fetch more to match window size of http v2 buffer - request->_req->proceed_req(request->_req, NULL); - } - - if(!async_req) { - // progress ONLY non-streaming type request body since - // streaming requests will call proceed_req in an async fashion - request->_req->proceed_req(request->_req, NULL); - } - + request->_req->proceed_req(request->_req, NULL); return 0; } @@ -825,12 +799,8 @@ void HttpServer::stream_response(stream_response_state_t& state) { h2o_start_response(req, state.generator); } - if(state.is_req_http1) { - h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL); - h2o_dispose_request(req); - } else { - h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_ERROR); - } + h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL); + h2o_dispose_request(req); return ; } @@ -1034,6 +1004,10 @@ bool HttpServer::trigger_vote() { return replication_state->trigger_vote(); } +bool HttpServer::reset_peers() { + return replication_state->reset_peers(); +} + ThreadPool* HttpServer::get_thread_pool() const { return thread_pool; } diff --git a/src/index.cpp b/src/index.cpp index 4859a451..c84daa36 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -746,18 +746,34 @@ void Index::index_field_in_memory(const field& afield, std::vector [&afield, &geo_array_index=geo_array_index, geo_index](const index_record& record, uint32_t seq_id) { // nested geopoint value inside an array of object will be a simple array so must be treated as geopoint bool nested_obj_arr_geopoint = (afield.nested && afield.type == field_types::GEOPOINT_ARRAY && - record.doc[afield.name].size() == 2 && record.doc[afield.name][0].is_number()); + !record.doc[afield.name].empty() && record.doc[afield.name][0].is_number()); if(afield.type == field_types::GEOPOINT || nested_obj_arr_geopoint) { - const std::vector& latlong = record.doc[afield.name]; + // this could be a nested gepoint array so can have more than 2 array values + const std::vector& latlongs = record.doc[afield.name]; + for(size_t li = 0; li < latlongs.size(); li+=2) { + S2RegionTermIndexer::Options options; + options.set_index_contains_points_only(true); + S2RegionTermIndexer indexer(options); + S2Point point = S2LatLng::FromDegrees(latlongs[li], latlongs[li+1]).ToPoint(); - S2RegionTermIndexer::Options options; - options.set_index_contains_points_only(true); - S2RegionTermIndexer indexer(options); - S2Point point = S2LatLng::FromDegrees(latlong[0], latlong[1]).ToPoint(); + for(const auto& term: indexer.GetIndexTerms(point, "")) { + (*geo_index)[term].push_back(seq_id); + } + } - for(const auto& term: indexer.GetIndexTerms(point, "")) { - (*geo_index)[term].push_back(seq_id); + if(nested_obj_arr_geopoint) { + int64_t* packed_latlongs = new int64_t[(latlongs.size()/2) + 1]; + packed_latlongs[0] = latlongs.size()/2; + size_t j_packed_latlongs = 0; + + for(size_t li = 0; li < latlongs.size(); li+=2) { + int64_t packed_latlong = GeoPoint::pack_lat_lng(latlongs[li], latlongs[li+1]); + packed_latlongs[j_packed_latlongs + 1] = packed_latlong; + j_packed_latlongs++; + } + + geo_array_index.at(afield.name)->emplace(seq_id, packed_latlongs); } } else { const std::vector>& latlongs = record.doc[afield.name]; @@ -771,12 +787,7 @@ void Index::index_field_in_memory(const field& afield, std::vector for(size_t li = 0; li < latlongs.size(); li++) { auto& latlong = latlongs[li]; S2Point point = S2LatLng::FromDegrees(latlong[0], latlong[1]).ToPoint(); - std::set terms; for(const auto& term: indexer.GetIndexTerms(point, "")) { - terms.insert(term); - } - - for(const auto& term: terms) { (*geo_index)[term].push_back(seq_id); } @@ -2206,7 +2217,7 @@ Option Index::run_search(search_args* search_params, const std::string& co search_params->included_ids, search_params->excluded_ids, search_params->sort_fields_std, search_params->num_typos, search_params->topster, search_params->curated_topster, - search_params->per_page, search_params->page, search_params->token_order, + search_params->per_page, search_params->offset, search_params->token_order, search_params->prefixes, search_params->drop_tokens_threshold, search_params->all_result_ids_len, search_params->groups_processed, search_params->searched_queries, @@ -2667,7 +2678,7 @@ Option Index::search(std::vector& field_query_tokens, cons const std::vector& excluded_ids, std::vector& sort_fields_std, const std::vector& num_typos, Topster* topster, Topster* curated_topster, const size_t per_page, - const size_t page, const token_ordering token_order, const std::vector& prefixes, + const size_t offset, const token_ordering token_order, const std::vector& prefixes, const size_t drop_tokens_threshold, size_t& all_result_ids_len, spp::sparse_hash_map& groups_processed, std::vector>& searched_queries, @@ -2702,6 +2713,8 @@ Option Index::search(std::vector& field_query_tokens, cons return Option(true); } + size_t fetch_size = offset + per_page; + std::set curated_ids; std::map> included_ids_map; // outer pos => inner pos => list of IDs std::vector included_ids_vec; @@ -2782,7 +2795,7 @@ Option Index::search(std::vector& field_query_tokens, cons groups_processed[distinct_id]++; } - if (result_ids.size() == page * per_page) { + if (result_ids.size() == fetch_size) { break; } @@ -2805,7 +2818,7 @@ Option Index::search(std::vector& field_query_tokens, cons collate_included_ids({}, included_ids_map, curated_topster, searched_queries); if (!vector_query.field_name.empty()) { - auto k = std::max(vector_query.k, per_page * page); + auto k = std::max(vector_query.k, fetch_size); if(vector_query.query_doc_given) { // since we will omit the query doc from results k++; @@ -3075,7 +3088,7 @@ Option Index::search(std::vector& field_query_tokens, cons VectorFilterFunctor filterFunctor(filter_result.docs, filter_result.count); auto& field_vector_index = vector_index.at(vector_query.field_name); std::vector> dist_labels; - auto k = std::max(vector_query.k, per_page * page); + auto k = std::max(vector_query.k, fetch_size); if(field_vector_index->distance_type == cosine) { std::vector normalized_q(vector_query.values.size()); diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index cb632f2a..87902022 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -77,6 +77,7 @@ void master_server_routes() { server->post("/operations/vote", post_vote, false, false); server->post("/operations/cache/clear", post_clear_cache, false, false); server->post("/operations/db/compact", post_compact_db, false, false); + server->post("/operations/reset_peers", post_reset_peers, false, false); server->get("/limits", get_rate_limits); server->get("/limits/active", get_active_throttles); diff --git a/src/raft_server.cpp b/src/raft_server.cpp index 1a1a45a5..6c8228fd 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -35,6 +35,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int this->election_timeout_interval_ms = election_timeout_ms; this->raft_dir_path = raft_dir; + this->peering_endpoint = peering_endpoint; braft::NodeOptions node_options; @@ -532,7 +533,8 @@ int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) { return init_db_status; } -void ReplicationState::refresh_nodes(const std::string & nodes) { +void ReplicationState::refresh_nodes(const std::string & nodes, const size_t raft_counter, + const std::atomic& reset_peers_on_error) { std::shared_lock lock(node_mutex); if(!node) { @@ -569,8 +571,8 @@ void ReplicationState::refresh_nodes(const std::string & nodes) { std::vector latest_nodes; new_conf.list_peers(&latest_nodes); - if(latest_nodes.size() == 1) { - LOG(WARNING) << "Single-node with no leader. Resetting peers."; + if(latest_nodes.size() == 1 || (raft_counter > 0 && reset_peers_on_error)) { + LOG(WARNING) << "Node with no leader. Resetting peers of size: " << latest_nodes.size(); node->reset_peers(new_conf); } else { LOG(WARNING) << "Multi-node with no leader: refusing to reset peers."; @@ -773,6 +775,35 @@ bool ReplicationState::trigger_vote() { return false; } +bool ReplicationState::reset_peers() { + std::shared_lock lock(node_mutex); + + if(node) { + const Option & refreshed_nodes_op = Config::fetch_nodes_config(config->get_nodes()); + if(!refreshed_nodes_op.ok()) { + LOG(WARNING) << "Error while fetching peer configuration: " << refreshed_nodes_op.error(); + return false; + } + + const std::string& nodes_config = ReplicationState::to_nodes_config(peering_endpoint, + Config::get_instance().get_api_port(), + refreshed_nodes_op.get()); + + braft::Configuration peer_config; + peer_config.parse_from(nodes_config); + + std::vector peers; + peer_config.list_peers(&peers); + + auto status = node->reset_peers(peer_config); + LOG(INFO) << "Reset peers. Ok? " << status.ok() << ", status: " << status; + LOG(INFO) << "New peer config is: " << peer_config; + return status.ok(); + } + + return false; +} + http_message_dispatcher* ReplicationState::get_message_dispatcher() const { return message_dispatcher; } diff --git a/src/tsconfig.cpp b/src/tsconfig.cpp index a50664c6..ee7af9e1 100644 --- a/src/tsconfig.cpp +++ b/src/tsconfig.cpp @@ -1,6 +1,8 @@ #include "option.h" #include "json.hpp" #include "tsconfig.h" +#include "file_utils.h" +#include Option Config::update_config(const nlohmann::json& req_json) { bool found_config = false; @@ -54,3 +56,36 @@ Option Config::update_config(const nlohmann::json& req_json) { return Option(true); } + +Option Config::fetch_file_contents(const std::string & file_path) { + if(!file_exists(file_path)) { + return Option(404, std::string("File does not exist at: ") + file_path); + } + + std::ifstream infile(file_path); + std::string content((std::istreambuf_iterator(infile)), (std::istreambuf_iterator())); + infile.close(); + + return Option(content); +} + +Option Config::fetch_nodes_config(const std::string& path_to_nodes) { + std::string nodes_config; + + if(!path_to_nodes.empty()) { + const Option & nodes_op = fetch_file_contents(path_to_nodes); + + if(!nodes_op.ok()) { + return Option(500, "Error reading file containing nodes configuration: " + nodes_op.error()); + } else { + nodes_config = nodes_op.get(); + if(nodes_config.empty()) { + return Option(500, "File containing nodes configuration is empty."); + } else { + nodes_config = nodes_op.get(); + } + } + } + + return Option(nodes_config); +} \ No newline at end of file diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index ed381e8b..0d9ccad7 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -51,18 +51,6 @@ void catch_interrupt(int sig) { quit_raft_service = true; } -Option fetch_file_contents(const std::string & file_path) { - if(!file_exists(file_path)) { - return Option(404, std::string("File does not exist at: ") + file_path); - } - - std::ifstream infile(file_path); - std::string content((std::istreambuf_iterator(infile)), (std::istreambuf_iterator())); - infile.close(); - - return Option(content); -} - void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.set_program_name("./typesense-server"); @@ -106,6 +94,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.add("disk-used-max-percentage", '\0', "Reject writes when used disk space exceeds this percentage. Default: 100 (never reject).", false, 100); options.add("memory-used-max-percentage", '\0', "Reject writes when memory usage exceeds this percentage. Default: 100 (never reject).", false, 100); options.add("skip-writes", '\0', "Skip all writes except config changes. Default: false.", false, false); + options.add("reset-peers-on-error", '\0', "Reset node's peers on clustering error. Default: false.", false, false); options.add("log-slow-searches-time-ms", '\0', "When >= 0, searches that take longer than this duration are logged.", false, 30*1000); @@ -155,27 +144,6 @@ int init_root_logger(Config & config, const std::string & server_version) { return 0; } -Option fetch_nodes_config(const std::string& path_to_nodes) { - std::string nodes_config; - - if(!path_to_nodes.empty()) { - const Option & nodes_op = fetch_file_contents(path_to_nodes); - - if(!nodes_op.ok()) { - return Option(500, "Error reading file containing nodes configuration: " + nodes_op.error()); - } else { - nodes_config = nodes_op.get(); - if(nodes_config.empty()) { - return Option(500, "File containing nodes configuration is empty."); - } else { - nodes_config = nodes_op.get(); - } - } - } - - return Option(nodes_config); -} - bool is_private_ip(uint32_t ip) { uint8_t b1, b2; b1 = (uint8_t) (ip >> 24); @@ -251,13 +219,14 @@ const char* get_internal_ip(const std::string& subnet_cidr) { int start_raft_server(ReplicationState& replication_state, const std::string& state_dir, const std::string& path_to_nodes, const std::string& peering_address, uint32_t peering_port, const std::string& peering_subnet, - uint32_t api_port, int snapshot_interval_seconds, int snapshot_max_byte_count_per_rpc) { + uint32_t api_port, int snapshot_interval_seconds, int snapshot_max_byte_count_per_rpc, + const std::atomic& reset_peers_on_error) { if(path_to_nodes.empty()) { LOG(INFO) << "Since no --nodes argument is provided, starting a single node Typesense cluster."; } - const Option& nodes_config_op = fetch_nodes_config(path_to_nodes); + const Option& nodes_config_op = Config::fetch_nodes_config(path_to_nodes); if(!nodes_config_op.ok()) { LOG(ERROR) << nodes_config_op.error(); @@ -311,7 +280,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st while (!brpc::IsAskedToQuit() && !quit_raft_service.load()) { if(raft_counter % 10 == 0) { // reset peer configuration periodically to identify change in cluster membership - const Option & refreshed_nodes_op = fetch_nodes_config(path_to_nodes); + const Option & refreshed_nodes_op = Config::fetch_nodes_config(path_to_nodes); if(!refreshed_nodes_op.ok()) { LOG(WARNING) << "Error while refreshing peer configuration: " << refreshed_nodes_op.error(); continue; @@ -319,7 +288,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st const std::string& nodes_config = ReplicationState::to_nodes_config(peering_endpoint, api_port, refreshed_nodes_op.get()); - replication_state.refresh_nodes(nodes_config); + replication_state.refresh_nodes(nodes_config, raft_counter, reset_peers_on_error); if(raft_counter % 60 == 0) { replication_state.do_snapshot(nodes_config); @@ -459,6 +428,7 @@ int run_server(const Config & config, const std::string & version, void (*master TextEmbedderManager::set_model_dir(config.get_model_dir()); TextEmbedderManager::download_default_model(); } + // first we start the peering service ReplicationState replication_state(server, batch_indexer, &store, @@ -482,7 +452,8 @@ int run_server(const Config & config, const std::string & version, void (*master config.get_peering_subnet(), config.get_api_port(), config.get_snapshot_interval_seconds(), - config.get_snapshot_max_byte_count_per_rpc()); + config.get_snapshot_max_byte_count_per_rpc(), + config.get_reset_peers_on_error()); LOG(INFO) << "Shutting down batch indexer..."; batch_indexer->stop(); diff --git a/src/validator.cpp b/src/validator.cpp index 25a77884..d3c828b7 100644 --- a/src/validator.cpp +++ b/src/validator.cpp @@ -43,8 +43,10 @@ Option validator_t::coerce_element(const field& a_field, nlohmann::jso } if(!(doc_ele[0].is_number() && doc_ele[1].is_number())) { - // one or more elements is not an number, try to coerce - Option coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, dummy_iter, false, array_ele_erased); + // one or more elements is not a number, try to coerce + Option coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, + doc_ele[0], doc_ele[1], + dummy_iter, false, array_ele_erased); if(!coerce_op.ok()) { return coerce_op; } @@ -62,16 +64,27 @@ Option validator_t::coerce_element(const field& a_field, nlohmann::jso nlohmann::json::iterator it = doc_ele.begin(); - // Handle a geopoint[] type inside an array of object: it won't be an array of array, so cannot iterate - if(a_field.nested && a_field.type == field_types::GEOPOINT_ARRAY && - it->is_number() && doc_ele.size() == 2) { + // have to differentiate the geopoint[] type of a nested array object's geopoint[] vs a simple nested field + // geopoint[] type of an array of objects field won't be an array of array + if(a_field.nested && a_field.type == field_types::GEOPOINT_ARRAY && it != doc_ele.end() && it->is_number()) { + if(!doc_ele.empty() && doc_ele.size() % 2 != 0) { + return Option<>(400, "Nested field `" + field_name + "` does not contain valid geopoint values."); + } + const auto& item = doc_ele; - if(!(item[0].is_number() && item[1].is_number())) { - // one or more elements is not an number, try to coerce - Option coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, it, true, array_ele_erased); - if(!coerce_op.ok()) { - return coerce_op; + + for(size_t ai = 0; ai < doc_ele.size(); ai+=2) { + if(!(doc_ele[ai].is_number() && doc_ele[ai+1].is_number())) { + // one or more elements is not an number, try to coerce + Option coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, + doc_ele[ai], doc_ele[ai+1], + it, true, array_ele_erased); + if(!coerce_op.ok()) { + return coerce_op; + } } + + it++; } return Option(200); @@ -83,7 +96,7 @@ Option validator_t::coerce_element(const field& a_field, nlohmann::jso } for(; it != doc_ele.end(); ) { - const auto& item = it.value(); + nlohmann::json& item = it.value(); array_ele_erased = false; if (a_field.type == field_types::STRING_ARRAY && !item.is_string()) { @@ -118,8 +131,10 @@ Option validator_t::coerce_element(const field& a_field, nlohmann::jso } if(!(item[0].is_number() && item[1].is_number())) { - // one or more elements is not an number, try to coerce - Option coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, it, true, array_ele_erased); + // one or more elements is not a number, try to coerce + Option coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, + item[0], item[1], + it, true, array_ele_erased); if(!coerce_op.ok()) { return coerce_op; } @@ -396,11 +411,12 @@ Option validator_t::coerce_bool(const DIRTY_VALUES& dirty_values, cons return Option(200); } -Option validator_t::coerce_geopoint(const DIRTY_VALUES& dirty_values, const field& a_field, nlohmann::json &document, - const std::string &field_name, - nlohmann::json::iterator& array_iter, bool is_array, bool& array_ele_erased) { +Option validator_t::coerce_geopoint(const DIRTY_VALUES& dirty_values, const field& a_field, + nlohmann::json &document, const std::string &field_name, + nlohmann::json& lat, nlohmann::json& lng, + nlohmann::json::iterator& array_iter, + bool is_array, bool& array_ele_erased) { std::string suffix = is_array ? "an array of" : "a"; - auto& item = is_array ? array_iter.value() : document[field_name]; if(dirty_values == DIRTY_VALUES::REJECT) { return Option<>(400, "Field `" + field_name + "` must be " + suffix + " geopoint."); @@ -422,19 +438,19 @@ Option validator_t::coerce_geopoint(const DIRTY_VALUES& dirty_values, // try to value coerce into a geopoint - if(!item[0].is_number() && item[0].is_string()) { - if(StringUtils::is_float(item[0])) { - item[0] = std::stof(item[0].get()); + if(!lat.is_number() && lat.is_string()) { + if(StringUtils::is_float(lat)) { + lat = std::stof(lat.get()); } } - if(!item[1].is_number() && item[1].is_string()) { - if(StringUtils::is_float(item[1])) { - item[1] = std::stof(item[1].get()); + if(!lng.is_number() && lng.is_string()) { + if(StringUtils::is_float(lng)) { + lng = std::stof(lng.get()); } } - if(!item[0].is_number() || !item[1].is_number()) { + if(!lat.is_number() || !lng.is_number()) { if(dirty_values == DIRTY_VALUES::COERCE_OR_DROP) { if(!a_field.optional) { return Option<>(400, "Field `" + field_name + "` must be " + suffix + " geopoint."); diff --git a/test/collection_nested_fields_test.cpp b/test/collection_nested_fields_test.cpp index 39c59030..5baa6a15 100644 --- a/test/collection_nested_fields_test.cpp +++ b/test/collection_nested_fields_test.cpp @@ -1788,7 +1788,7 @@ TEST_F(CollectionNestedFieldsTest, NestedFieldWithGeopointArray) { Collection* coll1 = op.get(); auto doc1 = R"({ - "addresses": [{"geoPoint": [1.91, 23.5]}] + "addresses": [{"geoPoint": [1.91, 23.5]}, {"geoPoint": [12.91, 23.5]}] })"_json; ASSERT_TRUE(coll1->add(doc1.dump(), CREATE).ok()); @@ -1796,16 +1796,39 @@ TEST_F(CollectionNestedFieldsTest, NestedFieldWithGeopointArray) { auto results = coll1->search("*", {}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}, 0).get(); ASSERT_EQ(1, results["found"].get()); + results = coll1->search("*", {}, "addresses.geoPoint: (12.911, 23.5, 1 mi)", + {}, {}, {0}, 10, 1, FREQUENCY).get(); + ASSERT_EQ(1, results["found"].get()); + // with nested geopoint array auto doc2 = R"({ - "addresses": [{"geoPoint": [[1.91, 23.5]]}] + "addresses": [{"geoPoint": [[1.91, 23.5]]}, {"geoPoint": [[1.91, 23.5], [1.95, 24.5]]}] })"_json; ASSERT_TRUE(coll1->add(doc2.dump(), CREATE).ok()); results = coll1->search("*", {}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}, 0).get(); ASSERT_EQ(2, results["found"].get()); + // simply nested geopoint array + + auto doc3 = R"({ + "addresses": {"geoPoint": [[1.91, 23.5]]} + })"_json; + + ASSERT_TRUE(coll1->add(doc3.dump(), CREATE).ok()); + results = coll1->search("*", {}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}, 0).get(); + ASSERT_EQ(3, results["found"].get()); + + // simply nested geopoint + // this technically cannot be allowed but it's really tricky to detect so we allow + auto doc4 = R"({ + "addresses": {"geoPoint": [1.91, 23.5]} + })"_json; + + auto simple_geopoint_op = coll1->add(doc4.dump(), CREATE); + ASSERT_TRUE(simple_geopoint_op.ok()); + // data validation auto bad_doc = R"({ "addresses": [{"geoPoint": [1.91, "x"]}] diff --git a/test/core_api_utils_test.cpp b/test/core_api_utils_test.cpp index 9978a071..1bf5a88b 100644 --- a/test/core_api_utils_test.cpp +++ b/test/core_api_utils_test.cpp @@ -617,6 +617,124 @@ TEST_F(CoreAPIUtilsTest, PresetSingleSearch) { collectionManager.drop_collection("coll1"); } +TEST_F(CoreAPIUtilsTest, SearchPagination) { + nlohmann::json schema = R"({ + "name": "coll1", + "fields": [ + {"name": "name", "type": "string" }, + {"name": "points", "type": "int32" } + ] + })"_json; + + auto op = collectionManager.create_collection(schema); + ASSERT_TRUE(op.ok()); + Collection* coll1 = op.get(); + + for(size_t i = 0; i < 20; i++) { + nlohmann::json doc; + doc["name"] = "Title " + std::to_string(i); + doc["points"] = i; + coll1->add(doc.dump(), CREATE); + } + + std::shared_ptr req = std::make_shared(); + std::shared_ptr res = std::make_shared(nullptr); + + nlohmann::json body; + + // without any pagination params, default is top 10 records by sort order + + body["searches"] = nlohmann::json::array(); + nlohmann::json search; + search["collection"] = "coll1"; + search["q"] = "title"; + search["query_by"] = "name"; + search["sort_by"] = "points:desc"; + body["searches"].push_back(search); + + req->body = body.dump(); + nlohmann::json embedded_params; + req->embedded_params_vec.push_back(embedded_params); + + post_multi_search(req, res); + nlohmann::json results = nlohmann::json::parse(res->body)["results"][0]; + ASSERT_EQ(10, results["hits"].size()); + ASSERT_EQ(19, results["hits"][0]["document"]["points"].get()); + ASSERT_EQ(1, results["page"].get()); + + // when offset is used we should expect the same but "offset" should be returned in response + search.clear(); + req->params.clear(); + body["searches"] = nlohmann::json::array(); + search["collection"] = "coll1"; + search["q"] = "title"; + search["offset"] = "1"; + search["query_by"] = "name"; + search["sort_by"] = "points:desc"; + body["searches"].push_back(search); + req->body = body.dump(); + + post_multi_search(req, res); + results = nlohmann::json::parse(res->body)["results"][0]; + ASSERT_EQ(10, results["hits"].size()); + ASSERT_EQ(18, results["hits"][0]["document"]["points"].get()); + ASSERT_EQ(1, results["offset"].get()); + + // use limit to restrict page size + search.clear(); + req->params.clear(); + body["searches"] = nlohmann::json::array(); + search["collection"] = "coll1"; + search["q"] = "title"; + search["offset"] = "1"; + search["limit"] = "5"; + search["query_by"] = "name"; + search["sort_by"] = "points:desc"; + body["searches"].push_back(search); + req->body = body.dump(); + + post_multi_search(req, res); + results = nlohmann::json::parse(res->body)["results"][0]; + ASSERT_EQ(5, results["hits"].size()); + ASSERT_EQ(18, results["hits"][0]["document"]["points"].get()); + ASSERT_EQ(1, results["offset"].get()); + + // when page is -1 + search.clear(); + req->params.clear(); + body["searches"] = nlohmann::json::array(); + search["collection"] = "coll1"; + search["q"] = "title"; + search["page"] = "-1"; + search["limit"] = "5"; + search["query_by"] = "name"; + search["sort_by"] = "points:desc"; + body["searches"].push_back(search); + req->body = body.dump(); + + post_multi_search(req, res); + results = nlohmann::json::parse(res->body)["results"][0]; + ASSERT_EQ(400, results["code"].get()); + ASSERT_EQ("Parameter `page` must be an unsigned integer.", results["error"].get()); + + // when offset is -1 + search.clear(); + req->params.clear(); + body["searches"] = nlohmann::json::array(); + search["collection"] = "coll1"; + search["q"] = "title"; + search["offset"] = "-1"; + search["query_by"] = "name"; + search["sort_by"] = "points:desc"; + body["searches"].push_back(search); + req->body = body.dump(); + + post_multi_search(req, res); + results = nlohmann::json::parse(res->body)["results"][0]; + ASSERT_EQ(400, results["code"].get()); + ASSERT_EQ("Parameter `offset` must be an unsigned integer.", results["error"].get()); +} + TEST_F(CoreAPIUtilsTest, ExportWithFilter) { Collection *coll1; std::vector fields = {field("title", field_types::STRING, false),