Fix conversation expire and migration (#1895)

* Fix conversation expire and migration

* Remove unnecessary log

* Fix unused variable

* Remove unimplemented function declerations
This commit is contained in:
Ozan Armağan 2024-08-14 18:19:12 +03:00 committed by GitHub
parent 0395857601
commit 21500131fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 47 additions and 24 deletions

View File

@ -39,8 +39,6 @@ class ConversationManager {
Option<bool> validate_conversation_store_schema(Collection* collection);
Option<bool> validate_conversation_store_collection(const std::string& collection);
Option<bool> add_history_collection(const std::string& collection);
Option<bool> remove_history_collection(const std::string& collection);
Option<Collection*> get_history_collection(const std::string& conversation_id);
Option<bool> initialize_history_collection(const std::string& collection);
private:

View File

@ -242,6 +242,10 @@ Option<bool> ConversationManager::init(ReplicationState* raft_server) {
void ConversationManager::clear_expired_conversations() {
std::unique_lock lock(conversations_mutex);
// Only leader can delete expired conversations
if(raft_server && !raft_server->is_leader()) {
return;
}
auto models_op = ConversationModelManager::get_all_models();
if(!models_op.ok()) {
@ -260,16 +264,29 @@ void ConversationManager::clear_expired_conversations() {
auto ttl = model["ttl"].get<uint64_t>();
auto collection = CollectionManager::get_instance().get_collection(history_collection).get();
std::string filter_by_str = "timestamp:<" + std::to_string(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count() - ttl + TTL_OFFSET) + "&&model_id:=" + model["id"].get<std::string>();
if(raft_server) {
std::string res;
std::map<std::string, std::string> res_headers;
std::string url = raft_server->get_leader_url() + "collections/" + history_collection + "/documents?filter_by=" + filter_by_str;
auto res_code = HttpClient::get_instance().delete_response(url, res, res_headers, 10*1000, true);
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> resp = std::make_shared<http_res>(nullptr);
req->params["collection"] = history_collection;
req->params["filter_by"] = "timestamp:<" + std::to_string(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count() - ttl + TTL_OFFSET) + "&&model_id:=" + model["id"].get<std::string>();
if(res_code != 200) {
LOG(ERROR) << "Error while deleting expired conversations: " << res;
LOG(ERROR) << "Status: " << res_code;
}
} else {
std::shared_ptr<http_req> req = std::make_shared<http_req>();
std::shared_ptr<http_res> resp = std::make_shared<http_res>(nullptr);
req->params["collection"] = history_collection;
req->params["filter_by"] = filter_by_str;
auto api_res = del_remove_documents(req, resp);
auto api_res = del_remove_documents(req, resp);
if(!api_res) {
LOG(ERROR) << "Error while deleting expired conversations: " << resp->body;
}
if(!api_res) {
LOG(ERROR) << "Error while deleting expired conversations: " << resp->body;
}
}

View File

@ -40,7 +40,7 @@ Option<nlohmann::json> ConversationModelManager::add_model_unsafe(nlohmann::json
}
if(model.count("ttl") == 0) {
model["ttl"] = 60 * 60 * 24;
model["ttl"] = (uint64_t)(60 * 60 * 24);
}
auto model_key = get_model_key(model["id"]);
@ -240,6 +240,11 @@ Option<nlohmann::json> ConversationModelManager::migrate_model(nlohmann::json mo
return Option<nlohmann::json>(default_collection.code(), default_collection.error());
}
model["history_collection"] = default_collection.get()->get_name();
if(model.count("ttl") == 0) {
model["ttl"] = (uint64_t)(60 * 60 * 24);
}
auto add_res = add_model_unsafe(model, model_id);
if(!add_res.ok()) {
return Option<nlohmann::json>(add_res.code(), add_res.error());

View File

@ -35,16 +35,17 @@ protected:
"fields": [
{
"name": "conversation_id",
"type": "string",
"facet": true
"type": "string"
},
{
"name": "role",
"type": "string"
"type": "string",
"index": false
},
{
"name": "message",
"type": "string"
"type": "string",
"index": false
},
{
"name": "timestamp",
@ -3832,8 +3833,8 @@ TEST_F(CollectionVectorTest, TestPartiallyUpdateConversationModel) {
auto conversation_model_config = R"({
"model_name": "openai/gpt-3.5-turbo",
"max_bytes: 1000,
"history_collection": "conversation_store",
"max_bytes": 1000,
"history_collection": "conversation_store"
})"_json;
conversation_model_config["api_key"] = api_key;

View File

@ -25,16 +25,17 @@ class ConversationTest : public ::testing::Test {
"fields": [
{
"name": "conversation_id",
"type": "string",
"facet": true
"type": "string"
},
{
"name": "role",
"type": "string"
"type": "string",
"index": false
},
{
"name": "message",
"type": "string"
"type": "string",
"index": false
},
{
"name": "timestamp",

View File

@ -34,16 +34,17 @@ protected:
"fields": [
{
"name": "conversation_id",
"type": "string",
"facet": true
"type": "string"
},
{
"name": "role",
"type": "string"
"type": "string",
"index": false
},
{
"name": "message",
"type": "string"
"type": "string",
"index": false
},
{
"name": "timestamp",