Merge branch 'v0.25-join' into v0.26-facets

This commit is contained in:
Kishore Nallan 2023-11-25 15:39:52 +05:30
commit 7ac12c0357
11 changed files with 81 additions and 29 deletions

View File

@ -55,7 +55,8 @@ private:
mutable std::shared_mutex mutex;
mutable std::shared_mutex index_repair_lock;
// ensures that a Collection* is not destructed while in use by multiple threads
mutable std::shared_mutex lifecycle_mutex;
const uint8_t CURATED_RECORD_IDENTIFIER = 100;
@ -673,6 +674,8 @@ public:
static void hide_credential(nlohmann::json& json, const std::string& credential_name);
friend class filter_result_iterator_t;
std::shared_mutex& get_lifecycle_mutex();
};
template<class T>

View File

@ -53,7 +53,7 @@ class CollectionManager {
private:
mutable std::shared_mutex mutex;
mutable std::mutex coll_create_mutex;
mutable std::shared_mutex noop_coll_mutex;
Store *store;
ThreadPool* thread_pool;
@ -168,7 +168,9 @@ public:
nlohmann::json get_collection_summaries() const;
Option<nlohmann::json> drop_collection(const std::string& collection_name, const bool remove_from_store = true);
Option<nlohmann::json> drop_collection(const std::string& collection_name,
const bool remove_from_store = true,
const bool compact_store = true);
uint32_t get_next_collection_id() const;

View File

@ -292,4 +292,6 @@ public:
void persist_applying_index();
int64_t get_num_queued_writes();
void decr_pending_writes();
};

View File

@ -243,6 +243,8 @@ public:
static Option<bool> handle_gzip(const std::shared_ptr<http_req>& request);
void decr_pending_writes();
private:
friend class ReplicationClosure;

View File

@ -272,6 +272,9 @@ void BatchedIndexer::run() {
});
}
uint64_t stuck_counter = 0;
uint64_t prev_count = 0;
while(!quit) {
std::this_thread::sleep_for(std::chrono::milliseconds (1000));
@ -284,6 +287,27 @@ void BatchedIndexer::run() {
std::unique_lock lk(mutex);
LOG(INFO) << "Running GC for aborted requests, req map size: " << req_res_map.size();
if(req_res_map.size() > 0 && prev_count == req_res_map.size()) {
stuck_counter++;
if(stuck_counter > 3) {
size_t max_loop = 0;
for(const auto& it : req_res_map) {
max_loop++;
LOG(INFO) << "Stuck req_key: " << it.first;
if(max_loop == 5) {
break;
}
}
stuck_counter = 0;
}
} else {
stuck_counter = 0;
}
prev_count = req_res_map.size();
// iterate through all map entries and delete ones which are not complete but > GC_PRUNE_MAX_SECONDS
for (auto it = req_res_map.cbegin(); it != req_res_map.cend();) {
uint64_t seconds_since_batch_update = std::chrono::duration_cast<std::chrono::seconds>(

View File

@ -60,8 +60,8 @@ Collection::Collection(const std::string& name, const uint32_t collection_id, co
}
Collection::~Collection() {
std::unique_lock lifecycle_lock(lifecycle_mutex);
std::unique_lock lock(mutex);
std::unique_lock repair_lock(index_repair_lock);
delete index;
delete synonym_index;
}
@ -6202,6 +6202,7 @@ Option<bool> Collection::truncate_after_top_k(const string &field_name, size_t k
return Option<bool>(true);
}
<<<<<<< HEAD
void Collection::reference_populate_sort_mapping(int *sort_order, std::vector<size_t> &geopoint_indices,
std::vector<sort_by> &sort_fields_std,
std::array<spp::sparse_hash_map<uint32_t, int64_t, Hasher32> *, 3> &field_values)
@ -6262,6 +6263,10 @@ Option<uint32_t> Collection::get_sort_index_value_with_lock(const std::string& f
return index->get_sort_index_value_with_lock(name, field_name, seq_id);
}
std::shared_mutex& Collection::get_lifecycle_mutex() {
return lifecycle_mutex;
}
void Collection::remove_embedding_field(const std::string& field_name) {
if(embedding_fields.find(field_name) == embedding_fields.end()) {
return;
@ -6278,6 +6283,5 @@ tsl::htrie_map<char, field> Collection::get_embedding_fields_unsafe() {
}
void Collection::do_housekeeping() {
std::unique_lock lock(index_repair_lock);
index->repair_hnsw_index();
}

View File

@ -419,7 +419,7 @@ Option<Collection*> CollectionManager::create_collection(const std::string& name
const std::vector<std::string>& symbols_to_index,
const std::vector<std::string>& token_separators,
const bool enable_nested_fields) {
std::unique_lock lock(coll_create_mutex);
std::unique_lock lock(mutex);
if(store->contains(Collection::get_meta_key(name))) {
return Option<Collection*>(409, std::string("A collection with name `") + name + "` already exists.");
@ -470,6 +470,7 @@ Option<Collection*> CollectionManager::create_collection(const std::string& name
return Option<Collection*>(500, "Could not write to on-disk storage.");
}
lock.unlock();
add_to_collections(new_collection);
if (referenced_in_backlog.count(name) > 0) {
@ -499,7 +500,8 @@ Collection* CollectionManager::get_collection_unsafe(const std::string & collect
locked_resource_view_t<Collection> CollectionManager::get_collection(const std::string & collection_name) const {
std::shared_lock lock(mutex);
Collection* coll = get_collection_unsafe(collection_name);
return locked_resource_view_t<Collection>(mutex, coll);
return coll != nullptr ? locked_resource_view_t<Collection>(coll->get_lifecycle_mutex(), coll) :
locked_resource_view_t<Collection>(noop_coll_mutex, coll);
}
locked_resource_view_t<Collection> CollectionManager::get_collection_with_id(uint32_t collection_id) const {
@ -539,7 +541,9 @@ std::vector<std::string> CollectionManager::get_collection_names() const {
return collection_vec;
}
Option<nlohmann::json> CollectionManager::drop_collection(const std::string& collection_name, const bool remove_from_store) {
Option<nlohmann::json> CollectionManager::drop_collection(const std::string& collection_name,
const bool remove_from_store,
const bool compact_store) {
std::shared_lock s_lock(mutex);
auto collection = get_collection_unsafe(collection_name);
@ -556,8 +560,11 @@ Option<nlohmann::json> CollectionManager::drop_collection(const std::string& col
const std::string& del_key_prefix = std::to_string(collection->get_collection_id()) + "_";
const std::string& del_end_prefix = std::to_string(collection->get_collection_id()) + "`";
store->delete_range(del_key_prefix, del_end_prefix);
store->flush();
store->compact_range(del_key_prefix, del_end_prefix);
if(compact_store) {
store->flush();
store->compact_range(del_key_prefix, del_end_prefix);
}
// delete overrides
const std::string& del_override_prefix =

View File

@ -256,9 +256,14 @@ bool patch_update_collection(const std::shared_ptr<http_req>& req, const std::sh
}
bool del_drop_collection(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
std::string doc_id = req->params["id"];
bool compact_store = false;
if(req->params.count("compact_store") != 0) {
compact_store = (req->params["compact_store"] == "true");
}
CollectionManager & collectionManager = CollectionManager::get_instance();
Option<nlohmann::json> drop_op = collectionManager.drop_collection(req->params["collection"], true);
Option<nlohmann::json> drop_op = collectionManager.drop_collection(req->params["collection"], true, compact_store);
if(!drop_op.ok()) {
res->set(drop_op.code(), drop_op.error());

View File

@ -295,6 +295,7 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi
size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) {
//LOG(INFO) << "curl_write_async_done";
deferred_req_res_t* req_res = static_cast<deferred_req_res_t *>(context);
req_res->server->decr_pending_writes();
if(!req_res->res->is_alive) {
// underlying client request is dead, don't try to send anymore data

View File

@ -1103,3 +1103,7 @@ bool HttpServer::is_leader() const {
ThreadPool* HttpServer::get_meta_thread_pool() const {
return meta_thread_pool;
}
void HttpServer::decr_pending_writes() {
return replication_state->decr_pending_writes();
}

View File

@ -378,18 +378,17 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
response->content_type_header = res_headers["content-type"];
response->set_500("");
} else {
pending_writes--;
return ;
}
} else {
std::string api_res;
long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 10*1000, true);
long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 0, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
}
} else if(request->http_method == "PUT") {
std::string api_res;
long status = HttpClient::put_response(url, request->body, api_res, res_headers, 10*1000, true);
long status = HttpClient::put_response(url, request->body, api_res, res_headers, 0, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
} else if(request->http_method == "DELETE") {
@ -402,13 +401,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr<http_req>& request,
std::string api_res;
route_path* rpath = nullptr;
bool route_found = server->get_route(request->route_hash, &rpath);
long timeout_ms = 10 * 1000;
if(route_found && rpath->handler == patch_update_collection) {
timeout_ms = 0; // patching a collection can take a long time
}
long status = HttpClient::patch_response(url, request->body, api_res, res_headers, timeout_ms, true);
long status = HttpClient::patch_response(url, request->body, api_res, res_headers, 0, true);
response->content_type_header = res_headers["content-type"];
response->set_body(status, api_res);
} else {
@ -678,13 +671,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes, const size_t raf
node->get_status(&nodeStatus);
LOG(INFO) << "Term: " << nodeStatus.term
<< ", last_index index: " << nodeStatus.last_index
<< ", committed_index: " << nodeStatus.committed_index
<< ", known_applied_index: " << nodeStatus.known_applied_index
<< ", applying_index: " << nodeStatus.applying_index
<< ", queued_writes: " << batched_indexer->get_queued_writes()
<< ", pending_queue_size: " << nodeStatus.pending_queue_size
<< ", local_sequence: " << store->get_latest_seq_number();
<< ", pending_queue: " << nodeStatus.pending_queue_size
<< ", last_index: " << nodeStatus.last_index
<< ", committed: " << nodeStatus.committed_index
<< ", known_applied: " << nodeStatus.known_applied_index
<< ", applying: " << nodeStatus.applying_index
<< ", pending_writes: " << pending_writes
<< ", queued_writes: " << batched_indexer->get_queued_writes()
<< ", local_sequence: " << store->get_latest_seq_number();
if(node->is_leader()) {
RefreshNodesClosure* refresh_nodes_done = new RefreshNodesClosure;
@ -1102,6 +1096,10 @@ std::string ReplicationState::get_leader_url() const {
return get_node_url_path(leader_addr, "/", protocol);
}
void ReplicationState::decr_pending_writes() {
pending_writes--;
}
void TimedSnapshotClosure::Run() {
// Auto delete this after Done()
std::unique_ptr<TimedSnapshotClosure> self_guard(this);