Merge branch 'v0.25-join' into v0.26-filter

This commit is contained in:
Kishore Nallan 2023-05-24 20:34:56 +05:30
commit d68c4fb3b3
26 changed files with 1062 additions and 19 deletions

View File

@ -0,0 +1,68 @@
#pragma once
#include "popular_queries.h"
#include "option.h"
#include "raft_server.h"
#include <vector>
#include <string>
#include <unordered_map>
#include <shared_mutex>
class AnalyticsManager {
private:
mutable std::mutex mutex;
std::condition_variable cv;
std::atomic<bool> quit = false;
const size_t QUERY_COMPACTION_INTERVAL_S = 60;
struct suggestion_config_t {
std::string name;
std::string suggestion_collection;
std::vector<std::string> query_collections;
size_t max_suggestions;
};
// config name => config
std::unordered_map<std::string, suggestion_config_t> suggestion_configs;
// query collection => suggestion collections
std::unordered_map<std::string, std::vector<std::string>> query_collection_mapping;
// suggestion collection => popular queries
std::unordered_map<std::string, PopularQueries*> popular_queries;
Store* store = nullptr;
AnalyticsManager() {}
~AnalyticsManager();
public:
static constexpr const char* ANALYTICS_CONFIG_PREFIX = "$AC";
static constexpr const char* RESOURCE_TYPE = "popular_queries";
static AnalyticsManager& get_instance() {
static AnalyticsManager instance;
return instance;
}
AnalyticsManager(AnalyticsManager const&) = delete;
void operator=(AnalyticsManager const&) = delete;
void init(Store* store);
void run(ReplicationState* raft_server);
Option<bool> create_index(nlohmann::json& payload, bool write_to_disk = true);
Option<bool> remove_suggestion_index(const std::string& name);
void add_suggestion(const std::string& query_collection,
std::string& query, const bool live_query, const std::string& user_id);
void stop();
void dispose();
};

View File

@ -553,6 +553,8 @@ public:
std::vector<uint32_t>& query_by_weights,
std::vector<search_field_t>& weighted_search_fields,
std::vector<std::string>& reordered_search_fields) const;
Option<bool> truncate_after_top_k(const std::string& field_name, size_t k);
};
template<class T>

View File

@ -141,6 +141,14 @@ bool del_throttle(const std::shared_ptr<http_req>& req, const std::shared_ptr<ht
bool del_exceed(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Events
bool post_create_event(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
bool del_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);
// Misc helpers
void get_collections_for_auth(std::map<std::string, std::string>& req_params, const std::string& body,

26
include/event_manager.h Normal file
View File

@ -0,0 +1,26 @@
#pragma once
#include "json.hpp"
#include "option.h"
class EventManager {
private:
EventManager() = default;
~EventManager() = default;
static constexpr char* EVENT_TYPE = "type";
static constexpr char* EVENT_DATA = "data";
public:
static EventManager& get_instance() {
static EventManager instance;
return instance;
}
EventManager(EventManager const&) = delete;
void operator=(EventManager const&) = delete;
bool add_event(const nlohmann::json& event);
};

View File

@ -207,6 +207,7 @@ public:
struct http_req {
static constexpr const char* AUTH_HEADER = "x-typesense-api-key";
static constexpr const char* USER_HEADER = "x-typesense-user-id";
static constexpr const char* AGENT_HEADER = "user-agent";
h2o_req_t* _req;

View File

@ -507,7 +507,7 @@ private:
static void compute_facet_stats(facet &a_facet, uint64_t raw_value, const std::string & field_type);
static void handle_doc_ops(const tsl::htrie_map<char, field>& search_schema,
nlohmann::json& update_doc, const nlohmann::json& old_doc, nlohmann::json& new_doc);
nlohmann::json& update_doc, const nlohmann::json& old_doc);
static void get_doc_changes(const index_operation_t op, const tsl::htrie_map<char, field>& search_schema,
nlohmann::json &update_doc, const nlohmann::json &old_doc, nlohmann::json &new_doc,
@ -939,6 +939,9 @@ public:
std::vector<uint32_t>& included_ids_vec,
std::unordered_set<uint32_t>& excluded_group_ids) const;
Option<bool> seq_ids_outside_top_k(const std::string& field_name, size_t k,
std::vector<uint32_t>& outside_seq_ids);
friend class filter_result_iterator_t;
};

View File

@ -48,6 +48,8 @@ public:
size_t size();
void seq_ids_outside_top_k(size_t k, std::vector<uint32_t>& seq_ids);
void contains(const NUM_COMPARATOR& comparator, const int64_t& value,
const uint32_t& context_ids_length,
uint32_t* const& context_ids,

51
include/popular_queries.h Normal file
View File

@ -0,0 +1,51 @@
#pragma once
#include <string>
#include <vector>
#include <tsl/htrie_map.h>
#include <json.hpp>
#include <atomic>
#include <shared_mutex>
class PopularQueries {
public:
struct QWithTimestamp {
std::string query;
uint64_t timestamp;
QWithTimestamp(const std::string& query, uint64_t timestamp) : query(query), timestamp(timestamp) {}
};
static const size_t QUERY_FINALIZATION_INTERVAL_MICROS = 4 * 1000 * 1000;
private:
size_t k;
const size_t max_size;
// counts aggregated within the current node
tsl::htrie_map<char, uint32_t> local_counts;
std::shared_mutex lmutex;
std::unordered_map<std::string, std::vector<QWithTimestamp>> user_prefix_queries;
std::shared_mutex umutex;
public:
PopularQueries(size_t k);
void add(const std::string& value, const bool live_query, const std::string& user_id,
uint64_t now_ts_us = 0);
void compact_user_queries(uint64_t now_ts_us);
void serialize_as_docs(std::string& docs);
void reset_local_counts();
size_t get_k();
std::unordered_map<std::string, std::vector<QWithTimestamp>> get_user_prefix_queries();
tsl::htrie_map<char, uint32_t> get_local_counts();
};

View File

@ -237,6 +237,8 @@ public:
nlohmann::json get_status();
std::string get_leader_url() const;
private:
friend class ReplicationClosure;

View File

@ -66,6 +66,10 @@ private:
std::atomic<bool> reset_peers_on_error;
bool enable_search_analytics;
uint32_t analytics_flush_interval;
protected:
Config() {
@ -90,6 +94,9 @@ protected:
this->skip_writes = false;
this->log_slow_searches_time_ms = 30 * 1000;
this->reset_peers_on_error = false;
this->enable_search_analytics = false;
this->analytics_flush_interval = 3600; // in seconds
}
Config(Config const&) {
@ -291,6 +298,10 @@ public:
return this->cache_num_entries;
}
size_t get_analytics_flush_interval() const {
return this->analytics_flush_interval;
}
size_t get_thread_pool_size() const {
return this->thread_pool_size;
}
@ -303,6 +314,10 @@ public:
return this->enable_access_logging;
}
bool get_enable_search_analytics() const {
return this->enable_search_analytics;
}
int get_disk_used_max_percentage() const {
return this->disk_used_max_percentage;
}
@ -418,6 +433,10 @@ public:
this->cache_num_entries = std::stoi(get_env("TYPESENSE_CACHE_NUM_ENTRIES"));
}
if(!get_env("TYPESENSE_ANALYTICS_FLUSH_INTERVAL").empty()) {
this->analytics_flush_interval = std::stoi(get_env("TYPESENSE_ANALYTICS_FLUSH_INTERVAL"));
}
if(!get_env("TYPESENSE_THREAD_POOL_SIZE").empty()) {
this->thread_pool_size = std::stoi(get_env("TYPESENSE_THREAD_POOL_SIZE"));
}
@ -431,6 +450,7 @@ public:
}
this->enable_access_logging = ("TRUE" == get_env("TYPESENSE_ENABLE_ACCESS_LOGGING"));
this->enable_search_analytics = ("TRUE" == get_env("TYPESENSE_ENABLE_SEARCH_ANALYTICS"));
if(!get_env("TYPESENSE_DISK_USED_MAX_PERCENTAGE").empty()) {
this->disk_used_max_percentage = std::stoi(get_env("TYPESENSE_DISK_USED_MAX_PERCENTAGE"));
@ -576,6 +596,10 @@ public:
this->cache_num_entries = (int) reader.GetInteger("server", "cache-num-entries", 1000);
}
if(reader.Exists("server", "analytics-flush-interval")) {
this->analytics_flush_interval = (int) reader.GetInteger("server", "analytics-flush-interval", 3600);
}
if(reader.Exists("server", "thread-pool-size")) {
this->thread_pool_size = (int) reader.GetInteger("server", "thread-pool-size", 0);
}
@ -589,6 +613,11 @@ public:
this->enable_access_logging = (enable_access_logging_str == "true");
}
if(reader.Exists("server", "enable-search-analytics")) {
auto enable_search_analytics_str = reader.Get("server", "enable-search-analytics", "false");
this->enable_search_analytics = (enable_search_analytics_str == "true");
}
if(reader.Exists("server", "disk-used-max-percentage")) {
this->disk_used_max_percentage = (int) reader.GetInteger("server", "disk-used-max-percentage", 100);
}
@ -728,6 +757,10 @@ public:
this->cache_num_entries = options.get<uint32_t>("cache-num-entries");
}
if(options.exist("analytics-flush-interval")) {
this->analytics_flush_interval = options.get<uint32_t>("analytics-flush-interval");
}
if(options.exist("thread-pool-size")) {
this->thread_pool_size = options.get<uint32_t>("thread-pool-size");
}
@ -755,6 +788,11 @@ public:
if(options.exist("reset-peers-on-error")) {
this->reset_peers_on_error = options.get<bool>("reset-peers-on-error");
}
if(options.exist("enable-search-analytics")) {
this->enable_search_analytics = options.get<bool>("enable-search-analytics");
}
}
void set_cors_domains(std::string& cors_domains_value) {

252
src/analytics_manager.cpp Normal file
View File

@ -0,0 +1,252 @@
#include <mutex>
#include <thread>
#include "analytics_manager.h"
#include "tokenizer.h"
#include "http_client.h"
#include "collection_manager.h"
Option<bool> AnalyticsManager::create_index(nlohmann::json& payload, bool write_to_disk) {
/*
Sample payload:
{
"name": "top_queries",
"limit": 1000,
"source": {
"collections": ["brands", "products"]
},
"destination": {
"collection": "top_queries"
}
}
*/
if(!payload.contains("name") || !payload["name"].is_string()) {
return Option<bool>(400, "Bad or missing name.");
}
if(!payload.contains("source") || !payload["source"].is_object()) {
return Option<bool>(400, "Bad or missing source.");
}
if(!payload.contains("destination") || !payload["destination"].is_object()) {
return Option<bool>(400, "Bad or missing destination.");
}
const std::string& suggestion_config_name = payload["name"].get<std::string>();
size_t max_suggestions = 1000;
if(payload.contains("limit") && payload["limit"].is_number_integer()) {
max_suggestions = payload["limit"].get<size_t>();
}
if(suggestion_configs.find(suggestion_config_name) != suggestion_configs.end()) {
return Option<bool>(400, "There's already another configuration with the name `" +
suggestion_config_name + "`.");
}
if(!payload["source"].contains("collections") || !payload["source"]["collections"].is_array()) {
return Option<bool>(400, "Must contain a valid list of source collections.");
}
if(!payload["destination"].contains("collection") || !payload["destination"]["collection"].is_string()) {
return Option<bool>(400, "Must contain a valid destination collection.");
}
const std::string& suggestion_collection = payload["destination"]["collection"].get<std::string>();
suggestion_config_t suggestion_config;
suggestion_config.name = suggestion_config_name;
suggestion_config.suggestion_collection = suggestion_collection;
suggestion_config.max_suggestions = max_suggestions;
for(const auto& coll: payload["source"]["collections"]) {
if(!coll.is_string()) {
return Option<bool>(400, "Must contain a valid list of source collection names.");
}
const std::string& src_collection = coll.get<std::string>();
suggestion_config.query_collections.push_back(src_collection);
}
std::unique_lock lock(mutex);
suggestion_configs.emplace(suggestion_config_name, suggestion_config);
for(const auto& query_coll: suggestion_config.query_collections) {
query_collection_mapping[query_coll].push_back(suggestion_collection);
}
PopularQueries* popularQueries = new PopularQueries(max_suggestions);
popular_queries.emplace(suggestion_collection, popularQueries);
if(write_to_disk) {
payload["type"] = RESOURCE_TYPE;
auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + suggestion_config_name;
bool inserted = store->insert(suggestion_key, payload.dump());
if(!inserted) {
return Option<bool>(500, "Error while storing the config to disk.");
}
}
return Option<bool>(true);
}
AnalyticsManager::~AnalyticsManager() {
std::unique_lock lock(mutex);
for(auto& kv: popular_queries) {
delete kv.second;
}
}
Option<bool> AnalyticsManager::remove_suggestion_index(const std::string &name) {
std::unique_lock lock(mutex);
auto suggestion_configs_it = suggestion_configs.find(name);
if(suggestion_configs_it == suggestion_configs.end()) {
return Option<bool>(404, "Index not found.");
}
const auto& suggestion_collection = suggestion_configs_it->second.suggestion_collection;
for(const auto& query_collection: suggestion_configs_it->second.query_collections) {
query_collection_mapping.erase(query_collection);
}
if(popular_queries.count(suggestion_collection) != 0) {
delete popular_queries[suggestion_collection];
popular_queries.erase(suggestion_collection);
}
suggestion_configs.erase(name);
auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + name;
bool erased = store->remove(suggestion_key);
if(!erased) {
return Option<bool>(500, "Error while deleting from disk.");
}
return Option<bool>(true);
}
void AnalyticsManager::add_suggestion(const std::string &query_collection, std::string &query,
const bool live_query, const std::string& user_id) {
// look up suggestion collections for the query collection
std::unique_lock lock(mutex);
const auto& suggestion_collections_it = query_collection_mapping.find(query_collection);
if(suggestion_collections_it != query_collection_mapping.end()) {
for(const auto& suggestion_collection: suggestion_collections_it->second) {
const auto& popular_queries_it = popular_queries.find(suggestion_collection);
if(popular_queries_it != popular_queries.end()) {
Tokenizer::normalize_ascii(query);
popular_queries_it->second->add(query, live_query, user_id);
}
}
}
}
void AnalyticsManager::run(ReplicationState* raft_server) {
uint64_t prev_persistence_s = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
while(!quit) {
std::unique_lock lk(mutex);
cv.wait_for(lk, std::chrono::seconds(QUERY_COMPACTION_INTERVAL_S), [&] { return quit.load(); });
//LOG(INFO) << "QuerySuggestions::run";
if(quit) {
lk.unlock();
break;
}
for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
auto popular_queries_it = popular_queries.find(suggestion_coll);
if(popular_queries_it == popular_queries.end()) {
continue;
}
// need to prepare the counts as JSON docs for import into the suggestion collection
// {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}}
PopularQueries* popularQueries = popular_queries_it->second;
// aggregate prefix queries to their final form
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(now).count();
popularQueries->compact_user_queries(now_ts_us);
auto now_ts_seconds = std::chrono::duration_cast<std::chrono::seconds>(now).count();
if(now_ts_seconds - prev_persistence_s < Config::get_instance().get_analytics_flush_interval()) {
// we will persist aggregation every hour
continue;
}
prev_persistence_s = now_ts_seconds;
std::string import_payload;
popularQueries->serialize_as_docs(import_payload);
if(import_payload.empty()) {
continue;
}
// send http request
std::string leader_url = raft_server->get_leader_url();
if(!leader_url.empty()) {
const std::string& resource_url = leader_url + "collections/" + suggestion_coll +
"/documents/import?action=emplace";
std::string res;
std::map<std::string, std::string> res_headers;
long status_code = HttpClient::post_response(resource_url, import_payload,
res, res_headers, 10*1000);
if(status_code != 200) {
LOG(ERROR) << "Error while sending query suggestions events to leader. "
<< "Status code: " << status_code << ", response: " << res;
} else {
LOG(INFO) << "Sent query suggestions to leader for aggregation.";
popularQueries->reset_local_counts();
if(raft_server->is_leader()) {
// try to run top-K compaction of suggestion collection
auto coll = CollectionManager::get_instance().get_collection(suggestion_coll);
if (coll == nullptr) {
LOG(ERROR) << "No collection found for suggestions aggregation: " + suggestion_coll;
continue;
}
coll->truncate_after_top_k("count", popularQueries->get_k());
}
}
}
}
lk.unlock();
}
dispose();
}
void AnalyticsManager::stop() {
quit = true;
cv.notify_all();
}
void AnalyticsManager::dispose() {
for(auto& kv: popular_queries) {
delete kv.second;
}
popular_queries.clear();
}
void AnalyticsManager::init(Store* store) {
this->store = store;
}

View File

@ -4235,7 +4235,7 @@ Option<bool> Collection::validate_alter_payload(nlohmann::json& schema_changes,
index_operation_t::CREATE,
false,
fallback_field_type,
DIRTY_VALUES::REJECT);
DIRTY_VALUES::COERCE_OR_REJECT);
if(!validate_op.ok()) {
std::string err_message = validate_op.error();
@ -4777,4 +4777,22 @@ void Collection::process_remove_field_for_embedding_fields(const field& the_fiel
}
}
}
}
Option<bool> Collection::truncate_after_top_k(const string &field_name, size_t k) {
std::vector<uint32_t> seq_ids;
auto op = index->seq_ids_outside_top_k(field_name, k, seq_ids);
if(!op.ok()) {
return op;
}
for(auto seq_id: seq_ids) {
auto remove_op = remove_if_found(seq_id);
if(!remove_op.ok()) {
LOG(ERROR) << "Error while truncating top k: " << remove_op.error();
}
}
return Option<bool>(true);
}

View File

@ -2,6 +2,8 @@
#include <vector>
#include <json.hpp>
#include <app_metrics.h>
#include <analytics_manager.h>
#include <event_manager.h>
#include "collection_manager.h"
#include "batched_indexer.h"
#include "logger.h"
@ -1060,6 +1062,15 @@ Option<bool> CollectionManager::do_search(std::map<std::string, std::string>& re
nlohmann::json result = result_op.get();
if(Config::get_instance().get_enable_search_analytics()) {
if(result.count("found") != 0 && result["found"].get<size_t>() != 0) {
std::string processed_query = raw_query;
Tokenizer::normalize_ascii(processed_query);
AnalyticsManager::get_instance().add_suggestion(collection->get_name(), processed_query,
true, req_params["x-typesense-user-id"]);
}
}
if(exclude_fields.count("search_time_ms") == 0) {
result["search_time_ms"] = timeMillis;
}
@ -1287,6 +1298,19 @@ Option<bool> CollectionManager::load_collection(const nlohmann::json &collection
collection->add_synonym(collection_synonym);
}
// restore query suggestions configs
std::vector<std::string> analytics_config_jsons;
cm.store->scan_fill(AnalyticsManager::ANALYTICS_CONFIG_PREFIX,
std::string(AnalyticsManager::ANALYTICS_CONFIG_PREFIX) + "`",
analytics_config_jsons);
for(const auto& analytics_config_json: analytics_config_jsons) {
nlohmann::json analytics_config = nlohmann::json::parse(analytics_config_json);
if(analytics_config["type"] == AnalyticsManager::RESOURCE_TYPE) {
AnalyticsManager::get_instance().create_index(analytics_config, false);
}
}
// Fetch records from the store and re-create memory index
const std::string seq_id_prefix = collection->get_seq_id_collection_prefix();
std::string upper_bound_key = collection->get_seq_id_collection_prefix() + "`"; // cannot inline this
@ -1325,7 +1349,7 @@ Option<bool> CollectionManager::load_collection(const nlohmann::json &collection
field::flatten_doc(document, collection->get_nested_fields(), {}, true, flattened_fields);
}
auto dirty_values = DIRTY_VALUES::DROP;
auto dirty_values = DIRTY_VALUES::COERCE_OR_DROP;
num_valid_docs++;

View File

@ -3,6 +3,7 @@
#include <cstdlib>
#include <app_metrics.h>
#include <regex>
#include <analytics_manager.h>
#include "typesense_server_utils.h"
#include "core_api.h"
#include "string_utils.h"
@ -13,6 +14,7 @@
#include "core_api_utils.h"
#include "lru/lru.hpp"
#include "ratelimit_manager.h"
#include "event_manager.h"
using namespace std::chrono_literals;
@ -2032,4 +2034,58 @@ Option<std::pair<std::string,std::string>> get_api_key_and_ip(const std::string&
}
return Option<std::pair<std::string,std::string>>(std::make_pair(api_key, ip));
}
}
bool post_create_event(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
nlohmann::json req_json;
try {
req_json = nlohmann::json::parse(req->body);
} catch(const std::exception& e) {
LOG(ERROR) << "JSON error: " << e.what();
res->set_400("Bad JSON.");
return false;
}
bool success = EventManager::get_instance().add_event(req_json);
if(success) {
res->set_201(R"({"ok": true)");
return true;
}
res->set_400(R"({"ok": false)");
return false;
}
bool post_create_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
nlohmann::json req_json;
try {
req_json = nlohmann::json::parse(req->body);
} catch(const std::exception& e) {
LOG(ERROR) << "JSON error: " << e.what();
res->set_400("Bad JSON.");
return false;
}
auto op = AnalyticsManager::get_instance().create_index(req_json);
if(!op.ok()) {
res->set(op.code(), op.error());
return false;
}
res->set_201(req_json.dump());
return true;
}
bool del_analytics_popular_queries(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
auto op = AnalyticsManager::get_instance().remove_suggestion_index(req->params["name"]);
if(!op.ok()) {
res->set(op.code(), op.error());
return false;
}
res->set_200(R"({"ok": true)");
return true;
}

54
src/event_manager.cpp Normal file
View File

@ -0,0 +1,54 @@
#include <analytics_manager.h>
#include "event_manager.h"
bool EventManager::add_event(const nlohmann::json& event) {
/*
Sample event payload:
{
"type": "search",
"data": {
"q": "Nike shoes",
"collections": ["products"]
}
}
*/
if(!event.contains("type")) {
return false;
}
const auto& event_type_val = event[EVENT_TYPE];
if(event_type_val.is_string()) {
const std::string& event_type = event_type_val.get<std::string>();
if(event_type == "search") {
if(!event.contains("data")) {
return false;
}
const auto& event_data_val = event[EVENT_DATA];
if(!event_data_val.is_object()) {
return false;
}
const auto& event_data_query_it = event_data_val["q"];
if(!event_data_query_it.is_string() || !event_data_val["collections"].is_array()) {
return false;
}
for(const auto& coll: event_data_val["collections"]) {
if(!coll.is_string()) {
return false;
}
std::string query = event_data_query_it.get<std::string>();
AnalyticsManager::get_instance().add_suggestion(coll.get<std::string>(), query, false, "");
}
}
}
return true;
}

View File

@ -407,6 +407,17 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
api_auth_key_sent = query_map[http_req::AUTH_HEADER];
}
// extract user id from header, if not already present as GET param
ssize_t user_header_cursor = h2o_find_header_by_str(&req->headers, http_req::USER_HEADER, strlen(http_req::USER_HEADER), -1);
if(user_header_cursor != -1) {
h2o_iovec_t & slot = req->headers.entries[user_header_cursor].value;
std::string user_id_sent = std::string(slot.base, slot.len);
query_map[http_req::USER_HEADER] = user_id_sent;
} else if(query_map.count(http_req::USER_HEADER) == 0) {
query_map[http_req::USER_HEADER] = client_ip;
}
route_path *rpath = nullptr;
uint64_t route_hash = h2o_handler->http_server->find_route(path_parts, http_method, &rpath);

View File

@ -456,6 +456,7 @@ void Index::validate_and_preprocess(Index *index, std::vector<index_record>& ite
scrub_reindex_doc(search_schema, index_rec.doc, index_rec.del_doc, index_rec.old_doc);
embed_fields(index_rec.new_doc, embedding_fields, search_schema);
} else {
handle_doc_ops(search_schema, index_rec.doc, index_rec.old_doc);
embed_fields(index_rec.doc, embedding_fields, search_schema);
}
@ -5682,7 +5683,7 @@ void Index::refresh_schemas(const std::vector<field>& new_fields, const std::vec
}
void Index::handle_doc_ops(const tsl::htrie_map<char, field>& search_schema,
nlohmann::json& update_doc, const nlohmann::json& old_doc, nlohmann::json& new_doc) {
nlohmann::json& update_doc, const nlohmann::json& old_doc) {
/*
{
@ -5706,7 +5707,6 @@ void Index::handle_doc_ops(const tsl::htrie_map<char, field>& search_schema,
}
auto updated_value = existing_value + item.value().get<int32>();
new_doc[item.key()] = updated_value;
update_doc[item.key()] = updated_value;
}
}
@ -5726,9 +5726,10 @@ void Index::get_doc_changes(const index_operation_t op, const tsl::htrie_map<cha
} else {
new_doc = old_doc;
handle_doc_ops(search_schema, update_doc, old_doc, new_doc);
handle_doc_ops(search_schema, update_doc, old_doc);
new_doc.merge_patch(update_doc);
if(old_doc.contains(".flat")) {
new_doc[".flat"] = old_doc[".flat"];
for(auto& fl: update_doc[".flat"]) {
@ -5826,6 +5827,18 @@ size_t Index::num_seq_ids() const {
return seq_ids->num_ids();
}
Option<bool> Index::seq_ids_outside_top_k(const std::string& field_name, size_t k,
std::vector<uint32_t>& outside_seq_ids) {
auto field_it = numerical_index.find(field_name);
if(field_it == sort_index.end()) {
return Option<bool>(400, "Field not found in numerical index.");
}
field_it->second->seq_ids_outside_top_k(k, outside_seq_ids);
return Option<bool>(true);
}
void Index::resolve_space_as_typos(std::vector<std::string>& qtokens, const string& field_name,
std::vector<std::vector<std::string>>& resolved_queries) const {

View File

@ -68,6 +68,11 @@ void master_server_routes() {
server->put("/presets/:name", put_upsert_preset);
server->del("/presets/:name", del_preset);
// analytics
server->post("/analytics/popular-queries", post_create_analytics_popular_queries);
server->del("/analytics/popular-queries/:name", del_analytics_popular_queries);
server->post("/analytics/events", post_create_event);
// meta
server->get("/metrics.json", get_metrics_json);
server->get("/stats.json", get_stats_json);

View File

@ -318,6 +318,30 @@ void num_tree_t::contains(const NUM_COMPARATOR& comparator, const int64_t& value
result_ids = out;
}
void num_tree_t::seq_ids_outside_top_k(size_t k, std::vector<uint32_t> &seq_ids) {
size_t ids_skipped = 0;
for (auto iter = int64map.rbegin(); iter != int64map.rend(); ++iter) {
auto num_ids = ids_t::num_ids(iter->second);
if(ids_skipped > k) {
ids_t::uncompress(iter->second, seq_ids);
} else if((ids_skipped + num_ids) > k) {
// this element hits the limit, so we pick partial IDs to satisfy k
std::vector<uint32_t> ids;
ids_t::uncompress(iter->second, ids);
for(size_t i = 0; i < ids.size(); i++) {
auto seq_id = ids[i];
if(ids_skipped + i >= k) {
seq_ids.push_back(seq_id);
}
}
}
ids_skipped += num_ids;
}
}
size_t num_tree_t::size() {
return int64map.size();
}

118
src/popular_queries.cpp Normal file
View File

@ -0,0 +1,118 @@
#include "popular_queries.h"
#include "logger.h"
#include <algorithm>
#include <mutex>
#include "string_utils.h"
PopularQueries::PopularQueries(size_t k) : k(k), max_size(k * 2) {
}
void PopularQueries::add(const std::string& key, const bool live_query, const std::string& user_id, uint64_t now_ts_us) {
if(live_query) {
// live query must be aggregated first to their final form as they could be prefix queries
if(now_ts_us == 0) {
now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
if(!umutex.try_lock()) {
// instead of locking we just skip incrementing keys during consolidation time
return ;
}
auto& queries = user_prefix_queries[user_id];
if(queries.size() < 100) {
queries.emplace_back(key, now_ts_us);
}
umutex.unlock();
} else {
if(!lmutex.try_lock()) {
// instead of locking we just skip incrementing keys during consolidation time
return ;
}
auto it = local_counts.find(key);
if(it != local_counts.end()) {
it.value()++;
} else if(local_counts.size() < max_size) {
// skip count when map has become too large (to prevent abuse)
local_counts.emplace(key, 1);
}
lmutex.unlock();
}
}
void PopularQueries::serialize_as_docs(std::string& docs) {
std::shared_lock lk(lmutex);
std::string key_buffer;
for(auto it = local_counts.begin(); it != local_counts.end(); ++it) {
it.key(key_buffer);
nlohmann::json doc;
doc["id"] = std::to_string(StringUtils::hash_wy(key_buffer.c_str(), key_buffer.size()));
doc["q"] = key_buffer;
doc["$operations"]["increment"]["count"] = it.value();
docs += doc.dump() + "\n";
}
if(!docs.empty()) {
docs.pop_back();
}
}
void PopularQueries::reset_local_counts() {
std::unique_lock lk(lmutex);
local_counts.clear();
}
size_t PopularQueries::get_k() {
return k;
}
void PopularQueries::compact_user_queries(uint64_t now_ts_us) {
std::unique_lock lk(umutex);
std::vector<std::string> keys_to_delete;
for(auto& kv: user_prefix_queries) {
auto& queries = kv.second;
int64_t last_consolidated_index = -1;
for(uint32_t i = 0; i < queries.size(); i++) {
if(now_ts_us - queries[i].timestamp < QUERY_FINALIZATION_INTERVAL_MICROS) {
break;
}
uint64_t diff_micros = (i == queries.size()-1) ? (now_ts_us - queries[i].timestamp) :
(queries[i + 1].timestamp - queries[i].timestamp);
if(diff_micros > QUERY_FINALIZATION_INTERVAL_MICROS) {
add(queries[i].query, false, "");
last_consolidated_index = i;
}
}
queries.erase(queries.begin(), queries.begin() + last_consolidated_index+1);
if(queries.empty()) {
keys_to_delete.push_back(kv.first);
}
}
for(auto& key: keys_to_delete) {
user_prefix_queries.erase(key);
}
}
std::unordered_map<std::string, std::vector<PopularQueries::QWithTimestamp>> PopularQueries::get_user_prefix_queries() {
std::unique_lock lk(umutex);
return user_prefix_queries;
}
tsl::htrie_map<char, uint32_t> PopularQueries::get_local_counts() {
std::unique_lock lk(lmutex);
return local_counts;
}

View File

@ -955,6 +955,21 @@ bool ReplicationState::get_ext_snapshot_succeeded() {
return ext_snapshot_succeeded;
}
std::string ReplicationState::get_leader_url() const {
std::shared_lock lock(node_mutex);
if(node->leader_id().is_empty()) {
LOG(ERROR) << "Could not get leader status, as node does not have a leader!";
return "";
}
const std::string & leader_addr = node->leader_id().to_string();
lock.unlock();
const std::string protocol = api_uses_ssl ? "https" : "http";
return get_node_url_path(leader_addr, "/", protocol);
}
void TimedSnapshotClosure::Run() {
// Auto delete this after Done()
std::unique_ptr<TimedSnapshotClosure> self_guard(this);

View File

@ -13,6 +13,7 @@
#include <arpa/inet.h>
#include <sys/socket.h>
#include <ifaddrs.h>
#include <analytics_manager.h>
#include "core_api.h"
#include "ratelimit_manager.h"
@ -94,6 +95,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.add<std::string>("config", '\0', "Path to the configuration file.", false, "");
options.add<bool>("enable-access-logging", '\0', "Enable access logging.", false, false);
options.add<bool>("enable-search-analytics", '\0', "Enable search analytics.", false, false);
options.add<int>("disk-used-max-percentage", '\0', "Reject writes when used disk space exceeds this percentage. Default: 100 (never reject).", false, 100);
options.add<int>("memory-used-max-percentage", '\0', "Reject writes when memory usage exceeds this percentage. Default: 100 (never reject).", false, 100);
options.add<bool>("skip-writes", '\0', "Skip all writes except config changes. Default: false.", false, false);
@ -101,6 +103,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) {
options.add<int>("log-slow-searches-time-ms", '\0', "When >= 0, searches that take longer than this duration are logged.", false, 30*1000);
options.add<int>("cache-num-entries", '\0', "Number of entries to cache.", false, 1000);
options.add<uint32_t>("analytics-flush-interval", '\0', "Frequency of persisting analytics data to disk (in seconds).", false, 3600);
// DEPRECATED
options.add<std::string>("listen-address", 'h', "[DEPRECATED: use `api-address`] Address to which Typesense API service binds.", false, "0.0.0.0");
@ -394,6 +397,8 @@ int run_server(const Config & config, const std::string & version, void (*master
HttpClient & httpClient = HttpClient::get_instance();
httpClient.init(config.get_api_key());
AnalyticsManager::get_instance().init(&store);
server = new HttpServer(
version,
config.get_api_address(),
@ -450,6 +455,10 @@ int run_server(const Config & config, const std::string & version, void (*master
batch_indexer->run();
});
std::thread event_sink_thread([&replication_state]() {
AnalyticsManager::get_instance().run(&replication_state);
});
std::string path_to_nodes = config.get_nodes();
start_raft_server(replication_state, state_dir, path_to_nodes,
config.get_peering_address(),
@ -466,6 +475,12 @@ int run_server(const Config & config, const std::string & version, void (*master
LOG(INFO) << "Waiting for batch indexing thread to be done...";
batch_indexing_thread.join();
LOG(INFO) << "Shutting down event sink thread...";
AnalyticsManager::get_instance().stop();
LOG(INFO) << "Waiting for event sink thread to be done...";
event_sink_thread.join();
LOG(INFO) << "Shutting down server_thread_pool";
server_thread_pool.shutdown();

View File

@ -793,6 +793,55 @@ TEST_F(CollectionManagerTest, RestoreNestedDocsOnRestart) {
collectionManager2.drop_collection("coll1");
}
TEST_F(CollectionManagerTest, RestoreCoercedDocValuesOnRestart) {
nlohmann::json schema = R"({
"name": "coll1",
"enable_nested_fields": true,
"fields": [
{"name": "product", "type": "object" },
{"name": "product.price", "type": "int64" }
]
})"_json;
auto op = collectionManager.create_collection(schema);
ASSERT_TRUE(op.ok());
Collection* coll1 = op.get();
auto doc1 = R"({
"product": {"price": 45.78}
})"_json;
auto create_op = coll1->add(doc1.dump(), CREATE);
ASSERT_TRUE(create_op.ok());
auto res_op = coll1->search("*", {}, "product.price:>0", {}, {}, {0}, 10, 1,
token_ordering::FREQUENCY, {true});
ASSERT_TRUE(res_op.ok());
ASSERT_EQ(1, res_op.get()["found"].get<size_t>());
// create a new collection manager to ensure that it restores the records from the disk backed store
CollectionManager& collectionManager2 = CollectionManager::get_instance();
collectionManager2.init(store, 1.0, "auth_key", quit);
auto load_op = collectionManager2.load(8, 1000);
if(!load_op.ok()) {
LOG(ERROR) << load_op.error();
}
ASSERT_TRUE(load_op.ok());
auto restored_coll = collectionManager2.get_collection("coll1").get();
ASSERT_NE(nullptr, restored_coll);
res_op = restored_coll->search("*", {}, "product.price:>0", {}, {}, {0}, 10, 1,
token_ordering::FREQUENCY, {true});
ASSERT_TRUE(res_op.ok());
ASSERT_EQ(1, res_op.get()["found"].get<size_t>());
collectionManager.drop_collection("coll1");
collectionManager2.drop_collection("coll1");
}
TEST_F(CollectionManagerTest, DropCollectionCleanly) {
std::ifstream infile(std::string(ROOT_DIR)+"test/multi_field_documents.jsonl");
std::string json_line;

View File

@ -101,4 +101,43 @@ TEST_F(CollectionOperationsTest, IncrementInt32Value) {
ASSERT_EQ("0", res["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("The Sherlock Holmes", res["hits"][0]["document"]["title"].get<std::string>());
ASSERT_EQ(101, res["hits"][0]["document"]["points"].get<size_t>());
}
}
TEST_F(CollectionOperationsTest, IncrementInt32ValueCreationViaOptionalField) {
nlohmann::json schema = R"({
"name": "coll1",
"fields": [
{"name": "title", "type": "string"},
{"name": "points", "type": "int32", "optional": true}
]
})"_json;
Collection *coll = collectionManager.create_collection(schema).get();
nlohmann::json doc;
doc["id"] = "0";
doc["title"] = "Sherlock Holmes";
doc["$operations"] = R"({"increment": {"points": 1}})"_json;
ASSERT_TRUE(coll->add(doc.dump(), EMPLACE).ok());
auto res = coll->search("*", {"title"}, "points:1", {}, {}, {0}, 3, 1, FREQUENCY, {false}).get();
ASSERT_EQ(1, res["hits"].size());
ASSERT_EQ(3, res["hits"][0]["document"].size());
ASSERT_EQ("0", res["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("Sherlock Holmes", res["hits"][0]["document"]["title"].get<std::string>());
ASSERT_EQ(1, res["hits"][0]["document"]["points"].get<size_t>());
// try same with CREATE action
doc.clear();
doc["id"] = "1";
doc["title"] = "Harry Potter";
doc["$operations"] = R"({"increment": {"points": 10}})"_json;
ASSERT_TRUE(coll->add(doc.dump(), CREATE).ok());
res = coll->search("*", {"title"}, "points:10", {}, {}, {0}, 3, 1, FREQUENCY, {false}).get();
ASSERT_EQ(1, res["hits"].size());
ASSERT_EQ(3, res["hits"][0]["document"].size());
ASSERT_EQ("1", res["hits"][0]["document"]["id"].get<std::string>());
ASSERT_EQ("Harry Potter", res["hits"][0]["document"]["title"].get<std::string>());
ASSERT_EQ(10, res["hits"][0]["document"]["points"].get<size_t>());
}

View File

@ -542,7 +542,7 @@ TEST_F(CollectionSchemaChangeTest, AbilityToDropAndReAddIndexAtTheSameTime) {
nlohmann::json doc;
doc["id"] = "0";
doc["title"] = "123";
doc["title"] = "Hello";
doc["timestamp"] = 3433232;
ASSERT_TRUE(coll1->add(doc.dump()).ok());
@ -562,7 +562,7 @@ TEST_F(CollectionSchemaChangeTest, AbilityToDropAndReAddIndexAtTheSameTime) {
"Existing data for field `title` cannot be coerced into an int32.", alter_op.error());
// existing data should not have been touched
auto res = coll1->search("12", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {true}, 10).get();
auto res = coll1->search("he", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {true}, 10).get();
ASSERT_EQ(1, res["hits"].size());
ASSERT_EQ("0", res["hits"][0]["document"]["id"].get<std::string>());
@ -586,7 +586,7 @@ TEST_F(CollectionSchemaChangeTest, AbilityToDropAndReAddIndexAtTheSameTime) {
ASSERT_EQ(4, res["facet_counts"][0].size());
ASSERT_EQ("title", res["facet_counts"][0]["field_name"]);
ASSERT_EQ(1, res["facet_counts"][0]["counts"].size());
ASSERT_EQ("123", res["facet_counts"][0]["counts"][0]["value"].get<std::string>());
ASSERT_EQ("Hello", res["facet_counts"][0]["counts"][0]["value"].get<std::string>());
// migrate int32 to int64
schema_changes = R"({
@ -801,7 +801,7 @@ TEST_F(CollectionSchemaChangeTest, DropFieldNotExistingInDocuments) {
ASSERT_TRUE(alter_op.ok());
}
TEST_F(CollectionSchemaChangeTest, ChangeFieldToCoercableTypeIsNotAllowed) {
TEST_F(CollectionSchemaChangeTest, ChangeFieldToCoercableTypeIsAllowed) {
// optional title field
std::vector<field> fields = {field("title", field_types::STRING, false, true, true, "", 1, 1),
field("points", field_types::INT32, true),};
@ -823,9 +823,7 @@ TEST_F(CollectionSchemaChangeTest, ChangeFieldToCoercableTypeIsNotAllowed) {
})"_json;
auto alter_op = coll1->alter(schema_changes);
ASSERT_FALSE(alter_op.ok());
ASSERT_EQ("Schema change is incompatible with the type of documents already stored in this collection. "
"Existing data for field `points` cannot be coerced into a string.", alter_op.error());
ASSERT_TRUE(alter_op.ok());
}
TEST_F(CollectionSchemaChangeTest, ChangeFromPrimitiveToDynamicField) {
@ -1140,7 +1138,7 @@ TEST_F(CollectionSchemaChangeTest, DropIntegerFieldAndAddStringValues) {
nlohmann::json doc;
doc["id"] = "0";
doc["label"] = 1000;
doc["label"] = "hello";
doc["title"] = "Foo";
auto add_op = coll1->add(doc.dump());
ASSERT_TRUE(add_op.ok());
@ -1157,7 +1155,7 @@ TEST_F(CollectionSchemaChangeTest, DropIntegerFieldAndAddStringValues) {
// add new document with a string label
doc["id"] = "1";
doc["label"] = "abcdef";
doc["label"] = 1000;
doc["title"] = "Bar";
add_op = coll1->add(doc.dump());
ASSERT_TRUE(add_op.ok());
@ -1173,7 +1171,7 @@ TEST_F(CollectionSchemaChangeTest, DropIntegerFieldAndAddStringValues) {
alter_op = coll1->alter(schema_changes);
ASSERT_FALSE(alter_op.ok());
ASSERT_EQ("Schema change is incompatible with the type of documents already stored in this collection. "
"Existing data for field `label` cannot be coerced into a string.", alter_op.error());
"Existing data for field `label` cannot be coerced into an int64.", alter_op.error());
// but should allow the problematic field to be dropped
schema_changes = R"({
@ -1411,6 +1409,70 @@ TEST_F(CollectionSchemaChangeTest, DropAndReAddNestedObject) {
ASSERT_EQ(4, schema_map.size());
}
TEST_F(CollectionSchemaChangeTest, UpdateAfterNestedNullValue) {
nlohmann::json schema = R"({
"name": "coll1",
"enable_nested_fields": true,
"fields": [
{"name": "lines", "optional": false, "type": "object[]"},
{"name": "lines.name", "optional": true, "type": "string[]"}
]
})"_json;
Collection* coll1 = collectionManager.create_collection(schema).get();
nlohmann::json doc = R"(
{"id": "1", "lines": [{"name": null}]}
)"_json;
auto add_op = coll1->add(doc.dump(), CREATE, "1", DIRTY_VALUES::DROP);
ASSERT_TRUE(add_op.ok());
// add new field
auto schema_changes = R"({
"fields": [
{"name": "title", "type": "string", "optional": true}
]
})"_json;
auto alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
}
TEST_F(CollectionSchemaChangeTest, AlterShouldBeAbleToHandleFieldValueCoercion) {
nlohmann::json schema = R"({
"name": "coll1",
"enable_nested_fields": true,
"fields": [
{"name": "product", "optional": false, "type": "object"},
{"name": "product.price", "type": "int64"},
{"name": "title", "type": "string"},
{"name": "description", "type": "string"}
]
})"_json;
Collection* coll1 = collectionManager.create_collection(schema).get();
nlohmann::json doc = R"(
{"id": "0", "product": {"price": 56.45}, "title": "Title 1", "description": "Description 1"}
)"_json;
auto add_op = coll1->add(doc.dump(), CREATE, "0", DIRTY_VALUES::COERCE_OR_REJECT);
ASSERT_TRUE(add_op.ok());
// drop a field
auto schema_changes = R"({
"fields": [
{"name": "description", "drop": true}
]
})"_json;
auto alter_op = coll1->alter(schema_changes);
ASSERT_TRUE(alter_op.ok());
}
TEST_F(CollectionSchemaChangeTest, GeoFieldSchemaAddition) {
nlohmann::json schema = R"({
"name": "coll1",

View File

@ -0,0 +1,87 @@
#include <gtest/gtest.h>
#include "popular_queries.h"
#include "logger.h"
class PopularQueriesTest : public ::testing::Test {
protected:
virtual void SetUp() {
}
virtual void TearDown() {
}
};
TEST_F(PopularQueriesTest, PrefixQueryCompaction) {
PopularQueries pq(10);
auto now_ts_us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
// compaction when no queries have been entered
pq.compact_user_queries(now_ts_us);
auto queries = pq.get_user_prefix_queries();
ASSERT_TRUE(queries.empty());
// compaction after user has typed first prefix but before compaction interval has happened
pq.add("f", true, "0", now_ts_us+1);
pq.compact_user_queries(now_ts_us+2);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(1, queries.size());
ASSERT_EQ(1, queries.count("0"));
ASSERT_EQ(1, queries["0"].size());
ASSERT_EQ("f", queries["0"][0].query);
ASSERT_EQ(now_ts_us+1, queries["0"][0].timestamp);
ASSERT_EQ(0, pq.get_local_counts().size());
// compaction interval has happened
pq.compact_user_queries(now_ts_us+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(0, queries.size());
auto local_counts = pq.get_local_counts();
ASSERT_EQ(1, local_counts.size());
ASSERT_EQ(1, local_counts.count("f"));
ASSERT_EQ(1, local_counts["f"]);
// 3 letter search
pq.reset_local_counts();
pq.add("f", true, "0", now_ts_us+1);
pq.add("fo", true, "0", now_ts_us+2);
pq.add("foo", true, "0", now_ts_us+3);
pq.compact_user_queries(now_ts_us+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(0, queries.size());
local_counts = pq.get_local_counts();
ASSERT_EQ(1, local_counts.size());
ASSERT_EQ(1, local_counts.count("foo"));
ASSERT_EQ(1, local_counts["foo"]);
// 3 letter search + start of next search
pq.reset_local_counts();
pq.add("f", true, "0", now_ts_us+1);
pq.add("fo", true, "0", now_ts_us+2);
pq.add("foo", true, "0", now_ts_us+3);
pq.add("b", true, "0", now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100);
pq.compact_user_queries(now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100+1);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(1, queries.size());
ASSERT_EQ(1, queries["0"].size());
ASSERT_EQ("b", queries["0"][0].query);
local_counts = pq.get_local_counts();
ASSERT_EQ(1, local_counts.size());
ASSERT_EQ(1, local_counts.count("foo"));
ASSERT_EQ(1, local_counts["foo"]);
// continue with that query
auto prev_ts = now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100+1;
pq.add("ba", true, "0", prev_ts+1);
pq.add("bar", true, "0", prev_ts+2);
pq.compact_user_queries(prev_ts+2+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+1);
queries = pq.get_user_prefix_queries();
ASSERT_EQ(0, queries.size());
local_counts = pq.get_local_counts();
ASSERT_EQ(2, local_counts.size());
ASSERT_EQ(1, local_counts.count("bar"));
ASSERT_EQ(1, local_counts["bar"]);
}