Merge branch 'v0.25-join' of https://github.com/ozanarmagan/typesense into v0.25-join

This commit is contained in:
ozanarmagan 2023-04-02 12:51:20 +03:00
commit b31867a193
20 changed files with 397 additions and 152 deletions

View File

@ -461,7 +461,8 @@ public:
const uint64_t search_time_start_us = 0,
const text_match_type_t match_type = max_score,
const size_t facet_sample_percent = 100,
const size_t facet_sample_threshold = 0) const;
const size_t facet_sample_threshold = 0,
const size_t page_offset = UINT32_MAX) const;
Option<bool> get_filter_ids(const std::string & filter_query, filter_result_t& filter_result) const;

View File

@ -119,6 +119,8 @@ bool post_clear_cache(const std::shared_ptr<http_req>& req, const std::shared_pt
bool post_compact_db(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_reset_peers(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Rate Limiting
bool get_rate_limits(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);

View File

@ -244,13 +244,12 @@ struct http_req {
int64_t log_index;
std::atomic<bool> is_http_v1;
std::atomic<bool> is_diposed;
std::string client_ip = "0.0.0.0";
http_req(): _req(nullptr), route_hash(1),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body_index(0), data(nullptr), ready(false), log_index(0), is_http_v1(true),
chunk_len(0), body_index(0), data(nullptr), ready(false), log_index(0),
is_diposed(false) {
start_ts = std::chrono::duration_cast<std::chrono::microseconds>(
@ -272,7 +271,6 @@ struct http_req {
if(_req != nullptr) {
const auto& tv = _req->processed_at.at;
conn_ts = (tv.tv_sec * 1000 * 1000) + tv.tv_usec;
is_http_v1 = (_req->version < 0x200);
} else {
conn_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();

View File

@ -48,7 +48,6 @@ private:
public:
bool is_req_early_exit = false;
bool is_req_http1 = true;
bool is_res_start = true;
h2o_send_state_t send_state = H2O_SEND_STATE_IN_PROGRESS;
@ -125,7 +124,6 @@ public:
h2o_custom_generator_t* res_generator = static_cast<h2o_custom_generator_t*>(res->generator.load());
res_state.is_req_early_exit = (res_generator->rpath->async_req && res->final && !req->last_chunk_aggregate);
res_state.is_req_http1 = req->is_http_v1;
res_state.send_state = res->final ? H2O_SEND_STATE_FINAL : H2O_SEND_STATE_IN_PROGRESS;
res_state.generator = (res_generator == nullptr) ? nullptr : &res_generator->h2o_generator;
res_state.set_response(res->status_code, res->content_type_header, res->body);
@ -326,6 +324,8 @@ public:
bool trigger_vote();
bool reset_peers();
void persist_applying_index();
int64_t get_num_queued_writes();

View File

@ -109,7 +109,7 @@ struct search_args {
std::vector<uint32_t> num_typos;
size_t max_facet_values;
size_t per_page;
size_t page;
size_t offset;
token_ordering token_order;
std::vector<bool> prefixes;
size_t drop_tokens_threshold;
@ -164,7 +164,7 @@ struct search_args {
search_fields(search_fields), match_type(match_type), filter_tree_root(filter_tree_root), facets(facets),
included_ids(included_ids), excluded_ids(excluded_ids), sort_fields_std(sort_fields_std),
facet_query(facet_query), num_typos(num_typos), max_facet_values(max_facet_values), per_page(per_page),
page(page), token_order(token_order), prefixes(prefixes),
offset(offset), token_order(token_order), prefixes(prefixes),
drop_tokens_threshold(drop_tokens_threshold), typo_tokens_threshold(typo_tokens_threshold),
group_by_fields(group_by_fields), group_limit(group_limit), default_sorting_field(default_sorting_field),
prioritize_exact_match(prioritize_exact_match), prioritize_token_position(prioritize_token_position),
@ -637,7 +637,7 @@ public:
const std::vector<uint32_t>& excluded_ids, std::vector<sort_by>& sort_fields_std,
const std::vector<uint32_t>& num_typos, Topster* topster, Topster* curated_topster,
const size_t per_page,
const size_t page, const token_ordering token_order, const std::vector<bool>& prefixes,
const size_t offset, const token_ordering token_order, const std::vector<bool>& prefixes,
const size_t drop_tokens_threshold, size_t& all_result_ids_len,
spp::sparse_hash_map<uint64_t, uint32_t>& groups_processed,
std::vector<std::vector<art_leaf*>>& searched_queries,

View File

@ -139,6 +139,8 @@ private:
const uint64_t snapshot_interval_s; // frequency of actual snapshotting
uint64_t last_snapshot_ts; // when last snapshot ran
butil::EndPoint peering_endpoint;
public:
static constexpr const char* log_dir_name = "log";
@ -163,12 +165,15 @@ public:
void read(const std::shared_ptr<http_res>& response);
// updates cluster membership
void refresh_nodes(const std::string & nodes);
void refresh_nodes(const std::string & nodes, const size_t raft_counter,
const std::atomic<bool>& reset_peers_on_error);
void refresh_catchup_status(bool log_msg);
bool trigger_vote();
bool reset_peers();
bool has_leader_term() const {
return leader_term.load(butil::memory_order_acquire) > 0;
}

View File

@ -62,6 +62,8 @@ private:
std::atomic<int> log_slow_searches_time_ms;
std::atomic<bool> reset_peers_on_error;
protected:
Config() {
@ -84,6 +86,7 @@ protected:
this->memory_used_max_percentage = 100;
this->skip_writes = false;
this->log_slow_searches_time_ms = 30 * 1000;
this->reset_peers_on_error = false;
}
Config(Config const&) {
@ -166,6 +169,10 @@ public:
this->skip_writes = skip_writes;
}
void set_reset_peers_on_error(bool reset_peers_on_error) {
this->reset_peers_on_error = reset_peers_on_error;
}
// getters
std::string get_data_dir() const {
@ -265,6 +272,10 @@ public:
return this->log_slow_searches_time_ms;
}
const std::atomic<bool>& get_reset_peers_on_error() const {
return reset_peers_on_error;
}
size_t get_num_collections_parallel_load() const {
return this->num_collections_parallel_load;
}
@ -419,6 +430,7 @@ public:
}
this->skip_writes = ("TRUE" == get_env("TYPESENSE_SKIP_WRITES"));
this->reset_peers_on_error = ("TRUE" == get_env("TYPESENSE_RESET_PEERS_ON_ERROR"));
}
void load_config_file(cmdline::parser & options) {
@ -575,6 +587,11 @@ public:
this->skip_writes = (skip_writes_str == "true");
}
if(reader.Exists("server", "reset-peers-on-error")) {
auto reset_peers_on_error_str = reader.Get("server", "reset-peers-on-error", "false");
this->reset_peers_on_error = (reset_peers_on_error_str == "true");
}
if(reader.Exists("server", "model-dir")) {
this->model_dir = reader.Get("server", "model-dir", "");
}
@ -715,6 +732,10 @@ public:
if(options.exist("skip-writes")) {
this->skip_writes = options.get<bool>("skip-writes");
}
if(options.exist("reset-peers-on-error")) {
this->reset_peers_on_error = options.get<bool>("reset-peers-on-error");
}
}
void set_cors_domains(std::string& cors_domains_value) {
@ -743,4 +764,8 @@ public:
}
Option<bool> update_config(const nlohmann::json& req_json);
static Option<std::string> fetch_file_contents(const std::string & file_path);
static Option<std::string> fetch_nodes_config(const std::string& path_to_nodes);
};

View File

@ -60,8 +60,10 @@ public:
const std::string &field_name,
nlohmann::json::iterator& array_iter, bool is_array, bool& array_ele_erased);
static Option<uint32_t> coerce_geopoint(const DIRTY_VALUES& dirty_values, const field& a_field, nlohmann::json &document,
const std::string &field_name,
nlohmann::json::iterator& array_iter, bool is_array, bool& array_ele_erased);
static Option<uint32_t> coerce_geopoint(const DIRTY_VALUES& dirty_values, const field& a_field,
nlohmann::json &document, const std::string &field_name,
nlohmann::json& lat, nlohmann::json& lng,
nlohmann::json::iterator& array_iter,
bool is_array, bool& array_ele_erased);
};

View File

@ -1082,7 +1082,8 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
const uint64_t search_time_start_us,
const text_match_type_t match_type,
const size_t facet_sample_percent,
const size_t facet_sample_threshold) const {
const size_t facet_sample_threshold,
const size_t page_offset) const {
std::shared_lock lock(mutex);
@ -1341,7 +1342,7 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
}
// check for valid pagination
if(page < 1) {
if(page != 0 && page < 1) {
std::string message = "Page must be an integer of value greater than 0.";
return Option<nlohmann::json>(422, message);
}
@ -1351,7 +1352,10 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
return Option<nlohmann::json>(422, message);
}
if((page * per_page) > limit_hits) {
size_t offset = (page != 0) ? (per_page * (page - 1)) : page_offset;
size_t fetch_size = offset + per_page;
if(fetch_size > limit_hits) {
std::string message = "Only upto " + std::to_string(limit_hits) + " hits can be fetched. " +
"Ensure that `page` and `per_page` parameters are within this range.";
return Option<nlohmann::json>(422, message);
@ -1361,9 +1365,9 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
// ensure that `max_hits` never exceeds number of documents in collection
if(search_fields.size() <= 1 || raw_query == "*") {
max_hits = std::min(std::max((page * per_page), max_hits), get_num_documents());
max_hits = std::min(std::max(fetch_size, max_hits), get_num_documents());
} else {
max_hits = std::min(std::max((page * per_page), max_hits), get_num_documents());
max_hits = std::min(std::max(fetch_size, max_hits), get_num_documents());
}
if(token_order == NOT_SET) {
@ -1520,7 +1524,7 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
match_type,
filter_tree_root, facets, included_ids, excluded_ids,
sort_fields_std, facet_query, num_typos, max_facet_values, max_hits,
per_page, page, token_order, prefixes,
per_page, offset, token_order, prefixes,
drop_tokens_threshold, typo_tokens_threshold,
group_by_fields, group_limit, default_sorting_field,
prioritize_exact_match, prioritize_token_position,
@ -1654,10 +1658,10 @@ Option<nlohmann::json> Collection::search(std::string raw_query,
facet_query_last_token = facet_query_tokens.empty() ? "" : facet_query_tokens.back();
}
const long start_result_index = (page - 1) * per_page;
const long start_result_index = offset;
// `end_result_index` could be -1 when max_hits is 0
const long end_result_index = std::min((page * per_page), std::min(max_hits, result_group_kvs.size())) - 1;
const long end_result_index = std::min(fetch_size, std::min(max_hits, result_group_kvs.size())) - 1;
// handle which fields have to be highlighted

View File

@ -675,6 +675,8 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
const char *LIMIT_HITS = "limit_hits";
const char *PER_PAGE = "per_page";
const char *PAGE = "page";
const char *OFFSET = "offset";
const char *LIMIT = "limit";
const char *RANK_TOKENS_BY = "rank_tokens_by";
const char *INCLUDE_FIELDS = "include_fields";
const char *EXCLUDE_FIELDS = "exclude_fields";
@ -756,7 +758,8 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
std::vector<std::string> facet_fields;
std::vector<sort_by> sort_fields;
size_t per_page = 10;
size_t page = 1;
size_t page = 0;
size_t offset = UINT32_MAX;
token_ordering token_order = NOT_SET;
std::string vector_query;
@ -809,7 +812,9 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
{SNIPPET_THRESHOLD, &snippet_threshold},
{HIGHLIGHT_AFFIX_NUM_TOKENS, &highlight_affix_num_tokens},
{PAGE, &page},
{OFFSET, &offset},
{PER_PAGE, &per_page},
{LIMIT, &per_page},
{GROUP_LIMIT, &group_limit},
{SEARCH_CUTOFF_MS, &search_cutoff_ms},
{MAX_EXTRA_PREFIX, &max_extra_prefix},
@ -947,6 +952,10 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
per_page = 0;
}
if(req_params[PAGE].empty() && req_params[OFFSET].empty()) {
page = 1;
}
include_fields.insert(include_fields_vec.begin(), include_fields_vec.end());
exclude_fields.insert(exclude_fields_vec.begin(), exclude_fields_vec.end());
@ -1030,7 +1039,8 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
start_ts,
match_type,
facet_sample_percent,
facet_sample_threshold
facet_sample_threshold,
offset
);
uint64_t timeMillis = std::chrono::duration_cast<std::chrono::milliseconds>(
@ -1049,7 +1059,12 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
result["search_time_ms"] = timeMillis;
}
result["page"] = page;
if(page != 0) {
result["page"] = page;
} else {
result["offset"] = offset;
}
results_json_str = result.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore);
//LOG(INFO) << "Time taken: " << timeMillis << "ms";

View File

@ -1598,6 +1598,17 @@ bool post_compact_db(const std::shared_ptr<http_req>& req, const std::shared_ptr
return true;
}
bool post_reset_peers(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
res->status_code = 200;
res->content_type_header = "application/json";
nlohmann::json response;
response["success"] = server->reset_peers();
res->body = response.dump();
return true;
}
bool get_synonyms(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
CollectionManager & collectionManager = CollectionManager::get_instance();
auto collection = collectionManager.get_collection(req->params["collection"]);

View File

@ -544,17 +544,18 @@ bool HttpServer::is_write_request(const std::string& root_resource, const std::s
}
int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
// NOTE: this callback is triggered multiple times by HTTP 2 but only once by HTTP 1
// This quirk is because of the underlying buffer/window sizes. We will have to deal with both cases.
h2o_custom_generator_t* custom_generator = static_cast<h2o_custom_generator_t*>(ctx);
const std::shared_ptr<http_req>& request = custom_generator->req();
const std::shared_ptr<http_res>& response = custom_generator->res();
h2o_iovec_t chunk = request->_req->entity;
bool async_req = custom_generator->rpath->async_req;
bool is_http_v1 = (0x101 <= request->_req->version && request->_req->version < 0x200);
/*
LOG(INFO) << "async_req_cb, chunk.len=" << chunk.len
<< ", is_http_v1: " << is_http_v1
<< ", request->req->entity.len=" << request->req->entity.len
<< ", content_len: " << request->req->content_length
<< ", is_end_stream=" << is_end_stream;
@ -562,8 +563,12 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
// disallow specific curl clients from using import call via http2
// detects: https://github.com/curl/curl/issues/1410
if(request->_req->version >= 0x200 && request->path_without_query.find("import") != std::string::npos) {
ssize_t agent_header_cursor = h2o_find_header_by_str(&request->_req->headers, http_req::AGENT_HEADER, strlen(http_req::AGENT_HEADER), -1);
if(!is_http_v1 && async_req && request->first_chunk_aggregate && request->chunk_len == 0 &&
request->path_without_query.find("import") != std::string::npos) {
ssize_t agent_header_cursor = h2o_find_header_by_str(&request->_req->headers,
http_req::AGENT_HEADER,
strlen(http_req::AGENT_HEADER), -1);
if(agent_header_cursor != -1) {
h2o_iovec_t & slot = request->_req->headers.entries[agent_header_cursor].value;
const std::string user_agent = std::string(slot.base, slot.len);
@ -601,25 +606,7 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
//LOG(INFO) << "request->body.size(): " << request->body.size() << ", request->chunk_len=" << request->chunk_len;
// LOG(INFO) << "req->entity.len: " << request->req->entity.len << ", request->chunk_len=" << request->chunk_len;
bool async_req = custom_generator->rpath->async_req;
/*
On HTTP2, the request body callback is invoked multiple times with chunks of 16,384 bytes until the
`active_stream_window_size` is reached. For the first iteration, `active_stream_window_size`
includes initial request entity size and as well as chunk sizes
On HTTP 1, though, the handler is called only once with a small chunk size and requires proceed_req() to
be called for fetching further chunks. We need to handle this difference.
*/
bool exceeds_chunk_limit;
if(!request->is_http_v1 && request->first_chunk_aggregate) {
exceeds_chunk_limit = ((request->chunk_len + request->_req->entity.len) >= ACTIVE_STREAM_WINDOW_SIZE);
} else {
exceeds_chunk_limit = (request->chunk_len >= ACTIVE_STREAM_WINDOW_SIZE);
}
bool exceeds_chunk_limit = (request->chunk_len >= ACTIVE_STREAM_WINDOW_SIZE);
bool can_process_async = async_req && exceeds_chunk_limit;
/*if(is_end_stream == 1) {
@ -641,20 +628,7 @@ int HttpServer::async_req_cb(void *ctx, int is_end_stream) {
return 0;
}
// we are not ready to fire the request handler, so that means we need to buffer the request further
// this could be because we are a) dealing with a HTTP v1 request or b) a synchronous request
if(request->is_http_v1) {
// http v1 callbacks fire on small chunk sizes, so fetch more to match window size of http v2 buffer
request->_req->proceed_req(request->_req, NULL);
}
if(!async_req) {
// progress ONLY non-streaming type request body since
// streaming requests will call proceed_req in an async fashion
request->_req->proceed_req(request->_req, NULL);
}
request->_req->proceed_req(request->_req, NULL);
return 0;
}
@ -825,12 +799,8 @@ void HttpServer::stream_response(stream_response_state_t& state) {
h2o_start_response(req, state.generator);
}
if(state.is_req_http1) {
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(req);
} else {
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_ERROR);
}
h2o_send(req, &state.res_body, 1, H2O_SEND_STATE_FINAL);
h2o_dispose_request(req);
return ;
}
@ -1034,6 +1004,10 @@ bool HttpServer::trigger_vote() {
return replication_state->trigger_vote();
}
bool HttpServer::reset_peers() {
return replication_state->reset_peers();
}
ThreadPool* HttpServer::get_thread_pool() const {
return thread_pool;
}

View File

@ -746,18 +746,34 @@ void Index::index_field_in_memory(const field& afield, std::vector<index_record>
[&afield, &geo_array_index=geo_array_index, geo_index](const index_record& record, uint32_t seq_id) {
// nested geopoint value inside an array of object will be a simple array so must be treated as geopoint
bool nested_obj_arr_geopoint = (afield.nested && afield.type == field_types::GEOPOINT_ARRAY &&
record.doc[afield.name].size() == 2 && record.doc[afield.name][0].is_number());
!record.doc[afield.name].empty() && record.doc[afield.name][0].is_number());
if(afield.type == field_types::GEOPOINT || nested_obj_arr_geopoint) {
const std::vector<double>& latlong = record.doc[afield.name];
// this could be a nested gepoint array so can have more than 2 array values
const std::vector<double>& latlongs = record.doc[afield.name];
for(size_t li = 0; li < latlongs.size(); li+=2) {
S2RegionTermIndexer::Options options;
options.set_index_contains_points_only(true);
S2RegionTermIndexer indexer(options);
S2Point point = S2LatLng::FromDegrees(latlongs[li], latlongs[li+1]).ToPoint();
S2RegionTermIndexer::Options options;
options.set_index_contains_points_only(true);
S2RegionTermIndexer indexer(options);
S2Point point = S2LatLng::FromDegrees(latlong[0], latlong[1]).ToPoint();
for(const auto& term: indexer.GetIndexTerms(point, "")) {
(*geo_index)[term].push_back(seq_id);
}
}
for(const auto& term: indexer.GetIndexTerms(point, "")) {
(*geo_index)[term].push_back(seq_id);
if(nested_obj_arr_geopoint) {
int64_t* packed_latlongs = new int64_t[(latlongs.size()/2) + 1];
packed_latlongs[0] = latlongs.size()/2;
size_t j_packed_latlongs = 0;
for(size_t li = 0; li < latlongs.size(); li+=2) {
int64_t packed_latlong = GeoPoint::pack_lat_lng(latlongs[li], latlongs[li+1]);
packed_latlongs[j_packed_latlongs + 1] = packed_latlong;
j_packed_latlongs++;
}
geo_array_index.at(afield.name)->emplace(seq_id, packed_latlongs);
}
} else {
const std::vector<std::vector<double>>& latlongs = record.doc[afield.name];
@ -771,12 +787,7 @@ void Index::index_field_in_memory(const field& afield, std::vector<index_record>
for(size_t li = 0; li < latlongs.size(); li++) {
auto& latlong = latlongs[li];
S2Point point = S2LatLng::FromDegrees(latlong[0], latlong[1]).ToPoint();
std::set<std::string> terms;
for(const auto& term: indexer.GetIndexTerms(point, "")) {
terms.insert(term);
}
for(const auto& term: terms) {
(*geo_index)[term].push_back(seq_id);
}
@ -2206,7 +2217,7 @@ Option<bool> Index::run_search(search_args* search_params, const std::string& co
search_params->included_ids, search_params->excluded_ids,
search_params->sort_fields_std, search_params->num_typos,
search_params->topster, search_params->curated_topster,
search_params->per_page, search_params->page, search_params->token_order,
search_params->per_page, search_params->offset, search_params->token_order,
search_params->prefixes, search_params->drop_tokens_threshold,
search_params->all_result_ids_len, search_params->groups_processed,
search_params->searched_queries,
@ -2667,7 +2678,7 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
const std::vector<uint32_t>& excluded_ids, std::vector<sort_by>& sort_fields_std,
const std::vector<uint32_t>& num_typos, Topster* topster, Topster* curated_topster,
const size_t per_page,
const size_t page, const token_ordering token_order, const std::vector<bool>& prefixes,
const size_t offset, const token_ordering token_order, const std::vector<bool>& prefixes,
const size_t drop_tokens_threshold, size_t& all_result_ids_len,
spp::sparse_hash_map<uint64_t, uint32_t>& groups_processed,
std::vector<std::vector<art_leaf*>>& searched_queries,
@ -2702,6 +2713,8 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
return Option(true);
}
size_t fetch_size = offset + per_page;
std::set<uint32_t> curated_ids;
std::map<size_t, std::map<size_t, uint32_t>> included_ids_map; // outer pos => inner pos => list of IDs
std::vector<uint32_t> included_ids_vec;
@ -2782,7 +2795,7 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
groups_processed[distinct_id]++;
}
if (result_ids.size() == page * per_page) {
if (result_ids.size() == fetch_size) {
break;
}
@ -2805,7 +2818,7 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
collate_included_ids({}, included_ids_map, curated_topster, searched_queries);
if (!vector_query.field_name.empty()) {
auto k = std::max<size_t>(vector_query.k, per_page * page);
auto k = std::max<size_t>(vector_query.k, fetch_size);
if(vector_query.query_doc_given) {
// since we will omit the query doc from results
k++;
@ -3075,7 +3088,7 @@ Option<bool> Index::search(std::vector<query_tokens_t>& field_query_tokens, cons
VectorFilterFunctor filterFunctor(filter_result.docs, filter_result.count);
auto& field_vector_index = vector_index.at(vector_query.field_name);
std::vector<std::pair<float, size_t>> dist_labels;
auto k = std::max<size_t>(vector_query.k, per_page * page);
auto k = std::max<size_t>(vector_query.k, fetch_size);
if(field_vector_index->distance_type == cosine) {
std::vector<float> normalized_q(vector_query.values.size());

View File

@ -77,6 +77,7 @@ void master_server_routes() {
server->post("/operations/vote", post_vote, false, false);
server->post("/operations/cache/clear", post_clear_cache, false, false);
server->post("/operations/db/compact", post_compact_db, false, false);
server->post("/operations/reset_peers", post_reset_peers, false, false);
server->get("/limits", get_rate_limits);
server->get("/limits/active", get_active_throttles);

View File

@ -35,6 +35,7 @@ int ReplicationState::start(const butil::EndPoint & peering_endpoint, const int
this->election_timeout_interval_ms = election_timeout_ms;
this->raft_dir_path = raft_dir;
this->peering_endpoint = peering_endpoint;
braft::NodeOptions node_options;
@ -532,7 +533,8 @@ int ReplicationState::on_snapshot_load(braft::SnapshotReader* reader) {
return init_db_status;
}
void ReplicationState::refresh_nodes(const std::string & nodes) {
void ReplicationState::refresh_nodes(const std::string & nodes, const size_t raft_counter,
const std::atomic<bool>& reset_peers_on_error) {
std::shared_lock lock(node_mutex);
if(!node) {
@ -569,8 +571,8 @@ void ReplicationState::refresh_nodes(const std::string & nodes) {
std::vector<braft::PeerId> latest_nodes;
new_conf.list_peers(&latest_nodes);
if(latest_nodes.size() == 1) {
LOG(WARNING) << "Single-node with no leader. Resetting peers.";
if(latest_nodes.size() == 1 || (raft_counter > 0 && reset_peers_on_error)) {
LOG(WARNING) << "Node with no leader. Resetting peers of size: " << latest_nodes.size();
node->reset_peers(new_conf);
} else {
LOG(WARNING) << "Multi-node with no leader: refusing to reset peers.";
@ -773,6 +775,35 @@ bool ReplicationState::trigger_vote() {
return false;
}
bool ReplicationState::reset_peers() {
std::shared_lock lock(node_mutex);
if(node) {
const Option<std::string> & refreshed_nodes_op = Config::fetch_nodes_config(config->get_nodes());
if(!refreshed_nodes_op.ok()) {
LOG(WARNING) << "Error while fetching peer configuration: " << refreshed_nodes_op.error();
return false;
}
const std::string& nodes_config = ReplicationState::to_nodes_config(peering_endpoint,
Config::get_instance().get_api_port(),
refreshed_nodes_op.get());
braft::Configuration peer_config;
peer_config.parse_from(nodes_config);
std::vector<braft::PeerId> peers;
peer_config.list_peers(&peers);
auto status = node->reset_peers(peer_config);
LOG(INFO) << "Reset peers. Ok? " << status.ok() << ", status: " << status;
LOG(INFO) << "New peer config is: " << peer_config;
return status.ok();
}
return false;
}
http_message_dispatcher* ReplicationState::get_message_dispatcher() const {
return message_dispatcher;
}

View File

@ -1,6 +1,8 @@
#include "option.h"
#include "json.hpp"
#include "tsconfig.h"
#include "file_utils.h"
#include <fstream>
Option<bool> Config::update_config(const nlohmann::json& req_json) {
bool found_config = false;
@ -54,3 +56,36 @@ Option<bool> Config::update_config(const nlohmann::json& req_json) {
return Option<bool>(true);
}
Option<std::string> Config::fetch_file_contents(const std::string & file_path) {
if(!file_exists(file_path)) {
return Option<std::string>(404, std::string("File does not exist at: ") + file_path);
}
std::ifstream infile(file_path);
std::string content((std::istreambuf_iterator<char>(infile)), (std::istreambuf_iterator<char>()));
infile.close();
return Option<std::string>(content);
}
Option<std::string> Config::fetch_nodes_config(const std::string& path_to_nodes) {
std::string nodes_config;
if(!path_to_nodes.empty()) {
const Option<std::string> & nodes_op = fetch_file_contents(path_to_nodes);
if(!nodes_op.ok()) {
return Option<std::string>(500, "Error reading file containing nodes configuration: " + nodes_op.error());
} else {
nodes_config = nodes_op.get();
if(nodes_config.empty()) {
return Option<std::string>(500, "File containing nodes configuration is empty.");
} else {
nodes_config = nodes_op.get();
}
}
}
return Option<std::string>(nodes_config);
}

View File

@ -51,18 +51,6 @@ void catch_interrupt(int sig) {
quit_raft_service = true;
}
Option<std::string> fetch_file_contents(const std::string & file_path) {
if(!file_exists(file_path)) {
return Option<std::string>(404, std::string("File does not exist at: ") + file_path);
}
std::ifstream infile(file_path);
std::string content((std::istreambuf_iterator<char>(infile)), (std::istreambuf_iterator<char>()));
infile.close();
return Option<std::string>(content);
}
void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.set_program_name("./typesense-server");
@ -106,6 +94,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.add<int>("disk-used-max-percentage", '\0', "Reject writes when used disk space exceeds this percentage. Default: 100 (never reject).", false, 100);
options.add<int>("memory-used-max-percentage", '\0', "Reject writes when memory usage exceeds this percentage. Default: 100 (never reject).", false, 100);
options.add<bool>("skip-writes", '\0', "Skip all writes except config changes. Default: false.", false, false);
options.add<bool>("reset-peers-on-error", '\0', "Reset node's peers on clustering error. Default: false.", false, false);
options.add<int>("log-slow-searches-time-ms", '\0', "When >= 0, searches that take longer than this duration are logged.", false, 30*1000);
@ -155,27 +144,6 @@ int init_root_logger(Config & config, const std::string & server_version) {
return 0;
}
Option<std::string> fetch_nodes_config(const std::string& path_to_nodes) {
std::string nodes_config;
if(!path_to_nodes.empty()) {
const Option<std::string> & nodes_op = fetch_file_contents(path_to_nodes);
if(!nodes_op.ok()) {
return Option<std::string>(500, "Error reading file containing nodes configuration: " + nodes_op.error());
} else {
nodes_config = nodes_op.get();
if(nodes_config.empty()) {
return Option<std::string>(500, "File containing nodes configuration is empty.");
} else {
nodes_config = nodes_op.get();
}
}
}
return Option<std::string>(nodes_config);
}
bool is_private_ip(uint32_t ip) {
uint8_t b1, b2;
b1 = (uint8_t) (ip >> 24);
@ -251,13 +219,14 @@ const char* get_internal_ip(const std::string& subnet_cidr) {
int start_raft_server(ReplicationState& replication_state, const std::string& state_dir, const std::string& path_to_nodes,
const std::string& peering_address, uint32_t peering_port, const std::string& peering_subnet,
uint32_t api_port, int snapshot_interval_seconds, int snapshot_max_byte_count_per_rpc) {
uint32_t api_port, int snapshot_interval_seconds, int snapshot_max_byte_count_per_rpc,
const std::atomic<bool>& reset_peers_on_error) {
if(path_to_nodes.empty()) {
LOG(INFO) << "Since no --nodes argument is provided, starting a single node Typesense cluster.";
}
const Option<std::string>& nodes_config_op = fetch_nodes_config(path_to_nodes);
const Option<std::string>& nodes_config_op = Config::fetch_nodes_config(path_to_nodes);
if(!nodes_config_op.ok()) {
LOG(ERROR) << nodes_config_op.error();
@ -311,7 +280,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
while (!brpc::IsAskedToQuit() && !quit_raft_service.load()) {
if(raft_counter % 10 == 0) {
// reset peer configuration periodically to identify change in cluster membership
const Option<std::string> & refreshed_nodes_op = fetch_nodes_config(path_to_nodes);
const Option<std::string> & refreshed_nodes_op = Config::fetch_nodes_config(path_to_nodes);
if(!refreshed_nodes_op.ok()) {
LOG(WARNING) << "Error while refreshing peer configuration: " << refreshed_nodes_op.error();
continue;
@ -319,7 +288,7 @@ int start_raft_server(ReplicationState& replication_state, const std::string& st
const std::string& nodes_config = ReplicationState::to_nodes_config(peering_endpoint, api_port,
refreshed_nodes_op.get());
replication_state.refresh_nodes(nodes_config);
replication_state.refresh_nodes(nodes_config, raft_counter, reset_peers_on_error);
if(raft_counter % 60 == 0) {
replication_state.do_snapshot(nodes_config);
@ -459,6 +428,7 @@ int run_server(const Config & config, const std::string & version, void (*master
TextEmbedderManager::set_model_dir(config.get_model_dir());
TextEmbedderManager::download_default_model();
}
// first we start the peering service
ReplicationState replication_state(server, batch_indexer, &store,
@ -482,7 +452,8 @@ int run_server(const Config & config, const std::string & version, void (*master
config.get_peering_subnet(),
config.get_api_port(),
config.get_snapshot_interval_seconds(),
config.get_snapshot_max_byte_count_per_rpc());
config.get_snapshot_max_byte_count_per_rpc(),
config.get_reset_peers_on_error());
LOG(INFO) << "Shutting down batch indexer...";
batch_indexer->stop();

View File

@ -43,8 +43,10 @@ Option<uint32_t> validator_t::coerce_element(const field& a_field, nlohmann::jso
}
if(!(doc_ele[0].is_number() && doc_ele[1].is_number())) {
// one or more elements is not an number, try to coerce
Option<uint32_t> coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, dummy_iter, false, array_ele_erased);
// one or more elements is not a number, try to coerce
Option<uint32_t> coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name,
doc_ele[0], doc_ele[1],
dummy_iter, false, array_ele_erased);
if(!coerce_op.ok()) {
return coerce_op;
}
@ -62,16 +64,27 @@ Option<uint32_t> validator_t::coerce_element(const field& a_field, nlohmann::jso
nlohmann::json::iterator it = doc_ele.begin();
// Handle a geopoint[] type inside an array of object: it won't be an array of array, so cannot iterate
if(a_field.nested && a_field.type == field_types::GEOPOINT_ARRAY &&
it->is_number() && doc_ele.size() == 2) {
// have to differentiate the geopoint[] type of a nested array object's geopoint[] vs a simple nested field
// geopoint[] type of an array of objects field won't be an array of array
if(a_field.nested && a_field.type == field_types::GEOPOINT_ARRAY && it != doc_ele.end() && it->is_number()) {
if(!doc_ele.empty() && doc_ele.size() % 2 != 0) {
return Option<>(400, "Nested field `" + field_name + "` does not contain valid geopoint values.");
}
const auto& item = doc_ele;
if(!(item[0].is_number() && item[1].is_number())) {
// one or more elements is not an number, try to coerce
Option<uint32_t> coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, it, true, array_ele_erased);
if(!coerce_op.ok()) {
return coerce_op;
for(size_t ai = 0; ai < doc_ele.size(); ai+=2) {
if(!(doc_ele[ai].is_number() && doc_ele[ai+1].is_number())) {
// one or more elements is not an number, try to coerce
Option<uint32_t> coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name,
doc_ele[ai], doc_ele[ai+1],
it, true, array_ele_erased);
if(!coerce_op.ok()) {
return coerce_op;
}
}
it++;
}
return Option<uint32_t>(200);
@ -83,7 +96,7 @@ Option<uint32_t> validator_t::coerce_element(const field& a_field, nlohmann::jso
}
for(; it != doc_ele.end(); ) {
const auto& item = it.value();
nlohmann::json& item = it.value();
array_ele_erased = false;
if (a_field.type == field_types::STRING_ARRAY && !item.is_string()) {
@ -118,8 +131,10 @@ Option<uint32_t> validator_t::coerce_element(const field& a_field, nlohmann::jso
}
if(!(item[0].is_number() && item[1].is_number())) {
// one or more elements is not an number, try to coerce
Option<uint32_t> coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name, it, true, array_ele_erased);
// one or more elements is not a number, try to coerce
Option<uint32_t> coerce_op = coerce_geopoint(dirty_values, a_field, document, field_name,
item[0], item[1],
it, true, array_ele_erased);
if(!coerce_op.ok()) {
return coerce_op;
}
@ -396,11 +411,12 @@ Option<uint32_t> validator_t::coerce_bool(const DIRTY_VALUES& dirty_values, cons
return Option<uint32_t>(200);
}
Option<uint32_t> validator_t::coerce_geopoint(const DIRTY_VALUES& dirty_values, const field& a_field, nlohmann::json &document,
const std::string &field_name,
nlohmann::json::iterator& array_iter, bool is_array, bool& array_ele_erased) {
Option<uint32_t> validator_t::coerce_geopoint(const DIRTY_VALUES& dirty_values, const field& a_field,
nlohmann::json &document, const std::string &field_name,
nlohmann::json& lat, nlohmann::json& lng,
nlohmann::json::iterator& array_iter,
bool is_array, bool& array_ele_erased) {
std::string suffix = is_array ? "an array of" : "a";
auto& item = is_array ? array_iter.value() : document[field_name];
if(dirty_values == DIRTY_VALUES::REJECT) {
return Option<>(400, "Field `" + field_name + "` must be " + suffix + " geopoint.");
@ -422,19 +438,19 @@ Option<uint32_t> validator_t::coerce_geopoint(const DIRTY_VALUES& dirty_values,
// try to value coerce into a geopoint
if(!item[0].is_number() && item[0].is_string()) {
if(StringUtils::is_float(item[0])) {
item[0] = std::stof(item[0].get<std::string>());
if(!lat.is_number() && lat.is_string()) {
if(StringUtils::is_float(lat)) {
lat = std::stof(lat.get<std::string>());
}
}
if(!item[1].is_number() && item[1].is_string()) {
if(StringUtils::is_float(item[1])) {
item[1] = std::stof(item[1].get<std::string>());
if(!lng.is_number() && lng.is_string()) {
if(StringUtils::is_float(lng)) {
lng = std::stof(lng.get<std::string>());
}
}
if(!item[0].is_number() || !item[1].is_number()) {
if(!lat.is_number() || !lng.is_number()) {
if(dirty_values == DIRTY_VALUES::COERCE_OR_DROP) {
if(!a_field.optional) {
return Option<>(400, "Field `" + field_name + "` must be " + suffix + " geopoint.");

View File

@ -1788,7 +1788,7 @@ TEST_F(CollectionNestedFieldsTest, NestedFieldWithGeopointArray) {
Collection* coll1 = op.get();
auto doc1 = R"({
"addresses": [{"geoPoint": [1.91, 23.5]}]
"addresses": [{"geoPoint": [1.91, 23.5]}, {"geoPoint": [12.91, 23.5]}]
})"_json;
ASSERT_TRUE(coll1->add(doc1.dump(), CREATE).ok());
@ -1796,16 +1796,39 @@ TEST_F(CollectionNestedFieldsTest, NestedFieldWithGeopointArray) {
auto results = coll1->search("*", {}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}, 0).get();
ASSERT_EQ(1, results["found"].get<size_t>());
results = coll1->search("*", {}, "addresses.geoPoint: (12.911, 23.5, 1 mi)",
{}, {}, {0}, 10, 1, FREQUENCY).get();
ASSERT_EQ(1, results["found"].get<size_t>());
// with nested geopoint array
auto doc2 = R"({
"addresses": [{"geoPoint": [[1.91, 23.5]]}]
"addresses": [{"geoPoint": [[1.91, 23.5]]}, {"geoPoint": [[1.91, 23.5], [1.95, 24.5]]}]
})"_json;
ASSERT_TRUE(coll1->add(doc2.dump(), CREATE).ok());
results = coll1->search("*", {}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}, 0).get();
ASSERT_EQ(2, results["found"].get<size_t>());
// simply nested geopoint array
auto doc3 = R"({
"addresses": {"geoPoint": [[1.91, 23.5]]}
})"_json;
ASSERT_TRUE(coll1->add(doc3.dump(), CREATE).ok());
results = coll1->search("*", {}, "", {}, {}, {0}, 10, 1, FREQUENCY, {false}, 0).get();
ASSERT_EQ(3, results["found"].get<size_t>());
// simply nested geopoint
// this technically cannot be allowed but it's really tricky to detect so we allow
auto doc4 = R"({
"addresses": {"geoPoint": [1.91, 23.5]}
})"_json;
auto simple_geopoint_op = coll1->add(doc4.dump(), CREATE);
ASSERT_TRUE(simple_geopoint_op.ok());
// data validation
auto bad_doc = R"({
"addresses": [{"geoPoint": [1.91, "x"]}]

View File

@ -617,6 +617,124 @@ TEST_F(CoreAPIUtilsTest, PresetSingleSearch) {
collectionManager.drop_collection("coll1");
}
TEST_F(CoreAPIUtilsTest, SearchPagination) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "name", "type": "string" },
{"name": "points", "type": "int32" }
]
})"_json;
auto op = collectionManager.create_collection(schema);
ASSERT_TRUE(op.ok());
Collection* coll1 = op.get();
for(size_t i = 0; i < 20; i++) {
nlohmann::json doc;
doc["name"] = "Title " + std::to_string(i);
doc["points"] = i;
coll1->add(doc.dump(), CREATE);
}
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> res = std::make_shared<http_res>(nullptr);
nlohmann::json body;
// without any pagination params, default is top 10 records by sort order
body["searches"] = nlohmann::json::array();
nlohmann::json search;
search["collection"] = "coll1";
search["q"] = "title";
search["query_by"] = "name";
search["sort_by"] = "points:desc";
body["searches"].push_back(search);
req->body = body.dump();
nlohmann::json embedded_params;
req->embedded_params_vec.push_back(embedded_params);
post_multi_search(req, res);
nlohmann::json results = nlohmann::json::parse(res->body)["results"][0];
ASSERT_EQ(10, results["hits"].size());
ASSERT_EQ(19, results["hits"][0]["document"]["points"].get<size_t>());
ASSERT_EQ(1, results["page"].get<size_t>());
// when offset is used we should expect the same but "offset" should be returned in response
search.clear();
req->params.clear();
body["searches"] = nlohmann::json::array();
search["collection"] = "coll1";
search["q"] = "title";
search["offset"] = "1";
search["query_by"] = "name";
search["sort_by"] = "points:desc";
body["searches"].push_back(search);
req->body = body.dump();
post_multi_search(req, res);
results = nlohmann::json::parse(res->body)["results"][0];
ASSERT_EQ(10, results["hits"].size());
ASSERT_EQ(18, results["hits"][0]["document"]["points"].get<size_t>());
ASSERT_EQ(1, results["offset"].get<size_t>());
// use limit to restrict page size
search.clear();
req->params.clear();
body["searches"] = nlohmann::json::array();
search["collection"] = "coll1";
search["q"] = "title";
search["offset"] = "1";
search["limit"] = "5";
search["query_by"] = "name";
search["sort_by"] = "points:desc";
body["searches"].push_back(search);
req->body = body.dump();
post_multi_search(req, res);
results = nlohmann::json::parse(res->body)["results"][0];
ASSERT_EQ(5, results["hits"].size());
ASSERT_EQ(18, results["hits"][0]["document"]["points"].get<size_t>());
ASSERT_EQ(1, results["offset"].get<size_t>());
// when page is -1
search.clear();
req->params.clear();
body["searches"] = nlohmann::json::array();
search["collection"] = "coll1";
search["q"] = "title";
search["page"] = "-1";
search["limit"] = "5";
search["query_by"] = "name";
search["sort_by"] = "points:desc";
body["searches"].push_back(search);
req->body = body.dump();
post_multi_search(req, res);
results = nlohmann::json::parse(res->body)["results"][0];
ASSERT_EQ(400, results["code"].get<size_t>());
ASSERT_EQ("Parameter `page` must be an unsigned integer.", results["error"].get<std::string>());
// when offset is -1
search.clear();
req->params.clear();
body["searches"] = nlohmann::json::array();
search["collection"] = "coll1";
search["q"] = "title";
search["offset"] = "-1";
search["query_by"] = "name";
search["sort_by"] = "points:desc";
body["searches"].push_back(search);
req->body = body.dump();
post_multi_search(req, res);
results = nlohmann::json::parse(res->body)["results"][0];
ASSERT_EQ(400, results["code"].get<size_t>());
ASSERT_EQ("Parameter `offset` must be an unsigned integer.", results["error"].get<std::string>());
}
TEST_F(CoreAPIUtilsTest, ExportWithFilter) {
Collection *coll1;
std::vector<field> fields = {field("title", field_types::STRING, false),