Finer control over locking during alter.

This commit is contained in:
Kishore Nallan 2023-09-16 19:40:08 +05:30
parent a670b579f7
commit ab62726355
2 changed files with 18 additions and 4 deletions

View File

@ -4101,9 +4101,10 @@ Option<bool> Collection::batch_alter_data(const std::vector<field>& alter_fields
// Update schema with additions (deletions can only be made later)
std::vector<field> new_fields;
tsl::htrie_map<char, field> schema_additions;
std::vector<std::string> nested_field_names;
std::unique_lock ulock(mutex);
for(auto& f: alter_fields) {
if(f.name == ".*") {
fields.push_back(f);
@ -4130,10 +4131,13 @@ Option<bool> Collection::batch_alter_data(const std::vector<field>& alter_fields
fields.push_back(f);
}
index->refresh_schemas(new_fields, {});
field::compact_nested_fields(nested_fields);
ulock.unlock();
std::shared_lock shlock(mutex);
index->refresh_schemas(new_fields, {});
// Now, we can index existing data onto the updated schema
const std::string seq_id_prefix = get_seq_id_collection_prefix();
std::string upper_bound_key = get_seq_id_collection_prefix() + "`"; // cannot inline this
@ -4201,6 +4205,9 @@ Option<bool> Collection::batch_alter_data(const std::vector<field>& alter_fields
}
LOG(INFO) << "Finished altering " << num_found_docs << " document(s).";
shlock.unlock();
ulock.lock();
std::vector<field> garbage_embedding_fields_vec;
for(auto& del_field: del_fields) {
search_schema.erase(del_field.name);
@ -4233,6 +4240,9 @@ Option<bool> Collection::batch_alter_data(const std::vector<field>& alter_fields
process_remove_field_for_embedding_fields(del_field, garbage_embedding_fields_vec);
}
ulock.unlock();
shlock.lock();
index->refresh_schemas({}, del_fields);
index->refresh_schemas({}, garbage_embedding_fields_vec);
@ -4245,7 +4255,7 @@ Option<bool> Collection::batch_alter_data(const std::vector<field>& alter_fields
}
Option<bool> Collection::alter(nlohmann::json& alter_payload) {
std::unique_lock lock(mutex);
std::shared_lock shlock(mutex);
LOG(INFO) << "Collection " << name << " is being prepared for alter...";
@ -4266,7 +4276,10 @@ Option<bool> Collection::alter(nlohmann::json& alter_payload) {
return Option<bool>(400, "The schema already contains a `.*` field.");
}
shlock.unlock();
if(!this_fallback_field_type.empty() && fallback_field_type.empty()) {
std::unique_lock ulock(mutex);
fallback_field_type = this_fallback_field_type;
}

View File

@ -589,6 +589,7 @@ size_t Index::batch_memory_index(Index *index,
}
num_queued = num_processed = 0;
std::unique_lock ulock(index->mutex);
for(const auto& field_name: found_fields) {
//LOG(INFO) << "field name: " << field_name;