Add extra telemetry for continuous aggregates

Add the following telemetry fields for continuous aggregates:

* The number of continuous aggregates created on distributed
  hypertables
* The number of continuous aggregates using real-time aggregation
This commit is contained in:
Erik Nordström 2022-02-08 09:57:23 +01:00 committed by Erik Nordström
parent e19fffc148
commit 5af9f45488
5 changed files with 124 additions and 32 deletions

View File

@ -228,40 +228,44 @@ process_relation(BaseStats *stats, Form_pg_class class)
}
static void
process_hypertable(BaseStats *stats, Form_pg_class class, const Hypertable *ht)
process_hypertable(HyperStats *hyp, Form_pg_class class, const Hypertable *ht)
{
HyperStats *hyperstats = (HyperStats *) stats;
process_relation(stats, class);
process_relation(&hyp->storage.base, class);
if (TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(ht))
hyperstats->compressed_hypertable_count++;
hyp->compressed_hypertable_count++;
}
static void
process_distributed_hypertable(BaseStats *stats, Form_pg_class class, const Hypertable *ht)
process_distributed_hypertable(HyperStats *hyp, Form_pg_class class, const Hypertable *ht)
{
HyperStats *hyperstats = (HyperStats *) stats;
stats->relcount++;
hyp->storage.base.relcount++;
if (TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(ht))
hyperstats->compressed_hypertable_count++;
hyp->compressed_hypertable_count++;
if (ht->fd.replication_factor > 1)
hyperstats->replicated_hypertable_count++;
hyp->replicated_hypertable_count++;
}
static void
process_continuous_agg(BaseStats *stats, Form_pg_class class, const ContinuousAgg *cagg)
process_continuous_agg(CaggStats *cs, Form_pg_class class, const ContinuousAgg *cagg)
{
HyperStats *hyperstats = (HyperStats *) stats;
const Hypertable *ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
const Hypertable *mat_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
const Hypertable *raw_ht = ts_hypertable_get_by_id(cagg->data.raw_hypertable_id);
process_relation(stats, class);
Assert(cagg);
if (TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(ht))
hyperstats->compressed_hypertable_count++;
process_relation(&cs->hyp.storage.base, class);
if (TS_HYPERTABLE_HAS_COMPRESSION_ENABLED(mat_ht))
cs->hyp.compressed_hypertable_count++;
if (hypertable_is_distributed(raw_ht))
cs->on_distributed_hypertable_count++;
if (!cagg->data.materialized_only)
cs->uses_real_time_aggregation_count++;
}
static void
@ -433,7 +437,7 @@ process_chunk(StatsContext *statsctx, StatsRelType chunk_reltype, Form_pg_class
add_chunk_stats(&stats->distributed_hypertable_members, class, chunk, compr_stats);
break;
case RELTYPE_MATERIALIZED_CHUNK:
add_chunk_stats(&stats->continuous_aggs, class, chunk, compr_stats);
add_chunk_stats(&stats->continuous_aggs.hyp, class, chunk, compr_stats);
break;
default:
pg_unreachable();
@ -538,13 +542,11 @@ ts_telemetry_stats_gather(TelemetryStats *stats)
{
case RELTYPE_HYPERTABLE:
Assert(NULL != ht);
process_hypertable(&stats->hypertables.storage.base, class, ht);
process_hypertable(&stats->hypertables, class, ht);
break;
case RELTYPE_DISTRIBUTED_HYPERTABLE:
Assert(NULL != ht);
process_distributed_hypertable(&stats->distributed_hypertables.storage.base,
class,
ht);
process_distributed_hypertable(&stats->distributed_hypertables, class, ht);
break;
case RELTYPE_DISTRIBUTED_HYPERTABLE_MEMBER:
/*
@ -552,7 +554,7 @@ ts_telemetry_stats_gather(TelemetryStats *stats)
* a regular hypertable.
*/
Assert(NULL != ht);
process_hypertable(&stats->distributed_hypertable_members.storage.base, class, ht);
process_hypertable(&stats->distributed_hypertable_members, class, ht);
break;
case RELTYPE_TABLE:
process_relation(&stats->tables.base, class);
@ -581,9 +583,8 @@ ts_telemetry_stats_gather(TelemetryStats *stats)
break;
case RELTYPE_CONTINUOUS_AGG:
Assert(NULL != cagg);
process_continuous_agg(&stats->continuous_aggs.storage.base, class, cagg);
process_continuous_agg(&stats->continuous_aggs, class, cagg);
break;
/* No stats collected for types below */
case RELTYPE_COMPRESSION_HYPERTABLE:
case RELTYPE_MATERIALIZED_HYPERTABLE:

View File

@ -35,7 +35,8 @@ typedef enum StatsType
{
STATS_TYPE_BASE,
STATS_TYPE_STORAGE,
STATS_TYPE_HYPER
STATS_TYPE_HYPER,
STATS_TYPE_CAGG,
} StatsType;
typedef struct BaseStats
@ -69,15 +70,22 @@ typedef struct HyperStats
int64 uncompressed_row_count;
} HyperStats;
typedef struct CaggStats
{
HyperStats hyp; /* "hyper" as field name leads to name conflict on Windows compiler */
int64 on_distributed_hypertable_count;
int64 uses_real_time_aggregation_count;
} CaggStats;
typedef struct TelemetryStats
{
HyperStats hypertables;
HyperStats distributed_hypertables;
HyperStats distributed_hypertable_members;
HyperStats continuous_aggs;
HyperStats partitioned_tables;
StorageStats tables;
StorageStats materialized_views;
CaggStats continuous_aggs;
BaseStats views;
} TelemetryStats;

View File

@ -277,6 +277,9 @@ format_iso8601(Datum value)
#define REQ_RELKIND_COMPRESSED_INDEXES_SIZE "compressed_indexes_size"
#define REQ_RELKIND_COMPRESSED_ROWCOUNT "compressed_row_count"
#define REQ_RELKIND_CAGG_ON_DISTRIBUTED_HYPERTABLE_COUNT "num_caggs_on_distributed_hypertables"
#define REQ_RELKIND_CAGG_USES_REAL_TIME_AGGREGATION_COUNT "num_caggs_using_real_time_aggregation"
static JsonbValue *
add_compression_stats_object(JsonbParseState *parse_state, StatsRelType reltype,
const HyperStats *hs)
@ -358,6 +361,18 @@ add_relkind_stats_object(JsonbParseState *parse_state, const char *relkindname,
}
}
if (statstype == STATS_TYPE_CAGG)
{
const CaggStats *cs = (const CaggStats *) stats;
ts_jsonb_add_int64(parse_state,
REQ_RELKIND_CAGG_ON_DISTRIBUTED_HYPERTABLE_COUNT,
cs->on_distributed_hypertable_count);
ts_jsonb_add_int64(parse_state,
REQ_RELKIND_CAGG_USES_REAL_TIME_AGGREGATION_COUNT,
cs->uses_real_time_aggregation_count);
}
return pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
}
@ -467,9 +482,9 @@ build_telemetry_report()
STATS_TYPE_HYPER);
add_relkind_stats_object(parse_state,
REQ_RELS_CONTINUOUS_AGGS,
&relstats.continuous_aggs.storage.base,
&relstats.continuous_aggs.hyp.storage.base,
RELTYPE_CONTINUOUS_AGG,
STATS_TYPE_HYPER);
STATS_TYPE_CAGG);
pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);

View File

@ -140,7 +140,9 @@ SELECT jsonb_pretty(rels) AS relations FROM relations;
"indexes_size": 0, +
"num_children": 0, +
"num_relations": 1, +
"num_reltuples": 0 +
"num_reltuples": 0, +
"num_caggs_on_distributed_hypertables": 0, +
"num_caggs_using_real_time_aggregation": 1 +
}, +
"distributed_hypertables_data_node": { +
"heap_size": 0, +
@ -278,7 +280,9 @@ SELECT jsonb_pretty(rels) AS relations FROM relations;
"indexes_size": 114688, +
"num_children": 2, +
"num_relations": 1, +
"num_reltuples": 0 +
"num_reltuples": 0, +
"num_caggs_on_distributed_hypertables": 0, +
"num_caggs_using_real_time_aggregation": 1 +
}, +
"distributed_hypertables_data_node": { +
"heap_size": 0, +
@ -355,6 +359,8 @@ FROM show_chunks('contagg') c ORDER BY c LIMIT 1;
_timescaledb_internal._hyper_2_10_chunk
(1 row)
-- Turn of real-time aggregation
ALTER MATERIALIZED VIEW contagg SET (timescaledb.materialized_only = true);
ANALYZE normal, hyper, part;
REFRESH MATERIALIZED VIEW telemetry_report;
SELECT jsonb_pretty(rels) AS relations FROM relations;
@ -423,7 +429,9 @@ SELECT jsonb_pretty(rels) AS relations FROM relations;
"indexes_size": 65536, +
"num_children": 2, +
"num_relations": 1, +
"num_reltuples": 452 +
"num_reltuples": 452, +
"num_caggs_on_distributed_hypertables": 0, +
"num_caggs_using_real_time_aggregation": 0 +
}, +
"distributed_hypertables_data_node": { +
"heap_size": 0, +
@ -872,5 +880,46 @@ FROM relations;
}
(1 row)
-- Create a continuous aggregate on the distributed hypertable
CREATE MATERIALIZED VIEW distcontagg
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
device,
min(time)
FROM
disthyper
GROUP BY hour, device;
NOTICE: refreshing continuous aggregate "distcontagg"
REFRESH MATERIALIZED VIEW telemetry_report;
SELECT
jsonb_pretty(rels -> 'continuous_aggregates') AS continuous_aggregates
FROM relations;
continuous_aggregates
------------------------------------------------
{ +
"heap_size": 180224, +
"toast_size": 40960, +
"compression": { +
"compressed_heap_size": 40960, +
"compressed_row_count": 10, +
"num_compressed_caggs": 1, +
"compressed_toast_size": 8192, +
"num_compressed_chunks": 1, +
"uncompressed_heap_size": 57344, +
"uncompressed_row_count": 452, +
"compressed_indexes_size": 16384, +
"uncompressed_toast_size": 8192, +
"uncompressed_indexes_size": 81920 +
}, +
"indexes_size": 180224, +
"num_children": 4, +
"num_relations": 2, +
"num_reltuples": 452, +
"num_caggs_on_distributed_hypertables": 1,+
"num_caggs_using_real_time_aggregation": 1+
}
(1 row)
DROP VIEW relations;
DROP MATERIALIZED VIEW telemetry_report;

View File

@ -99,6 +99,9 @@ ALTER MATERIALIZED VIEW contagg SET (timescaledb.compress);
SELECT compress_chunk(c)
FROM show_chunks('contagg') c ORDER BY c LIMIT 1;
-- Turn of real-time aggregation
ALTER MATERIALIZED VIEW contagg SET (timescaledb.materialized_only = true);
ANALYZE normal, hyper, part;
REFRESH MATERIALIZED VIEW telemetry_report;
@ -196,5 +199,21 @@ SELECT
jsonb_pretty(rels -> 'distributed_hypertables_access_node') AS distributed_hypertables_an
FROM relations;
-- Create a continuous aggregate on the distributed hypertable
CREATE MATERIALIZED VIEW distcontagg
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
device,
min(time)
FROM
disthyper
GROUP BY hour, device;
REFRESH MATERIALIZED VIEW telemetry_report;
SELECT
jsonb_pretty(rels -> 'continuous_aggregates') AS continuous_aggregates
FROM relations;
DROP VIEW relations;
DROP MATERIALIZED VIEW telemetry_report;