Add interface for troubleshooting job failures

This commit gives more visibility into job failures by making the
information regarding a job runtime error available in an extension
table (`job_errors`) that users can directly query.
This commit also adds an infromational view on top of the table for
convenience.
To prevent the `job_errors` table from growing too large,
a retention job is also set up with a default retention interval
of 1 month. The retention job is registered with a custom check
function that requires that a valid "drop_after" interval be provided
in the config field of the job.
This commit is contained in:
Konstantina Skovola 2022-09-06 16:25:09 +03:00 committed by Konstantina Skovola
parent 1d4b9d6977
commit 9bd772de25
28 changed files with 865 additions and 39 deletions

View File

@ -59,7 +59,8 @@ set(SOURCE_FILES
policy_api.sql policy_api.sql
policy_internal.sql policy_internal.sql
cagg_utils.sql cagg_utils.sql
cagg_migrate.sql) cagg_migrate.sql
job_error_log_retention.sql)
if(USE_TELEMETRY) if(USE_TELEMETRY)
list(APPEND SOURCE_FILES with_telemetry.sql) list(APPEND SOURCE_FILES with_telemetry.sql)

View File

@ -0,0 +1,71 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.
-- A retention policy is set up for the table _timescaledb_internal.job_errors (Error Log Retention Policy [2])
-- By default, it will run once a month and and drop rows older than a month.
CREATE OR REPLACE FUNCTION _timescaledb_internal.policy_job_error_retention(job_id integer, config JSONB) RETURNS integer
LANGUAGE PLPGSQL AS
$BODY$
DECLARE
drop_after INTERVAL;
numrows INTEGER;
BEGIN
SELECT config->>'drop_after' INTO STRICT drop_after;
WITH deleted AS
(DELETE
FROM _timescaledb_internal.job_errors
WHERE finish_time < (now() - drop_after) RETURNING *)
SELECT count(*)
FROM deleted INTO numrows;
RETURN numrows;
END;
$BODY$ SET search_path TO pg_catalog, pg_temp;
CREATE OR REPLACE FUNCTION _timescaledb_internal.policy_job_error_retention_check(config JSONB) RETURNS VOID
LANGUAGE PLPGSQL AS
$BODY$
DECLARE
drop_after interval;
BEGIN
IF config IS NULL THEN
RAISE EXCEPTION 'config cannot be NULL, and must contain drop_after';
END IF;
SELECT config->>'drop_after' INTO STRICT drop_after;
IF drop_after IS NULL THEN
RAISE EXCEPTION 'drop_after interval not provided';
END IF ;
END;
$BODY$ SET search_path TO pg_catalog, pg_temp;
INSERT INTO _timescaledb_config.bgw_job (
id,
application_name,
schedule_interval,
max_runtime,
max_retries,
retry_period,
proc_schema,
proc_name,
owner,
scheduled,
config,
check_schema,
check_name
)
VALUES
(
2,
'Error Log Retention Policy [2]',
INTERVAL '1 month',
INTERVAL '1 hour',
-1,
INTERVAL '1h',
'_timescaledb_internal',
'policy_job_error_retention',
CURRENT_ROLE,
true,
'{"drop_after":"1 month"}',
'_timescaledb_internal',
'policy_job_error_retention_check'
) ON CONFLICT (id) DO NOTHING;

View File

@ -305,6 +305,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat (
total_crashes bigint NOT NULL, total_crashes bigint NOT NULL,
consecutive_failures int NOT NULL, consecutive_failures int NOT NULL,
consecutive_crashes int NOT NULL, consecutive_crashes int NOT NULL,
flags int NOT NULL DEFAULT 0,
-- table constraints -- table constraints
CONSTRAINT bgw_job_stat_pkey PRIMARY KEY (job_id), CONSTRAINT bgw_job_stat_pkey PRIMARY KEY (job_id),
CONSTRAINT bgw_job_stat_job_id_fkey FOREIGN KEY (job_id) REFERENCES _timescaledb_config.bgw_job (id) ON DELETE CASCADE CONSTRAINT bgw_job_stat_job_id_fkey FOREIGN KEY (job_id) REFERENCES _timescaledb_config.bgw_job (id) ON DELETE CASCADE
@ -547,6 +548,15 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_agg_
SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_catalog.continuous_agg_migrate_plan_step', 'step_id'), ''); SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_catalog.continuous_agg_migrate_plan_step', 'step_id'), '');
CREATE TABLE _timescaledb_internal.job_errors (
job_id integer not null,
pid integer,
start_time timestamptz,
finish_time timestamptz,
error_data jsonb
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_internal.job_errors', '');
-- Set table permissions -- Set table permissions
-- We need to grant SELECT to PUBLIC for all tables even those not -- We need to grant SELECT to PUBLIC for all tables even those not
-- marked as being dumped because pg_dump will try to access all -- marked as being dumped because pg_dump will try to access all

View File

@ -172,6 +172,48 @@ BEGIN
WHERE mat_hypertable_id OPERATOR(pg_catalog.=) _cagg_data.mat_hypertable_id; WHERE mat_hypertable_id OPERATOR(pg_catalog.=) _cagg_data.mat_hypertable_id;
END; END;
$BODY$; $BODY$;
CREATE TABLE _timescaledb_internal.job_errors (
job_id integer not null,
pid integer,
start_time timestamptz,
finish_time timestamptz,
error_data jsonb
);
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_internal.job_errors', '');
CREATE VIEW timescaledb_information.job_errors AS
SELECT
job_id,
error_data ->> 'proc_schema' as proc_schema,
error_data ->> 'proc_name' as proc_name,
pid,
start_time,
finish_time,
error_data ->> 'sqlerrcode' AS sqlerrcode,
CASE WHEN error_data ->>'message' IS NOT NULL THEN
CASE WHEN error_data ->>'detail' IS NOT NULL THEN
CASE WHEN error_data ->>'hint' IS NOT NULL THEN concat(error_data ->>'message', '. ', error_data ->>'detail', '. ', error_data->>'hint')
ELSE concat(error_data ->>'message', ' ', error_data ->>'detail')
END
ELSE
CASE WHEN error_data ->>'hint' IS NOT NULL THEN concat(error_data ->>'message', '. ', error_data->>'hint')
ELSE error_data ->>'message'
END
END
ELSE
'job crash detected, see server logs'
END
AS err_message
FROM
_timescaledb_internal.job_errors;
ALTER TABLE _timescaledb_internal.bgw_job_stat ADD COLUMN flags integer;
UPDATE _timescaledb_internal.bgw_job_stat SET flags = 0;
ALTER TABLE _timescaledb_internal.bgw_job_stat
ALTER COLUMN flags SET NOT NULL,
ALTER COLUMN flags SET DEFAULT 0;
-- Issue #4727 -- Issue #4727
ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step

View File

@ -171,3 +171,59 @@ ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step
ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step
ADD CONSTRAINT continuous_agg_migrate_plan_step_check2 ADD CONSTRAINT continuous_agg_migrate_plan_step_check2
CHECK (type IN ('CREATE NEW CAGG', 'DISABLE POLICIES', 'COPY POLICIES', 'ENABLE POLICIES', 'SAVE WATERMARK', 'REFRESH NEW CAGG', 'COPY DATA')); CHECK (type IN ('CREATE NEW CAGG', 'DISABLE POLICIES', 'COPY POLICIES', 'ENABLE POLICIES', 'SAVE WATERMARK', 'REFRESH NEW CAGG', 'COPY DATA'));
DROP FUNCTION _timescaledb_internal.policy_job_error_retention(integer, JSONB);
DROP FUNCTION _timescaledb_internal.policy_job_error_retention_check(JSONB);
DELETE FROM _timescaledb_config.bgw_job WHERE id = 2;
ALTER EXTENSION timescaledb DROP VIEW timescaledb_information.job_errors;
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_internal.job_errors;
DROP VIEW timescaledb_information.job_errors;
DROP TABLE _timescaledb_internal.job_errors;
-- drop dependent views
DROP VIEW IF EXISTS timescaledb_information.job_stats;
DROP VIEW IF EXISTS timescaledb_information.jobs;
ALTER TABLE _timescaledb_internal.bgw_job_stat
DROP COLUMN flags;
-- need to recreate the bgw_job_stats table because dropping the column
-- will not remove it from the pg_attribute table
CREATE TABLE _timescaledb_internal.bgw_job_stat_tmp (
LIKE _timescaledb_internal.bgw_job_stat
INCLUDING ALL
-- indexes and constraintes will be created later to keep original names
EXCLUDING INDEXES
EXCLUDING CONSTRAINTS
);
INSERT INTO _timescaledb_internal.bgw_job_stat_tmp
SELECT
job_id,
last_start,
last_finish,
next_start,
last_successful_finish,
last_run_success,
total_runs,
total_duration,
total_successes,
total_failures,
total_crashes,
consecutive_failures,
consecutive_crashes
FROM
_timescaledb_internal.bgw_job_stat;
DROP TABLE _timescaledb_internal.bgw_job_stat;
ALTER TABLE _timescaledb_internal.bgw_job_stat_tmp
RENAME TO bgw_job_stat;
ALTER TABLE _timescaledb_internal.bgw_job_stat
ADD CONSTRAINT bgw_job_stat_pkey PRIMARY KEY (job_id),
ADD CONSTRAINT bgw_job_stat_job_id_fkey FOREIGN KEY (job_id)
REFERENCES _timescaledb_config.bgw_job (id) ON DELETE CASCADE;
GRANT SELECT ON TABLE _timescaledb_internal.bgw_job_stat TO PUBLIC;

View File

@ -299,4 +299,31 @@ ORDER BY table_name,
segmentby_column_index, segmentby_column_index,
orderby_column_index; orderby_column_index;
-- troubleshooting job errors view
CREATE OR REPLACE VIEW timescaledb_information.job_errors AS
SELECT
job_id,
error_data ->> 'proc_schema' as proc_schema,
error_data ->> 'proc_name' as proc_name,
pid,
start_time,
finish_time,
error_data ->> 'sqlerrcode' AS sqlerrcode,
CASE WHEN error_data ->>'message' IS NOT NULL THEN
CASE WHEN error_data ->>'detail' IS NOT NULL THEN
CASE WHEN error_data ->>'hint' IS NOT NULL THEN concat(error_data ->>'message', '. ', error_data ->>'detail', '. ', error_data->>'hint')
ELSE concat(error_data ->>'message', ' ', error_data ->>'detail')
END
ELSE
CASE WHEN error_data ->>'hint' IS NOT NULL THEN concat(error_data ->>'message', '. ', error_data->>'hint')
ELSE error_data ->>'message'
END
END
ELSE
'job crash detected, see server logs'
END
AS err_message
FROM
_timescaledb_internal.job_errors;
GRANT SELECT ON ALL TABLES IN SCHEMA timescaledb_information TO PUBLIC; GRANT SELECT ON ALL TABLES IN SCHEMA timescaledb_information TO PUBLIC;

View File

@ -44,6 +44,7 @@
#include "bgw/scheduler.h" #include "bgw/scheduler.h"
#include <cross_module_fn.h> #include <cross_module_fn.h>
#include "jsonb_utils.h"
#define TELEMETRY_INITIAL_NUM_RUNS 12 #define TELEMETRY_INITIAL_NUM_RUNS 12
@ -158,6 +159,57 @@ job_config_check(BgwJob *job, Jsonb *config)
job->fd.id); job->fd.id);
} }
/* this function fills in a jsonb with the non-null fields of
the error data and also includes the proc name and schema in the jsonb
we include these here to avoid adding these fields to the table */
static Jsonb *
ts_errdata_to_jsonb(ErrorData *edata, Name proc_schema, Name proc_name)
{
JsonbParseState *parse_state = NULL;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
if (edata->sqlerrcode)
ts_jsonb_add_str(parse_state, "sqlerrcode", unpack_sql_state(edata->sqlerrcode));
if (edata->message)
ts_jsonb_add_str(parse_state, "message", edata->message);
if (edata->detail)
ts_jsonb_add_str(parse_state, "detail", edata->detail);
if (edata->hint)
ts_jsonb_add_str(parse_state, "hint", edata->hint);
if (edata->filename)
ts_jsonb_add_str(parse_state, "filename", edata->filename);
if (edata->lineno)
ts_jsonb_add_int32(parse_state, "lineno", edata->lineno);
if (edata->funcname)
ts_jsonb_add_str(parse_state, "funcname", edata->funcname);
if (edata->domain)
ts_jsonb_add_str(parse_state, "domain", edata->domain);
if (edata->context_domain)
ts_jsonb_add_str(parse_state, "context_domain", edata->context_domain);
if (edata->context)
ts_jsonb_add_str(parse_state, "context", edata->context);
if (edata->schema_name)
ts_jsonb_add_str(parse_state, "schema_name", edata->schema_name);
if (edata->table_name)
ts_jsonb_add_str(parse_state, "table_name", edata->table_name);
if (edata->column_name)
ts_jsonb_add_str(parse_state, "column_name", edata->column_name);
if (edata->datatype_name)
ts_jsonb_add_str(parse_state, "datatype_name", edata->datatype_name);
if (edata->constraint_name)
ts_jsonb_add_str(parse_state, "constraint_name", edata->constraint_name);
if (edata->internalquery)
ts_jsonb_add_str(parse_state, "internalquery", edata->internalquery);
if (edata->detail_log)
ts_jsonb_add_str(parse_state, "detail_log", edata->detail_log);
if (strlen(NameStr(*proc_schema)) > 0)
ts_jsonb_add_str(parse_state, "proc_schema", NameStr(*proc_schema));
if (strlen(NameStr(*proc_name)) > 0)
ts_jsonb_add_str(parse_state, "proc_name", NameStr(*proc_name));
/* we add the schema qualified name here as well*/
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
return JsonbValueToJsonb(result);
}
static BgwJob * static BgwJob *
bgw_job_from_tupleinfo(TupleInfo *ti, size_t alloc_size) bgw_job_from_tupleinfo(TupleInfo *ti, size_t alloc_size)
{ {
@ -975,6 +1027,46 @@ zero_guc(const char *guc_name)
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("could not set \"%s\" guc", guc_name))); (errcode(ERRCODE_INTERNAL_ERROR), errmsg("could not set \"%s\" guc", guc_name)));
} }
/*
* This function creates an entry in the job_errors table
* when a background job throws a runtime error or the job scheduler
* detects that the job crashed
*/
bool
ts_job_errors_insert_tuple(const FormData_job_error *job_err)
{
Catalog *catalog = ts_catalog_get();
Relation rel = table_open(catalog_get_table_id(catalog, JOB_ERRORS), RowExclusiveLock);
TupleDesc desc = RelationGetDescr(rel);
Datum values[Natts_job_error];
bool nulls[Natts_job_error] = { false };
CatalogSecurityContext sec_ctx;
values[AttrNumberGetAttrOffset(Anum_job_error_job_id)] = Int32GetDatum(job_err->job_id);
values[AttrNumberGetAttrOffset(Anum_job_error_start_time)] =
TimestampTzGetDatum(job_err->start_time);
values[AttrNumberGetAttrOffset(Anum_job_error_finish_time)] =
TimestampTzGetDatum(job_err->finish_time);
if (job_err->pid > 0)
values[AttrNumberGetAttrOffset(Anum_job_error_pid)] = Int64GetDatum(job_err->pid);
else
nulls[AttrNumberGetAttrOffset(Anum_job_error_pid)] = true;
if (job_err->error_data)
values[AttrNumberGetAttrOffset(Anum_job_error_error_data)] =
JsonbPGetDatum(job_err->error_data);
else
nulls[AttrNumberGetAttrOffset(Anum_job_error_error_data)] = true;
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
table_close(rel, RowExclusiveLock);
return true;
}
extern Datum extern Datum
ts_bgw_job_entrypoint(PG_FUNCTION_ARGS) ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
{ {
@ -1016,6 +1108,9 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
elog(ERROR, "job %d not found when running the background worker", params.job_id); elog(ERROR, "job %d not found when running the background worker", params.job_id);
pgstat_report_appname(NameStr(job->fd.application_name)); pgstat_report_appname(NameStr(job->fd.application_name));
MemoryContext oldcontext = CurrentMemoryContext;
TimestampTz start_time = DT_NOBEGIN, finish_time = DT_NOBEGIN;
NameData proc_schema = { 0 }, proc_name = { 0 };
PG_TRY(); PG_TRY();
{ {
@ -1067,17 +1162,43 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
{ {
ts_bgw_job_stat_mark_end(job, JOB_FAILURE); ts_bgw_job_stat_mark_end(job, JOB_FAILURE);
ts_bgw_job_check_max_retries(job); ts_bgw_job_check_max_retries(job);
namestrcpy(&proc_name, NameStr(job->fd.proc_name));
namestrcpy(&proc_schema, NameStr(job->fd.proc_schema));
pfree(job); pfree(job);
job = NULL; job = NULL;
} }
CommitTransactionCommand();
/* /*
* the rethrow will log the error; but also log which job threw the * the rethrow will log the error; but also log which job threw the
* error * error
*/ */
elog(LOG, "job %d threw an error", params.job_id); elog(LOG, "job %d threw an error", params.job_id);
PG_RE_THROW();
ErrorData *edata;
FormData_job_error jerr = { 0 };
// switch away from error context to not lose the data
MemoryContextSwitchTo(oldcontext);
edata = CopyErrorData();
BgwJobStat *job_stat = ts_bgw_job_stat_find(params.job_id);
if (job_stat != NULL)
{
start_time = job_stat->fd.last_start;
finish_time = job_stat->fd.last_finish;
}
/* We include the procname in the error data and expose it in the view
to avoid adding an extra field in the table */
jerr.error_data = ts_errdata_to_jsonb(edata, &proc_schema, &proc_name);
jerr.job_id = params.job_id;
jerr.start_time = start_time;
jerr.finish_time = finish_time;
jerr.pid = MyProcPid;
ts_job_errors_insert_tuple(&jerr);
CommitTransactionCommand();
FlushErrorState();
ReThrowError(edata);
} }
PG_END_TRY(); PG_END_TRY();

View File

@ -57,5 +57,5 @@ extern void ts_bgw_job_set_scheduler_test_hook(scheduler_test_hook_type hook);
extern void ts_bgw_job_set_job_entrypoint_function_name(char *func_name); extern void ts_bgw_job_set_job_entrypoint_function_name(char *func_name);
extern bool ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial_runs, extern bool ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial_runs,
Interval *next_interval); Interval *next_interval);
extern TSDLLEXPORT bool ts_job_errors_insert_tuple(const FormData_job_error *jerr);
#endif /* BGW_JOB_H */ #endif /* BGW_JOB_H */

View File

@ -14,6 +14,8 @@
#include "scanner.h" #include "scanner.h"
#include "timer.h" #include "timer.h"
#include "utils.h" #include "utils.h"
#include "jsonb_utils.h"
#include <utils/builtins.h>
#define MAX_INTERVALS_BACKOFF 5 #define MAX_INTERVALS_BACKOFF 5
#define MAX_FAILURES_MULTIPLIER 20 #define MAX_FAILURES_MULTIPLIER 20
@ -153,7 +155,7 @@ bgw_job_stat_tuple_mark_start(TupleInfo *ti, void *const data)
fd->last_run_success = false; fd->last_run_success = false;
fd->total_crashes++; fd->total_crashes++;
fd->consecutive_crashes++; fd->consecutive_crashes++;
fd->flags = ts_clear_flags_32(fd->flags, LAST_CRASH_REPORTED);
ts_catalog_update(ti->scanrel, new_tuple); ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple); heap_freetuple(new_tuple);
@ -338,6 +340,7 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
fd->last_run_success = result_ctx->result == JOB_SUCCESS ? true : false; fd->last_run_success = result_ctx->result == JOB_SUCCESS ? true : false;
fd->total_crashes--; fd->total_crashes--;
fd->consecutive_crashes = 0; fd->consecutive_crashes = 0;
fd->flags = ts_clear_flags_32(fd->flags, LAST_CRASH_REPORTED);
if (result_ctx->result == JOB_SUCCESS) if (result_ctx->result == JOB_SUCCESS)
{ {
@ -372,6 +375,25 @@ bgw_job_stat_tuple_mark_end(TupleInfo *ti, void *const data)
return SCAN_DONE; return SCAN_DONE;
} }
static ScanTupleResult
bgw_job_stat_tuple_mark_crash_reported(TupleInfo *ti, void *const data)
{
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
HeapTuple new_tuple = heap_copytuple(tuple);
FormData_bgw_job_stat *fd = (FormData_bgw_job_stat *) GETSTRUCT(new_tuple);
if (should_free)
heap_freetuple(tuple);
fd->flags = ts_set_flags_32(fd->flags, LAST_CRASH_REPORTED);
ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple);
return SCAN_DONE;
}
static ScanTupleResult static ScanTupleResult
bgw_job_stat_tuple_set_next_start(TupleInfo *ti, void *const data) bgw_job_stat_tuple_set_next_start(TupleInfo *ti, void *const data)
{ {
@ -421,6 +443,8 @@ bgw_job_stat_insert_relation(Relation rel, int32 bgw_job_id, bool mark_start,
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_success)] = Int64GetDatum(0); values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_success)] = Int64GetDatum(0);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_failures)] = Int64GetDatum(0); values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_total_failures)] = Int64GetDatum(0);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_failures)] = Int32GetDatum(0); values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_consecutive_failures)] = Int32GetDatum(0);
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_flags)] =
Int32GetDatum(JOB_STAT_FLAGS_DEFAULT);
if (mark_start) if (mark_start)
{ {
@ -480,6 +504,18 @@ ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result)
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
} }
void
ts_bgw_job_stat_mark_crash_reported(int32 bgw_job_id)
{
if (!bgw_job_stat_scan_job_id(bgw_job_id,
bgw_job_stat_tuple_mark_crash_reported,
NULL,
NULL,
RowExclusiveLock))
elog(ERROR, "unable to find job statistics for job %d", bgw_job_id);
pgstat_report_activity(STATE_IDLE, NULL);
}
bool bool
ts_bgw_job_stat_end_was_marked(BgwJobStat *jobstat) ts_bgw_job_stat_end_was_marked(BgwJobStat *jobstat)
{ {
@ -562,7 +598,34 @@ ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job, int32 consecutive_f
return DT_NOBEGIN; return DT_NOBEGIN;
if (jobstat->fd.consecutive_crashes > 0) if (jobstat->fd.consecutive_crashes > 0)
{
/* Update the errors table regarding the crash */
if (!ts_flags_are_set_32(jobstat->fd.flags, LAST_CRASH_REPORTED))
{
/* add the proc_schema, proc_name to the jsonb */
NameData proc_schema = { 0 }, proc_name = { 0 };
namestrcpy(&proc_schema, NameStr(job->fd.proc_schema));
namestrcpy(&proc_name, NameStr(job->fd.proc_name));
JsonbParseState *parse_state = NULL;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
ts_jsonb_add_str(parse_state, "proc_schema", NameStr(proc_schema));
ts_jsonb_add_str(parse_state, "proc_name", NameStr(proc_name));
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
const FormData_job_error jerr = {
.error_data = JsonbValueToJsonb(result),
.start_time = jobstat->fd.last_start,
.finish_time = ts_timer_get_current_timestamp(),
.pid = -1,
.job_id = jobstat->fd.id,
};
ts_job_errors_insert_tuple(&jerr);
ts_bgw_job_stat_mark_crash_reported(jobstat->fd.id);
}
return calculate_next_start_on_crash(jobstat->fd.consecutive_crashes, job); return calculate_next_start_on_crash(jobstat->fd.consecutive_crashes, job);
}
return jobstat->fd.next_start; return jobstat->fd.next_start;
} }

View File

@ -9,6 +9,9 @@
#include "ts_catalog/catalog.h" #include "ts_catalog/catalog.h"
#include "job.h" #include "job.h"
#define JOB_STAT_FLAGS_DEFAULT 0
#define LAST_CRASH_REPORTED 1
typedef struct BgwJobStat typedef struct BgwJobStat
{ {
FormData_bgw_job_stat fd; FormData_bgw_job_stat fd;
@ -38,5 +41,6 @@ extern bool ts_bgw_job_stat_should_execute(BgwJobStat *jobstat, BgwJob *job);
extern TimestampTz ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job, extern TimestampTz ts_bgw_job_stat_next_start(BgwJobStat *jobstat, BgwJob *job,
int32 consecutive_failed_starts); int32 consecutive_failed_starts);
extern TSDLLEXPORT void ts_bgw_job_stat_mark_crash_reported(int32 bgw_job_id);
#endif /* BGW_JOB_STAT_H */ #endif /* BGW_JOB_STAT_H */

View File

@ -224,6 +224,8 @@ worker_state_cleanup(ScheduledBgwJob *sjob)
* Usually the job process will mark the end, but if the job gets * Usually the job process will mark the end, but if the job gets
* a signal (cancel or terminate), it won't be able to so we * a signal (cancel or terminate), it won't be able to so we
* should. * should.
* TODO: Insert a record in the job_errors table informing of this failure
* Currently the SIGTERM case is not handled, there might be other cases as well
*/ */
elog(LOG, "job %d failed", sjob->job.fd.id); elog(LOG, "job %d failed", sjob->job.fd.id);
mark_job_as_ended(sjob, JOB_FAILURE); mark_job_as_ended(sjob, JOB_FAILURE);

View File

@ -116,6 +116,10 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
.schema_name = CATALOG_SCHEMA_NAME, .schema_name = CATALOG_SCHEMA_NAME,
.table_name = CONTINUOUS_AGGS_BUCKET_FUNCTION_TABLE_NAME, .table_name = CONTINUOUS_AGGS_BUCKET_FUNCTION_TABLE_NAME,
}, },
[JOB_ERRORS] = {
.schema_name = INTERNAL_SCHEMA_NAME,
.table_name = JOB_ERRORS_TABLE_NAME,
},
[_MAX_CATALOG_TABLES] = { [_MAX_CATALOG_TABLES] = {
.schema_name = "invalid schema", .schema_name = "invalid schema",
.table_name = "invalid table", .table_name = "invalid table",

View File

@ -56,6 +56,7 @@ typedef enum CatalogTable
REMOTE_TXN, REMOTE_TXN,
CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION,
CONTINUOUS_AGGS_BUCKET_FUNCTION, CONTINUOUS_AGGS_BUCKET_FUNCTION,
JOB_ERRORS,
/* Don't forget updating catalog.c when adding new tables! */ /* Don't forget updating catalog.c when adding new tables! */
_MAX_CATALOG_TABLES, _MAX_CATALOG_TABLES,
} CatalogTable; } CatalogTable;
@ -779,6 +780,7 @@ enum Anum_bgw_job_stat
Anum_bgw_job_stat_total_crashes, Anum_bgw_job_stat_total_crashes,
Anum_bgw_job_stat_consecutive_failures, Anum_bgw_job_stat_consecutive_failures,
Anum_bgw_job_stat_consecutive_crashes, Anum_bgw_job_stat_consecutive_crashes,
Anum_bgw_job_stat_flags,
_Anum_bgw_job_stat_max, _Anum_bgw_job_stat_max,
}; };
@ -799,6 +801,7 @@ typedef struct FormData_bgw_job_stat
int64 total_crashes; int64 total_crashes;
int32 consecutive_failures; int32 consecutive_failures;
int32 consecutive_crashes; int32 consecutive_crashes;
int32 flags;
} FormData_bgw_job_stat; } FormData_bgw_job_stat;
typedef FormData_bgw_job_stat *Form_bgw_job_stat; typedef FormData_bgw_job_stat *Form_bgw_job_stat;
@ -1379,6 +1382,31 @@ typedef struct CatalogSecurityContext
int saved_security_context; int saved_security_context;
} CatalogSecurityContext; } CatalogSecurityContext;
#define JOB_ERRORS_TABLE_NAME "job_errors"
enum Anum_job_error
{
Anum_job_error_job_id = 1,
Anum_job_error_pid,
Anum_job_error_start_time,
Anum_job_error_finish_time,
Anum_job_error_error_data,
_Anum_job_error_max,
};
#define Natts_job_error (_Anum_job_error_max - 1)
typedef struct FormData_job_error
{
int32 job_id;
int32 pid;
TimestampTz start_time;
TimestampTz finish_time;
Jsonb *error_data;
} FormData_job_error;
typedef FormData_job_error *Form_job_error;
extern void ts_catalog_table_info_init(CatalogTableInfo *tables, int max_table, extern void ts_catalog_table_info_init(CatalogTableInfo *tables, int max_table,
const TableInfoDef *table_ary, const TableInfoDef *table_ary,
const TableIndexDef *index_ary, const char **serial_id_ary); const TableIndexDef *index_ary, const char **serial_id_ary);

View File

@ -566,7 +566,8 @@ ORDER BY c.id;
_timescaledb_internal | _hyper_3_18_chunk | table | default_perm_user _timescaledb_internal | _hyper_3_18_chunk | table | default_perm_user
_timescaledb_internal | bgw_job_stat | table | super_user _timescaledb_internal | bgw_job_stat | table | super_user
_timescaledb_internal | bgw_policy_chunk_stats | table | super_user _timescaledb_internal | bgw_policy_chunk_stats | table | super_user
(16 rows) _timescaledb_internal | job_errors | table | super_user
(17 rows)
-- next two calls of show_chunks should give same set of chunks as above when combined -- next two calls of show_chunks should give same set of chunks as above when combined
SELECT show_chunks('drop_chunk_test1'); SELECT show_chunks('drop_chunk_test1');

View File

@ -225,7 +225,8 @@ SELECT * FROM _timescaledb_catalog.hypertable;
-----------------------+------------------------+-------+------------ -----------------------+------------------------+-------+------------
_timescaledb_internal | bgw_job_stat | table | super_user _timescaledb_internal | bgw_job_stat | table | super_user
_timescaledb_internal | bgw_policy_chunk_stats | table | super_user _timescaledb_internal | bgw_policy_chunk_stats | table | super_user
(2 rows) _timescaledb_internal | job_errors | table | super_user
(3 rows)
-- Test that renaming ordinary table works -- Test that renaming ordinary table works
CREATE TABLE renametable (foo int); CREATE TABLE renametable (foo int);

View File

@ -560,6 +560,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
--------------------------------------------------- ---------------------------------------------------
timescaledb_experimental.policies timescaledb_experimental.policies
timescaledb_experimental.chunk_replication_status timescaledb_experimental.chunk_replication_status
timescaledb_information.job_errors
timescaledb_information.compression_settings timescaledb_information.compression_settings
timescaledb_information.dimensions timescaledb_information.dimensions
timescaledb_information.chunks timescaledb_information.chunks
@ -576,7 +577,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
_timescaledb_internal.bgw_policy_chunk_stats _timescaledb_internal.bgw_policy_chunk_stats
_timescaledb_internal.bgw_job_stat _timescaledb_internal.bgw_job_stat
_timescaledb_catalog.tablespace_id_seq _timescaledb_catalog.tablespace_id_seq
(18 rows) (19 rows)
-- Make sure we can't run our restoring functions as a normal perm user as that would disable functionality for the whole db -- Make sure we can't run our restoring functions as a normal perm user as that would disable functionality for the whole db
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER

View File

@ -73,7 +73,9 @@ SELECT add_job('custom_func_definer', '1h', config:='{"type":"function"}'::jsonb
1004 1004
(1 row) (1 row)
SELECT * FROM timescaledb_information.jobs WHERE job_id != 1 ORDER BY 1; -- exclude the telemetry[1] and job error retention[2] jobs
-- job 2 may have already run which will set its next_start field thus making the test flaky
SELECT * FROM timescaledb_information.jobs WHERE job_id NOT IN (1,2) ORDER BY 1;
job_id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | config | next_start | hypertable_schema | hypertable_name | check_schema | check_name job_id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | config | next_start | hypertable_schema | hypertable_name | check_schema | check_name
--------+----------------------------+-------------------+-------------+-------------+--------------+-------------+---------------------+-------------------+-----------+-----------------------+------------+-------------------+-----------------+--------------+------------ --------+----------------------------+-------------------+-------------+-------------+--------------+-------------+---------------------+-------------------+-----------+-----------------------+------------+-------------------+-----------------+--------------+------------
1000 | User-Defined Action [1000] | @ 1 hour | @ 0 | -1 | @ 5 mins | public | custom_func | default_perm_user | t | {"type": "function"} | | | | | 1000 | User-Defined Action [1000] | @ 1 hour | @ 0 | -1 | @ 5 mins | public | custom_func | default_perm_user | t | {"type": "function"} | | | | |

View File

@ -106,11 +106,12 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50);
(1 row) (1 row)
-- empty -- empty
-- turn on extended display to make the many fields of the table easier to parse
\x on
SELECT * FROM _timescaledb_internal.bgw_job_stat; SELECT * FROM _timescaledb_internal.bgw_job_stat;
job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes
--------+------------+-------------+------------+------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+---------------------
(0 rows) (0 rows)
\x off
-- empty -- empty
SELECT * FROM sorted_bgw_log; SELECT * FROM sorted_bgw_log;
msg_no | application_name | msg msg_no | application_name | msg
@ -141,11 +142,11 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50);
(1 row) (1 row)
-- empty -- empty
\x on
SELECT * FROM _timescaledb_internal.bgw_job_stat; SELECT * FROM _timescaledb_internal.bgw_job_stat;
job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes
--------+------------+-------------+------------+------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+---------------------
(0 rows) (0 rows)
\x off
SELECT * FROM timescaledb_information.job_stats; SELECT * FROM timescaledb_information.job_stats;
hypertable_schema | hypertable_name | job_id | last_run_started_at | last_successful_finish | last_run_status | job_status | last_run_duration | next_start | total_runs | total_successes | total_failures hypertable_schema | hypertable_name | job_id | last_run_started_at | last_successful_finish | last_run_status | job_status | last_run_duration | next_start | total_runs | total_successes | total_failures
-------------------+-----------------+--------+---------------------+------------------------+-----------------+------------+-------------------+------------+------------+-----------------+---------------- -------------------+-----------------+--------+---------------------+------------------------+-----------------+------------+-------------------+------------+------------+-----------------+----------------
@ -163,12 +164,25 @@ SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50);
(1 row) (1 row)
\x on
SELECT * FROM _timescaledb_internal.bgw_job_stat; SELECT * FROM _timescaledb_internal.bgw_job_stat;
job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes -[ RECORD 1 ]----------+--------------------------------
--------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+--------------------- job_id | 1000
1000 | Fri Dec 31 16:00:00.05 1999 PST | Fri Dec 31 16:00:00.05 1999 PST | Fri Dec 31 16:00:00.15 1999 PST | Fri Dec 31 16:00:00.05 1999 PST | t | 1 | @ 0 | 1 | 0 | 0 | 0 | 0 last_start | Fri Dec 31 16:00:00.05 1999 PST
(1 row) last_finish | Fri Dec 31 16:00:00.05 1999 PST
next_start | Fri Dec 31 16:00:00.15 1999 PST
last_successful_finish | Fri Dec 31 16:00:00.05 1999 PST
last_run_success | t
total_runs | 1
total_duration | @ 0
total_successes | 1
total_failures | 0
total_crashes | 0
consecutive_failures | 0
consecutive_crashes | 0
flags | 0
\x off
SELECT * FROM timescaledb_information.job_stats; SELECT * FROM timescaledb_information.job_stats;
hypertable_schema | hypertable_name | job_id | last_run_started_at | last_successful_finish | last_run_status | job_status | last_run_duration | next_start | total_runs | total_successes | total_failures hypertable_schema | hypertable_name | job_id | last_run_started_at | last_successful_finish | last_run_status | job_status | last_run_duration | next_start | total_runs | total_successes | total_failures
-------------------+-----------------+--------+---------------------------------+---------------------------------+-----------------+------------+-------------------+---------------------------------+------------+-----------------+---------------- -------------------+-----------------+--------+---------------------------------+---------------------------------+-----------------+------------+-------------------+---------------------------------+------------+-----------------+----------------
@ -1433,12 +1447,25 @@ SELECT wait_for_job_1_to_run(2);
t t
(1 row) (1 row)
\x on
select * from _timescaledb_internal.bgw_job_stat; select * from _timescaledb_internal.bgw_job_stat;
job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes -[ RECORD 1 ]----------+--------------------------------
--------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+--------------------- job_id | 1024
1024 | Fri Dec 31 16:00:00.15 1999 PST | Fri Dec 31 16:00:00.15 1999 PST | Fri Dec 31 16:00:00.25 1999 PST | Fri Dec 31 16:00:00.15 1999 PST | t | 2 | @ 0 | 2 | 0 | 0 | 0 | 0 last_start | Fri Dec 31 16:00:00.15 1999 PST
(1 row) last_finish | Fri Dec 31 16:00:00.15 1999 PST
next_start | Fri Dec 31 16:00:00.25 1999 PST
last_successful_finish | Fri Dec 31 16:00:00.15 1999 PST
last_run_success | t
total_runs | 2
total_duration | @ 0
total_successes | 2
total_failures | 0
total_crashes | 0
consecutive_failures | 0
consecutive_crashes | 0
flags | 0
\x off
SELECT delete_job(x.id) FROM (select * from _timescaledb_config.bgw_job) x; SELECT delete_job(x.id) FROM (select * from _timescaledb_config.bgw_job) x;
delete_job delete_job
------------ ------------
@ -1552,12 +1579,25 @@ SELECT * FROM sorted_bgw_log;
0 | new_job | Execute job 1 0 | new_job | Execute job 1
(16 rows) (16 rows)
\x on
SELECT * FROM _timescaledb_internal.bgw_job_stat; SELECT * FROM _timescaledb_internal.bgw_job_stat;
job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes -[ RECORD 1 ]----------+--------------------------------
--------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+--------------------- job_id | 1025
1025 | Fri Dec 31 16:00:00.48 1999 PST | Fri Dec 31 16:00:00.48 1999 PST | Fri Dec 31 16:00:00.49 1999 PST | Fri Dec 31 16:00:00.48 1999 PST | t | 2 | @ 0 | 2 | 0 | 0 | 0 | 0 last_start | Fri Dec 31 16:00:00.48 1999 PST
(1 row) last_finish | Fri Dec 31 16:00:00.48 1999 PST
next_start | Fri Dec 31 16:00:00.49 1999 PST
last_successful_finish | Fri Dec 31 16:00:00.48 1999 PST
last_run_success | t
total_runs | 2
total_duration | @ 0
total_successes | 2
total_failures | 0
total_crashes | 0
consecutive_failures | 0
consecutive_crashes | 0
flags | 0
\x off
-- clean up jobs -- clean up jobs
SELECT _timescaledb_internal.stop_background_workers(); SELECT _timescaledb_internal.stop_background_workers();
stop_background_workers stop_background_workers

View File

@ -616,11 +616,12 @@ SELECT proc_name, count(*)
FROM _timescaledb_config.bgw_job FROM _timescaledb_config.bgw_job
WHERE proc_name NOT LIKE '%telemetry%' WHERE proc_name NOT LIKE '%telemetry%'
GROUP BY proc_name; GROUP BY proc_name;
proc_name | count proc_name | count
------------------+------- ----------------------------+-------
policy_reorder | 1 policy_job_error_retention | 1
policy_retention | 2 policy_reorder | 1
(2 rows) policy_retention | 2
(3 rows)
-- test that the behavior is strict when providing NULL required arguments -- test that the behavior is strict when providing NULL required arguments
create table test_strict (time timestamptz not null, a int, b int); create table test_strict (time timestamptz not null, a int, b int);

View File

@ -230,9 +230,9 @@ SELECT * FROM timescaledb_information.jobs WHERE job_id=:reorder_job_id;
SELECT * SELECT *
FROM _timescaledb_internal.bgw_job_stat FROM _timescaledb_internal.bgw_job_stat
where job_id=:reorder_job_id; where job_id=:reorder_job_id;
job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes | flags
--------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+--------------------- --------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+---------------------+-------
1000 | Fri Dec 31 16:00:00.05 1999 PST | Fri Dec 31 16:00:00.05 1999 PST | Tue Jan 04 16:00:00.05 2000 PST | Fri Dec 31 16:00:00.05 1999 PST | t | 3 | @ 0 | 3 | 0 | 0 | 0 | 0 1000 | Fri Dec 31 16:00:00.05 1999 PST | Fri Dec 31 16:00:00.05 1999 PST | Tue Jan 04 16:00:00.05 2000 PST | Fri Dec 31 16:00:00.05 1999 PST | t | 3 | @ 0 | 3 | 0 | 0 | 0 | 0 | 0
(1 row) (1 row)
-- three chunks clustered -- three chunks clustered
@ -274,9 +274,9 @@ SELECT * FROM timescaledb_information.jobs WHERE job_id=:reorder_job_id;
SELECT * SELECT *
FROM _timescaledb_internal.bgw_job_stat FROM _timescaledb_internal.bgw_job_stat
where job_id=:reorder_job_id; where job_id=:reorder_job_id;
job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes job_id | last_start | last_finish | next_start | last_successful_finish | last_run_success | total_runs | total_duration | total_successes | total_failures | total_crashes | consecutive_failures | consecutive_crashes | flags
--------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+--------------------- --------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+------------------+------------+----------------+-----------------+----------------+---------------+----------------------+---------------------+-------
1000 | Fri Dec 31 16:00:00.05 1999 PST | Fri Dec 31 16:00:00.05 1999 PST | Tue Jan 04 16:00:00.05 2000 PST | Fri Dec 31 16:00:00.05 1999 PST | t | 3 | @ 0 | 3 | 0 | 0 | 0 | 0 1000 | Fri Dec 31 16:00:00.05 1999 PST | Fri Dec 31 16:00:00.05 1999 PST | Tue Jan 04 16:00:00.05 2000 PST | Fri Dec 31 16:00:00.05 1999 PST | t | 3 | @ 0 | 3 | 0 | 0 | 0 | 0 | 0
(1 row) (1 row)
-- still have 3 chunks clustered -- still have 3 chunks clustered

View File

@ -0,0 +1,163 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
CREATE FUNCTION wait_for_retention_job_to_run_successfully(expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS
$BODY$
DECLARE
r RECORD;
BEGIN
FOR i in 1..spins
LOOP
SELECT total_successes, total_failures FROM _timescaledb_internal.bgw_job_stat WHERE job_id=2 INTO r;
IF (r.total_successes = expected_runs) THEN
RETURN true;
ELSEIF (r.total_successes > expected_runs) THEN
RAISE 'num_runs > expected';
ELSE
PERFORM pg_sleep(0.1);
END IF;
END LOOP;
RAISE INFO 'wait_for_job_to_run: timeout after % tries', spins;
RETURN false;
END
$BODY$;
\set client_min_messages TO NOTICE;
create or replace procedure job_fail(jobid int, config jsonb) language plpgsql as $$
begin
perform pg_sleep(2);
raise exception 'raising an exception';
end
$$;
-- very simple case: job that raises an exception
select add_job('job_fail', '4 minutes') as jobf_id \gset
-- test jobs that try to update concurrently
CREATE TABLE custom_log (
a int,
b int,
msg text
);
insert into custom_log values (0, 0, 'msg0');
ALTER SYSTEM SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
-- test a concurrent update
CREATE OR REPLACE PROCEDURE custom_proc1(jobid int, config jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
UPDATE custom_log set msg = 'msg1' where msg = 'msg0';
perform pg_sleep(10);
COMMIT;
END
$$;
CREATE OR REPLACE PROCEDURE custom_proc2(jobid int, config jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
UPDATE custom_log set msg = 'msg2' where msg = 'msg0';
perform pg_sleep(10);
COMMIT;
END
$$;
select add_job('custom_proc1', '2 min', initial_start => now());
add_job
---------
1001
(1 row)
-- to make sure custom_log is first updated by custom_proc_1
select add_job('custom_proc2', '2 min', initial_start => now() + interval '5 seconds');
add_job
---------
1002
(1 row)
SELECT _timescaledb_internal.start_background_workers();
start_background_workers
--------------------------
t
(1 row)
-- enough time to for job_fail to fail
select pg_sleep(10);
pg_sleep
----------
(1 row)
select job_id, error_data->'proc_name' as proc_name, error_data->>'message' as err_message, error_data->>'sqlerrcode' as sqlerrcode
from _timescaledb_internal.job_errors where job_id = :jobf_id;
job_id | proc_name | err_message | sqlerrcode
--------+------------+----------------------+------------
1000 | "job_fail" | raising an exception | P0001
(1 row)
select delete_job(:jobf_id);
delete_job
------------
(1 row)
select pg_sleep(20);
pg_sleep
----------
(1 row)
-- exclude the retention policy
select job_id, error_data->>'message' as err_message, error_data->>'sqlerrcode' as sqlerrcode
from _timescaledb_internal.job_errors WHERE job_id != 2;
job_id | err_message | sqlerrcode
--------+-----------------------------------------------------+------------
1000 | raising an exception | P0001
1002 | could not serialize access due to concurrent update | 40001
(2 rows)
ALTER SYSTEM RESET DEFAULT_TRANSACTION_ISOLATION;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
-- test the retention job
SELECT next_start FROM alter_job(2, next_start => '2060-01-01 00:00:00+00'::timestamptz);
next_start
------------------------------
Wed Dec 31 16:00:00 2059 PST
(1 row)
TRUNCATE TABLE _timescaledb_internal.job_errors;
INSERT INTO _timescaledb_internal.job_errors(job_id, pid, start_time, finish_time, error_data)
VALUES (123, 12345, '2000-01-01 00:00:00+00'::timestamptz, '2000-01-01 00:00:10+00'::timestamptz, '{}'),
(456, 45678, '2000-01-01 00:00:20+00'::timestamptz, '2000-01-01 00:00:40+00'::timestamptz, '{}'),
-- not older than a month
(123, 23456, '2050-01-01 00:00:00+00'::timestamptz, '2050-01-01 00:00:10+00'::timestamptz, '{}');
-- 3 rows in the table before policy runs
SELECT * FROM _timescaledb_internal.job_errors;
job_id | pid | start_time | finish_time | error_data
--------+-------+------------------------------+------------------------------+------------
123 | 12345 | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 16:00:10 1999 PST | {}
456 | 45678 | Fri Dec 31 16:00:20 1999 PST | Fri Dec 31 16:00:40 1999 PST | {}
123 | 23456 | Fri Dec 31 16:00:00 2049 PST | Fri Dec 31 16:00:10 2049 PST | {}
(3 rows)
-- drop all job_stats for the retention job
DELETE FROM _timescaledb_internal.bgw_job_stat WHERE job_id = 2;
SELECT next_start FROM alter_job(2, next_start => now() + interval '2 seconds') \gset
SELECT wait_for_retention_job_to_run_successfully(1);
wait_for_retention_job_to_run_successfully
--------------------------------------------
t
(1 row)
-- only the last row remains
SELECT * FROM _timescaledb_internal.job_errors;
job_id | pid | start_time | finish_time | error_data
--------+-------+------------------------------+------------------------------+------------
123 | 23456 | Fri Dec 31 16:00:00 2049 PST | Fri Dec 31 16:00:10 2049 PST | {}
(1 row)

View File

@ -112,6 +112,8 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
_timescaledb_internal.policy_compression(integer,jsonb) _timescaledb_internal.policy_compression(integer,jsonb)
_timescaledb_internal.policy_compression_check(jsonb) _timescaledb_internal.policy_compression_check(jsonb)
_timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean)
_timescaledb_internal.policy_job_error_retention(integer,jsonb)
_timescaledb_internal.policy_job_error_retention_check(jsonb)
_timescaledb_internal.policy_recompression(integer,jsonb) _timescaledb_internal.policy_recompression(integer,jsonb)
_timescaledb_internal.policy_refresh_continuous_aggregate(integer,jsonb) _timescaledb_internal.policy_refresh_continuous_aggregate(integer,jsonb)
_timescaledb_internal.policy_refresh_continuous_aggregate_check(jsonb) _timescaledb_internal.policy_refresh_continuous_aggregate_check(jsonb)

View File

@ -31,6 +31,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug)
APPEND APPEND
TEST_FILES TEST_FILES
bgw_db_scheduler.sql bgw_db_scheduler.sql
troubleshooting_job_errors.sql
bgw_reorder_drop_chunks.sql bgw_reorder_drop_chunks.sql
compress_bgw_reorder_drop_chunks.sql compress_bgw_reorder_drop_chunks.sql
chunk_api.sql chunk_api.sql
@ -98,6 +99,7 @@ endif()
set(SOLO_TESTS set(SOLO_TESTS
bgw_db_scheduler bgw_db_scheduler
troubleshooting_job_errors
bgw_reorder_drop_chunks bgw_reorder_drop_chunks
compress_bgw_reorder_drop_chunks compress_bgw_reorder_drop_chunks
compression_ddl compression_ddl

View File

@ -51,7 +51,9 @@ SELECT add_job('custom_proc2','1h', config:= '{"type":"procedure"}'::jsonb);
SELECT add_job('custom_func', '1h', config:='{"type":"function"}'::jsonb); SELECT add_job('custom_func', '1h', config:='{"type":"function"}'::jsonb);
SELECT add_job('custom_func_definer', '1h', config:='{"type":"function"}'::jsonb); SELECT add_job('custom_func_definer', '1h', config:='{"type":"function"}'::jsonb);
SELECT * FROM timescaledb_information.jobs WHERE job_id != 1 ORDER BY 1; -- exclude the telemetry[1] and job error retention[2] jobs
-- job 2 may have already run which will set its next_start field thus making the test flaky
SELECT * FROM timescaledb_information.jobs WHERE job_id NOT IN (1,2) ORDER BY 1;
SELECT count(*) FROM _timescaledb_config.bgw_job WHERE config->>'type' IN ('procedure', 'function'); SELECT count(*) FROM _timescaledb_config.bgw_job WHERE config->>'type' IN ('procedure', 'function');

View File

@ -116,7 +116,10 @@ SELECT ts_bgw_params_create();
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50); SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50);
-- empty -- empty
-- turn on extended display to make the many fields of the table easier to parse
\x on
SELECT * FROM _timescaledb_internal.bgw_job_stat; SELECT * FROM _timescaledb_internal.bgw_job_stat;
\x off
-- empty -- empty
SELECT * FROM sorted_bgw_log; SELECT * FROM sorted_bgw_log;
@ -129,12 +132,16 @@ SELECT ts_bgw_params_reset_time();
SELECT insert_job('unscheduled', 'bgw_test_job_1', INTERVAL '100ms', INTERVAL '100s', INTERVAL '1s',scheduled:= false); SELECT insert_job('unscheduled', 'bgw_test_job_1', INTERVAL '100ms', INTERVAL '100s', INTERVAL '1s',scheduled:= false);
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50); SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50);
-- empty -- empty
\x on
SELECT * FROM _timescaledb_internal.bgw_job_stat; SELECT * FROM _timescaledb_internal.bgw_job_stat;
\x off
SELECT * FROM timescaledb_information.job_stats; SELECT * FROM timescaledb_information.job_stats;
SELECT test_toggle_scheduled(1000); SELECT test_toggle_scheduled(1000);
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50); SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(50);
\x on
SELECT * FROM _timescaledb_internal.bgw_job_stat; SELECT * FROM _timescaledb_internal.bgw_job_stat;
\x off
SELECT * FROM timescaledb_information.job_stats; SELECT * FROM timescaledb_information.job_stats;
SELECT * FROM sorted_bgw_log; SELECT * FROM sorted_bgw_log;
@ -611,7 +618,9 @@ SELECT ts_bgw_params_reset_time(150000, true);
SELECT wait_for_timer_to_run(150000); SELECT wait_for_timer_to_run(150000);
SELECT wait_for_job_1_to_run(2); SELECT wait_for_job_1_to_run(2);
\x on
select * from _timescaledb_internal.bgw_job_stat; select * from _timescaledb_internal.bgw_job_stat;
\x off
SELECT delete_job(x.id) FROM (select * from _timescaledb_config.bgw_job) x; SELECT delete_job(x.id) FROM (select * from _timescaledb_config.bgw_job) x;
-- test null handling in delete_job -- test null handling in delete_job
@ -642,8 +651,9 @@ SELECT wait_for_job_1_to_run(4);
SELECT ts_bgw_params_reset_time(500000, true); SELECT ts_bgw_params_reset_time(500000, true);
SELECT * FROM sorted_bgw_log; SELECT * FROM sorted_bgw_log;
\x on
SELECT * FROM _timescaledb_internal.bgw_job_stat; SELECT * FROM _timescaledb_internal.bgw_job_stat;
\x off
-- clean up jobs -- clean up jobs
SELECT _timescaledb_internal.stop_background_workers(); SELECT _timescaledb_internal.stop_background_workers();

View File

@ -0,0 +1,106 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
CREATE FUNCTION wait_for_retention_job_to_run_successfully(expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS
$BODY$
DECLARE
r RECORD;
BEGIN
FOR i in 1..spins
LOOP
SELECT total_successes, total_failures FROM _timescaledb_internal.bgw_job_stat WHERE job_id=2 INTO r;
IF (r.total_successes = expected_runs) THEN
RETURN true;
ELSEIF (r.total_successes > expected_runs) THEN
RAISE 'num_runs > expected';
ELSE
PERFORM pg_sleep(0.1);
END IF;
END LOOP;
RAISE INFO 'wait_for_job_to_run: timeout after % tries', spins;
RETURN false;
END
$BODY$;
\set client_min_messages TO NOTICE;
create or replace procedure job_fail(jobid int, config jsonb) language plpgsql as $$
begin
perform pg_sleep(2);
raise exception 'raising an exception';
end
$$;
-- very simple case: job that raises an exception
select add_job('job_fail', '4 minutes') as jobf_id \gset
-- test jobs that try to update concurrently
CREATE TABLE custom_log (
a int,
b int,
msg text
);
insert into custom_log values (0, 0, 'msg0');
ALTER SYSTEM SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';
SELECT pg_reload_conf();
-- test a concurrent update
CREATE OR REPLACE PROCEDURE custom_proc1(jobid int, config jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
UPDATE custom_log set msg = 'msg1' where msg = 'msg0';
perform pg_sleep(10);
COMMIT;
END
$$;
CREATE OR REPLACE PROCEDURE custom_proc2(jobid int, config jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
UPDATE custom_log set msg = 'msg2' where msg = 'msg0';
perform pg_sleep(10);
COMMIT;
END
$$;
select add_job('custom_proc1', '2 min', initial_start => now());
-- to make sure custom_log is first updated by custom_proc_1
select add_job('custom_proc2', '2 min', initial_start => now() + interval '5 seconds');
SELECT _timescaledb_internal.start_background_workers();
-- enough time to for job_fail to fail
select pg_sleep(10);
select job_id, error_data->'proc_name' as proc_name, error_data->>'message' as err_message, error_data->>'sqlerrcode' as sqlerrcode
from _timescaledb_internal.job_errors where job_id = :jobf_id;
select delete_job(:jobf_id);
select pg_sleep(20);
-- exclude the retention policy
select job_id, error_data->>'message' as err_message, error_data->>'sqlerrcode' as sqlerrcode
from _timescaledb_internal.job_errors WHERE job_id != 2;
ALTER SYSTEM RESET DEFAULT_TRANSACTION_ISOLATION;
SELECT pg_reload_conf();
-- test the retention job
SELECT next_start FROM alter_job(2, next_start => '2060-01-01 00:00:00+00'::timestamptz);
TRUNCATE TABLE _timescaledb_internal.job_errors;
INSERT INTO _timescaledb_internal.job_errors(job_id, pid, start_time, finish_time, error_data)
VALUES (123, 12345, '2000-01-01 00:00:00+00'::timestamptz, '2000-01-01 00:00:10+00'::timestamptz, '{}'),
(456, 45678, '2000-01-01 00:00:20+00'::timestamptz, '2000-01-01 00:00:40+00'::timestamptz, '{}'),
-- not older than a month
(123, 23456, '2050-01-01 00:00:00+00'::timestamptz, '2050-01-01 00:00:10+00'::timestamptz, '{}');
-- 3 rows in the table before policy runs
SELECT * FROM _timescaledb_internal.job_errors;
-- drop all job_stats for the retention job
DELETE FROM _timescaledb_internal.bgw_job_stat WHERE job_id = 2;
SELECT next_start FROM alter_job(2, next_start => now() + interval '2 seconds') \gset
SELECT wait_for_retention_job_to_run_successfully(1);
-- only the last row remains
SELECT * FROM _timescaledb_internal.job_errors;

View File

@ -0,0 +1,66 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.
use strict;
use warnings;
use TimescaleNode;
use Test::More;
# This test checks that a job crash is reported in the job errors table
# (that is, a record gets inserted into the table _timescaledb_internal.job_errors).
# We cannot do that with a regression test because the server will not recover after
# a crash in that case
my $node = TimescaleNode->create();
# by default PostgresNode doesn't doesn't restart after a crash
# taken from 013_crash_restart.pl
$node->safe_psql(
'postgres',
q[ALTER SYSTEM SET restart_after_crash = 1;
ALTER SYSTEM SET log_connections = 1;
SELECT pg_reload_conf();]);
# create proc to run as job
my $query = <<'END_OF_SQL';
CREATE OR REPLACE PROCEDURE custom_proc_sleep60(jobid int, config jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
RAISE NOTICE 'im about to sleep for 60 seconds, plenty of time for you to kill me';
perform pg_sleep(60);
END
$$;
END_OF_SQL
my $ret = $node->safe_psql('postgres', "$query");
my $query_add =
q[select add_job('custom_proc_sleep60', '5 minutes', initial_start => now())];
my $jobid = $node->safe_psql('postgres', "$query_add");
is($jobid, '1000', 'job was added');
# sleep 10 to make sure job has started running
$node->safe_psql('postgres', "select pg_sleep(10)");
# select the pid of this job in order to kill it
my $query_pid = <<"END_OF_QUERY";
select pid from pg_stat_activity
where application_name like 'User-Defined Action%'
and query like '%custom_proc_sleep60%'
END_OF_QUERY
my $pid = $node->safe_psql('postgres', "$query_pid");
isnt($pid, "", "check the pid is not null");
# now kill the one backend
my $int_pid = int($pid);
kill 9, $int_pid;
# Wait till server restarts
is($node->poll_query_until('postgres', undef, ''),
"1", "reconnected after SIGQUIT");
my $errlog = $node->safe_psql('postgres',
'select count(*) from _timescaledb_internal.job_errors where job_id = 1000 and pid is null'
);
is($errlog, "1", "there is a row for the crash in the error log");
done_testing();

View File

@ -1,6 +1,6 @@
set(PROVE_TEST_FILES 001_simple_multinode.pl 003_connections_privs.pl) set(PROVE_TEST_FILES 001_simple_multinode.pl 003_connections_privs.pl)
set(PROVE_DEBUG_TEST_FILES 002_chunk_copy_move.pl 004_multinode_rdwr_1pc.pl set(PROVE_DEBUG_TEST_FILES 002_chunk_copy_move.pl 004_multinode_rdwr_1pc.pl
005_add_data_node.pl) 005_add_data_node.pl 006_job_crash_log.pl)
if(CMAKE_BUILD_TYPE MATCHES Debug) if(CMAKE_BUILD_TYPE MATCHES Debug)
list(APPEND PROVE_TEST_FILES ${PROVE_DEBUG_TEST_FILES}) list(APPEND PROVE_TEST_FILES ${PROVE_DEBUG_TEST_FILES})