Fix subtransaction resource owner

When executing a subtransaction using `BeginInternalSubTransaction` the
memory context switches from the current context to
`CurTransactionContext` and when the transaction is aborted or
committed using `ReleaseCurrentSubTransaction` or
`RollbackAndReleaseCurrentSubTransaction` respectively, it will not
restore to the previous memory context or resource owner but rather use
`TopTransactionContext`. Because of this, both the memory context and
the resource owner will be wrong when executing
`calculate_next_start_on_failure`, which causes `run_job` to generate
an error when used with the telemetry job.

This commit fixes this by saving both the resource owner and the memory
context before starting the internal subtransaction and restoring it
after finishing the internal subtransaction.

Since the `ts_bgw_job_run_and_set_next_start` was incorrectly reading
the wrong result from the telemetry job, this commit fixes this as
well. Note that `ts_bgw_job_run_and_set_next_start` is only used when
running the telemetry job, so it does not cause issues for other jobs.
This commit is contained in:
Mats Kindahl 2023-05-09 18:58:12 +02:00 committed by Mats Kindahl
parent abb6762450
commit 656daf45f6
4 changed files with 17 additions and 9 deletions

View File

@ -34,6 +34,7 @@ accidentally triggering the load of a previous DB version.**
* #5525 Fix tablespace for compressed hypertable and corresponding toast * #5525 Fix tablespace for compressed hypertable and corresponding toast
* #5642 Fix ALTER TABLE SET with normal tables * #5642 Fix ALTER TABLE SET with normal tables
* #5666 Reduce memory usage for distributed analyze * #5666 Reduce memory usage for distributed analyze
* #5668 Fix subtransaction resource owner
**Thanks** **Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates

View File

@ -24,6 +24,7 @@
#include <storage/sinvaladt.h> #include <storage/sinvaladt.h>
#include <utils/acl.h> #include <utils/acl.h>
#include <utils/elog.h> #include <utils/elog.h>
#include <executor/execdebug.h>
#include <utils/jsonb.h> #include <utils/jsonb.h>
#include <utils/snapmgr.h> #include <utils/snapmgr.h>
#include <unistd.h> #include <unistd.h>
@ -1315,7 +1316,7 @@ ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial
Interval *next_interval, bool atomic, bool mark) Interval *next_interval, bool atomic, bool mark)
{ {
BgwJobStat *job_stat; BgwJobStat *job_stat;
bool had_error; bool result;
if (atomic) if (atomic)
StartTransactionCommand(); StartTransactionCommand();
@ -1323,10 +1324,10 @@ ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial
if (mark) if (mark)
ts_bgw_job_stat_mark_start(job->fd.id); ts_bgw_job_stat_mark_start(job->fd.id);
had_error = func(); result = func();
if (mark) if (mark)
ts_bgw_job_stat_mark_end(job, had_error ? JOB_FAILURE : JOB_SUCCESS); ts_bgw_job_stat_mark_end(job, result ? JOB_SUCCESS : JOB_FAILURE);
/* Now update next_start. */ /* Now update next_start. */
job_stat = ts_bgw_job_stat_find(job->fd.id); job_stat = ts_bgw_job_stat_find(job->fd.id);
@ -1349,7 +1350,7 @@ ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial
if (atomic) if (atomic)
CommitTransactionCommand(); CommitTransactionCommand();
return had_error; return result;
} }
/* Insert a new job in the bgw_job relation */ /* Insert a new job in the bgw_job relation */

View File

@ -16,6 +16,7 @@
#include "utils.h" #include "utils.h"
#include "jsonb_utils.h" #include "jsonb_utils.h"
#include <utils/builtins.h> #include <utils/builtins.h>
#include <utils/resowner.h>
#include "time_bucket.h" #include "time_bucket.h"
#define MAX_INTERVALS_BACKOFF 5 #define MAX_INTERVALS_BACKOFF 5
@ -346,7 +347,8 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
consecutive_failures); consecutive_failures);
Assert(consecutive_failures > 0 && multiplier < 63); Assert(consecutive_failures > 0 && multiplier < 63);
MemoryContext oldctx; MemoryContext oldctx = CurrentMemoryContext;
ResourceOwner oldowner = CurrentResourceOwner;
/* 2^(consecutive_failures) - 1, at most 2^20 */ /* 2^(consecutive_failures) - 1, at most 2^20 */
int64 max_slots = (INT64CONST(1) << (int64) multiplier) - INT64CONST(1); int64 max_slots = (INT64CONST(1) << (int64) multiplier) - INT64CONST(1);
int64 rand_backoff = rand() % (max_slots * USECS_PER_SEC); int64 rand_backoff = rand() % (max_slots * USECS_PER_SEC);
@ -356,8 +358,7 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
elog(LOG, "%s: invalid finish time", __func__); elog(LOG, "%s: invalid finish time", __func__);
last_finish = ts_timer_get_current_timestamp(); last_finish = ts_timer_get_current_timestamp();
} }
oldctx = CurrentMemoryContext;
BeginInternalSubTransaction("next start on failure");
PG_TRY(); PG_TRY();
{ {
Datum ival; Datum ival;
@ -368,6 +369,8 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
Interval retry_ival = { .time = 2000000 }; Interval retry_ival = { .time = 2000000 };
retry_ival.time += rand_backoff; retry_ival.time += rand_backoff;
BeginInternalSubTransaction("next start on failure");
if (launch_failure) if (launch_failure)
{ {
// random backoff seconds in [2, 2 + 2^f] // random backoff seconds in [2, 2 + 2^f]
@ -397,17 +400,20 @@ calculate_next_start_on_failure(TimestampTz finish_time, int consecutive_failure
DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(last_finish), ival)); DirectFunctionCall2(timestamptz_pl_interval, TimestampTzGetDatum(last_finish), ival));
res_set = true; res_set = true;
ReleaseCurrentSubTransaction(); ReleaseCurrentSubTransaction();
MemoryContextSwitchTo(oldctx);
CurrentResourceOwner = oldowner;
} }
PG_CATCH(); PG_CATCH();
{ {
RollbackAndReleaseCurrentSubTransaction();
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
CurrentResourceOwner = oldowner;
ErrorData *errdata = CopyErrorData(); ErrorData *errdata = CopyErrorData();
ereport(LOG, ereport(LOG,
(errcode(ERRCODE_INTERNAL_ERROR), (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not calculate next start on failure: resetting value"), errmsg("could not calculate next start on failure: resetting value"),
errdetail("Error: %s.", errdata->message))); errdetail("Error: %s.", errdata->message)));
FlushErrorState(); FlushErrorState();
RollbackAndReleaseCurrentSubTransaction();
} }
PG_END_TRY(); PG_END_TRY();
Assert(CurrentMemoryContext == oldctx); Assert(CurrentMemoryContext == oldctx);

View File

@ -28,7 +28,7 @@ SELECT last_finish > :'last_finish' AS job_executed,
WHERE job_id = :job_id; WHERE job_id = :job_id;
job_executed | last_run_success job_executed | last_run_success
--------------+------------------ --------------+------------------
t | t t | f
(1 row) (1 row)
-- Running it as the default user should fail since they do not own -- Running it as the default user should fail since they do not own