Add windowed stats endpoint

This commit is contained in:
Jason Bosco 2021-01-22 16:52:27 -08:00
parent d2206168ec
commit bcea70ebfd
8 changed files with 154 additions and 7 deletions

78
include/app_metrics.h Normal file
View File

@ -0,0 +1,78 @@
#pragma once
#include "sparsepp.h"
#include "json.hpp"
#include "logger.h"
#include <string>
class AppMetrics {
private:
// stores last complete window
spp::sparse_hash_map<std::string, uint64_t>* counts;
spp::sparse_hash_map<std::string, uint64_t>* durations;
// stores the current window
spp::sparse_hash_map<std::string, uint64_t>* current_counts;
spp::sparse_hash_map<std::string, uint64_t>* current_durations;
AppMetrics() {
current_counts = new spp::sparse_hash_map<std::string, uint64_t>();
counts = new spp::sparse_hash_map<std::string, uint64_t>();
current_durations = new spp::sparse_hash_map<std::string, uint64_t>();
durations = new spp::sparse_hash_map<std::string, uint64_t>();
}
~AppMetrics() {
delete current_counts;
delete counts;
delete current_durations;
delete durations;
}
public:
static const uint64_t METRICS_REFRESH_INTERVAL_MS = 10 * 1000;
static AppMetrics & get_instance() {
static AppMetrics instance;
return instance;
}
AppMetrics(AppMetrics const&) = delete;
void operator=(AppMetrics const&) = delete;
void increment_count(const std::string& identifier, uint64_t count) {
(*current_counts)[identifier] += count;
}
void increment_duration(const std::string& identifier, uint64_t duration) {
(*current_durations)[identifier] += duration;
}
void window_reset() {
delete counts;
counts = current_counts;
current_counts = new spp::sparse_hash_map<std::string, uint64_t>();
delete durations;
durations = current_durations;
current_durations = new spp::sparse_hash_map<std::string, uint64_t>();
}
void get(const std::string& count_key, const std::string& latency_key, nlohmann::json &result) {
result[count_key] = nlohmann::json::object();
for(const auto& kv: *counts) {
result[count_key][kv.first] = (double(kv.second) / (METRICS_REFRESH_INTERVAL_MS / 1000));
}
result[latency_key] = nlohmann::json::object();
for(const auto& kv: *durations) {
auto counter_it = counts->find(kv.first);
if(counter_it != counts->end() && counter_it->second != 0) {
result[latency_key][kv.first] = (double(kv.second) / counter_it->second);
}
}
}
};

View File

@ -83,6 +83,8 @@ bool post_health(http_req& req, http_res& res);
bool get_metrics_json(http_req& req, http_res& res);
bool get_stats_json(http_req& req, http_res& res);
bool get_log_sequence(http_req& req, http_res& res);
// operations

View File

@ -9,6 +9,7 @@
#include "json.hpp"
#include "string_utils.h"
#include "logger.h"
#include "app_metrics.h"
#define H2O_USE_LIBUV 0
extern "C" {
@ -164,6 +165,7 @@ struct http_res {
struct http_req {
h2o_req_t* _req;
std::string http_method;
std::string path_without_query;
uint64_t route_hash;
std::map<std::string, std::string> params;
@ -184,18 +186,39 @@ struct http_req {
// for deffered processing of async handlers
h2o_custom_timer_t defer_timer;
uint64_t start_ts;
bool deserialized_request;
http_req(): _req(nullptr), route_hash(1),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body_index(0), data(nullptr) {
chunk_len(0), body_index(0), data(nullptr), deserialized_request(true) {
start_ts = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
http_req(h2o_req_t* _req, const std::string & http_method, uint64_t route_hash,
http_req(h2o_req_t* _req, const std::string & http_method, const std::string & path_without_query, uint64_t route_hash,
const std::map<std::string, std::string> & params, const std::string& body):
_req(_req), http_method(http_method), route_hash(route_hash), params(params),
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body(body), body_index(0), data(nullptr) {
_req(_req), http_method(http_method), path_without_query(path_without_query), route_hash(route_hash),
params(params), first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body(body), body_index(0), data(nullptr), deserialized_request(false) {
start_ts = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
~http_req() {
if(!deserialized_request) {
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
uint64_t ms_since_start = (now - start_ts);
std::string metric_identifier = http_method + " " + path_without_query;
AppMetrics::get_instance().increment_duration(metric_identifier, ms_since_start);
}
}
// NOTE: we don't ser/de all fields, only ones needed for write forwarding
@ -214,6 +237,8 @@ struct http_req {
first_chunk_aggregate = content.count("first_chunk_aggregate") != 0 ? content["first_chunk_aggregate"].get<bool>() : true;
last_chunk_aggregate = content.count("last_chunk_aggregate") != 0 ? content["last_chunk_aggregate"].get<bool>() : false;
_req = nullptr;
deserialized_request = true;
}
std::string serialize() const {

View File

@ -50,7 +50,9 @@ private:
static const size_t REQ_TIMEOUT_MS = 60000;
static const uint64_t SSL_REFRESH_INTERVAL_MS = 8 * 60 * 60 * 1000;
h2o_custom_timer_t ssl_refresh_timer;
h2o_custom_timer_t metrics_refresh_timer;
http_message_dispatcher* message_dispatcher;
@ -82,6 +84,8 @@ private:
static void on_ssl_refresh_timeout(h2o_timer_t *entry);
static void on_metrics_refresh_timeout(h2o_timer_t *entry);
int create_listener();
h2o_pathconf_t *register_handler(h2o_hostconf_t *hostconf, const char *path,

View File

@ -218,7 +218,7 @@ private:
// have to do a dummy write, otherwise snapshot will not trigger
if(create_init_db_snapshot) {
http_req* request = new http_req(nullptr, "POST", 0, {}, "INIT_SNAPSHOT");
http_req* request = new http_req(nullptr, "POST", "/INIT_SNAPSHOT", 0, {}, "INIT_SNAPSHOT");
http_res* response = new http_res();
write(request, response);
}

View File

@ -1,6 +1,7 @@
#include <regex>
#include <chrono>
#include <thread>
#include <app_metrics.h>
#include "typesense_server_utils.h"
#include "core_api.h"
#include "string_utils.h"
@ -240,6 +241,13 @@ bool get_metrics_json(http_req &req, http_res &res) {
return true;
}
bool get_stats_json(http_req &req, http_res &res) {
nlohmann::json result;
AppMetrics::get_instance().get("requests_per_second", "latency_ms", result);
res.set_body(200, result.dump(2));
return true;
}
bool get_log_sequence(http_req &req, http_res &res) {
CollectionManager & collectionManager = CollectionManager::get_instance();

View File

@ -7,6 +7,7 @@
#include <h2o.h>
#include <iostream>
#include <auth_manager.h>
#include <app_metrics.h>
#include "raft_server.h"
#include "logger.h"
@ -29,6 +30,7 @@ HttpServer::HttpServer(const std::string & version, const std::string & listen_a
message_dispatcher->init(ctx.loop);
ssl_refresh_timer.timer.expire_at = 0; // used during destructor
metrics_refresh_timer.timer.expire_at = 0; // used during destructor
}
void HttpServer::on_accept(h2o_socket_t *listener, const char *err) {
@ -46,6 +48,21 @@ void HttpServer::on_accept(h2o_socket_t *listener, const char *err) {
h2o_accept(http_server->accept_ctx, sock);
}
void HttpServer::on_metrics_refresh_timeout(h2o_timer_t *entry) {
h2o_custom_timer_t* custom_timer = reinterpret_cast<h2o_custom_timer_t*>(entry);
AppMetrics::get_instance().window_reset();
HttpServer *hs = static_cast<HttpServer*>(custom_timer->data);
// link the timer for the next cycle
h2o_timer_link(
hs->ctx.loop,
AppMetrics::METRICS_REFRESH_INTERVAL_MS,
&hs->metrics_refresh_timer.timer
);
}
void HttpServer::on_ssl_refresh_timeout(h2o_timer_t *entry) {
h2o_custom_timer_t* custom_timer = reinterpret_cast<h2o_custom_timer_t*>(entry);
@ -159,6 +176,10 @@ int HttpServer::create_listener() {
int HttpServer::run(ReplicationState* replication_state) {
this->replication_state = replication_state;
metrics_refresh_timer = h2o_custom_timer_t(this);
h2o_timer_init(&metrics_refresh_timer.timer, on_metrics_refresh_timeout);
h2o_timer_link(ctx.loop, AppMetrics::METRICS_REFRESH_INTERVAL_MS, &metrics_refresh_timer.timer);
if (create_listener() != 0) {
LOG(ERROR) << "Failed to listen on " << listen_address << ":" << listen_port << " - " << strerror(errno);
return 1;
@ -290,6 +311,9 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
StringUtils::split(path, path_with_query_parts, "?");
const std::string & path_without_query = path_with_query_parts[0];
std::string metric_identifier = http_method + " " + path_without_query;
AppMetrics::get_instance().increment_count(metric_identifier, 1);
// Handle CORS
if(h2o_handler->http_server->cors_enabled) {
h2o_add_header_by_str(&req->pool, &req->res.headers, H2O_STRLIT("access-control-allow-origin"),
@ -380,7 +404,7 @@ int HttpServer::catch_all_handler(h2o_handler_t *_h2o_handler, h2o_req_t *req) {
const std::string & body = std::string(req->entity.base, req->entity.len);
http_req* request = new http_req(req, rpath->http_method, route_hash, query_map, body);
http_req* request = new http_req(req, rpath->http_method, path_without_query, route_hash, query_map, body);
http_res* response = new http_res();
// add custom generator with a dispose function for cleaning up resources
@ -739,6 +763,11 @@ HttpServer::~HttpServer() {
clear_timeouts({&ssl_refresh_timer.timer}, false);
}
if(metrics_refresh_timer.timer.expire_at != 0) {
// avoid callback since it recreates timeout
clear_timeouts({&metrics_refresh_timer.timer}, false);
}
h2o_timerwheel_run(ctx.loop->_timeouts, 9999999999999);
h2o_context_dispose(&ctx);

View File

@ -57,6 +57,7 @@ void master_server_routes() {
// meta
server->get("/metrics.json", get_metrics_json);
server->get("/stats.json", get_stats_json);
server->get("/debug", get_debug);
server->get("/health", get_health);
server->post("/health", post_health);