diff --git a/include/collection.h b/include/collection.h index b101a818..bc949e61 100644 --- a/include/collection.h +++ b/include/collection.h @@ -55,7 +55,8 @@ private: mutable std::shared_mutex mutex; - mutable std::shared_mutex index_repair_lock; + // ensures that a Collection* is not destructed while in use by multiple threads + mutable std::shared_mutex lifecycle_mutex; const uint8_t CURATED_RECORD_IDENTIFIER = 100; @@ -673,6 +674,8 @@ public: static void hide_credential(nlohmann::json& json, const std::string& credential_name); friend class filter_result_iterator_t; + + std::shared_mutex& get_lifecycle_mutex(); }; template diff --git a/include/collection_manager.h b/include/collection_manager.h index 7319bb98..c8465fee 100644 --- a/include/collection_manager.h +++ b/include/collection_manager.h @@ -53,7 +53,7 @@ class CollectionManager { private: mutable std::shared_mutex mutex; - mutable std::mutex coll_create_mutex; + mutable std::shared_mutex noop_coll_mutex; Store *store; ThreadPool* thread_pool; @@ -168,7 +168,9 @@ public: nlohmann::json get_collection_summaries() const; - Option drop_collection(const std::string& collection_name, const bool remove_from_store = true); + Option drop_collection(const std::string& collection_name, + const bool remove_from_store = true, + const bool compact_store = true); uint32_t get_next_collection_id() const; diff --git a/include/http_server.h b/include/http_server.h index 7443b466..4ce21181 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -292,4 +292,6 @@ public: void persist_applying_index(); int64_t get_num_queued_writes(); + + void decr_pending_writes(); }; diff --git a/include/raft_server.h b/include/raft_server.h index 0c0035ec..6dbd2eb8 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -243,6 +243,8 @@ public: static Option handle_gzip(const std::shared_ptr& request); + void decr_pending_writes(); + private: friend class ReplicationClosure; diff --git a/src/batched_indexer.cpp b/src/batched_indexer.cpp index be3c9974..15f0a54e 100644 --- a/src/batched_indexer.cpp +++ b/src/batched_indexer.cpp @@ -272,6 +272,9 @@ void BatchedIndexer::run() { }); } + uint64_t stuck_counter = 0; + uint64_t prev_count = 0; + while(!quit) { std::this_thread::sleep_for(std::chrono::milliseconds (1000)); @@ -284,6 +287,27 @@ void BatchedIndexer::run() { std::unique_lock lk(mutex); LOG(INFO) << "Running GC for aborted requests, req map size: " << req_res_map.size(); + if(req_res_map.size() > 0 && prev_count == req_res_map.size()) { + stuck_counter++; + if(stuck_counter > 3) { + size_t max_loop = 0; + for(const auto& it : req_res_map) { + max_loop++; + LOG(INFO) << "Stuck req_key: " << it.first; + if(max_loop == 5) { + break; + } + } + + stuck_counter = 0; + } + + } else { + stuck_counter = 0; + } + + prev_count = req_res_map.size(); + // iterate through all map entries and delete ones which are not complete but > GC_PRUNE_MAX_SECONDS for (auto it = req_res_map.cbegin(); it != req_res_map.cend();) { uint64_t seconds_since_batch_update = std::chrono::duration_cast( diff --git a/src/collection.cpp b/src/collection.cpp index 694ea1bb..592344fe 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -60,8 +60,8 @@ Collection::Collection(const std::string& name, const uint32_t collection_id, co } Collection::~Collection() { + std::unique_lock lifecycle_lock(lifecycle_mutex); std::unique_lock lock(mutex); - std::unique_lock repair_lock(index_repair_lock); delete index; delete synonym_index; } @@ -6202,6 +6202,7 @@ Option Collection::truncate_after_top_k(const string &field_name, size_t k return Option(true); } +<<<<<<< HEAD void Collection::reference_populate_sort_mapping(int *sort_order, std::vector &geopoint_indices, std::vector &sort_fields_std, std::array *, 3> &field_values) @@ -6262,6 +6263,10 @@ Option Collection::get_sort_index_value_with_lock(const std::string& f return index->get_sort_index_value_with_lock(name, field_name, seq_id); } +std::shared_mutex& Collection::get_lifecycle_mutex() { + return lifecycle_mutex; +} + void Collection::remove_embedding_field(const std::string& field_name) { if(embedding_fields.find(field_name) == embedding_fields.end()) { return; @@ -6278,6 +6283,5 @@ tsl::htrie_map Collection::get_embedding_fields_unsafe() { } void Collection::do_housekeeping() { - std::unique_lock lock(index_repair_lock); index->repair_hnsw_index(); } diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 5739953e..70f0b998 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -419,7 +419,7 @@ Option CollectionManager::create_collection(const std::string& name const std::vector& symbols_to_index, const std::vector& token_separators, const bool enable_nested_fields) { - std::unique_lock lock(coll_create_mutex); + std::unique_lock lock(mutex); if(store->contains(Collection::get_meta_key(name))) { return Option(409, std::string("A collection with name `") + name + "` already exists."); @@ -470,6 +470,7 @@ Option CollectionManager::create_collection(const std::string& name return Option(500, "Could not write to on-disk storage."); } + lock.unlock(); add_to_collections(new_collection); if (referenced_in_backlog.count(name) > 0) { @@ -499,7 +500,8 @@ Collection* CollectionManager::get_collection_unsafe(const std::string & collect locked_resource_view_t CollectionManager::get_collection(const std::string & collection_name) const { std::shared_lock lock(mutex); Collection* coll = get_collection_unsafe(collection_name); - return locked_resource_view_t(mutex, coll); + return coll != nullptr ? locked_resource_view_t(coll->get_lifecycle_mutex(), coll) : + locked_resource_view_t(noop_coll_mutex, coll); } locked_resource_view_t CollectionManager::get_collection_with_id(uint32_t collection_id) const { @@ -539,7 +541,9 @@ std::vector CollectionManager::get_collection_names() const { return collection_vec; } -Option CollectionManager::drop_collection(const std::string& collection_name, const bool remove_from_store) { +Option CollectionManager::drop_collection(const std::string& collection_name, + const bool remove_from_store, + const bool compact_store) { std::shared_lock s_lock(mutex); auto collection = get_collection_unsafe(collection_name); @@ -556,8 +560,11 @@ Option CollectionManager::drop_collection(const std::string& col const std::string& del_key_prefix = std::to_string(collection->get_collection_id()) + "_"; const std::string& del_end_prefix = std::to_string(collection->get_collection_id()) + "`"; store->delete_range(del_key_prefix, del_end_prefix); - store->flush(); - store->compact_range(del_key_prefix, del_end_prefix); + + if(compact_store) { + store->flush(); + store->compact_range(del_key_prefix, del_end_prefix); + } // delete overrides const std::string& del_override_prefix = diff --git a/src/core_api.cpp b/src/core_api.cpp index e3afb1d2..97e076ba 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -256,9 +256,14 @@ bool patch_update_collection(const std::shared_ptr& req, const std::sh } bool del_drop_collection(const std::shared_ptr& req, const std::shared_ptr& res) { - std::string doc_id = req->params["id"]; + bool compact_store = false; + + if(req->params.count("compact_store") != 0) { + compact_store = (req->params["compact_store"] == "true"); + } + CollectionManager & collectionManager = CollectionManager::get_instance(); - Option drop_op = collectionManager.drop_collection(req->params["collection"], true); + Option drop_op = collectionManager.drop_collection(req->params["collection"], true, compact_store); if(!drop_op.ok()) { res->set(drop_op.code(), drop_op.error()); diff --git a/src/http_client.cpp b/src/http_client.cpp index 79a62a4a..cef788c6 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -295,6 +295,7 @@ size_t HttpClient::curl_write_async(char *buffer, size_t size, size_t nmemb, voi size_t HttpClient::curl_write_async_done(void *context, curl_socket_t item) { //LOG(INFO) << "curl_write_async_done"; deferred_req_res_t* req_res = static_cast(context); + req_res->server->decr_pending_writes(); if(!req_res->res->is_alive) { // underlying client request is dead, don't try to send anymore data diff --git a/src/http_server.cpp b/src/http_server.cpp index ce381c9f..18ad4296 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -1103,3 +1103,7 @@ bool HttpServer::is_leader() const { ThreadPool* HttpServer::get_meta_thread_pool() const { return meta_thread_pool; } + +void HttpServer::decr_pending_writes() { + return replication_state->decr_pending_writes(); +} diff --git a/src/raft_server.cpp b/src/raft_server.cpp index f98e8b3f..26cf6396 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -378,18 +378,17 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, response->content_type_header = res_headers["content-type"]; response->set_500(""); } else { - pending_writes--; return ; } } else { std::string api_res; - long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 10*1000, true); + long status = HttpClient::post_response(url, request->body, api_res, res_headers, {}, 0, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } } else if(request->http_method == "PUT") { std::string api_res; - long status = HttpClient::put_response(url, request->body, api_res, res_headers, 10*1000, true); + long status = HttpClient::put_response(url, request->body, api_res, res_headers, 0, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else if(request->http_method == "DELETE") { @@ -402,13 +401,7 @@ void ReplicationState::write_to_leader(const std::shared_ptr& request, std::string api_res; route_path* rpath = nullptr; bool route_found = server->get_route(request->route_hash, &rpath); - - long timeout_ms = 10 * 1000; - if(route_found && rpath->handler == patch_update_collection) { - timeout_ms = 0; // patching a collection can take a long time - } - - long status = HttpClient::patch_response(url, request->body, api_res, res_headers, timeout_ms, true); + long status = HttpClient::patch_response(url, request->body, api_res, res_headers, 0, true); response->content_type_header = res_headers["content-type"]; response->set_body(status, api_res); } else { @@ -678,13 +671,14 @@ void ReplicationState::refresh_nodes(const std::string & nodes, const size_t raf node->get_status(&nodeStatus); LOG(INFO) << "Term: " << nodeStatus.term - << ", last_index index: " << nodeStatus.last_index - << ", committed_index: " << nodeStatus.committed_index - << ", known_applied_index: " << nodeStatus.known_applied_index - << ", applying_index: " << nodeStatus.applying_index - << ", queued_writes: " << batched_indexer->get_queued_writes() - << ", pending_queue_size: " << nodeStatus.pending_queue_size - << ", local_sequence: " << store->get_latest_seq_number(); + << ", pending_queue: " << nodeStatus.pending_queue_size + << ", last_index: " << nodeStatus.last_index + << ", committed: " << nodeStatus.committed_index + << ", known_applied: " << nodeStatus.known_applied_index + << ", applying: " << nodeStatus.applying_index + << ", pending_writes: " << pending_writes + << ", queued_writes: " << batched_indexer->get_queued_writes() + << ", local_sequence: " << store->get_latest_seq_number(); if(node->is_leader()) { RefreshNodesClosure* refresh_nodes_done = new RefreshNodesClosure; @@ -1102,6 +1096,10 @@ std::string ReplicationState::get_leader_url() const { return get_node_url_path(leader_addr, "/", protocol); } +void ReplicationState::decr_pending_writes() { + pending_writes--; +} + void TimedSnapshotClosure::Run() { // Auto delete this after Done() std::unique_ptr self_guard(this);