From bcea70ebfdf272a8beee9d24174fc5f705721b7c Mon Sep 17 00:00:00 2001 From: Jason Bosco Date: Fri, 22 Jan 2021 16:52:27 -0800 Subject: [PATCH] Add windowed stats endpoint --- include/app_metrics.h | 78 +++++++++++++++++++++++++++++++++++ include/core_api.h | 2 + include/http_data.h | 35 +++++++++++++--- include/http_server.h | 4 ++ include/raft_server.h | 2 +- src/core_api.cpp | 8 ++++ src/http_server.cpp | 31 +++++++++++++- src/main/typesense_server.cpp | 1 + 8 files changed, 154 insertions(+), 7 deletions(-) create mode 100644 include/app_metrics.h diff --git a/include/app_metrics.h b/include/app_metrics.h new file mode 100644 index 00000000..ca252a6a --- /dev/null +++ b/include/app_metrics.h @@ -0,0 +1,78 @@ +#pragma once + +#include "sparsepp.h" +#include "json.hpp" +#include "logger.h" +#include + +class AppMetrics { +private: + // stores last complete window + spp::sparse_hash_map* counts; + spp::sparse_hash_map* durations; + + // stores the current window + spp::sparse_hash_map* current_counts; + spp::sparse_hash_map* current_durations; + + AppMetrics() { + current_counts = new spp::sparse_hash_map(); + counts = new spp::sparse_hash_map(); + + current_durations = new spp::sparse_hash_map(); + durations = new spp::sparse_hash_map(); + } + + ~AppMetrics() { + delete current_counts; + delete counts; + + delete current_durations; + delete durations; + } + +public: + + static const uint64_t METRICS_REFRESH_INTERVAL_MS = 10 * 1000; + + static AppMetrics & get_instance() { + static AppMetrics instance; + return instance; + } + + AppMetrics(AppMetrics const&) = delete; + void operator=(AppMetrics const&) = delete; + + void increment_count(const std::string& identifier, uint64_t count) { + (*current_counts)[identifier] += count; + } + + void increment_duration(const std::string& identifier, uint64_t duration) { + (*current_durations)[identifier] += duration; + } + + void window_reset() { + delete counts; + counts = current_counts; + current_counts = new spp::sparse_hash_map(); + + delete durations; + durations = current_durations; + current_durations = new spp::sparse_hash_map(); + } + + void get(const std::string& count_key, const std::string& latency_key, nlohmann::json &result) { + result[count_key] = nlohmann::json::object(); + for(const auto& kv: *counts) { + result[count_key][kv.first] = (double(kv.second) / (METRICS_REFRESH_INTERVAL_MS / 1000)); + } + + result[latency_key] = nlohmann::json::object(); + for(const auto& kv: *durations) { + auto counter_it = counts->find(kv.first); + if(counter_it != counts->end() && counter_it->second != 0) { + result[latency_key][kv.first] = (double(kv.second) / counter_it->second); + } + } + } +}; \ No newline at end of file diff --git a/include/core_api.h b/include/core_api.h index 9c35f2a1..0abdf0bf 100644 --- a/include/core_api.h +++ b/include/core_api.h @@ -83,6 +83,8 @@ bool post_health(http_req& req, http_res& res); bool get_metrics_json(http_req& req, http_res& res); +bool get_stats_json(http_req& req, http_res& res); + bool get_log_sequence(http_req& req, http_res& res); // operations diff --git a/include/http_data.h b/include/http_data.h index f883ecff..12ac1352 100644 --- a/include/http_data.h +++ b/include/http_data.h @@ -9,6 +9,7 @@ #include "json.hpp" #include "string_utils.h" #include "logger.h" +#include "app_metrics.h" #define H2O_USE_LIBUV 0 extern "C" { @@ -164,6 +165,7 @@ struct http_res { struct http_req { h2o_req_t* _req; std::string http_method; + std::string path_without_query; uint64_t route_hash; std::map params; @@ -184,18 +186,39 @@ struct http_req { // for deffered processing of async handlers h2o_custom_timer_t defer_timer; + uint64_t start_ts; + bool deserialized_request; + http_req(): _req(nullptr), route_hash(1), first_chunk_aggregate(true), last_chunk_aggregate(false), - chunk_len(0), body_index(0), data(nullptr) { + chunk_len(0), body_index(0), data(nullptr), deserialized_request(true) { + + start_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); } - http_req(h2o_req_t* _req, const std::string & http_method, uint64_t route_hash, + http_req(h2o_req_t* _req, const std::string & http_method, const std::string & path_without_query, uint64_t route_hash, const std::map & params, const std::string& body): - _req(_req), http_method(http_method), route_hash(route_hash), params(params), - first_chunk_aggregate(true), last_chunk_aggregate(false), - chunk_len(0), body(body), body_index(0), data(nullptr) { + _req(_req), http_method(http_method), path_without_query(path_without_query), route_hash(route_hash), + params(params), first_chunk_aggregate(true), last_chunk_aggregate(false), + chunk_len(0), body(body), body_index(0), data(nullptr), deserialized_request(false) { + start_ts = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + } + + ~http_req() { + + if(!deserialized_request) { + uint64_t now = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + + uint64_t ms_since_start = (now - start_ts); + std::string metric_identifier = http_method + " " + path_without_query; + + AppMetrics::get_instance().increment_duration(metric_identifier, ms_since_start); + } } // NOTE: we don't ser/de all fields, only ones needed for write forwarding @@ -214,6 +237,8 @@ struct http_req { first_chunk_aggregate = content.count("first_chunk_aggregate") != 0 ? content["first_chunk_aggregate"].get() : true; last_chunk_aggregate = content.count("last_chunk_aggregate") != 0 ? content["last_chunk_aggregate"].get() : false; _req = nullptr; + + deserialized_request = true; } std::string serialize() const { diff --git a/include/http_server.h b/include/http_server.h index 3a0acb5b..3c000bf6 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -50,7 +50,9 @@ private: static const size_t REQ_TIMEOUT_MS = 60000; static const uint64_t SSL_REFRESH_INTERVAL_MS = 8 * 60 * 60 * 1000; + h2o_custom_timer_t ssl_refresh_timer; + h2o_custom_timer_t metrics_refresh_timer; http_message_dispatcher* message_dispatcher; @@ -82,6 +84,8 @@ private: static void on_ssl_refresh_timeout(h2o_timer_t *entry); + static void on_metrics_refresh_timeout(h2o_timer_t *entry); + int create_listener(); h2o_pathconf_t *register_handler(h2o_hostconf_t *hostconf, const char *path, diff --git a/include/raft_server.h b/include/raft_server.h index 248d528a..41bc8ec5 100644 --- a/include/raft_server.h +++ b/include/raft_server.h @@ -218,7 +218,7 @@ private: // have to do a dummy write, otherwise snapshot will not trigger if(create_init_db_snapshot) { - http_req* request = new http_req(nullptr, "POST", 0, {}, "INIT_SNAPSHOT"); + http_req* request = new http_req(nullptr, "POST", "/INIT_SNAPSHOT", 0, {}, "INIT_SNAPSHOT"); http_res* response = new http_res(); write(request, response); } diff --git a/src/core_api.cpp b/src/core_api.cpp index 63ec3d76..a12b466b 100644 --- a/src/core_api.cpp +++ b/src/core_api.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include "typesense_server_utils.h" #include "core_api.h" #include "string_utils.h" @@ -240,6 +241,13 @@ bool get_metrics_json(http_req &req, http_res &res) { return true; } +bool get_stats_json(http_req &req, http_res &res) { + nlohmann::json result; + AppMetrics::get_instance().get("requests_per_second", "latency_ms", result); + + res.set_body(200, result.dump(2)); + return true; +} bool get_log_sequence(http_req &req, http_res &res) { CollectionManager & collectionManager = CollectionManager::get_instance(); diff --git a/src/http_server.cpp b/src/http_server.cpp index f808587e..ced40a0d 100644 --- a/src/http_server.cpp +++ b/src/http_server.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include "raft_server.h" #include "logger.h" @@ -29,6 +30,7 @@ HttpServer::HttpServer(const std::string & version, const std::string & listen_a message_dispatcher->init(ctx.loop); ssl_refresh_timer.timer.expire_at = 0; // used during destructor + metrics_refresh_timer.timer.expire_at = 0; // used during destructor } void HttpServer::on_accept(h2o_socket_t *listener, const char *err) { @@ -46,6 +48,21 @@ void HttpServer::on_accept(h2o_socket_t *listener, const char *err) { h2o_accept(http_server->accept_ctx, sock); } +void HttpServer::on_metrics_refresh_timeout(h2o_timer_t *entry) { + h2o_custom_timer_t* custom_timer = reinterpret_cast(entry); + + AppMetrics::get_instance().window_reset(); + + HttpServer *hs = static_cast(custom_timer->data); + + // link the timer for the next cycle + h2o_timer_link( + hs->ctx.loop, + AppMetrics::METRICS_REFRESH_INTERVAL_MS, + &hs->metrics_refresh_timer.timer + ); +} + void HttpServer::on_ssl_refresh_timeout(h2o_timer_t *entry) { h2o_custom_timer_t* custom_timer = reinterpret_cast(entry); @@ -159,6 +176,10 @@ int HttpServer::create_listener() { int HttpServer::run(ReplicationState* replication_state) { this->replication_state = replication_state; + metrics_refresh_timer = h2o_custom_timer_t(this); + h2o_timer_init(&metrics_refresh_timer.timer, on_metrics_refresh_timeout); + h2o_timer_link(ctx.loop, AppMetrics::METRICS_REFRESH_INTERVAL_MS, &metrics_refresh_timer.timer); + if (create_listener() != 0) { LOG(ERROR) << "Failed to listen on " << listen_address << ":" << listen_port << " - " << strerror(errno); return 1; @@ -290,6 +311,9 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { StringUtils::split(path, path_with_query_parts, "?"); const std::string & path_without_query = path_with_query_parts[0]; + std::string metric_identifier = http_method + " " + path_without_query; + AppMetrics::get_instance().increment_count(metric_identifier, 1); + // Handle CORS if(h2o_handler->http_server->cors_enabled) { h2o_add_header_by_str(&req->pool, &req->res.headers, H2O_STRLIT("access-control-allow-origin"), @@ -380,7 +404,7 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) { const std::string & body = std::string(req->entity.base, req->entity.len); - http_req* request = new http_req(req, rpath->http_method, route_hash, query_map, body); + http_req* request = new http_req(req, rpath->http_method, path_without_query, route_hash, query_map, body); http_res* response = new http_res(); // add custom generator with a dispose function for cleaning up resources @@ -739,6 +763,11 @@ HttpServer::~HttpServer() { clear_timeouts({&ssl_refresh_timer.timer}, false); } + if(metrics_refresh_timer.timer.expire_at != 0) { + // avoid callback since it recreates timeout + clear_timeouts({&metrics_refresh_timer.timer}, false); + } + h2o_timerwheel_run(ctx.loop->_timeouts, 9999999999999); h2o_context_dispose(&ctx); diff --git a/src/main/typesense_server.cpp b/src/main/typesense_server.cpp index a31b4fde..a81ad7c9 100644 --- a/src/main/typesense_server.cpp +++ b/src/main/typesense_server.cpp @@ -57,6 +57,7 @@ void master_server_routes() { // meta server->get("/metrics.json", get_metrics_json); + server->get("/stats.json", get_stats_json); server->get("/debug", get_debug); server->get("/health", get_health); server->post("/health", post_health);