diff --git a/src/bgw/job_stat.c b/src/bgw/job_stat.c index afd7087cb..a7e7e8b55 100644 --- a/src/bgw/job_stat.c +++ b/src/bgw/job_stat.c @@ -159,14 +159,17 @@ typedef struct } JobResultCtx; static TimestampTz -calculate_next_start_on_success(TimestampTz last_finish, BgwJob *job) +calculate_next_start_on_success(TimestampTz finish_time, BgwJob *job) { - /* TODO: add randomness here? Do we need a range or just a percent? */ - TimestampTz ts = - DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval, - TimestampTzGetDatum(last_finish), - IntervalPGetDatum(&job->fd.schedule_interval))); - + TimestampTz ts; + TimestampTz last_finish = finish_time; + if (!IS_VALID_TIMESTAMP(finish_time)) + { + last_finish = ts_timer_get_current_timestamp(); + } + ts = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval, + TimestampTzGetDatum(last_finish), + IntervalPGetDatum(&job->fd.schedule_interval))); return ts; } @@ -186,30 +189,63 @@ calculate_jitter_percent() /* For failures we have standard exponential backoff based on consecutive failures * along with a ceiling at schedule_interval * MAX_INTERVALS_BACKOFF */ static TimestampTz -calculate_next_start_on_failure(TimestampTz last_finish, int consecutive_failures, BgwJob *job) +calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failures, BgwJob *job) { float8 jitter = calculate_jitter_percent(); /* consecutive failures includes this failure */ float8 multiplier = 1 << (consecutive_failures - 1); + TimestampTz res; + volatile bool res_set = false; + TimestampTz last_finish = finish_time; + MemoryContext oldctx; + if (!IS_VALID_TIMESTAMP(finish_time)) + { + elog(LOG, "calculate_next_start_on_failure, got bad finish_time"); + last_finish = ts_timer_get_current_timestamp(); + } + oldctx = CurrentMemoryContext; + BeginInternalSubTransaction("next start on failure"); + PG_TRY(); + { + /* ival = retry_period * 2^(consecutive_failures - 1) */ + Datum ival = DirectFunctionCall2(interval_mul, + IntervalPGetDatum(&job->fd.retry_period), + Float8GetDatum(multiplier)); - /* ival = retry_period * 2^(consecutive_failures - 1) */ - Datum ival = DirectFunctionCall2(interval_mul, - IntervalPGetDatum(&job->fd.retry_period), - Float8GetDatum(multiplier)); + /* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */ + Datum ival_max = DirectFunctionCall2(interval_mul, + IntervalPGetDatum(&job->fd.schedule_interval), + Float8GetDatum(MAX_INTERVALS_BACKOFF)); - /* ival_max is the ceiling = MAX_INTERVALS_BACKOFF * schedule_interval */ - Datum ival_max = DirectFunctionCall2(interval_mul, - IntervalPGetDatum(&job->fd.schedule_interval), - Float8GetDatum(MAX_INTERVALS_BACKOFF)); + if (DatumGetInt32(DirectFunctionCall2(interval_cmp, ival, ival_max)) > 0) + ival = ival_max; - if (DatumGetInt32(DirectFunctionCall2(interval_cmp, ival, ival_max)) > 0) - ival = ival_max; + /* Add some random jitter to prevent stampeding-herds, interval will be within about +-13% + */ + ival = DirectFunctionCall2(interval_mul, ival, Float8GetDatum(1.0 + jitter)); - /* Add some random jitter to prevent stampeding-herds, interval will be within about +-13% */ - ival = DirectFunctionCall2(interval_mul, ival, Float8GetDatum(1.0 + jitter)); - - return DatumGetTimestampTz( - DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(last_finish), ival)); + res = DatumGetTimestampTz( + DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(last_finish), ival)); + res_set = true; + ReleaseCurrentSubTransaction(); + } + PG_CATCH(); + { + RollbackAndReleaseCurrentSubTransaction(); + FlushErrorState(); + } + PG_END_TRY(); + MemoryContextSwitchTo(oldctx); + if (!res_set) + { + TimestampTz nowt; + /* job->fd_retry_period is a valid non-null value */ + nowt = ts_timer_get_current_timestamp(); + res = DatumGetTimestampTz(DirectFunctionCall2(timestamptz_pl_interval, + TimestampTzGetDatum(nowt), + IntervalPGetDatum(&job->fd.retry_period))); + } + return res; } /* For crashes, the logic is the similar as for failures except we also have