Iterate doc fields for removal to allow partial deletion.

This commit is contained in:
kishorenc 2020-09-26 15:28:12 +05:30
parent 9c2782e93d
commit b764b32134
4 changed files with 65 additions and 55 deletions

View File

@ -79,15 +79,22 @@ struct search_args {
};
};
enum index_operation_t {
CREATE,
UPDATE,
DELETE
};
struct index_record {
size_t position; // position of record in the original request
uint32_t seq_id;
nlohmann::json document;
index_operation_t operation;
Option<bool> indexed; // indicates if the indexing operation was a success
index_record(size_t record_pos, uint32_t seq_id, const nlohmann::json& doc):
position(record_pos), seq_id(seq_id), document(doc), indexed(true) {
index_record(size_t record_pos, uint32_t seq_id, const nlohmann::json& doc, index_operation_t operation):
position(record_pos), seq_id(seq_id), document(doc), operation(operation), indexed(true) {
}

View File

@ -8,7 +8,6 @@
#include <art.h>
#include <thread>
#include <future>
#include <chrono>
#include <rocksdb/write_batch.h>
#include <system_metrics.h>
#include "topster.h"
@ -209,7 +208,7 @@ nlohmann::json Collection::add_many(std::vector<std::string>& json_lines) {
Option<uint32_t> doc_seq_id_op = to_doc(json_line, document);
const uint32_t seq_id = doc_seq_id_op.ok() ? doc_seq_id_op.get() : 0;
index_record record(i, seq_id, document);
index_record record(i, seq_id, document, CREATE);
// NOTE: we overwrite the input json_lines with result to avoid memory pressure

View File

@ -173,7 +173,7 @@ Option<bool> CollectionManager::load(const size_t init_batch_size) {
}
num_valid_docs++;
iter_batch[seq_id % collection->get_num_memory_shards()].emplace_back(index_record(0, seq_id, document));
iter_batch[seq_id % collection->get_num_memory_shards()].emplace_back(index_record(0, seq_id, document, CREATE));
// Peek and check for last record right here so that we handle batched indexing correctly
// Without doing this, the "last batch" would have to be indexed outside the loop.

View File

@ -322,23 +322,25 @@ size_t Index::batch_memory_index(Index *index, std::vector<index_record> & iter_
continue;
}
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.document, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema);
if(index_rec.operation == CREATE) {
Option<uint32_t> validation_op = validate_index_in_memory(index_rec.document, index_rec.seq_id,
default_sorting_field,
search_schema, facet_schema);
if(!validation_op.ok()) {
index_rec.index_failure(validation_op.code(), validation_op.error());
continue;
if(!validation_op.ok()) {
index_rec.index_failure(validation_op.code(), validation_op.error());
continue;
}
Option<uint32_t> index_mem_op = index->index_in_memory(index_rec.document, index_rec.seq_id, default_sorting_field);
if(!index_mem_op.ok()) {
index_rec.index_failure(index_mem_op.code(), index_mem_op.error());
continue;
}
index_rec.index_success(index_rec);
num_indexed++;
}
Option<uint32_t> index_mem_op = index->index_in_memory(index_rec.document, index_rec.seq_id, default_sorting_field);
if(!index_mem_op.ok()) {
index_rec.index_failure(index_mem_op.code(), index_mem_op.error());
continue;
}
index_rec.index_success(index_rec);
num_indexed++;
}
return num_indexed;
@ -1801,75 +1803,78 @@ void Index::remove_and_shift_offset_index(sorted_array &offset_index, const uint
}
Option<uint32_t> Index::remove(const uint32_t seq_id, const nlohmann::json & document) {
for(auto & name_field: search_schema) {
if(name_field.second.optional && document.count(name_field.first) == 0) {
for(const auto& it: document.items()) {
const std::string& field_name = it.key();
const auto& name_field = search_schema.find(field_name);
if(name_field == search_schema.end()) {
continue;
}
// Go through all the field names and find the keys+values so that they can be removed from in-memory index
std::vector<std::string> tokens;
if(name_field.second.type == field_types::STRING) {
StringUtils::split(document[name_field.first], tokens, " ");
} else if(name_field.second.type == field_types::STRING_ARRAY) {
std::vector<std::string> values = document[name_field.first].get<std::vector<std::string>>();
if(name_field->second.type == field_types::STRING) {
StringUtils::split(document[name_field->first], tokens, " ");
} else if(name_field->second.type == field_types::STRING_ARRAY) {
std::vector<std::string> values = document[name_field->first].get<std::vector<std::string>>();
for(const std::string & value: values) {
StringUtils::split(value, tokens, " ");
}
} else if(name_field.second.type == field_types::INT32) {
} else if(name_field->second.type == field_types::INT32) {
const int KEY_LEN = 8;
unsigned char key[KEY_LEN];
int32_t value = document[name_field.first].get<int32_t>();
int32_t value = document[name_field->first].get<int32_t>();
encode_int32(value, key);
tokens.push_back(std::string((char*)key, KEY_LEN));
} else if(name_field.second.type == field_types::INT32_ARRAY) {
std::vector<int32_t> values = document[name_field.first].get<std::vector<int32_t>>();
tokens.emplace_back((char*)key, KEY_LEN);
} else if(name_field->second.type == field_types::INT32_ARRAY) {
std::vector<int32_t> values = document[name_field->first].get<std::vector<int32_t>>();
for(const int32_t value: values) {
const int KEY_LEN = 8;
unsigned char key[KEY_LEN];
encode_int32(value, key);
tokens.push_back(std::string((char*)key, KEY_LEN));
tokens.emplace_back((char*)key, KEY_LEN);
}
} else if(name_field.second.type == field_types::INT64) {
} else if(name_field->second.type == field_types::INT64) {
const int KEY_LEN = 8;
unsigned char key[KEY_LEN];
int64_t value = document[name_field.first].get<int64_t>();
int64_t value = document[name_field->first].get<int64_t>();
encode_int64(value, key);
tokens.push_back(std::string((char*)key, KEY_LEN));
} else if(name_field.second.type == field_types::INT64_ARRAY) {
std::vector<int64_t> values = document[name_field.first].get<std::vector<int64_t>>();
tokens.emplace_back((char*)key, KEY_LEN);
} else if(name_field->second.type == field_types::INT64_ARRAY) {
std::vector<int64_t> values = document[name_field->first].get<std::vector<int64_t>>();
for(const int64_t value: values) {
const int KEY_LEN = 8;
unsigned char key[KEY_LEN];
encode_int64(value, key);
tokens.push_back(std::string((char*)key, KEY_LEN));
tokens.emplace_back((char*)key, KEY_LEN);
}
} else if(name_field.second.type == field_types::FLOAT) {
} else if(name_field->second.type == field_types::FLOAT) {
const int KEY_LEN = 8;
unsigned char key[KEY_LEN];
int64_t value = document[name_field.first].get<int64_t>();
int64_t value = document[name_field->first].get<int64_t>();
encode_float(value, key);
tokens.push_back(std::string((char*)key, KEY_LEN));
} else if(name_field.second.type == field_types::FLOAT_ARRAY) {
std::vector<float> values = document[name_field.first].get<std::vector<float>>();
tokens.emplace_back((char*)key, KEY_LEN);
} else if(name_field->second.type == field_types::FLOAT_ARRAY) {
std::vector<float> values = document[name_field->first].get<std::vector<float>>();
for(const float value: values) {
const int KEY_LEN = 8;
unsigned char key[KEY_LEN];
encode_float(value, key);
tokens.push_back(std::string((char*)key, KEY_LEN));
tokens.emplace_back((char*)key, KEY_LEN);
}
} else if(name_field.second.type == field_types::BOOL) {
} else if(name_field->second.type == field_types::BOOL) {
const int KEY_LEN = 1;
unsigned char key[KEY_LEN];
bool value = document[name_field.first].get<bool>();
bool value = document[name_field->first].get<bool>();
key[0] = value ? '1' : '0';
tokens.push_back(std::string((char*)key, KEY_LEN));
} else if(name_field.second.type == field_types::BOOL_ARRAY) {
std::vector<bool> values = document[name_field.first].get<std::vector<bool>>();
tokens.emplace_back((char*)key, KEY_LEN);
} else if(name_field->second.type == field_types::BOOL_ARRAY) {
std::vector<bool> values = document[name_field->first].get<std::vector<bool>>();
for(const bool value: values) {
const int KEY_LEN = 1;
unsigned char key[KEY_LEN];
key[0] = value ? '1' : '0';
tokens.push_back(std::string((char*)key, KEY_LEN));
tokens.emplace_back((char*)key, KEY_LEN);
}
}
@ -1877,7 +1882,7 @@ Option<uint32_t> Index::remove(const uint32_t seq_id, const nlohmann::json & doc
const unsigned char *key;
int key_len;
if(name_field.second.type == field_types::STRING_ARRAY || name_field.second.type == field_types::STRING) {
if(name_field->second.type == field_types::STRING_ARRAY || name_field->second.type == field_types::STRING) {
string_utils.unicode_normalize(token);
key = (const unsigned char *) token.c_str();
key_len = (int) (token.length() + 1);
@ -1886,8 +1891,8 @@ Option<uint32_t> Index::remove(const uint32_t seq_id, const nlohmann::json & doc
key_len = (int) (token.length());
}
art_leaf* leaf = (art_leaf *) art_search(search_index.at(name_field.first), key, key_len);
if(leaf != NULL) {
art_leaf* leaf = (art_leaf *) art_search(search_index.at(name_field->first), key, key_len);
if(leaf != nullptr) {
uint32_t seq_id_values[1] = {seq_id};
uint32_t doc_index = leaf->values->ids.indexOf(seq_id);
@ -1914,9 +1919,8 @@ Option<uint32_t> Index::remove(const uint32_t seq_id, const nlohmann::json & doc
LOG(INFO) << "----";*/
if(leaf->values->ids.getLength() == 0) {
art_values* values = (art_values*) art_delete(search_index.at(name_field.first), key, key_len);
art_values* values = (art_values*) art_delete(search_index.at(name_field->first), key, key_len);
delete values;
values = nullptr;
}
}
}