From 21500131fd41b6075784f170d6c542111a5f5470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ozan=20Arma=C4=9Fan?= <70442658+ozanarmagan@users.noreply.github.com> Date: Wed, 14 Aug 2024 18:19:12 +0300 Subject: [PATCH] Fix conversation expire and migration (#1895) * Fix conversation expire and migration * Remove unnecessary log * Fix unused variable * Remove unimplemented function declerations --- include/conversation_manager.h | 2 -- src/conversation_manager.cpp | 31 ++++++++++++++++++++------ src/conversation_model_manager.cpp | 7 +++++- test/collection_vector_search_test.cpp | 13 ++++++----- test/conversation_test.cpp | 9 ++++---- test/core_api_utils_test.cpp | 9 ++++---- 6 files changed, 47 insertions(+), 24 deletions(-) diff --git a/include/conversation_manager.h b/include/conversation_manager.h index 2d9d90db..0bec6eef 100644 --- a/include/conversation_manager.h +++ b/include/conversation_manager.h @@ -39,8 +39,6 @@ class ConversationManager { Option validate_conversation_store_schema(Collection* collection); Option validate_conversation_store_collection(const std::string& collection); - Option add_history_collection(const std::string& collection); - Option remove_history_collection(const std::string& collection); Option get_history_collection(const std::string& conversation_id); Option initialize_history_collection(const std::string& collection); private: diff --git a/src/conversation_manager.cpp b/src/conversation_manager.cpp index 2a875253..3cccc505 100644 --- a/src/conversation_manager.cpp +++ b/src/conversation_manager.cpp @@ -242,6 +242,10 @@ Option ConversationManager::init(ReplicationState* raft_server) { void ConversationManager::clear_expired_conversations() { std::unique_lock lock(conversations_mutex); + // Only leader can delete expired conversations + if(raft_server && !raft_server->is_leader()) { + return; + } auto models_op = ConversationModelManager::get_all_models(); if(!models_op.ok()) { @@ -260,16 +264,29 @@ void ConversationManager::clear_expired_conversations() { auto ttl = model["ttl"].get(); auto collection = CollectionManager::get_instance().get_collection(history_collection).get(); + std::string filter_by_str = "timestamp:<" + std::to_string(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() - ttl + TTL_OFFSET) + "&&model_id:=" + model["id"].get(); + if(raft_server) { + + std::string res; + std::map res_headers; + std::string url = raft_server->get_leader_url() + "collections/" + history_collection + "/documents?filter_by=" + filter_by_str; + auto res_code = HttpClient::get_instance().delete_response(url, res, res_headers, 10*1000, true); - std::shared_ptr req = std::make_shared(); - std::shared_ptr resp = std::make_shared(nullptr); - req->params["collection"] = history_collection; - req->params["filter_by"] = "timestamp:<" + std::to_string(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count() - ttl + TTL_OFFSET) + "&&model_id:=" + model["id"].get(); + if(res_code != 200) { + LOG(ERROR) << "Error while deleting expired conversations: " << res; + LOG(ERROR) << "Status: " << res_code; + } + } else { + std::shared_ptr req = std::make_shared(); + std::shared_ptr resp = std::make_shared(nullptr); + req->params["collection"] = history_collection; + req->params["filter_by"] = filter_by_str; + auto api_res = del_remove_documents(req, resp); - auto api_res = del_remove_documents(req, resp); + if(!api_res) { + LOG(ERROR) << "Error while deleting expired conversations: " << resp->body; + } - if(!api_res) { - LOG(ERROR) << "Error while deleting expired conversations: " << resp->body; } } diff --git a/src/conversation_model_manager.cpp b/src/conversation_model_manager.cpp index f16d7c45..a6ee9fe9 100644 --- a/src/conversation_model_manager.cpp +++ b/src/conversation_model_manager.cpp @@ -40,7 +40,7 @@ Option ConversationModelManager::add_model_unsafe(nlohmann::json } if(model.count("ttl") == 0) { - model["ttl"] = 60 * 60 * 24; + model["ttl"] = (uint64_t)(60 * 60 * 24); } auto model_key = get_model_key(model["id"]); @@ -240,6 +240,11 @@ Option ConversationModelManager::migrate_model(nlohmann::json mo return Option(default_collection.code(), default_collection.error()); } model["history_collection"] = default_collection.get()->get_name(); + + if(model.count("ttl") == 0) { + model["ttl"] = (uint64_t)(60 * 60 * 24); + } + auto add_res = add_model_unsafe(model, model_id); if(!add_res.ok()) { return Option(add_res.code(), add_res.error()); diff --git a/test/collection_vector_search_test.cpp b/test/collection_vector_search_test.cpp index 920c0b0f..aefa449c 100644 --- a/test/collection_vector_search_test.cpp +++ b/test/collection_vector_search_test.cpp @@ -35,16 +35,17 @@ protected: "fields": [ { "name": "conversation_id", - "type": "string", - "facet": true + "type": "string" }, { "name": "role", - "type": "string" + "type": "string", + "index": false }, { "name": "message", - "type": "string" + "type": "string", + "index": false }, { "name": "timestamp", @@ -3832,8 +3833,8 @@ TEST_F(CollectionVectorTest, TestPartiallyUpdateConversationModel) { auto conversation_model_config = R"({ "model_name": "openai/gpt-3.5-turbo", - "max_bytes: 1000, - "history_collection": "conversation_store", + "max_bytes": 1000, + "history_collection": "conversation_store" })"_json; conversation_model_config["api_key"] = api_key; diff --git a/test/conversation_test.cpp b/test/conversation_test.cpp index 729bfd53..cb6b9ae0 100644 --- a/test/conversation_test.cpp +++ b/test/conversation_test.cpp @@ -25,16 +25,17 @@ class ConversationTest : public ::testing::Test { "fields": [ { "name": "conversation_id", - "type": "string", - "facet": true + "type": "string" }, { "name": "role", - "type": "string" + "type": "string", + "index": false }, { "name": "message", - "type": "string" + "type": "string", + "index": false }, { "name": "timestamp", diff --git a/test/core_api_utils_test.cpp b/test/core_api_utils_test.cpp index 2bd68a20..eb01c365 100644 --- a/test/core_api_utils_test.cpp +++ b/test/core_api_utils_test.cpp @@ -34,16 +34,17 @@ protected: "fields": [ { "name": "conversation_id", - "type": "string", - "facet": true + "type": "string" }, { "name": "role", - "type": "string" + "type": "string", + "index": false }, { "name": "message", - "type": "string" + "type": "string", + "index": false }, { "name": "timestamp",