Catch elog errors in scheduler code

Direct function calls to PG functions can throw
errors for bad inputs. If these are not handled
when called by the scheduler, the process dies
and background workers cannot be scheduled.
This commit is contained in:
gayyappan 2020-02-26 12:07:05 -05:00 committed by gayyappan
parent b363e8a379
commit 7de3564d18

View File

@ -159,14 +159,17 @@ typedef struct
} JobResultCtx; } JobResultCtx;
static TimestampTz static TimestampTz
calculate_next_start_on_success(TimestampTz last_finish, BgwJob *job) calculate_next_start_on_success(TimestampTz finish_time, BgwJob *job)
{ {
/* TODO: add randomness here? Do we need a range or just a percent? */ TimestampTz ts;
TimestampTz ts = TimestampTz last_finish = finish_time;
DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval, if (!IS_VALID_TIMESTAMP(finish_time))
TimestampTzGetDatum(last_finish), {
IntervalPGetDatum(&job->fd.schedule_interval))); last_finish = ts_timer_get_current_timestamp();
}
ts = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(last_finish),
IntervalPGetDatum(&job->fd.schedule_interval)));
return ts; return ts;
} }
@ -186,30 +189,63 @@ calculate_jitter_percent()
/* For failures we have standard exponential backoff based on consecutive failures /* For failures we have standard exponential backoff based on consecutive failures
* along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF */ * along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF */
static TimestampTz static TimestampTz
calculate_next_start_on_failure(TimestampTz last_finish, int consecutive_failures, BgwJob *job) calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failures, BgwJob *job)
{ {
float8 jitter = calculate_jitter_percent(); float8 jitter = calculate_jitter_percent();
/* consecutive failures includes this failure */ /* consecutive failures includes this failure */
float8 multiplier = 1 << (consecutive_failures - 1); float8 multiplier = 1 << (consecutive_failures - 1);
TimestampTz res;
volatile bool res_set = false;
TimestampTz last_finish = finish_time;
MemoryContext oldctx;
if (!IS_VALID_TIMESTAMP(finish_time))
{
elog(LOG, "calculate_next_start_on_failure, got bad finish_time");
last_finish = ts_timer_get_current_timestamp();
}
oldctx = CurrentMemoryContext;
BeginInternalSubTransaction("next start on failure");
PG_TRY();
{
/* ival = retry_period * 2^(consecutive_failures - 1) */
Datum ival = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.retry_period),
Float8GetDatum(multiplier));
/* ival = retry_period * 2^(consecutive_failures - 1) */ /* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */
Datum ival = DirectFunctionCall2(interval_mul, Datum ival_max = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&job->fd.retry_period), IntervalPGetDatum(&job->fd.schedule_interval),
Float8GetDatum(multiplier)); Float8GetDatum(MAX_INTERVALS_BACKOFF));
/* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */ if (DatumGetInt32(DirectFunctionCall2(interval_cmp, ival, ival_max)) > 0)
Datum ival_max = DirectFunctionCall2(interval_mul, ival = ival_max;
IntervalPGetDatum(&job->fd.schedule_interval),
Float8GetDatum(MAX_INTERVALS_BACKOFF));
if (DatumGetInt32(DirectFunctionCall2(interval_cmp, ival, ival_max)) > 0) /* Add some random jitter to prevent stampeding-herds, interval will be within about +-13%
ival = ival_max; */
ival = DirectFunctionCall2(interval_mul, ival, Float8GetDatum(1.0 + jitter));
/* Add some random jitter to prevent stampeding-herds, interval will be within about +-13% */ res = DatumGetTimestampTz(
ival = DirectFunctionCall2(interval_mul, ival, Float8GetDatum(1.0 + jitter)); DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(last_finish), ival));
res_set = true;
return DatumGetTimestampTz( ReleaseCurrentSubTransaction();
DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(last_finish), ival)); }
PG_CATCH();
{
RollbackAndReleaseCurrentSubTransaction();
FlushErrorState();
}
PG_END_TRY();
MemoryContextSwitchTo(oldctx);
if (!res_set)
{
TimestampTz nowt;
/* job->fd_retry_period is a valid non-null value */
nowt = ts_timer_get_current_timestamp();
res = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(nowt),
IntervalPGetDatum(&job->fd.retry_period)));
}
return res;
} }
/* For crashes, the logic is the similar as for failures except we also have /* For crashes, the logic is the similar as for failures except we also have