From a9da41dd0e83a847bfb2f469530dd751b79ae232 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Thu, 18 May 2023 18:53:58 +0530 Subject: [PATCH 1/4] Handle increment for creation. --- include/index.h | 2 +- src/index.cpp | 7 ++--- test/collection_operations_test.cpp | 41 ++++++++++++++++++++++++++++- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/include/index.h b/include/index.h index 38fdc576..138c774d 100644 --- a/include/index.h +++ b/include/index.h @@ -526,7 +526,7 @@ private: static void compute_facet_stats(facet &a_facet, uint64_t raw_value, const std::string & field_type); static void handle_doc_ops(const tsl::htrie_map& search_schema, - nlohmann::json& update_doc, const nlohmann::json& old_doc, nlohmann::json& new_doc); + nlohmann::json& update_doc, const nlohmann::json& old_doc); static void get_doc_changes(const index_operation_t op, const tsl::htrie_map& search_schema, nlohmann::json &update_doc, const nlohmann::json &old_doc, nlohmann::json &new_doc, diff --git a/src/index.cpp b/src/index.cpp index b337bb4b..66a1470d 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -455,6 +455,7 @@ void Index::validate_and_preprocess(Index *index, std::vector& ite scrub_reindex_doc(search_schema, index_rec.doc, index_rec.del_doc, index_rec.old_doc); embed_fields(index_rec.new_doc, embedding_fields, search_schema); } else { + handle_doc_ops(search_schema, index_rec.doc, index_rec.old_doc); embed_fields(index_rec.doc, embedding_fields, search_schema); } @@ -6130,7 +6131,7 @@ void Index::refresh_schemas(const std::vector& new_fields, const std::vec } void Index::handle_doc_ops(const tsl::htrie_map& search_schema, - nlohmann::json& update_doc, const nlohmann::json& old_doc, nlohmann::json& new_doc) { + nlohmann::json& update_doc, const nlohmann::json& old_doc) { /* { @@ -6154,7 +6155,6 @@ void Index::handle_doc_ops(const tsl::htrie_map& search_schema, } auto updated_value = existing_value + item.value().get(); - new_doc[item.key()] = updated_value; update_doc[item.key()] = updated_value; } } @@ -6174,9 +6174,10 @@ void Index::get_doc_changes(const index_operation_t op, const tsl::htrie_map()); ASSERT_EQ("The Sherlock Holmes", res["hits"][0]["document"]["title"].get()); ASSERT_EQ(101, res["hits"][0]["document"]["points"].get()); -} \ No newline at end of file +} + +TEST_F(CollectionOperationsTest, IncrementInt32ValueCreationViaOptionalField) { + nlohmann::json schema = R"({ + "name": "coll1", + "fields": [ + {"name": "title", "type": "string"}, + {"name": "points", "type": "int32", "optional": true} + ] + })"_json; + + Collection *coll = collectionManager.create_collection(schema).get(); + + nlohmann::json doc; + doc["id"] = "0"; + doc["title"] = "Sherlock Holmes"; + doc["$operations"] = R"({"increment": {"points": 1}})"_json; + ASSERT_TRUE(coll->add(doc.dump(), EMPLACE).ok()); + + auto res = coll->search("*", {"title"}, "points:1", {}, {}, {0}, 3, 1, FREQUENCY, {false}).get(); + ASSERT_EQ(1, res["hits"].size()); + ASSERT_EQ(3, res["hits"][0]["document"].size()); + ASSERT_EQ("0", res["hits"][0]["document"]["id"].get()); + ASSERT_EQ("Sherlock Holmes", res["hits"][0]["document"]["title"].get()); + ASSERT_EQ(1, res["hits"][0]["document"]["points"].get()); + + // try same with CREATE action + doc.clear(); + doc["id"] = "1"; + doc["title"] = "Harry Potter"; + doc["$operations"] = R"({"increment": {"points": 10}})"_json; + ASSERT_TRUE(coll->add(doc.dump(), CREATE).ok()); + + res = coll->search("*", {"title"}, "points:10", {}, {}, {0}, 3, 1, FREQUENCY, {false}).get(); + ASSERT_EQ(1, res["hits"].size()); + ASSERT_EQ(3, res["hits"][0]["document"].size()); + ASSERT_EQ("1", res["hits"][0]["document"]["id"].get()); + ASSERT_EQ("Harry Potter", res["hits"][0]["document"]["title"].get()); + ASSERT_EQ(10, res["hits"][0]["document"]["points"].get()); +} From 32336e4d9399f89de41235f06d35eeeef6852c3b Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Fri, 19 May 2023 21:27:35 +0530 Subject: [PATCH 2/4] Do coercion by default for collection loading / schema alter. --- src/collection.cpp | 2 +- src/collection_manager.cpp | 2 +- test/collection_manager_test.cpp | 49 +++++++++++++++ test/collection_schema_change_test.cpp | 82 ++++++++++++++++++++++---- 4 files changed, 123 insertions(+), 12 deletions(-) diff --git a/src/collection.cpp b/src/collection.cpp index 7d7d031b..b9c5a012 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -4227,7 +4227,7 @@ Option Collection::validate_alter_payload(nlohmann::json& schema_changes, index_operation_t::CREATE, false, fallback_field_type, - DIRTY_VALUES::REJECT); + DIRTY_VALUES::COERCE_OR_REJECT); if(!validate_op.ok()) { std::string err_message = validate_op.error(); diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 52b6ce15..4fe4702e 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -1325,7 +1325,7 @@ Option CollectionManager::load_collection(const nlohmann::json &collection field::flatten_doc(document, collection->get_nested_fields(), {}, true, flattened_fields); } - auto dirty_values = DIRTY_VALUES::DROP; + auto dirty_values = DIRTY_VALUES::COERCE_OR_DROP; num_valid_docs++; diff --git a/test/collection_manager_test.cpp b/test/collection_manager_test.cpp index 38c7e014..e821e51b 100644 --- a/test/collection_manager_test.cpp +++ b/test/collection_manager_test.cpp @@ -793,6 +793,55 @@ TEST_F(CollectionManagerTest, RestoreNestedDocsOnRestart) { collectionManager2.drop_collection("coll1"); } +TEST_F(CollectionManagerTest, RestoreCoercedDocValuesOnRestart) { + nlohmann::json schema = R"({ + "name": "coll1", + "enable_nested_fields": true, + "fields": [ + {"name": "product", "type": "object" }, + {"name": "product.price", "type": "int64" } + ] + })"_json; + + auto op = collectionManager.create_collection(schema); + ASSERT_TRUE(op.ok()); + Collection* coll1 = op.get(); + + auto doc1 = R"({ + "product": {"price": 45.78} + })"_json; + + auto create_op = coll1->add(doc1.dump(), CREATE); + ASSERT_TRUE(create_op.ok()); + + auto res_op = coll1->search("*", {}, "product.price:>0", {}, {}, {0}, 10, 1, + token_ordering::FREQUENCY, {true}); + ASSERT_TRUE(res_op.ok()); + ASSERT_EQ(1, res_op.get()["found"].get()); + + // create a new collection manager to ensure that it restores the records from the disk backed store + CollectionManager& collectionManager2 = CollectionManager::get_instance(); + collectionManager2.init(store, 1.0, "auth_key", quit); + auto load_op = collectionManager2.load(8, 1000); + + if(!load_op.ok()) { + LOG(ERROR) << load_op.error(); + } + + ASSERT_TRUE(load_op.ok()); + + auto restored_coll = collectionManager2.get_collection("coll1").get(); + ASSERT_NE(nullptr, restored_coll); + + res_op = restored_coll->search("*", {}, "product.price:>0", {}, {}, {0}, 10, 1, + token_ordering::FREQUENCY, {true}); + ASSERT_TRUE(res_op.ok()); + ASSERT_EQ(1, res_op.get()["found"].get()); + + collectionManager.drop_collection("coll1"); + collectionManager2.drop_collection("coll1"); +} + TEST_F(CollectionManagerTest, DropCollectionCleanly) { std::ifstream infile(std::string(ROOT_DIR)+"test/multi_field_documents.jsonl"); std::string json_line; diff --git a/test/collection_schema_change_test.cpp b/test/collection_schema_change_test.cpp index 88d8e940..48a64c3a 100644 --- a/test/collection_schema_change_test.cpp +++ b/test/collection_schema_change_test.cpp @@ -542,7 +542,7 @@ TEST_F(CollectionSchemaChangeTest, AbilityToDropAndReAddIndexAtTheSameTime) { nlohmann::json doc; doc["id"] = "0"; - doc["title"] = "123"; + doc["title"] = "Hello"; doc["timestamp"] = 3433232; ASSERT_TRUE(coll1->add(doc.dump()).ok()); @@ -562,7 +562,7 @@ TEST_F(CollectionSchemaChangeTest, AbilityToDropAndReAddIndexAtTheSameTime) { "Existing data for field `title` cannot be coerced into an int32.", alter_op.error()); // existing data should not have been touched - auto res = coll1->search("12", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {true}, 10).get(); + auto res = coll1->search("he", {"title"}, "", {}, {}, {0}, 10, 1, FREQUENCY, {true}, 10).get(); ASSERT_EQ(1, res["hits"].size()); ASSERT_EQ("0", res["hits"][0]["document"]["id"].get()); @@ -586,7 +586,7 @@ TEST_F(CollectionSchemaChangeTest, AbilityToDropAndReAddIndexAtTheSameTime) { ASSERT_EQ(4, res["facet_counts"][0].size()); ASSERT_EQ("title", res["facet_counts"][0]["field_name"]); ASSERT_EQ(1, res["facet_counts"][0]["counts"].size()); - ASSERT_EQ("123", res["facet_counts"][0]["counts"][0]["value"].get()); + ASSERT_EQ("Hello", res["facet_counts"][0]["counts"][0]["value"].get()); // migrate int32 to int64 schema_changes = R"({ @@ -801,7 +801,7 @@ TEST_F(CollectionSchemaChangeTest, DropFieldNotExistingInDocuments) { ASSERT_TRUE(alter_op.ok()); } -TEST_F(CollectionSchemaChangeTest, ChangeFieldToCoercableTypeIsNotAllowed) { +TEST_F(CollectionSchemaChangeTest, ChangeFieldToCoercableTypeIsAllowed) { // optional title field std::vector fields = {field("title", field_types::STRING, false, true, true, "", 1, 1), field("points", field_types::INT32, true),}; @@ -823,9 +823,7 @@ TEST_F(CollectionSchemaChangeTest, ChangeFieldToCoercableTypeIsNotAllowed) { })"_json; auto alter_op = coll1->alter(schema_changes); - ASSERT_FALSE(alter_op.ok()); - ASSERT_EQ("Schema change is incompatible with the type of documents already stored in this collection. " - "Existing data for field `points` cannot be coerced into a string.", alter_op.error()); + ASSERT_TRUE(alter_op.ok()); } TEST_F(CollectionSchemaChangeTest, ChangeFromPrimitiveToDynamicField) { @@ -1140,7 +1138,7 @@ TEST_F(CollectionSchemaChangeTest, DropIntegerFieldAndAddStringValues) { nlohmann::json doc; doc["id"] = "0"; - doc["label"] = 1000; + doc["label"] = "hello"; doc["title"] = "Foo"; auto add_op = coll1->add(doc.dump()); ASSERT_TRUE(add_op.ok()); @@ -1157,7 +1155,7 @@ TEST_F(CollectionSchemaChangeTest, DropIntegerFieldAndAddStringValues) { // add new document with a string label doc["id"] = "1"; - doc["label"] = "abcdef"; + doc["label"] = 1000; doc["title"] = "Bar"; add_op = coll1->add(doc.dump()); ASSERT_TRUE(add_op.ok()); @@ -1173,7 +1171,7 @@ TEST_F(CollectionSchemaChangeTest, DropIntegerFieldAndAddStringValues) { alter_op = coll1->alter(schema_changes); ASSERT_FALSE(alter_op.ok()); ASSERT_EQ("Schema change is incompatible with the type of documents already stored in this collection. " - "Existing data for field `label` cannot be coerced into a string.", alter_op.error()); + "Existing data for field `label` cannot be coerced into an int64.", alter_op.error()); // but should allow the problematic field to be dropped schema_changes = R"({ @@ -1411,6 +1409,70 @@ TEST_F(CollectionSchemaChangeTest, DropAndReAddNestedObject) { ASSERT_EQ(4, schema_map.size()); } +TEST_F(CollectionSchemaChangeTest, UpdateAfterNestedNullValue) { + nlohmann::json schema = R"({ + "name": "coll1", + "enable_nested_fields": true, + "fields": [ + {"name": "lines", "optional": false, "type": "object[]"}, + {"name": "lines.name", "optional": true, "type": "string[]"} + ] + })"_json; + + Collection* coll1 = collectionManager.create_collection(schema).get(); + + nlohmann::json doc = R"( + {"id": "1", "lines": [{"name": null}]} + )"_json; + + auto add_op = coll1->add(doc.dump(), CREATE, "1", DIRTY_VALUES::DROP); + ASSERT_TRUE(add_op.ok()); + + // add new field + + auto schema_changes = R"({ + "fields": [ + {"name": "title", "type": "string", "optional": true} + ] + })"_json; + + auto alter_op = coll1->alter(schema_changes); + ASSERT_TRUE(alter_op.ok()); +} + +TEST_F(CollectionSchemaChangeTest, AlterShouldBeAbleToHandleFieldValueCoercion) { + nlohmann::json schema = R"({ + "name": "coll1", + "enable_nested_fields": true, + "fields": [ + {"name": "product", "optional": false, "type": "object"}, + {"name": "product.price", "type": "int64"}, + {"name": "title", "type": "string"}, + {"name": "description", "type": "string"} + ] + })"_json; + + Collection* coll1 = collectionManager.create_collection(schema).get(); + + nlohmann::json doc = R"( + {"id": "0", "product": {"price": 56.45}, "title": "Title 1", "description": "Description 1"} + )"_json; + + auto add_op = coll1->add(doc.dump(), CREATE, "0", DIRTY_VALUES::COERCE_OR_REJECT); + ASSERT_TRUE(add_op.ok()); + + // drop a field + + auto schema_changes = R"({ + "fields": [ + {"name": "description", "drop": true} + ] + })"_json; + + auto alter_op = coll1->alter(schema_changes); + ASSERT_TRUE(alter_op.ok()); +} + TEST_F(CollectionSchemaChangeTest, GeoFieldSchemaAddition) { nlohmann::json schema = R"({ "name": "coll1", From b7f29aeed12977c51a2bb93a88a8a8819b474905 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Mon, 22 May 2023 17:52:52 +0530 Subject: [PATCH 3/4] Evented query aggregation skeleton. --- include/collection.h | 2 + include/core_api.h | 8 ++ include/event_manager.h | 30 ++++ include/http_data.h | 1 + include/index.h | 3 + include/num_tree.h | 2 + include/popular_queries.h | 51 +++++++ include/query_suggestions.h | 69 ++++++++++ include/raft_server.h | 2 + include/tsconfig.h | 38 ++++++ src/collection.cpp | 20 ++- src/collection_manager.cpp | 22 +++ src/core_api.cpp | 56 ++++++++ src/event_manager.cpp | 84 ++++++++++++ src/http_server.cpp | 11 ++ src/index.cpp | 12 ++ src/main/typesense_server.cpp | 5 + src/num_tree.cpp | 24 ++++ src/popular_queries.cpp | 118 ++++++++++++++++ src/query_suggestions.cpp | 243 +++++++++++++++++++++++++++++++++ src/raft_server.cpp | 15 ++ src/typesense_server_utils.cpp | 15 ++ test/popular_queries_test.cpp | 87 ++++++++++++ 23 files changed, 917 insertions(+), 1 deletion(-) create mode 100644 include/event_manager.h create mode 100644 include/popular_queries.h create mode 100644 include/query_suggestions.h create mode 100644 src/event_manager.cpp create mode 100644 src/popular_queries.cpp create mode 100644 src/query_suggestions.cpp create mode 100644 test/popular_queries_test.cpp diff --git a/include/collection.h b/include/collection.h index caaf897f..c2312659 100644 --- a/include/collection.h +++ b/include/collection.h @@ -553,6 +553,8 @@ public: std::vector& query_by_weights, std::vector& weighted_search_fields, std::vector& reordered_search_fields) const; + + Option truncate_after_top_k(const std::string& field_name, size_t k); }; template diff --git a/include/core_api.h b/include/core_api.h index a549c4a1..bc8d297c 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -141,6 +141,14 @@ bool del_throttle(const std::shared_ptr& req, const std::shared_ptr& req, const std::shared_ptr& res); +// Events + +bool post_create_event(const std::shared_ptr& req, const std::shared_ptr& res); + +bool post_create_event_sink(const std::shared_ptr& req, const std::shared_ptr& res); + +bool del_event_sink(const std::shared_ptr& req, const std::shared_ptr& res); + // Misc helpers void get_collections_for_auth(std::map& req_params, const std::string& body, diff --git a/include/event_manager.h b/include/event_manager.h new file mode 100644 index 00000000..e012ee8d --- /dev/null +++ b/include/event_manager.h @@ -0,0 +1,30 @@ +#pragma once + +#include "json.hpp" +#include "option.h" + +class EventManager { +private: + EventManager() = default; + + ~EventManager() = default; + + static constexpr char* EVENT_TYPE = "type"; + static constexpr char* EVENT_DATA = "data"; + +public: + static EventManager& get_instance() { + static EventManager instance; + return instance; + } + + EventManager(EventManager const&) = delete; + void operator=(EventManager const&) = delete; + + Option create_sink(const nlohmann::json& sink_config, bool write_to_disk = true); + + Option remove_sink(const std::string& name); + + bool add_event(const nlohmann::json& event); + +}; \ No newline at end of file diff --git a/include/http_data.h b/include/http_data.h index 893c1138..c060adc7 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -207,6 +207,7 @@ public: struct http_req { static constexpr const char* AUTH_HEADER = "x-typesense-api-key"; + static constexpr const char* USER_HEADER = "x-typesense-user-id"; static constexpr const char* AGENT_HEADER = "user-agent"; h2o_req_t* _req; diff --git a/include/index.h b/include/index.h index 138c774d..b9aa57f6 100644 --- a/include/index.h +++ b/include/index.h @@ -956,6 +956,9 @@ public: std::map>& included_ids_map, std::vector& included_ids_vec, std::unordered_set& excluded_group_ids) const; + + Option seq_ids_outside_top_k(const std::string& field_name, size_t k, + std::vector& outside_seq_ids); }; template diff --git a/include/num_tree.h b/include/num_tree.h index 5406a109..2170a30e 100644 --- a/include/num_tree.h +++ b/include/num_tree.h @@ -48,6 +48,8 @@ public: size_t size(); + void seq_ids_outside_top_k(size_t k, std::vector& seq_ids); + void contains(const NUM_COMPARATOR& comparator, const int64_t& value, const uint32_t& context_ids_length, uint32_t* const& context_ids, diff --git a/include/popular_queries.h b/include/popular_queries.h new file mode 100644 index 00000000..95493f27 --- /dev/null +++ b/include/popular_queries.h @@ -0,0 +1,51 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +class PopularQueries { +public: + struct QWithTimestamp { + std::string query; + uint64_t timestamp; + + QWithTimestamp(const std::string& query, uint64_t timestamp) : query(query), timestamp(timestamp) {} + }; + + static const size_t QUERY_FINALIZATION_INTERVAL_MICROS = 4 * 1000 * 1000; + +private: + + size_t k; + const size_t max_size; + + // counts aggregated within the current node + tsl::htrie_map local_counts; + std::shared_mutex lmutex; + + std::unordered_map> user_prefix_queries; + std::shared_mutex umutex; + +public: + + PopularQueries(size_t k); + + void add(const std::string& value, const bool live_query, const std::string& user_id, + uint64_t now_ts_us = 0); + + void compact_user_queries(uint64_t now_ts_us); + + void serialize_as_docs(std::string& docs); + + void reset_local_counts(); + + size_t get_k(); + + std::unordered_map> get_user_prefix_queries(); + + tsl::htrie_map get_local_counts(); +}; diff --git a/include/query_suggestions.h b/include/query_suggestions.h new file mode 100644 index 00000000..e42b4c4f --- /dev/null +++ b/include/query_suggestions.h @@ -0,0 +1,69 @@ +#pragma once +#include "popular_queries.h" +#include "option.h" +#include "raft_server.h" +#include +#include +#include +#include + +class QuerySuggestions { +private: + mutable std::mutex mutex; + std::condition_variable cv; + + std::atomic quit = false; + + const size_t QUERY_COMPACTION_INTERVAL_S = 60; + + struct suggestion_config_t { + std::string name; + std::string suggestion_collection; + std::vector query_collections; + size_t max_suggestions; + }; + + // config name => config + std::unordered_map suggestion_configs; + + // query collection => suggestion collections + std::unordered_map> query_collection_mapping; + + // suggestion collection => popular queries + std::unordered_map popular_queries; + + Store* store = nullptr; + + QuerySuggestions() {} + + ~QuerySuggestions(); + +public: + + static constexpr const char* EVENT_SINK_CONFIG_PREFIX = "$ES"; + + static constexpr const char* SINK_TYPE = "query_suggestions"; + + static QuerySuggestions& get_instance() { + static QuerySuggestions instance; + return instance; + } + + QuerySuggestions(QuerySuggestions const&) = delete; + void operator=(QuerySuggestions const&) = delete; + + void init(Store* store); + + void run(ReplicationState* raft_server); + + Option create_index(const nlohmann::json& payload, bool write_to_disk = true); + + Option remove_suggestion_index(const std::string& name); + + void add_suggestion(const std::string& query_collection, + std::string& query, const bool live_query, const std::string& user_id); + + void stop(); + + void dispose(); +}; diff --git a/include/raft_server.h b/include/raft_server.h index 8280338b..0a9c8368 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -237,6 +237,8 @@ public: nlohmann::json get_status(); + std::string get_leader_url() const; + private: friend class ReplicationClosure; diff --git a/include/tsconfig.h b/include/tsconfig.h index 50f5e094..fe7b8f0c 100644 --- a/include/tsconfig.h +++ b/include/tsconfig.h @@ -66,6 +66,10 @@ private: std::atomic reset_peers_on_error; + bool enable_search_analytics; + + uint32_t analytics_flush_interval; + protected: Config() { @@ -90,6 +94,9 @@ protected: this->skip_writes = false; this->log_slow_searches_time_ms = 30 * 1000; this->reset_peers_on_error = false; + + this->enable_search_analytics = false; + this->analytics_flush_interval = 3600; // in seconds } Config(Config const&) { @@ -291,6 +298,10 @@ public: return this->cache_num_entries; } + size_t get_analytics_flush_interval() const { + return this->analytics_flush_interval; + } + size_t get_thread_pool_size() const { return this->thread_pool_size; } @@ -303,6 +314,10 @@ public: return this->enable_access_logging; } + bool get_enable_search_analytics() const { + return this->enable_search_analytics; + } + int get_disk_used_max_percentage() const { return this->disk_used_max_percentage; } @@ -418,6 +433,10 @@ public: this->cache_num_entries = std::stoi(get_env("TYPESENSE_CACHE_NUM_ENTRIES")); } + if(!get_env("TYPESENSE_ANALYTICS_FLUSH_INTERVAL").empty()) { + this->analytics_flush_interval = std::stoi(get_env("TYPESENSE_ANALYTICS_FLUSH_INTERVAL")); + } + if(!get_env("TYPESENSE_THREAD_POOL_SIZE").empty()) { this->thread_pool_size = std::stoi(get_env("TYPESENSE_THREAD_POOL_SIZE")); } @@ -431,6 +450,7 @@ public: } this->enable_access_logging = ("TRUE" == get_env("TYPESENSE_ENABLE_ACCESS_LOGGING")); + this->enable_search_analytics = ("TRUE" == get_env("TYPESENSE_ENABLE_SEARCH_ANALYTICS")); if(!get_env("TYPESENSE_DISK_USED_MAX_PERCENTAGE").empty()) { this->disk_used_max_percentage = std::stoi(get_env("TYPESENSE_DISK_USED_MAX_PERCENTAGE")); @@ -576,6 +596,10 @@ public: this->cache_num_entries = (int) reader.GetInteger("server", "cache-num-entries", 1000); } + if(reader.Exists("server", "analytics-flush-interval")) { + this->analytics_flush_interval = (int) reader.GetInteger("server", "analytics-flush-interval", 3600); + } + if(reader.Exists("server", "thread-pool-size")) { this->thread_pool_size = (int) reader.GetInteger("server", "thread-pool-size", 0); } @@ -589,6 +613,11 @@ public: this->enable_access_logging = (enable_access_logging_str == "true"); } + if(reader.Exists("server", "enable-search-analytics")) { + auto enable_search_analytics_str = reader.Get("server", "enable-search-analytics", "false"); + this->enable_search_analytics = (enable_search_analytics_str == "true"); + } + if(reader.Exists("server", "disk-used-max-percentage")) { this->disk_used_max_percentage = (int) reader.GetInteger("server", "disk-used-max-percentage", 100); } @@ -728,6 +757,10 @@ public: this->cache_num_entries = options.get("cache-num-entries"); } + if(options.exist("analytics-flush-interval")) { + this->analytics_flush_interval = options.get("analytics-flush-interval"); + } + if(options.exist("thread-pool-size")) { this->thread_pool_size = options.get("thread-pool-size"); } @@ -755,6 +788,11 @@ public: if(options.exist("reset-peers-on-error")) { this->reset_peers_on_error = options.get("reset-peers-on-error"); } + + if(options.exist("enable-search-analytics")) { + this->enable_search_analytics = options.get("enable-search-analytics"); + } + } void set_cors_domains(std::string& cors_domains_value) { diff --git a/src/collection.cpp b/src/collection.cpp index b9c5a012..1c3e51b1 100644 --- a/src/collection.cpp +++ b/src/collection.cpp @@ -4769,4 +4769,22 @@ void Collection::process_remove_field_for_embedding_fields(const field& the_fiel } } -} \ No newline at end of file +} + +Option Collection::truncate_after_top_k(const string &field_name, size_t k) { + std::vector seq_ids; + auto op = index->seq_ids_outside_top_k(field_name, k, seq_ids); + + if(!op.ok()) { + return op; + } + + for(auto seq_id: seq_ids) { + auto remove_op = remove_if_found(seq_id); + if(!remove_op.ok()) { + LOG(ERROR) << "Error while truncating top k: " << remove_op.error(); + } + } + + return Option(true); +} diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 4fe4702e..8d45cdc8 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -2,6 +2,8 @@ #include #include #include +#include +#include #include "collection_manager.h" #include "batched_indexer.h" #include "logger.h" @@ -1060,6 +1062,15 @@ Option CollectionManager::do_search(std::map& re nlohmann::json result = result_op.get(); + if(Config::get_instance().get_enable_search_analytics()) { + if(result.count("found") != 0 && result["found"].get() != 0) { + std::string processed_query = raw_query; + Tokenizer::normalize_ascii(processed_query); + QuerySuggestions::get_instance().add_suggestion(collection->get_name(), processed_query, + true, req_params["x-typesense-user-id"]); + } + } + if(exclude_fields.count("search_time_ms") == 0) { result["search_time_ms"] = timeMillis; } @@ -1287,6 +1298,17 @@ Option CollectionManager::load_collection(const nlohmann::json &collection collection->add_synonym(collection_synonym); } + // restore query suggestions configs + std::vector sink_config_jsons; + cm.store->scan_fill(QuerySuggestions::EVENT_SINK_CONFIG_PREFIX, + std::string(QuerySuggestions::EVENT_SINK_CONFIG_PREFIX) + "`", + sink_config_jsons); + + for(const auto& sink_config_json: sink_config_jsons) { + nlohmann::json sink_config = nlohmann::json::parse(sink_config_json); + EventManager::get_instance().create_sink(sink_config, false); + } + // Fetch records from the store and re-create memory index const std::string seq_id_prefix = collection->get_seq_id_collection_prefix(); std::string upper_bound_key = collection->get_seq_id_collection_prefix() + "`"; // cannot inline this diff --git a/src/core_api.cpp b/src/core_api.cpp index b32176d7..3ace3fa8 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -13,6 +13,7 @@ #include "core_api_utils.h" #include "lru/lru.hpp" #include "ratelimit_manager.h" +#include "event_manager.h" using namespace std::chrono_literals; @@ -2032,4 +2033,59 @@ Option> get_api_key_and_ip(const std::string& } return Option>(std::make_pair(api_key, ip)); +} + +bool post_create_event(const std::shared_ptr& req, const std::shared_ptr& res) { + nlohmann::json req_json; + + try { + req_json = nlohmann::json::parse(req->body); + } catch(const std::exception& e) { + LOG(ERROR) << "JSON error: " << e.what(); + res->set_400("Bad JSON."); + return false; + } + + bool success = EventManager::get_instance().add_event(req_json); + if(success) { + res->set_201(R"({"ok": true)"); + return true; + } + + res->set_400(R"({"ok": false)"); + return false; +} + +bool post_create_event_sink(const std::shared_ptr& req, const std::shared_ptr& res) { + // connects an event to a sink, which for now, is another collection + nlohmann::json req_json; + + try { + req_json = nlohmann::json::parse(req->body); + } catch(const std::exception& e) { + LOG(ERROR) << "JSON error: " << e.what(); + res->set_400("Bad JSON."); + return false; + } + + auto op = EventManager::get_instance().create_sink(req_json); + + if(!op.ok()) { + res->set(op.code(), op.error()); + return false; + } + + res->set_201(req_json.dump()); + return true; +} + +bool del_event_sink(const std::shared_ptr& req, const std::shared_ptr& res) { + auto op = EventManager::get_instance().remove_sink(req->params["name"]); + if(!op.ok()) { + res->set(op.code(), op.error()); + return false; + } + + res->set_200(R"({"ok": true)"); + return true; } \ No newline at end of file diff --git a/src/event_manager.cpp b/src/event_manager.cpp new file mode 100644 index 00000000..49aa8185 --- /dev/null +++ b/src/event_manager.cpp @@ -0,0 +1,84 @@ +#include +#include "event_manager.h" + +bool EventManager::add_event(const nlohmann::json& event) { + /* + Sample event payload: + + { + "type": "search", + "data": { + "q": "Nike shoes", + "collections": ["products"] + } + } + */ + + if(!event.contains("type")) { + return false; + } + + const auto& event_type_val = event[EVENT_TYPE]; + + if(event_type_val.is_string()) { + const std::string& event_type = event_type_val.get(); + if(event_type == "search") { + if(!event.contains("data")) { + return false; + } + + const auto& event_data_val = event[EVENT_DATA]; + + if(!event_data_val.is_object()) { + return false; + } + + const auto& event_data_query_it = event_data_val["q"]; + + if(!event_data_query_it.is_string() || !event_data_val["collections"].is_array()) { + return false; + } + + for(const auto& coll: event_data_val["collections"]) { + if(!coll.is_string()) { + return false; + } + + std::string query = event_data_query_it.get(); + QuerySuggestions::get_instance().add_suggestion(coll.get(), query, false, ""); + } + } + } + + return true; +} + +Option EventManager::create_sink(const nlohmann::json& sink_config, bool write_to_disk) { + if(!sink_config.contains("name") || !sink_config["name"].is_string()) { + return Option(400, "Request payload contains invalid name."); + } + + if(!sink_config.contains("type") || !sink_config["type"].is_string()) { + return Option(400, "Request payload contains invalid type."); + } + + if(!sink_config.contains("source") || !sink_config["source"].is_object()) { + return Option(400, "Request payload contains invalid source."); + } + + if(!sink_config.contains("destination") || !sink_config["destination"].is_object()) { + return Option(400, "Request payload contains invalid destination."); + } + + if(sink_config.contains("type") && sink_config["type"] == QuerySuggestions::SINK_TYPE) { + QuerySuggestions::get_instance().create_index(sink_config, write_to_disk); + } else { + return Option(400, ("Missing or invalid event sink type.")); + } + + return Option(200); +} + +Option EventManager::remove_sink(const std::string& name) { + return QuerySuggestions::get_instance().remove_suggestion_index(name); +} diff --git a/src/http_server.cpp b/src/http_server.cpp index 7756b481..855fe2c2 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -407,6 +407,17 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { api_auth_key_sent = query_map[http_req::AUTH_HEADER]; } + // extract user id from header, if not already present as GET param + ssize_t user_header_cursor = h2o_find_header_by_str(&req->headers, http_req::USER_HEADER, strlen(http_req::USER_HEADER), -1); + + if(user_header_cursor != -1) { + h2o_iovec_t & slot = req->headers.entries[user_header_cursor].value; + std::string user_id_sent = std::string(slot.base, slot.len); + query_map[http_req::USER_HEADER] = user_id_sent; + } else if(query_map.count(http_req::USER_HEADER) == 0) { + query_map[http_req::USER_HEADER] = client_ip; + } + route_path *rpath = nullptr; uint64_t route_hash = h2o_handler->http_server->find_route(path_parts, http_method, &rpath); diff --git a/src/index.cpp b/src/index.cpp index 66a1470d..f70a4f55 100644 --- a/src/index.cpp +++ b/src/index.cpp @@ -6275,6 +6275,18 @@ size_t Index::num_seq_ids() const { return seq_ids->num_ids(); } +Option Index::seq_ids_outside_top_k(const std::string& field_name, size_t k, + std::vector& outside_seq_ids) { + auto field_it = numerical_index.find(field_name); + + if(field_it == sort_index.end()) { + return Option(400, "Field not found in numerical index."); + } + + field_it->second->seq_ids_outside_top_k(k, outside_seq_ids); + return Option(true); +} + void Index::resolve_space_as_typos(std::vector& qtokens, const string& field_name, std::vector>& resolved_queries) const { diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 6dad2175..915b5a91 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -68,6 +68,11 @@ void master_server_routes() { server->put("/presets/:name", put_upsert_preset); server->del("/presets/:name", del_preset); + // events + server->post("/events", post_create_event); + server->post("/events/sinks", post_create_event_sink); + server->del("/events/sinks/:name", del_event_sink); + // meta server->get("/metrics.json", get_metrics_json); server->get("/stats.json", get_stats_json); diff --git a/src/num_tree.cpp b/src/num_tree.cpp index 1bcdbc9f..c59cb008 100644 --- a/src/num_tree.cpp +++ b/src/num_tree.cpp @@ -318,6 +318,30 @@ void num_tree_t::contains(const NUM_COMPARATOR& comparator, const int64_t& value result_ids = out; } +void num_tree_t::seq_ids_outside_top_k(size_t k, std::vector &seq_ids) { + size_t ids_skipped = 0; + + for (auto iter = int64map.rbegin(); iter != int64map.rend(); ++iter) { + auto num_ids = ids_t::num_ids(iter->second); + if(ids_skipped > k) { + ids_t::uncompress(iter->second, seq_ids); + } else if((ids_skipped + num_ids) > k) { + // this element hits the limit, so we pick partial IDs to satisfy k + std::vector ids; + ids_t::uncompress(iter->second, ids); + for(size_t i = 0; i < ids.size(); i++) { + auto seq_id = ids[i]; + if(ids_skipped + i >= k) { + seq_ids.push_back(seq_id); + } + } + } + + ids_skipped += num_ids; + } +} + + size_t num_tree_t::size() { return int64map.size(); } diff --git a/src/popular_queries.cpp b/src/popular_queries.cpp new file mode 100644 index 00000000..ba46ff72 --- /dev/null +++ b/src/popular_queries.cpp @@ -0,0 +1,118 @@ +#include "popular_queries.h" +#include "logger.h" +#include +#include +#include "string_utils.h" + +PopularQueries::PopularQueries(size_t k) : k(k), max_size(k * 2) { + +} + +void PopularQueries::add(const std::string& key, const bool live_query, const std::string& user_id, uint64_t now_ts_us) { + if(live_query) { + // live query must be aggregated first to their final form as they could be prefix queries + if(now_ts_us == 0) { + now_ts_us = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + } + + if(!umutex.try_lock()) { + // instead of locking we just skip incrementing keys during consolidation time + return ; + } + + auto& queries = user_prefix_queries[user_id]; + if(queries.size() < 100) { + queries.emplace_back(key, now_ts_us); + } + + umutex.unlock(); + + } else { + if(!lmutex.try_lock()) { + // instead of locking we just skip incrementing keys during consolidation time + return ; + } + + auto it = local_counts.find(key); + + if(it != local_counts.end()) { + it.value()++; + } else if(local_counts.size() < max_size) { + // skip count when map has become too large (to prevent abuse) + local_counts.emplace(key, 1); + } + + lmutex.unlock(); + } +} + +void PopularQueries::serialize_as_docs(std::string& docs) { + std::shared_lock lk(lmutex); + + std::string key_buffer; + for(auto it = local_counts.begin(); it != local_counts.end(); ++it) { + it.key(key_buffer); + nlohmann::json doc; + doc["id"] = std::to_string(StringUtils::hash_wy(key_buffer.c_str(), key_buffer.size())); + doc["q"] = key_buffer; + doc["$operations"]["increment"]["count"] = it.value(); + docs += doc.dump() + "\n"; + } + + if(!docs.empty()) { + docs.pop_back(); + } +} + +void PopularQueries::reset_local_counts() { + std::unique_lock lk(lmutex); + local_counts.clear(); +} + +size_t PopularQueries::get_k() { + return k; +} + +void PopularQueries::compact_user_queries(uint64_t now_ts_us) { + std::unique_lock lk(umutex); + std::vector keys_to_delete; + + for(auto& kv: user_prefix_queries) { + auto& queries = kv.second; + int64_t last_consolidated_index = -1; + for(uint32_t i = 0; i < queries.size(); i++) { + if(now_ts_us - queries[i].timestamp < QUERY_FINALIZATION_INTERVAL_MICROS) { + break; + } + + uint64_t diff_micros = (i == queries.size()-1) ? (now_ts_us - queries[i].timestamp) : + (queries[i + 1].timestamp - queries[i].timestamp); + + if(diff_micros > QUERY_FINALIZATION_INTERVAL_MICROS) { + add(queries[i].query, false, ""); + last_consolidated_index = i; + } + } + + queries.erase(queries.begin(), queries.begin() + last_consolidated_index+1); + + if(queries.empty()) { + keys_to_delete.push_back(kv.first); + } + } + + for(auto& key: keys_to_delete) { + user_prefix_queries.erase(key); + } +} + +std::unordered_map> PopularQueries::get_user_prefix_queries() { + std::unique_lock lk(umutex); + return user_prefix_queries; +} + +tsl::htrie_map PopularQueries::get_local_counts() { + std::unique_lock lk(lmutex); + return local_counts; +} diff --git a/src/query_suggestions.cpp b/src/query_suggestions.cpp new file mode 100644 index 00000000..a8bafc00 --- /dev/null +++ b/src/query_suggestions.cpp @@ -0,0 +1,243 @@ +#include +#include +#include "query_suggestions.h" +#include "tokenizer.h" +#include "http_client.h" +#include "collection_manager.h" + +Option QuerySuggestions::create_index(const nlohmann::json& payload, bool write_to_disk) { + /* + Sample payload: + + { + "name": "top_queries", + "type": "query_suggestions", + "max_suggestions": 1000, + "source": { + "collections": ["brands", "products"] + }, + "destination": { + "collection": "top_queries" + } + } + */ + + // structural payload validation is done upstream, e.g. presence of source and destination is validated + // specific validations will be done here + + const std::string& suggestion_config_name = payload["name"].get(); + + size_t max_suggestions = 1000; + + if(payload.contains("max_suggestions") && payload["max_suggestions"].is_number_integer()) { + max_suggestions = payload["max_suggestions"].get(); + } + + if(suggestion_configs.find(suggestion_config_name) != suggestion_configs.end()) { + return Option(400, "There's already another configuration with the name `" + + suggestion_config_name + "`."); + } + + if(!payload["source"].contains("collections") || !payload["source"]["collections"].is_array()) { + return Option(400, "Must contain a valid list of source collections."); + } + + if(!payload["destination"].contains("collection") || !payload["destination"]["collection"].is_string()) { + return Option(400, "Must contain a valid destination collection."); + } + + const std::string& suggestion_collection = payload["destination"]["collection"].get(); + suggestion_config_t suggestion_config; + suggestion_config.name = suggestion_config_name; + suggestion_config.suggestion_collection = suggestion_collection; + suggestion_config.max_suggestions = max_suggestions; + + for(const auto& coll: payload["source"]["collections"]) { + if(!coll.is_string()) { + return Option(400, "Must contain a valid list of source collection names."); + } + + const std::string& src_collection = coll.get(); + suggestion_config.query_collections.push_back(src_collection); + } + + std::unique_lock lock(mutex); + + suggestion_configs.emplace(suggestion_config_name, suggestion_config); + + for(const auto& query_coll: suggestion_config.query_collections) { + query_collection_mapping[query_coll].push_back(suggestion_collection); + } + + PopularQueries* popularQueries = new PopularQueries(max_suggestions); + popular_queries.emplace(suggestion_collection, popularQueries); + + if(write_to_disk) { + auto suggestion_key = std::string(EVENT_SINK_CONFIG_PREFIX) + "_" + suggestion_config_name; + bool inserted = store->insert(suggestion_key, payload.dump()); + if(!inserted) { + return Option(500, "Error while storing the suggestion config to disk."); + } + } + + return Option(payload); +} + +QuerySuggestions::~QuerySuggestions() { + std::unique_lock lock(mutex); + + for(auto& kv: popular_queries) { + delete kv.second; + } +} + +Option QuerySuggestions::remove_suggestion_index(const std::string &name) { + std::unique_lock lock(mutex); + + auto suggestion_configs_it = suggestion_configs.find(name); + + if(suggestion_configs_it == suggestion_configs.end()) { + return Option(404, "Sink not found."); + } + + const auto& suggestion_collection = suggestion_configs_it->second.suggestion_collection; + + for(const auto& query_collection: suggestion_configs_it->second.query_collections) { + query_collection_mapping.erase(query_collection); + } + + if(popular_queries.count(suggestion_collection) != 0) { + delete popular_queries[suggestion_collection]; + popular_queries.erase(suggestion_collection); + } + + suggestion_configs.erase(name); + + auto suggestion_key = std::string(EVENT_SINK_CONFIG_PREFIX) + "_" + name; + bool erased = store->remove(suggestion_key); + if(!erased) { + return Option(500, "Error while removing the sink config from disk."); + } + + return Option(true); +} + +void QuerySuggestions::add_suggestion(const std::string &query_collection, std::string &query, + const bool live_query, const std::string& user_id) { + // look up suggestion collections for the query collection + std::unique_lock lock(mutex); + const auto& suggestion_collections_it = query_collection_mapping.find(query_collection); + if(suggestion_collections_it != query_collection_mapping.end()) { + for(const auto& suggestion_collection: suggestion_collections_it->second) { + const auto& popular_queries_it = popular_queries.find(suggestion_collection); + if(popular_queries_it != popular_queries.end()) { + Tokenizer::normalize_ascii(query); + popular_queries_it->second->add(query, live_query, user_id); + } + } + } +} + +void QuerySuggestions::run(ReplicationState* raft_server) { + uint64_t prev_persistence_s = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + + while(!quit) { + std::unique_lock lk(mutex); + cv.wait_for(lk, std::chrono::seconds(QUERY_COMPACTION_INTERVAL_S), [&] { return quit.load(); }); + + //LOG(INFO) << "QuerySuggestions::run"; + + if(quit) { + lk.unlock(); + break; + } + + for(const auto& suggestion_config: suggestion_configs) { + const std::string& sink_name = suggestion_config.first; + const std::string& suggestion_coll = suggestion_config.second.suggestion_collection; + + auto popular_queries_it = popular_queries.find(suggestion_coll); + if(popular_queries_it == popular_queries.end()) { + continue; + } + + // need to prepare the counts as JSON docs for import into the suggestion collection + // {"id": "432432", "q": "foo", "$operations": {"increment": {"count": 100}}} + + PopularQueries* popularQueries = popular_queries_it->second; + + // aggregate prefix queries to their final form + auto now = std::chrono::system_clock::now().time_since_epoch(); + auto now_ts_us = std::chrono::duration_cast(now).count(); + popularQueries->compact_user_queries(now_ts_us); + + auto now_ts_seconds = std::chrono::duration_cast(now).count(); + + if(now_ts_seconds - prev_persistence_s < Config::get_instance().get_analytics_flush_interval()) { + // we will persist aggregation every hour + continue; + } + + prev_persistence_s = now_ts_seconds; + + std::string import_payload; + popularQueries->serialize_as_docs(import_payload); + + if(import_payload.empty()) { + continue; + } + + // send http request + std::string leader_url = raft_server->get_leader_url(); + if(!leader_url.empty()) { + const std::string& resource_url = leader_url + "collections/" + suggestion_coll + + "/documents/import?action=emplace"; + std::string res; + std::map res_headers; + long status_code = HttpClient::post_response(resource_url, import_payload, + res, res_headers, 10*1000); + + if(status_code != 200) { + LOG(ERROR) << "Error while sending query suggestions events to leader. " + << "Status code: " << status_code << ", response: " << res; + } else { + LOG(INFO) << "Sent query suggestions to leader for aggregation."; + popularQueries->reset_local_counts(); + + if(raft_server->is_leader()) { + // try to run top-K compaction of suggestion collection + auto coll = CollectionManager::get_instance().get_collection(suggestion_coll); + if (coll == nullptr) { + LOG(ERROR) << "No collection found for suggestions aggregation: " + suggestion_coll; + continue; + } + + coll->truncate_after_top_k("count", popularQueries->get_k()); + } + } + } + } + + lk.unlock(); + } + + dispose(); +} + +void QuerySuggestions::stop() { + quit = true; + cv.notify_all(); +} + +void QuerySuggestions::dispose() { + for(auto& kv: popular_queries) { + delete kv.second; + } + + popular_queries.clear(); +} + +void QuerySuggestions::init(Store* store) { + this->store = store; +} diff --git a/src/raft_server.cpp b/src/raft_server.cpp index a8432bee..14c7323f 100644 --- a/src/raft_server.cpp +++ b/src/raft_server.cpp @@ -955,6 +955,21 @@ bool ReplicationState::get_ext_snapshot_succeeded() { return ext_snapshot_succeeded; } +std::string ReplicationState::get_leader_url() const { + std::shared_lock lock(node_mutex); + + if(node->leader_id().is_empty()) { + LOG(ERROR) << "Could not get leader status, as node does not have a leader!"; + return ""; + } + + const std::string & leader_addr = node->leader_id().to_string(); + lock.unlock(); + + const std::string protocol = api_uses_ssl ? "https" : "http"; + return get_node_url_path(leader_addr, "/", protocol); +} + void TimedSnapshotClosure::Run() { // Auto delete this after Done() std::unique_ptr self_guard(this); diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index c362f60c..73a15e2b 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "core_api.h" #include "ratelimit_manager.h" @@ -94,6 +95,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.add("config", '\0', "Path to the configuration file.", false, ""); options.add("enable-access-logging", '\0', "Enable access logging.", false, false); + options.add("enable-search-analytics", '\0', "Enable search analytics.", false, false); options.add("disk-used-max-percentage", '\0', "Reject writes when used disk space exceeds this percentage. Default: 100 (never reject).", false, 100); options.add("memory-used-max-percentage", '\0', "Reject writes when memory usage exceeds this percentage. Default: 100 (never reject).", false, 100); options.add("skip-writes", '\0', "Skip all writes except config changes. Default: false.", false, false); @@ -101,6 +103,7 @@ void init_cmdline_options(cmdline::parser & options, int argc, char **argv) { options.add("log-slow-searches-time-ms", '\0', "When >= 0, searches that take longer than this duration are logged.", false, 30*1000); options.add("cache-num-entries", '\0', "Number of entries to cache.", false, 1000); + options.add("analytics-flush-interval", '\0', "Frequency of persisting analytics data to disk (in seconds).", false, 3600); // DEPRECATED options.add("listen-address", 'h', "[DEPRECATED: use `api-address`] Address to which Typesense API service binds.", false, "0.0.0.0"); @@ -394,6 +397,8 @@ int run_server(const Config & config, const std::string & version, void (*master HttpClient & httpClient = HttpClient::get_instance(); httpClient.init(config.get_api_key()); + QuerySuggestions::get_instance().init(&store); + server = new HttpServer( version, config.get_api_address(), @@ -450,6 +455,10 @@ int run_server(const Config & config, const std::string & version, void (*master batch_indexer->run(); }); + std::thread event_sink_thread([&replication_state]() { + QuerySuggestions::get_instance().run(&replication_state); + }); + std::string path_to_nodes = config.get_nodes(); start_raft_server(replication_state, state_dir, path_to_nodes, config.get_peering_address(), @@ -466,6 +475,12 @@ int run_server(const Config & config, const std::string & version, void (*master LOG(INFO) << "Waiting for batch indexing thread to be done..."; batch_indexing_thread.join(); + LOG(INFO) << "Shutting down event sink thread..."; + QuerySuggestions::get_instance().stop(); + + LOG(INFO) << "Waiting for event sink thread to be done..."; + event_sink_thread.join(); + LOG(INFO) << "Shutting down server_thread_pool"; server_thread_pool.shutdown(); diff --git a/test/popular_queries_test.cpp b/test/popular_queries_test.cpp new file mode 100644 index 00000000..5877243d --- /dev/null +++ b/test/popular_queries_test.cpp @@ -0,0 +1,87 @@ +#include +#include "popular_queries.h" +#include "logger.h" + +class PopularQueriesTest : public ::testing::Test { +protected: + virtual void SetUp() { + + } + + virtual void TearDown() { + + } +}; + +TEST_F(PopularQueriesTest, PrefixQueryCompaction) { + PopularQueries pq(10); + + auto now_ts_us = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + + // compaction when no queries have been entered + pq.compact_user_queries(now_ts_us); + auto queries = pq.get_user_prefix_queries(); + ASSERT_TRUE(queries.empty()); + + // compaction after user has typed first prefix but before compaction interval has happened + pq.add("f", true, "0", now_ts_us+1); + pq.compact_user_queries(now_ts_us+2); + queries = pq.get_user_prefix_queries(); + ASSERT_EQ(1, queries.size()); + ASSERT_EQ(1, queries.count("0")); + ASSERT_EQ(1, queries["0"].size()); + ASSERT_EQ("f", queries["0"][0].query); + ASSERT_EQ(now_ts_us+1, queries["0"][0].timestamp); + ASSERT_EQ(0, pq.get_local_counts().size()); + + // compaction interval has happened + pq.compact_user_queries(now_ts_us+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100); + queries = pq.get_user_prefix_queries(); + ASSERT_EQ(0, queries.size()); + auto local_counts = pq.get_local_counts(); + ASSERT_EQ(1, local_counts.size()); + ASSERT_EQ(1, local_counts.count("f")); + ASSERT_EQ(1, local_counts["f"]); + + // 3 letter search + pq.reset_local_counts(); + pq.add("f", true, "0", now_ts_us+1); + pq.add("fo", true, "0", now_ts_us+2); + pq.add("foo", true, "0", now_ts_us+3); + pq.compact_user_queries(now_ts_us+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100); + queries = pq.get_user_prefix_queries(); + ASSERT_EQ(0, queries.size()); + local_counts = pq.get_local_counts(); + ASSERT_EQ(1, local_counts.size()); + ASSERT_EQ(1, local_counts.count("foo")); + ASSERT_EQ(1, local_counts["foo"]); + + // 3 letter search + start of next search + pq.reset_local_counts(); + pq.add("f", true, "0", now_ts_us+1); + pq.add("fo", true, "0", now_ts_us+2); + pq.add("foo", true, "0", now_ts_us+3); + pq.add("b", true, "0", now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100); + pq.compact_user_queries(now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100+1); + queries = pq.get_user_prefix_queries(); + ASSERT_EQ(1, queries.size()); + ASSERT_EQ(1, queries["0"].size()); + ASSERT_EQ("b", queries["0"][0].query); + local_counts = pq.get_local_counts(); + ASSERT_EQ(1, local_counts.size()); + ASSERT_EQ(1, local_counts.count("foo")); + ASSERT_EQ(1, local_counts["foo"]); + + // continue with that query + auto prev_ts = now_ts_us+3+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+100+1; + pq.add("ba", true, "0", prev_ts+1); + pq.add("bar", true, "0", prev_ts+2); + pq.compact_user_queries(prev_ts+2+PopularQueries::QUERY_FINALIZATION_INTERVAL_MICROS+1); + queries = pq.get_user_prefix_queries(); + ASSERT_EQ(0, queries.size()); + local_counts = pq.get_local_counts(); + ASSERT_EQ(2, local_counts.size()); + ASSERT_EQ(1, local_counts.count("bar")); + ASSERT_EQ(1, local_counts["bar"]); +} From c349148e2de835888de0af2e625f3ccd6378da34 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 23 May 2023 14:26:46 +0530 Subject: [PATCH 4/4] Refactor query aggregation. --- ...uery_suggestions.h => analytics_manager.h} | 21 ++++--- include/core_api.h | 4 +- include/event_manager.h | 4 -- ..._suggestions.cpp => analytics_manager.cpp} | 59 +++++++++++-------- src/collection_manager.cpp | 20 ++++--- src/core_api.cpp | 12 ++-- src/event_manager.cpp | 34 +---------- src/main/typesense_server.cpp | 8 +-- src/typesense_server_utils.cpp | 8 +-- 9 files changed, 73 insertions(+), 97 deletions(-) rename include/{query_suggestions.h => analytics_manager.h} (72%) rename src/{query_suggestions.cpp => analytics_manager.cpp} (80%) diff --git a/include/query_suggestions.h b/include/analytics_manager.h similarity index 72% rename from include/query_suggestions.h rename to include/analytics_manager.h index e42b4c4f..239319fe 100644 --- a/include/query_suggestions.h +++ b/include/analytics_manager.h @@ -7,7 +7,7 @@ #include #include -class QuerySuggestions { +class AnalyticsManager { private: mutable std::mutex mutex; std::condition_variable cv; @@ -34,29 +34,28 @@ private: Store* store = nullptr; - QuerySuggestions() {} + AnalyticsManager() {} - ~QuerySuggestions(); + ~AnalyticsManager(); public: - static constexpr const char* EVENT_SINK_CONFIG_PREFIX = "$ES"; + static constexpr const char* ANALYTICS_CONFIG_PREFIX = "$AC"; + static constexpr const char* RESOURCE_TYPE = "popular_queries"; - static constexpr const char* SINK_TYPE = "query_suggestions"; - - static QuerySuggestions& get_instance() { - static QuerySuggestions instance; + static AnalyticsManager& get_instance() { + static AnalyticsManager instance; return instance; } - QuerySuggestions(QuerySuggestions const&) = delete; - void operator=(QuerySuggestions const&) = delete; + AnalyticsManager(AnalyticsManager const&) = delete; + void operator=(AnalyticsManager const&) = delete; void init(Store* store); void run(ReplicationState* raft_server); - Option create_index(const nlohmann::json& payload, bool write_to_disk = true); + Option create_index(nlohmann::json& payload, bool write_to_disk = true); Option remove_suggestion_index(const std::string& name); diff --git a/include/core_api.h b/include/core_api.h index bc8d297c..4a18a26f 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -145,9 +145,9 @@ bool del_exceed(const std::shared_ptr& req, const std::shared_ptr& req, const std::shared_ptr& res); -bool post_create_event_sink(const std::shared_ptr& req, const std::shared_ptr& res); +bool post_create_analytics_popular_queries(const std::shared_ptr& req, const std::shared_ptr& res); -bool del_event_sink(const std::shared_ptr& req, const std::shared_ptr& res); +bool del_analytics_popular_queries(const std::shared_ptr& req, const std::shared_ptr& res); // Misc helpers diff --git a/include/event_manager.h b/include/event_manager.h index e012ee8d..51fe6391 100644 --- a/include/event_manager.h +++ b/include/event_manager.h @@ -21,10 +21,6 @@ public: EventManager(EventManager const&) = delete; void operator=(EventManager const&) = delete; - Option create_sink(const nlohmann::json& sink_config, bool write_to_disk = true); - - Option remove_sink(const std::string& name); - bool add_event(const nlohmann::json& event); }; \ No newline at end of file diff --git a/src/query_suggestions.cpp b/src/analytics_manager.cpp similarity index 80% rename from src/query_suggestions.cpp rename to src/analytics_manager.cpp index a8bafc00..2d51d215 100644 --- a/src/query_suggestions.cpp +++ b/src/analytics_manager.cpp @@ -1,18 +1,17 @@ #include #include -#include "query_suggestions.h" +#include "analytics_manager.h" #include "tokenizer.h" #include "http_client.h" #include "collection_manager.h" -Option QuerySuggestions::create_index(const nlohmann::json& payload, bool write_to_disk) { +Option AnalyticsManager::create_index(nlohmann::json& payload, bool write_to_disk) { /* Sample payload: { "name": "top_queries", - "type": "query_suggestions", - "max_suggestions": 1000, + "limit": 1000, "source": { "collections": ["brands", "products"] }, @@ -22,28 +21,37 @@ Option QuerySuggestions::create_index(const nlohmann::json& payl } */ - // structural payload validation is done upstream, e.g. presence of source and destination is validated - // specific validations will be done here + if(!payload.contains("name") || !payload["name"].is_string()) { + return Option(400, "Bad or missing name."); + } + + if(!payload.contains("source") || !payload["source"].is_object()) { + return Option(400, "Bad or missing source."); + } + + if(!payload.contains("destination") || !payload["destination"].is_object()) { + return Option(400, "Bad or missing destination."); + } const std::string& suggestion_config_name = payload["name"].get(); size_t max_suggestions = 1000; - if(payload.contains("max_suggestions") && payload["max_suggestions"].is_number_integer()) { - max_suggestions = payload["max_suggestions"].get(); + if(payload.contains("limit") && payload["limit"].is_number_integer()) { + max_suggestions = payload["limit"].get(); } if(suggestion_configs.find(suggestion_config_name) != suggestion_configs.end()) { - return Option(400, "There's already another configuration with the name `" + + return Option(400, "There's already another configuration with the name `" + suggestion_config_name + "`."); } if(!payload["source"].contains("collections") || !payload["source"]["collections"].is_array()) { - return Option(400, "Must contain a valid list of source collections."); + return Option(400, "Must contain a valid list of source collections."); } if(!payload["destination"].contains("collection") || !payload["destination"]["collection"].is_string()) { - return Option(400, "Must contain a valid destination collection."); + return Option(400, "Must contain a valid destination collection."); } const std::string& suggestion_collection = payload["destination"]["collection"].get(); @@ -54,7 +62,7 @@ Option QuerySuggestions::create_index(const nlohmann::json& payl for(const auto& coll: payload["source"]["collections"]) { if(!coll.is_string()) { - return Option(400, "Must contain a valid list of source collection names."); + return Option(400, "Must contain a valid list of source collection names."); } const std::string& src_collection = coll.get(); @@ -73,17 +81,18 @@ Option QuerySuggestions::create_index(const nlohmann::json& payl popular_queries.emplace(suggestion_collection, popularQueries); if(write_to_disk) { - auto suggestion_key = std::string(EVENT_SINK_CONFIG_PREFIX) + "_" + suggestion_config_name; + payload["type"] = RESOURCE_TYPE; + auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + suggestion_config_name; bool inserted = store->insert(suggestion_key, payload.dump()); if(!inserted) { - return Option(500, "Error while storing the suggestion config to disk."); + return Option(500, "Error while storing the config to disk."); } } - return Option(payload); + return Option(true); } -QuerySuggestions::~QuerySuggestions() { +AnalyticsManager::~AnalyticsManager() { std::unique_lock lock(mutex); for(auto& kv: popular_queries) { @@ -91,13 +100,13 @@ QuerySuggestions::~QuerySuggestions() { } } -Option QuerySuggestions::remove_suggestion_index(const std::string &name) { +Option AnalyticsManager::remove_suggestion_index(const std::string &name) { std::unique_lock lock(mutex); auto suggestion_configs_it = suggestion_configs.find(name); if(suggestion_configs_it == suggestion_configs.end()) { - return Option(404, "Sink not found."); + return Option(404, "Index not found."); } const auto& suggestion_collection = suggestion_configs_it->second.suggestion_collection; @@ -113,16 +122,16 @@ Option QuerySuggestions::remove_suggestion_index(const std::string &name) suggestion_configs.erase(name); - auto suggestion_key = std::string(EVENT_SINK_CONFIG_PREFIX) + "_" + name; + auto suggestion_key = std::string(ANALYTICS_CONFIG_PREFIX) + "_" + name; bool erased = store->remove(suggestion_key); if(!erased) { - return Option(500, "Error while removing the sink config from disk."); + return Option(500, "Error while deleting from disk."); } return Option(true); } -void QuerySuggestions::add_suggestion(const std::string &query_collection, std::string &query, +void AnalyticsManager::add_suggestion(const std::string &query_collection, std::string &query, const bool live_query, const std::string& user_id) { // look up suggestion collections for the query collection std::unique_lock lock(mutex); @@ -138,7 +147,7 @@ void QuerySuggestions::add_suggestion(const std::string &query_collection, std:: } } -void QuerySuggestions::run(ReplicationState* raft_server) { +void AnalyticsManager::run(ReplicationState* raft_server) { uint64_t prev_persistence_s = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()).count(); @@ -225,12 +234,12 @@ void QuerySuggestions::run(ReplicationState* raft_server) { dispose(); } -void QuerySuggestions::stop() { +void AnalyticsManager::stop() { quit = true; cv.notify_all(); } -void QuerySuggestions::dispose() { +void AnalyticsManager::dispose() { for(auto& kv: popular_queries) { delete kv.second; } @@ -238,6 +247,6 @@ void QuerySuggestions::dispose() { popular_queries.clear(); } -void QuerySuggestions::init(Store* store) { +void AnalyticsManager::init(Store* store) { this->store = store; } diff --git a/src/collection_manager.cpp b/src/collection_manager.cpp index 8d45cdc8..b1ba49a8 100644 --- a/src/collection_manager.cpp +++ b/src/collection_manager.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include "collection_manager.h" #include "batched_indexer.h" @@ -1066,7 +1066,7 @@ Option CollectionManager::do_search(std::map& re if(result.count("found") != 0 && result["found"].get() != 0) { std::string processed_query = raw_query; Tokenizer::normalize_ascii(processed_query); - QuerySuggestions::get_instance().add_suggestion(collection->get_name(), processed_query, + AnalyticsManager::get_instance().add_suggestion(collection->get_name(), processed_query, true, req_params["x-typesense-user-id"]); } } @@ -1299,14 +1299,16 @@ Option CollectionManager::load_collection(const nlohmann::json &collection } // restore query suggestions configs - std::vector sink_config_jsons; - cm.store->scan_fill(QuerySuggestions::EVENT_SINK_CONFIG_PREFIX, - std::string(QuerySuggestions::EVENT_SINK_CONFIG_PREFIX) + "`", - sink_config_jsons); + std::vector analytics_config_jsons; + cm.store->scan_fill(AnalyticsManager::ANALYTICS_CONFIG_PREFIX, + std::string(AnalyticsManager::ANALYTICS_CONFIG_PREFIX) + "`", + analytics_config_jsons); - for(const auto& sink_config_json: sink_config_jsons) { - nlohmann::json sink_config = nlohmann::json::parse(sink_config_json); - EventManager::get_instance().create_sink(sink_config, false); + for(const auto& analytics_config_json: analytics_config_jsons) { + nlohmann::json analytics_config = nlohmann::json::parse(analytics_config_json); + if(analytics_config["type"] == AnalyticsManager::RESOURCE_TYPE) { + AnalyticsManager::get_instance().create_index(analytics_config, false); + } } // Fetch records from the store and re-create memory index diff --git a/src/core_api.cpp b/src/core_api.cpp index 3ace3fa8..3d33ff86 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "typesense_server_utils.h" #include "core_api.h" #include "string_utils.h" @@ -2056,8 +2057,7 @@ bool post_create_event(const std::shared_ptr& req, const std::shared_p return false; } -bool post_create_event_sink(const std::shared_ptr& req, const std::shared_ptr& res) { - // connects an event to a sink, which for now, is another collection +bool post_create_analytics_popular_queries(const std::shared_ptr& req, const std::shared_ptr& res) { nlohmann::json req_json; try { @@ -2068,7 +2068,7 @@ bool post_create_event_sink(const std::shared_ptr& req, const std::sha return false; } - auto op = EventManager::get_instance().create_sink(req_json); + auto op = AnalyticsManager::get_instance().create_index(req_json); if(!op.ok()) { res->set(op.code(), op.error()); @@ -2079,8 +2079,8 @@ bool post_create_event_sink(const std::shared_ptr& req, const std::sha return true; } -bool del_event_sink(const std::shared_ptr& req, const std::shared_ptr& res) { - auto op = EventManager::get_instance().remove_sink(req->params["name"]); +bool del_analytics_popular_queries(const std::shared_ptr& req, const std::shared_ptr& res) { + auto op = AnalyticsManager::get_instance().remove_suggestion_index(req->params["name"]); if(!op.ok()) { res->set(op.code(), op.error()); return false; @@ -2088,4 +2088,4 @@ bool del_event_sink(const std::shared_ptr& req, const std::shared_ptr< res->set_200(R"({"ok": true)"); return true; -} \ No newline at end of file +} diff --git a/src/event_manager.cpp b/src/event_manager.cpp index 49aa8185..e0537d85 100644 --- a/src/event_manager.cpp +++ b/src/event_manager.cpp @@ -1,4 +1,4 @@ -#include +#include #include "event_manager.h" bool EventManager::add_event(const nlohmann::json& event) { @@ -45,40 +45,10 @@ bool EventManager::add_event(const nlohmann::json& event) { } std::string query = event_data_query_it.get(); - QuerySuggestions::get_instance().add_suggestion(coll.get(), query, false, ""); + AnalyticsManager::get_instance().add_suggestion(coll.get(), query, false, ""); } } } return true; } - -Option EventManager::create_sink(const nlohmann::json& sink_config, bool write_to_disk) { - if(!sink_config.contains("name") || !sink_config["name"].is_string()) { - return Option(400, "Request payload contains invalid name."); - } - - if(!sink_config.contains("type") || !sink_config["type"].is_string()) { - return Option(400, "Request payload contains invalid type."); - } - - if(!sink_config.contains("source") || !sink_config["source"].is_object()) { - return Option(400, "Request payload contains invalid source."); - } - - if(!sink_config.contains("destination") || !sink_config["destination"].is_object()) { - return Option(400, "Request payload contains invalid destination."); - } - - if(sink_config.contains("type") && sink_config["type"] == QuerySuggestions::SINK_TYPE) { - QuerySuggestions::get_instance().create_index(sink_config, write_to_disk); - } else { - return Option(400, ("Missing or invalid event sink type.")); - } - - return Option(200); -} - -Option EventManager::remove_sink(const std::string& name) { - return QuerySuggestions::get_instance().remove_suggestion_index(name); -} diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index 915b5a91..c1734747 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -68,10 +68,10 @@ void master_server_routes() { server->put("/presets/:name", put_upsert_preset); server->del("/presets/:name", del_preset); - // events - server->post("/events", post_create_event); - server->post("/events/sinks", post_create_event_sink); - server->del("/events/sinks/:name", del_event_sink); + // analytics + server->post("/analytics/popular-queries", post_create_analytics_popular_queries); + server->del("/analytics/popular-queries/:name", del_analytics_popular_queries); + server->post("/analytics/events", post_create_event); // meta server->get("/metrics.json", get_metrics_json); diff --git a/src/typesense_server_utils.cpp b/src/typesense_server_utils.cpp index 73a15e2b..63e49114 100644 --- a/src/typesense_server_utils.cpp +++ b/src/typesense_server_utils.cpp @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include "core_api.h" #include "ratelimit_manager.h" @@ -397,7 +397,7 @@ int run_server(const Config & config, const std::string & version, void (*master HttpClient & httpClient = HttpClient::get_instance(); httpClient.init(config.get_api_key()); - QuerySuggestions::get_instance().init(&store); + AnalyticsManager::get_instance().init(&store); server = new HttpServer( version, @@ -456,7 +456,7 @@ int run_server(const Config & config, const std::string & version, void (*master }); std::thread event_sink_thread([&replication_state]() { - QuerySuggestions::get_instance().run(&replication_state); + AnalyticsManager::get_instance().run(&replication_state); }); std::string path_to_nodes = config.get_nodes(); @@ -476,7 +476,7 @@ int run_server(const Config & config, const std::string & version, void (*master batch_indexing_thread.join(); LOG(INFO) << "Shutting down event sink thread..."; - QuerySuggestions::get_instance().stop(); + AnalyticsManager::get_instance().stop(); LOG(INFO) << "Waiting for event sink thread to be done..."; event_sink_thread.join();