Fix wrong crash error message on job history

Currently while a job is running we set `pid = SchedulerPid`,
`succeed = false` and `execution_finish=NOW()` and it leads to
confusion when querying either `timescaledb_information.job_errors`
or `timescaledb_information.job_history` views showing in the
`err_message = job crash detected, see server logs`. This information
is wrong and create confusion.

Fixed it by setting `succeed=NULL` and `pid=NULL` when the scheduler
launch the job and then when the job worker start to work then set
`pid=MyProcPid` (the worker PID) meaning that the job started and
didn't finished yet, and at the end of the execution we set
`succeed=TRUE or FALSE` and the `execution_finish=NOW()` to mark the
end of the job execution. Also adjusted the views to expose the
information properly.
This commit is contained in:
Fabrízio de Royes Mello 2025-01-28 15:51:26 -03:00
parent 6ce2fc0df4
commit 6063464f6d
16 changed files with 281 additions and 122 deletions

View File

@ -317,7 +317,7 @@ CREATE TABLE _timescaledb_internal.bgw_job_stat_history (
pid INTEGER, pid INTEGER,
execution_start TIMESTAMPTZ NOT NULL DEFAULT NOW(), execution_start TIMESTAMPTZ NOT NULL DEFAULT NOW(),
execution_finish TIMESTAMPTZ, execution_finish TIMESTAMPTZ,
succeeded boolean NOT NULL DEFAULT FALSE, succeeded boolean,
data jsonb, data jsonb,
-- table constraints -- table constraints
CONSTRAINT bgw_job_stat_history_pkey PRIMARY KEY (id) CONSTRAINT bgw_job_stat_history_pkey PRIMARY KEY (id)

View File

@ -1 +1,3 @@
ALTER TABLE _timescaledb_internal.bgw_job_stat_history
ALTER COLUMN succeeded DROP NOT NULL,
ALTER COLUMN succeeded DROP DEFAULT;

View File

@ -0,0 +1,5 @@
UPDATE _timescaledb_internal.bgw_job_stat_history SET succeeded = FALSE WHERE succeeded IS NULL;
ALTER TABLE _timescaledb_internal.bgw_job_stat_history
ALTER COLUMN succeeded SET NOT NULL,
ALTER COLUMN succeeded SET DEFAULT FALSE;

View File

@ -280,33 +280,34 @@ ORDER BY hypertable_name,
CREATE OR REPLACE VIEW timescaledb_information.job_errors CREATE OR REPLACE VIEW timescaledb_information.job_errors
WITH (security_barrier = true) AS WITH (security_barrier = true) AS
SELECT SELECT
job_id, h.job_id,
data->'job'->>'proc_schema' as proc_schema, h.data->'job'->>'proc_schema' as proc_schema,
data->'job'->>'proc_name' as proc_name, h.data->'job'->>'proc_name' as proc_name,
pid, h.pid,
execution_start AS start_time, h.execution_start AS start_time,
execution_finish AS finish_time, h.execution_finish AS finish_time,
data->'error_data'->>'sqlerrcode' AS sqlerrcode, h.data->'error_data'->>'sqlerrcode' AS sqlerrcode,
CASE WHEN data->'error_data'->>'message' IS NOT NULL THEN CASE
CASE WHEN data->'error_data'->>'detail' IS NOT NULL THEN WHEN h.succeeded IS NULL AND h.execution_finish IS NULL AND h.pid IS NULL THEN
CASE WHEN data->'error_data'->>'hint' IS NOT NULL THEN concat(data->'error_data'->>'message', '. ', data->'error_data'->>'detail', '. ', data->'error_data'->>'hint') 'job crash detected, see server logs'
ELSE concat(data->'error_data'->>'message', ' ', data->'error_data'->>'detail') WHEN h.data->'error_data'->>'message' IS NOT NULL THEN
CASE WHEN h.data->'error_data'->>'detail' IS NOT NULL THEN
CASE WHEN h.data->'error_data'->>'hint' IS NOT NULL THEN concat(h.data->'error_data'->>'message', '. ', h.data->'error_data'->>'detail', '. ', h.data->'error_data'->>'hint')
ELSE concat(h.data->'error_data'->>'message', ' ', h.data->'error_data'->>'detail')
END
ELSE
CASE WHEN h.data->'error_data'->>'hint' IS NOT NULL THEN concat(h.data->'error_data'->>'message', '. ', h.data->'error_data'->>'hint')
ELSE h.data->'error_data'->>'message'
END
END END
ELSE END AS err_message
CASE WHEN data->'error_data'->>'hint' IS NOT NULL THEN concat(data->'error_data'->>'message', '. ', data->'error_data'->>'hint')
ELSE data->'error_data'->>'message'
END
END
ELSE
'job crash detected, see server logs'
END
AS err_message
FROM FROM
_timescaledb_internal.bgw_job_stat_history _timescaledb_internal.bgw_job_stat_history h
LEFT JOIN LEFT JOIN
_timescaledb_config.bgw_job ON (bgw_job.id = bgw_job_stat_history.job_id) _timescaledb_config.bgw_job j ON (j.id = h.job_id)
WHERE WHERE
succeeded IS FALSE h.succeeded IS FALSE
OR h.succeeded IS NULL
AND (pg_catalog.pg_has_role(current_user, AND (pg_catalog.pg_has_role(current_user,
(SELECT pg_catalog.pg_get_userbyid(datdba) (SELECT pg_catalog.pg_get_userbyid(datdba)
FROM pg_catalog.pg_database FROM pg_catalog.pg_database
@ -328,6 +329,8 @@ SELECT
h.data->'job'->'config' AS config, h.data->'job'->'config' AS config,
h.data->'error_data'->>'sqlerrcode' AS sqlerrcode, h.data->'error_data'->>'sqlerrcode' AS sqlerrcode,
CASE CASE
WHEN h.succeeded IS NULL AND h.execution_finish IS NULL AND h.pid IS NULL THEN
'job crash detected, see server logs'
WHEN h.succeeded IS FALSE AND h.data->'error_data'->>'message' IS NOT NULL THEN WHEN h.succeeded IS FALSE AND h.data->'error_data'->>'message' IS NOT NULL THEN
CASE WHEN h.data->'error_data'->>'detail' IS NOT NULL THEN CASE WHEN h.data->'error_data'->>'detail' IS NOT NULL THEN
CASE WHEN h.data->'error_data'->>'hint' IS NOT NULL THEN concat(h.data->'error_data'->>'message', '. ', h.data->'error_data'->>'detail', '. ', h.data->'error_data'->>'hint') CASE WHEN h.data->'error_data'->>'hint' IS NOT NULL THEN concat(h.data->'error_data'->>'message', '. ', h.data->'error_data'->>'detail', '. ', h.data->'error_data'->>'hint')
@ -338,10 +341,6 @@ SELECT
ELSE h.data->'error_data'->>'message' ELSE h.data->'error_data'->>'message'
END END
END END
WHEN h.succeeded IS FALSE AND h.execution_finish IS NOT NULL THEN
'job crash detected, see server logs'
WHEN h.execution_finish IS NULL THEN
E'job didn\'t finish yet'
END AS err_message END AS err_message
FROM FROM
_timescaledb_internal.bgw_job_stat_history h _timescaledb_internal.bgw_job_stat_history h

View File

@ -33,6 +33,7 @@
#include <utils/timestamp.h> #include <utils/timestamp.h>
#include "compat/compat.h" #include "compat/compat.h"
#include "bgw/job_stat_history.h"
#include "bgw/scheduler.h" #include "bgw/scheduler.h"
#include "bgw_policy/chunk_stats.h" #include "bgw_policy/chunk_stats.h"
#include "bgw_policy/policy.h" #include "bgw_policy/policy.h"
@ -1151,6 +1152,7 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
INSTR_TIME_SET_CURRENT(start); INSTR_TIME_SET_CURRENT(start);
StartTransactionCommand(); StartTransactionCommand();
/* Grab a session lock on the job row to prevent concurrent deletes. Lock is released /* Grab a session lock on the job row to prevent concurrent deletes. Lock is released
* when the job process exits */ * when the job process exits */
job = ts_bgw_job_find_with_lock(params.job_id, job = ts_bgw_job_find_with_lock(params.job_id,
@ -1159,14 +1161,16 @@ ts_bgw_job_entrypoint(PG_FUNCTION_ARGS)
SESSION_LOCK, SESSION_LOCK,
/* block */ true, /* block */ true,
&got_lock); &got_lock);
CommitTransactionCommand();
if (job == NULL) if (job == NULL)
/* If the job is not found, we can't proceed */
elog(ERROR, "job %d not found when running the background worker", params.job_id); elog(ERROR, "job %d not found when running the background worker", params.job_id);
/* get parameters from bgworker */ /* get parameters from bgworker */
job->job_history.id = params.job_history_id; job->job_history.id = params.job_history_id;
job->job_history.execution_start = params.job_history_execution_start; job->job_history.execution_start = params.job_history_execution_start;
ts_bgw_job_stat_history_update(JOB_STAT_HISTORY_UPDATE_PID, job, JOB_SUCCESS, NULL);
CommitTransactionCommand();
elog(DEBUG2, "job %d (%s) found", params.job_id, NameStr(job->fd.application_name)); elog(DEBUG2, "job %d (%s) found", params.job_id, NameStr(job->fd.application_name));

View File

@ -650,7 +650,7 @@ ts_bgw_job_stat_mark_start(BgwJob *job)
job->job_history.execution_start = ts_timer_get_current_timestamp(); job->job_history.execution_start = ts_timer_get_current_timestamp();
job->job_history.id = INVALID_BGW_JOB_STAT_HISTORY_ID; job->job_history.id = INVALID_BGW_JOB_STAT_HISTORY_ID;
ts_bgw_job_stat_history_mark_start(job); ts_bgw_job_stat_history_update(JOB_STAT_HISTORY_UPDATE_START, job, JOB_SUCCESS, NULL);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
} }
@ -674,7 +674,7 @@ ts_bgw_job_stat_mark_end(BgwJob *job, JobResult result, Jsonb *edata)
errmsg("unable to find job statistics for job %d", job->fd.id))); errmsg("unable to find job statistics for job %d", job->fd.id)));
} }
ts_bgw_job_stat_history_mark_end(job, result, edata); ts_bgw_job_stat_history_update(JOB_STAT_HISTORY_UPDATE_END, job, result, edata);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
} }
@ -693,7 +693,7 @@ ts_bgw_job_stat_mark_crash_reported(BgwJob *job, JobResult result)
errmsg("unable to find job statistics for job %d", job->fd.id))); errmsg("unable to find job statistics for job %d", job->fd.id)));
} }
ts_bgw_job_stat_history_mark_end(job, result, NULL); ts_bgw_job_stat_history_update(JOB_STAT_HISTORY_UPDATE_END, job, result, NULL);
pgstat_report_activity(STATE_IDLE, NULL); pgstat_report_activity(STATE_IDLE, NULL);
} }

View File

@ -18,8 +18,9 @@
typedef struct BgwJobStatHistoryContext typedef struct BgwJobStatHistoryContext
{ {
BgwJob *job;
JobResult result; JobResult result;
BgwJobStatHistoryUpdateType update_type;
BgwJob *job;
Jsonb *edata; Jsonb *edata;
} BgwJobStatHistoryContext; } BgwJobStatHistoryContext;
@ -90,7 +91,7 @@ ts_bgw_job_stat_history_build_data_info(BgwJobStatHistoryContext *context)
} }
static void static void
ts_bgw_job_stat_history_insert(BgwJobStatHistoryContext *context) bgw_job_stat_history_insert(BgwJobStatHistoryContext *context, bool track_only_errors)
{ {
Assert(context != NULL); Assert(context != NULL);
@ -101,16 +102,29 @@ ts_bgw_job_stat_history_insert(BgwJobStatHistoryContext *context)
CatalogSecurityContext sec_ctx; CatalogSecurityContext sec_ctx;
ts_datum_set_int32(Anum_bgw_job_stat_history_job_id, values, context->job->fd.id, false); ts_datum_set_int32(Anum_bgw_job_stat_history_job_id, values, context->job->fd.id, false);
ts_datum_set_int32(Anum_bgw_job_stat_history_pid, values, MyProcPid, false);
ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_start, ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_start,
values, values,
context->job->job_history.execution_start, context->job->job_history.execution_start,
false); false);
ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_finish, values, 0, true); if (track_only_errors)
ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_finish, {
values, /* In case of logging only ERRORs */
ts_timer_get_current_timestamp(), ts_datum_set_int32(Anum_bgw_job_stat_history_pid, values, MyProcPid, false);
false); ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_finish,
values,
ts_timer_get_current_timestamp(),
false);
ts_datum_set_bool(Anum_bgw_job_stat_history_succeeded, values, false, false);
}
else
{
/* When tracking history first we INSERT the job without the FINISH execution timestamp,
* PID and SUCCEED flag because it will be marked once the job finishes */
ts_datum_set_int32(Anum_bgw_job_stat_history_pid, values, 0, true);
ts_datum_set_timestamptz(Anum_bgw_job_stat_history_execution_finish, values, 0, true);
ts_datum_set_bool(Anum_bgw_job_stat_history_succeeded, values, false, true);
}
ts_datum_set_jsonb(Anum_bgw_job_stat_history_data, ts_datum_set_jsonb(Anum_bgw_job_stat_history_data,
values, values,
ts_bgw_job_stat_history_build_data_info(context)); ts_bgw_job_stat_history_build_data_info(context));
@ -131,18 +145,14 @@ ts_bgw_job_stat_history_insert(BgwJobStatHistoryContext *context)
table_close(rel, NoLock); table_close(rel, NoLock);
} }
void static void
ts_bgw_job_stat_history_mark_start(BgwJob *job) bgw_job_stat_history_mark_start(BgwJobStatHistoryContext *context)
{ {
/* Don't mark the start in case of the GUC be disabled */ /* Don't mark the start in case of the GUC be disabled */
if (!ts_guc_enable_job_execution_logging) if (!ts_guc_enable_job_execution_logging)
return; return;
BgwJobStatHistoryContext context = { bgw_job_stat_history_insert(context, false);
.job = job,
};
ts_bgw_job_stat_history_insert(&context);
} }
static bool static bool
@ -192,34 +202,51 @@ bgw_job_stat_history_scan_id(int64 bgw_job_history_id, tuple_found_func tuple_fo
} }
static ScanTupleResult static ScanTupleResult
bgw_job_stat_history_tuple_mark_end(TupleInfo *ti, void *const data) bgw_job_stat_history_tuple_update(TupleInfo *ti, void *const data)
{ {
bool should_free; bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
BgwJobStatHistoryContext *context = (BgwJobStatHistoryContext *) data; BgwJobStatHistoryContext *context = (BgwJobStatHistoryContext *) data;
Jsonb *job_history_data = NULL;
Datum values[Natts_bgw_job_stat_history] = { 0 }; Datum values[Natts_bgw_job_stat_history] = { 0 };
bool nulls[Natts_bgw_job_stat_history] = { 0 }; bool nulls[Natts_bgw_job_stat_history] = { 0 };
bool doReplace[Natts_bgw_job_stat_history] = { 0 }; bool doReplace[Natts_bgw_job_stat_history] = { 0 };
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_pid)] = Int32GetDatum(MyProcPid); switch (context->update_type)
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_pid)] = true;
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_execution_finish)] =
TimestampTzGetDatum(ts_timer_get_current_timestamp());
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_execution_finish)] = true;
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_succeeded)] =
BoolGetDatum((context->result == JOB_SUCCESS));
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_succeeded)] = true;
Jsonb *job_history_data = ts_bgw_job_stat_history_build_data_info(context);
if (job_history_data != NULL)
{ {
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_data)] = case JOB_STAT_HISTORY_UPDATE_PID:
JsonbPGetDatum(job_history_data); {
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_data)] = true; values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_pid)] =
Int32GetDatum(MyProcPid);
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_pid)] = true;
break;
}
case JOB_STAT_HISTORY_UPDATE_END:
{
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_execution_finish)] =
TimestampTzGetDatum(ts_timer_get_current_timestamp());
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_execution_finish)] = true;
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_succeeded)] =
BoolGetDatum((context->result == JOB_SUCCESS));
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_succeeded)] = true;
job_history_data = ts_bgw_job_stat_history_build_data_info(context);
if (job_history_data != NULL)
{
values[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_data)] =
JsonbPGetDatum(job_history_data);
doReplace[AttrNumberGetAttrOffset(Anum_bgw_job_stat_history_data)] = true;
}
break;
}
case JOB_STAT_HISTORY_UPDATE_START:
pg_unreachable();
break;
} }
HeapTuple new_tuple = HeapTuple new_tuple =
@ -235,44 +262,65 @@ bgw_job_stat_history_tuple_mark_end(TupleInfo *ti, void *const data)
return SCAN_DONE; return SCAN_DONE;
} }
void static void
ts_bgw_job_stat_history_mark_end(BgwJob *job, JobResult result, Jsonb *edata) bgw_job_stat_history_update(BgwJobStatHistoryContext *context)
{ {
/* Don't execute in case of the GUC is false and the job succeeded, because failures are always /* Don't execute in case of the GUC is false and the job succeeded, because failures are always
* logged * logged
*/ */
if (!ts_guc_enable_job_execution_logging && result == JOB_SUCCESS) if (!ts_guc_enable_job_execution_logging && context->result == JOB_SUCCESS)
return; return;
/* Re-read the job information because it can change during the execution by using the /* Re-read the job information because it can change during the execution by using the
* `alter_job` API inside the function/procedure (i.e. job config) */ * `alter_job` API inside the function/procedure (i.e. job config) */
BgwJob *new_job = ts_bgw_job_find(job->fd.id, CurrentMemoryContext, true); BgwJob *new_job = ts_bgw_job_find(context->job->fd.id, CurrentMemoryContext, true);
/* Set the job history information */ /* Set the job history information */
new_job->job_history = job->job_history; new_job->job_history = context->job->job_history;
BgwJobStatHistoryContext context = { /* Use the newly loaded job in the current context to use this information to register the
.job = new_job, * execution history */
.result = result, context->job = new_job;
.edata = edata,
};
/* Failures are always logged so in case of the GUC is false and a failure happens then we need /* Failures are always logged so in case of the GUC is false and a failure happens then we need
* to insert all the information in the job error history table */ * to insert all the information in the job error history table */
if (!ts_guc_enable_job_execution_logging && result != JOB_SUCCESS) if (!ts_guc_enable_job_execution_logging && context->result != JOB_SUCCESS)
{ {
ts_bgw_job_stat_history_insert(&context); bgw_job_stat_history_insert(context, true);
} }
else else
{ {
/* Mark the end of the previous inserted start execution */ /* Mark the end of the previous inserted start execution */
if (!bgw_job_stat_history_scan_id(new_job->job_history.id, if (!bgw_job_stat_history_scan_id(new_job->job_history.id,
bgw_job_stat_history_tuple_mark_end, bgw_job_stat_history_tuple_update,
NULL, NULL,
&context, context,
RowExclusiveLock)) RowExclusiveLock))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unable to find job history " INT64_FORMAT, new_job->job_history.id))); errmsg("unable to find job history " INT64_FORMAT, new_job->job_history.id)));
} }
} }
void
ts_bgw_job_stat_history_update(BgwJobStatHistoryUpdateType update_type, BgwJob *job,
JobResult result, Jsonb *edata)
{
BgwJobStatHistoryContext context = {
.result = result,
.update_type = update_type,
.job = job,
.edata = edata,
};
switch (update_type)
{
case JOB_STAT_HISTORY_UPDATE_START:
bgw_job_stat_history_mark_start(&context);
break;
case JOB_STAT_HISTORY_UPDATE_END:
case JOB_STAT_HISTORY_UPDATE_PID:
bgw_job_stat_history_update(&context);
break;
}
}

View File

@ -11,5 +11,12 @@
#define INVALID_BGW_JOB_STAT_HISTORY_ID 0 #define INVALID_BGW_JOB_STAT_HISTORY_ID 0
extern void ts_bgw_job_stat_history_mark_start(BgwJob *job); typedef enum BgwJobStatHistoryUpdateType
extern void ts_bgw_job_stat_history_mark_end(BgwJob *job, JobResult result, Jsonb *edata); {
JOB_STAT_HISTORY_UPDATE_START,
JOB_STAT_HISTORY_UPDATE_END,
JOB_STAT_HISTORY_UPDATE_PID,
} BgwJobStatHistoryUpdateType;
extern void ts_bgw_job_stat_history_update(BgwJobStatHistoryUpdateType update_type, BgwJob *job,
JobResult result, Jsonb *edata);

View File

@ -330,17 +330,20 @@ ts_datum_set_text_from_cstring(const AttrNumber attno, NullableDatum *datums, co
} }
static inline void static inline void
ts_datum_set_bool(const AttrNumber attno, NullableDatum *datums, const bool value) ts_datum_set_bool(const AttrNumber attno, NullableDatum *datums, const bool value,
const bool isnull)
{ {
datums[AttrNumberGetAttrOffset(attno)].value = BoolGetDatum(value); if (!isnull)
datums[AttrNumberGetAttrOffset(attno)].isnull = false; datums[AttrNumberGetAttrOffset(attno)].value = BoolGetDatum(value);
datums[AttrNumberGetAttrOffset(attno)].isnull = isnull;
} }
static inline void static inline void
ts_datum_set_int32(const AttrNumber attno, NullableDatum *datums, const int32 value, ts_datum_set_int32(const AttrNumber attno, NullableDatum *datums, const int32 value,
const bool isnull) const bool isnull)
{ {
datums[AttrNumberGetAttrOffset(attno)].value = Int32GetDatum(value); if (!isnull)
datums[AttrNumberGetAttrOffset(attno)].value = Int32GetDatum(value);
datums[AttrNumberGetAttrOffset(attno)].isnull = isnull; datums[AttrNumberGetAttrOffset(attno)].isnull = isnull;
} }
@ -348,7 +351,8 @@ static inline void
ts_datum_set_int64(const AttrNumber attno, NullableDatum *datums, const int64 value, ts_datum_set_int64(const AttrNumber attno, NullableDatum *datums, const int64 value,
const bool isnull) const bool isnull)
{ {
datums[AttrNumberGetAttrOffset(attno)].value = Int64GetDatum(value); if (!isnull)
datums[AttrNumberGetAttrOffset(attno)].value = Int64GetDatum(value);
datums[AttrNumberGetAttrOffset(attno)].isnull = isnull; datums[AttrNumberGetAttrOffset(attno)].isnull = isnull;
} }
@ -356,7 +360,8 @@ static inline void
ts_datum_set_timestamptz(const AttrNumber attno, NullableDatum *datums, const TimestampTz value, ts_datum_set_timestamptz(const AttrNumber attno, NullableDatum *datums, const TimestampTz value,
const bool isnull) const bool isnull)
{ {
datums[AttrNumberGetAttrOffset(attno)].value = TimestampTzGetDatum(value); if (!isnull)
datums[AttrNumberGetAttrOffset(attno)].value = TimestampTzGetDatum(value);
datums[AttrNumberGetAttrOffset(attno)].isnull = isnull; datums[AttrNumberGetAttrOffset(attno)].isnull = isnull;
} }

View File

@ -40,7 +40,7 @@ create_cagg_validate_query_datum(TupleDesc tupdesc, const bool is_valid_query,
tupdesc = BlessTupleDesc(tupdesc); tupdesc = BlessTupleDesc(tupdesc);
ts_datum_set_bool(Anum_cagg_validate_query_valid, datums, is_valid_query); ts_datum_set_bool(Anum_cagg_validate_query_valid, datums, is_valid_query, false);
ts_datum_set_text_from_cstring(Anum_cagg_validate_query_error_level, ts_datum_set_text_from_cstring(Anum_cagg_validate_query_error_level,
datums, datums,
edata->elevel > 0 ? error_severity(edata->elevel) : NULL); edata->elevel > 0 ? error_severity(edata->elevel) : NULL);
@ -861,7 +861,10 @@ create_cagg_get_bucket_function_datum(TupleDesc tupdesc, ContinuousAggsBucketFun
ts_datum_set_text_from_cstring(Anum_cagg_bucket_function_timezone, ts_datum_set_text_from_cstring(Anum_cagg_bucket_function_timezone,
datums, datums,
bf->bucket_time_timezone); bf->bucket_time_timezone);
ts_datum_set_bool(Anum_cagg_bucket_function_fixed_width, datums, bf->bucket_fixed_interval); ts_datum_set_bool(Anum_cagg_bucket_function_fixed_width,
datums,
bf->bucket_fixed_interval,
false);
Assert(tupdesc->natts == Natts_cagg_validate_query); Assert(tupdesc->natts == Natts_cagg_validate_query);
tuple = ts_heap_form_tuple(tupdesc, datums); tuple = ts_heap_form_tuple(tupdesc, datums);

View File

@ -28,12 +28,6 @@ SELECT _timescaledb_functions.start_background_workers();
t t
(1 row) (1 row)
SELECT pg_sleep(6);
pg_sleep
----------
(1 row)
SELECT add_job('custom_job_ok', schedule_interval => interval '1 hour', initial_start := now()) AS job_id_1 \gset SELECT add_job('custom_job_ok', schedule_interval => interval '1 hour', initial_start := now()) AS job_id_1 \gset
SELECT add_job('custom_job_error', schedule_interval => interval '1 hour', initial_start := now()) AS job_id_2 \gset SELECT add_job('custom_job_error', schedule_interval => interval '1 hour', initial_start := now()) AS job_id_2 \gset
SELECT test.wait_for_job_to_run(:job_id_1, 1); SELECT test.wait_for_job_to_run(:job_id_1, 1);
@ -81,6 +75,8 @@ SELECT pg_reload_conf();
t t
(1 row) (1 row)
-- Reconnect to make sure the GUC is set
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT scheduled FROM alter_job(:job_id_1, next_start => now()); SELECT scheduled FROM alter_job(:job_id_1, next_start => now());
scheduled scheduled
----------- -----------

View File

@ -25,6 +25,8 @@ SELECT pg_reload_conf();
t t
(1 row) (1 row)
-- Reconnect to make sure the GUC is set
\c :TEST_DBNAME :ROLE_SUPERUSER
-- test a concurrent update -- test a concurrent update
CREATE OR REPLACE PROCEDURE custom_proc1(jobid int, config jsonb) LANGUAGE PLPGSQL AS CREATE OR REPLACE PROCEDURE custom_proc1(jobid int, config jsonb) LANGUAGE PLPGSQL AS
$$ $$
@ -102,6 +104,8 @@ SELECT pg_reload_conf();
t t
(1 row) (1 row)
-- Reconnect to make sure the GUC is set
\c :TEST_DBNAME :ROLE_SUPERUSER
-- test the retention job -- test the retention job
SELECT next_start FROM alter_job(3, next_start => '2060-01-01 00:00:00+00'::timestamptz); SELECT next_start FROM alter_job(3, next_start => '2060-01-01 00:00:00+00'::timestamptz);
next_start next_start
@ -152,6 +156,34 @@ SELECT _timescaledb_functions.stop_background_workers();
t t
(1 row) (1 row)
-- Job didn't finish yet and Crash detected
DELETE FROM _timescaledb_internal.bgw_job_stat_history;
INSERT INTO _timescaledb_internal.bgw_job_stat_history(job_id, pid, succeeded, execution_start, execution_finish, data)
VALUES (1, NULL, NULL, '2000-01-01 00:00:00+00'::timestamptz, NULL, '{}'), -- Crash server detected
(2, 2222, false, '2000-01-01 00:00:00+00'::timestamptz, NULL, '{}'), -- Didn't finished yet
(3, 3333, false, '2000-01-01 00:00:00+00'::timestamptz, '2000-01-01 01:00:00+00'::timestamptz, '{}'), -- Finish with ERROR
(4, 4444, true, '2000-01-01 00:00:00+00'::timestamptz, '2000-01-01 01:00:00+00'::timestamptz, '{}'); -- Finish with SUCCESS
SELECT job_id, pid, succeeded, start_time, finish_time, config, err_message
FROM timescaledb_information.job_history
ORDER BY job_id;
job_id | pid | succeeded | start_time | finish_time | config | err_message
--------+------+-----------+------------------------------+------------------------------+--------+-------------------------------------
1 | | | Fri Dec 31 16:00:00 1999 PST | | | job crash detected, see server logs
2 | 2222 | f | Fri Dec 31 16:00:00 1999 PST | | |
3 | 3333 | f | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 17:00:00 1999 PST | |
4 | 4444 | t | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 17:00:00 1999 PST | |
(4 rows)
SELECT job_id, pid, start_time, finish_time, err_message
FROM timescaledb_information.job_errors
ORDER BY job_id;
job_id | pid | start_time | finish_time | err_message
--------+------+------------------------------+------------------------------+-------------------------------------
1 | | Fri Dec 31 16:00:00 1999 PST | | job crash detected, see server logs
2 | 2222 | Fri Dec 31 16:00:00 1999 PST | |
3 | 3333 | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 17:00:00 1999 PST |
(3 rows)
DELETE FROM _timescaledb_internal.bgw_job_stat; DELETE FROM _timescaledb_internal.bgw_job_stat;
DELETE FROM _timescaledb_internal.bgw_job_stat_history; DELETE FROM _timescaledb_internal.bgw_job_stat_history;
DELETE FROM _timescaledb_config.bgw_job CASCADE; DELETE FROM _timescaledb_config.bgw_job CASCADE;
@ -201,6 +233,12 @@ SELECT count(*) > 0 FROM timescaledb_information.job_history WHERE succeeded IS
t t
(1 row) (1 row)
SELECT count(*) > 0 FROM timescaledb_information.job_errors WHERE err_message ~ 'failed to start job';
?column?
----------
t
(1 row)
\set VERBOSITY terse \set VERBOSITY terse
\c :TEST_DBNAME :ROLE_SUPERUSER \c :TEST_DBNAME :ROLE_SUPERUSER
SELECT _timescaledb_functions.stop_background_workers(); SELECT _timescaledb_functions.stop_background_workers();

View File

@ -13,6 +13,7 @@ SELECT pg_reload_conf();
t t
(1 row) (1 row)
\c :TEST_DBNAME :ROLE_SUPERUSER
SET ROLE :ROLE_DEFAULT_PERM_USER; SET ROLE :ROLE_DEFAULT_PERM_USER;
CREATE OR REPLACE PROCEDURE job_fail(jobid int, config jsonb) CREATE OR REPLACE PROCEDURE job_fail(jobid int, config jsonb)
AS $$ AS $$
@ -75,37 +76,48 @@ SELECT pg_sleep(6);
\set finish '2000-01-01 00:00:10+00' \set finish '2000-01-01 00:00:10+00'
INSERT INTO _timescaledb_internal.bgw_job_stat_history(job_id, pid, succeeded, execution_start, execution_finish, data) VALUES INSERT INTO _timescaledb_internal.bgw_job_stat_history(job_id, pid, succeeded, execution_start, execution_finish, data) VALUES
(11111, 12345, false, :'start'::timestamptz, :'finish'::timestamptz, '{"error_data": {"message": "not an error"}}'), (11111, 12345, false, :'start'::timestamptz, :'finish'::timestamptz, '{"error_data": {"message": "not an error"}}'),
(22222, 45678, false, :'start'::timestamptz, :'finish'::timestamptz, '{}'); (22222, 45678, false, :'start'::timestamptz, NULL, '{}'), -- Started and didn't finished yet
(33333, NULL, NULL, :'start'::timestamptz, NULL, NULL); -- Crash detected cause not assigned an PID
-- We check the log as different users and should only see what we -- We check the log as different users and should only see what we
-- have permissions to see. We only bother about jobs at 1000 or -- have permissions to see. We only bother about jobs at 1000 or
-- larger since the standard jobs are flaky. -- larger since the standard jobs are flaky.
SET ROLE :ROLE_DEFAULT_PERM_USER; SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message
FROM timescaledb_information.job_errors WHERE job_id >= 1000; FROM timescaledb_information.job_errors WHERE job_id >= 1000
job_id | proc_schema | proc_name | sqlerrcode | err_message ORDER BY job_id;
--------+-------------+-----------+------------+----------------------
1000 | public | job_fail | P0001 | raising an exception
(1 row)
SET ROLE :ROLE_DEFAULT_PERM_USER_2;
SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message
FROM timescaledb_information.job_errors WHERE job_id >= 1000;
job_id | proc_schema | proc_name | sqlerrcode | err_message
--------+-------------+--------------+------------+-----------------------------------------------------
1002 | public | custom_proc2 | 40001 | could not serialize access due to concurrent update
(1 row)
SET ROLE :ROLE_SUPERUSER;
SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message
FROM timescaledb_information.job_errors WHERE job_id >= 1000;
job_id | proc_schema | proc_name | sqlerrcode | err_message job_id | proc_schema | proc_name | sqlerrcode | err_message
--------+-------------+--------------+------------+----------------------------------------------------- --------+-------------+--------------+------------+-----------------------------------------------------
1000 | public | job_fail | P0001 | raising an exception 1000 | public | job_fail | P0001 | raising an exception
1002 | public | custom_proc2 | 40001 | could not serialize access due to concurrent update 1002 | public | custom_proc2 | 40001 | could not serialize access due to concurrent update
11111 | | | | not an error 11111 | | | | not an error
22222 | | | | job crash detected, see server logs 22222 | | | |
(4 rows) (4 rows)
SET ROLE :ROLE_DEFAULT_PERM_USER_2;
SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message
FROM timescaledb_information.job_errors WHERE job_id >= 1000
ORDER BY job_id;
job_id | proc_schema | proc_name | sqlerrcode | err_message
--------+-------------+--------------+------------+-----------------------------------------------------
1000 | public | job_fail | P0001 | raising an exception
1002 | public | custom_proc2 | 40001 | could not serialize access due to concurrent update
11111 | | | | not an error
22222 | | | |
(4 rows)
SET ROLE :ROLE_SUPERUSER;
SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message
FROM timescaledb_information.job_errors WHERE job_id >= 1000
ORDER BY job_id;
job_id | proc_schema | proc_name | sqlerrcode | err_message
--------+-------------+--------------+------------+-----------------------------------------------------
1000 | public | job_fail | P0001 | raising an exception
1002 | public | custom_proc2 | 40001 | could not serialize access due to concurrent update
11111 | | | | not an error
22222 | | | |
33333 | | | | job crash detected, see server logs
(5 rows)
SELECT delete_job(:custom_proc2_id); SELECT delete_job(:custom_proc2_id);
delete_job delete_job
------------ ------------
@ -124,6 +136,13 @@ SELECT delete_job(:job_fail_id);
(1 row) (1 row)
ALTER SYSTEM RESET DEFAULT_TRANSACTION_ISOLATION;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
\c :TEST_DBNAME :ROLE_SUPERUSER \c :TEST_DBNAME :ROLE_SUPERUSER
SELECT _timescaledb_functions.stop_background_workers(); SELECT _timescaledb_functions.stop_background_workers();
stop_background_workers stop_background_workers

View File

@ -23,7 +23,6 @@ SHOW timescaledb.enable_job_execution_logging;
-- Start Background Workers -- Start Background Workers
SELECT _timescaledb_functions.start_background_workers(); SELECT _timescaledb_functions.start_background_workers();
SELECT pg_sleep(6);
SELECT add_job('custom_job_ok', schedule_interval => interval '1 hour', initial_start := now()) AS job_id_1 \gset SELECT add_job('custom_job_ok', schedule_interval => interval '1 hour', initial_start := now()) AS job_id_1 \gset
SELECT add_job('custom_job_error', schedule_interval => interval '1 hour', initial_start := now()) AS job_id_2 \gset SELECT add_job('custom_job_error', schedule_interval => interval '1 hour', initial_start := now()) AS job_id_2 \gset
@ -45,6 +44,9 @@ ORDER BY job_id;
ALTER SYSTEM SET timescaledb.enable_job_execution_logging TO ON; ALTER SYSTEM SET timescaledb.enable_job_execution_logging TO ON;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
-- Reconnect to make sure the GUC is set
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT scheduled FROM alter_job(:job_id_1, next_start => now()); SELECT scheduled FROM alter_job(:job_id_1, next_start => now());
SELECT scheduled FROM alter_job(:job_id_2, next_start => now()); SELECT scheduled FROM alter_job(:job_id_2, next_start => now());

View File

@ -27,6 +27,9 @@ insert into custom_log values (0, 0, 'msg0');
ALTER SYSTEM SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable'; ALTER SYSTEM SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';
SELECT pg_reload_conf(); SELECT pg_reload_conf();
-- Reconnect to make sure the GUC is set
\c :TEST_DBNAME :ROLE_SUPERUSER
-- test a concurrent update -- test a concurrent update
CREATE OR REPLACE PROCEDURE custom_proc1(jobid int, config jsonb) LANGUAGE PLPGSQL AS CREATE OR REPLACE PROCEDURE custom_proc1(jobid int, config jsonb) LANGUAGE PLPGSQL AS
$$ $$
@ -65,6 +68,9 @@ from _timescaledb_internal.bgw_job_stat_history WHERE job_id >= 1000 and succeed
ALTER SYSTEM RESET DEFAULT_TRANSACTION_ISOLATION; ALTER SYSTEM RESET DEFAULT_TRANSACTION_ISOLATION;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
-- Reconnect to make sure the GUC is set
\c :TEST_DBNAME :ROLE_SUPERUSER
-- test the retention job -- test the retention job
SELECT next_start FROM alter_job(3, next_start => '2060-01-01 00:00:00+00'::timestamptz); SELECT next_start FROM alter_job(3, next_start => '2060-01-01 00:00:00+00'::timestamptz);
DELETE FROM _timescaledb_internal.bgw_job_stat_history; DELETE FROM _timescaledb_internal.bgw_job_stat_history;
@ -90,6 +96,22 @@ WHERE succeeded IS FALSE;
\c :TEST_DBNAME :ROLE_SUPERUSER \c :TEST_DBNAME :ROLE_SUPERUSER
SELECT _timescaledb_functions.stop_background_workers(); SELECT _timescaledb_functions.stop_background_workers();
-- Job didn't finish yet and Crash detected
DELETE FROM _timescaledb_internal.bgw_job_stat_history;
INSERT INTO _timescaledb_internal.bgw_job_stat_history(job_id, pid, succeeded, execution_start, execution_finish, data)
VALUES (1, NULL, NULL, '2000-01-01 00:00:00+00'::timestamptz, NULL, '{}'), -- Crash server detected
(2, 2222, false, '2000-01-01 00:00:00+00'::timestamptz, NULL, '{}'), -- Didn't finished yet
(3, 3333, false, '2000-01-01 00:00:00+00'::timestamptz, '2000-01-01 01:00:00+00'::timestamptz, '{}'), -- Finish with ERROR
(4, 4444, true, '2000-01-01 00:00:00+00'::timestamptz, '2000-01-01 01:00:00+00'::timestamptz, '{}'); -- Finish with SUCCESS
SELECT job_id, pid, succeeded, start_time, finish_time, config, err_message
FROM timescaledb_information.job_history
ORDER BY job_id;
SELECT job_id, pid, start_time, finish_time, err_message
FROM timescaledb_information.job_errors
ORDER BY job_id;
DELETE FROM _timescaledb_internal.bgw_job_stat; DELETE FROM _timescaledb_internal.bgw_job_stat;
DELETE FROM _timescaledb_internal.bgw_job_stat_history; DELETE FROM _timescaledb_internal.bgw_job_stat_history;
DELETE FROM _timescaledb_config.bgw_job CASCADE; DELETE FROM _timescaledb_config.bgw_job CASCADE;
@ -130,6 +152,7 @@ END;
$TEST$; $TEST$;
SELECT count(*) > 0 FROM timescaledb_information.job_history WHERE succeeded IS FALSE AND err_message ~ 'failed to start job'; SELECT count(*) > 0 FROM timescaledb_information.job_history WHERE succeeded IS FALSE AND err_message ~ 'failed to start job';
SELECT count(*) > 0 FROM timescaledb_information.job_errors WHERE err_message ~ 'failed to start job';
\set VERBOSITY terse \set VERBOSITY terse
\c :TEST_DBNAME :ROLE_SUPERUSER \c :TEST_DBNAME :ROLE_SUPERUSER

View File

@ -10,6 +10,7 @@ INSERT INTO my_table VALUES (0, 0);
GRANT ALL ON my_table TO PUBLIC; GRANT ALL ON my_table TO PUBLIC;
ALTER SYSTEM SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable'; ALTER SYSTEM SET DEFAULT_TRANSACTION_ISOLATION TO 'serializable';
SELECT pg_reload_conf(); SELECT pg_reload_conf();
\c :TEST_DBNAME :ROLE_SUPERUSER
SET ROLE :ROLE_DEFAULT_PERM_USER; SET ROLE :ROLE_DEFAULT_PERM_USER;
@ -61,26 +62,33 @@ SELECT pg_sleep(6);
\set finish '2000-01-01 00:00:10+00' \set finish '2000-01-01 00:00:10+00'
INSERT INTO _timescaledb_internal.bgw_job_stat_history(job_id, pid, succeeded, execution_start, execution_finish, data) VALUES INSERT INTO _timescaledb_internal.bgw_job_stat_history(job_id, pid, succeeded, execution_start, execution_finish, data) VALUES
(11111, 12345, false, :'start'::timestamptz, :'finish'::timestamptz, '{"error_data": {"message": "not an error"}}'), (11111, 12345, false, :'start'::timestamptz, :'finish'::timestamptz, '{"error_data": {"message": "not an error"}}'),
(22222, 45678, false, :'start'::timestamptz, :'finish'::timestamptz, '{}'); (22222, 45678, false, :'start'::timestamptz, NULL, '{}'), -- Started and didn't finished yet
(33333, NULL, NULL, :'start'::timestamptz, NULL, NULL); -- Crash detected cause not assigned an PID
-- We check the log as different users and should only see what we -- We check the log as different users and should only see what we
-- have permissions to see. We only bother about jobs at 1000 or -- have permissions to see. We only bother about jobs at 1000 or
-- larger since the standard jobs are flaky. -- larger since the standard jobs are flaky.
SET ROLE :ROLE_DEFAULT_PERM_USER; SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message
FROM timescaledb_information.job_errors WHERE job_id >= 1000; FROM timescaledb_information.job_errors WHERE job_id >= 1000
ORDER BY job_id;
SET ROLE :ROLE_DEFAULT_PERM_USER_2; SET ROLE :ROLE_DEFAULT_PERM_USER_2;
SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message
FROM timescaledb_information.job_errors WHERE job_id >= 1000; FROM timescaledb_information.job_errors WHERE job_id >= 1000
ORDER BY job_id;
SET ROLE :ROLE_SUPERUSER; SET ROLE :ROLE_SUPERUSER;
SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message SELECT job_id, proc_schema, proc_name, sqlerrcode, err_message
FROM timescaledb_information.job_errors WHERE job_id >= 1000; FROM timescaledb_information.job_errors WHERE job_id >= 1000
ORDER BY job_id;
SELECT delete_job(:custom_proc2_id); SELECT delete_job(:custom_proc2_id);
SELECT delete_job(:custom_proc1_id); SELECT delete_job(:custom_proc1_id);
SELECT delete_job(:job_fail_id); SELECT delete_job(:job_fail_id);
ALTER SYSTEM RESET DEFAULT_TRANSACTION_ISOLATION;
SELECT pg_reload_conf();
\c :TEST_DBNAME :ROLE_SUPERUSER \c :TEST_DBNAME :ROLE_SUPERUSER
SELECT _timescaledb_functions.stop_background_workers(); SELECT _timescaledb_functions.stop_background_workers();