mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 18:43:18 +08:00
Start stopped workers on restart message
Modify the restart action to start schedulers if they do not exist, this fixes a potential race condition where a scheduler could be started for a given database, but before it has shut down (because the extension does not exist) a create extension command is run, the start action then would not change the state of the worker but it would be waiting on the wrong vxid, so not see that the extension exists. This also makes it so the start action can be truly idempotent and not set the vxid on its startup, thereby respecting any restart action that has taken place before and better defining how each interacts with the system. Additionally, we decided that the previous behavior in which launchers were not started up on, say, alter extension update actions was not all that desirable as it worked if the stop action had happened, but the database had not restarted, if the database restarted, then the stop action would have no effect. We decided that if we desire the ability to disable schedulers for a particular database, we will implement it in the future as a standalone feature that takes effect across server restarts rather than having somewhat ill-defined behavior with an implicit feature of the stop action.
This commit is contained in:
parent
3e3bb0c796
commit
9ccda0df00
@ -7,6 +7,7 @@ set(INSTALL_FILE ${PROJECT_NAME}--${PROJECT_VERSION_MOD}.sql)
|
||||
set(PRE_INSTALL_SOURCE_FILES
|
||||
pre_install/schemas.sql # Must be first
|
||||
pre_install/tables.sql
|
||||
pre_install/bgw_scheduler_startup.sql
|
||||
)
|
||||
|
||||
# Things like aggregate functions cannot be REPLACEd and really
|
||||
|
@ -13,8 +13,6 @@ RETURNS BOOL
|
||||
AS '@LOADER_PATHNAME@', 'ts_bgw_db_workers_start'
|
||||
LANGUAGE C VOLATILE;
|
||||
|
||||
SELECT _timescaledb_internal.start_background_workers();
|
||||
|
||||
INSERT INTO _timescaledb_config.bgw_job (id, application_name, job_type, schedule_INTERVAL, max_runtime, max_retries, retry_period) VALUES
|
||||
(1, 'Telemetry Reporter', 'telemetry_and_version_check_if_enabled', INTERVAL '24h', INTERVAL '100s', -1, INTERVAL '1h')
|
||||
ON CONFLICT (id) DO NOTHING;
|
||||
|
6
sql/pre_install/bgw_scheduler_startup.sql
Normal file
6
sql/pre_install/bgw_scheduler_startup.sql
Normal file
@ -0,0 +1,6 @@
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.restart_background_workers()
|
||||
RETURNS BOOL
|
||||
AS '@LOADER_PATHNAME@', 'ts_bgw_db_workers_restart'
|
||||
LANGUAGE C VOLATILE;
|
||||
|
||||
SELECT _timescaledb_internal.restart_background_workers();
|
@ -2,21 +2,85 @@
|
||||
|
||||
The loader has two main purposes:
|
||||
|
||||
1) Load the correct versioned library for each database.
|
||||
Multiple databases in the same Postgres instance may contain
|
||||
different versions of TimescaleDB installed. The loader is
|
||||
responsible for loading the shared library corresponding
|
||||
to the correct TimescaleDB version for the database as soon
|
||||
as possible. For example, a database containing TimescaleDB
|
||||
version 0.8.0 will have timescaledb-0.8.0.so loaded.
|
||||
1) Load the correct versioned library for each database. Multiple databases in
|
||||
the same Postgres instance may have different versions of TimescaleDB
|
||||
installed. The loader is responsible for loading the shared library
|
||||
corresponding to the correct TimescaleDB version for the database as soon as
|
||||
possible. For example, a database containing TimescaleDB version 0.8.0 will
|
||||
have timescaledb-0.8.0.so loaded.
|
||||
|
||||
2) Starting background worker schedulers for each database.
|
||||
Background worker schedulers launch background worker tasks
|
||||
for TimescaleDB. The launcher is responsible for launching
|
||||
schedulers for any database that has TimescaleDB installed.
|
||||
This is done by a background task called the launcher.
|
||||
2) Starting a background task called the launcher at server startup. The
|
||||
launcher is responsible for launching schedulers (one for each database) that
|
||||
are responsible for checking whether the TimescaleDB extension is installed
|
||||
in a database. In case of no TimescaleDB extension, the scheduler exits until
|
||||
it is reactivated for that database, which happens, for instance, when the
|
||||
extension is installed. If a scheduler finds an extension, its task is to
|
||||
schedule jobs for that database. The launcher controls when schedulers are
|
||||
started up or shut down in response to events that necessitate such actions.
|
||||
It also instantiates a counter from which TimescaleDB background workers are
|
||||
allocated to be sure we are not using more `worker_processes` than we should.
|
||||
|
||||
|
||||
# Messages the launcher may receive
|
||||
The launcher implements a simple message queue to be notified when it should
|
||||
take certain actions, like starting or restarting a scheduler for a given
|
||||
database.
|
||||
|
||||
##Message types sent to the launcher:
|
||||
|
||||
`start`: Used to start the scheduler by the user. It is meant to be an
|
||||
idempotent start, as in, if it is run multiple times, it is the same as if it
|
||||
were run once. It is used mainly to reactivate a scheduler that the user had
|
||||
stopped. It does not reset the vxid of a scheduler and the started scheduler
|
||||
will not wait on txn finish.
|
||||
|
||||
`stop`: Used to stop the scheduler immediately. It does not wait on a vxid and
|
||||
it is idempotent.
|
||||
|
||||
`restart`: Used to either stop and restart the scheduler if it is running or
|
||||
start it if it is not. Technically, this would be better named `force_restart`
|
||||
as that better describes the action to start or restart the scheduler. The
|
||||
scheduler is immediately restarted, but waits on the vxid of the txn that sent
|
||||
the message. It is not idempotent, and will restart newly started schedulers,
|
||||
even while they are waiting. However, if the scheduler is already started or
|
||||
allocated, its "slot" is never released back to the pool, so as not to allow a
|
||||
job worker to "steal" a scheduler's slot during a restart.
|
||||
|
||||
## When/which messages are sent:
|
||||
|
||||
Server startup: no message sent. However, the launcher takes essentially the
|
||||
`start` action for each database (without the message handling/signalling bit).
|
||||
It cannot figure out whether a scheduler should exist for a given database
|
||||
because it can only connect to shared catalogs. The scheduler is responsible for
|
||||
shutting down if it should not exist (because either TimescaleDB is not
|
||||
installed in the database or the version of TimescaleDB installed does not have
|
||||
a scheduler function to call).
|
||||
|
||||
`CREATE DATABASE`: essentially the same as server startup. The launcher checks
|
||||
for new databases each time it wakes up and will start schedulers for any that
|
||||
it has not seen before.
|
||||
|
||||
`CREATE EXTENSION`: the create script sends a `restart` message. It does not use
|
||||
the `start` message because we need to wait waiting on the vxid of the process
|
||||
that is running `CREATE EXTENSION`. There is also the possibility that the
|
||||
idepotency of the `start` action, even if it waited on a vxid, would cause race
|
||||
conditions in cases where the server has just started or the database has been
|
||||
created.
|
||||
|
||||
`ALTER EXTENSION UPDATE`: the pre-update script sends a `restart` message. This
|
||||
ensures that the current scheduler is shut down as the action starts, it then
|
||||
waits on the vxid of the calling txn to figure out the correct version of the
|
||||
extension to use.
|
||||
|
||||
`DROP EXTENSION`: sends a `restart` message, which is necessary because a
|
||||
rollback of the drop extension command can still happen. The scheduler therefore
|
||||
waits on the vxid of the txn running `DROP EXTENSION` and then will take the
|
||||
correct action depending on whether the extension exists when the txn finishes.
|
||||
|
||||
`DROP DATABASE`: sends a `stop` message, causing immediate shutdown of the
|
||||
scheduler. This is necessary as the database cannot be dropped if there are any
|
||||
open connections to it (the scheduler maintains a connection to the db).
|
||||
|
||||
# Launcher per-DB state machine
|
||||
|
||||
The following is the state machine that the launcher maintains
|
||||
@ -29,7 +93,7 @@ whenever available resources exist.
|
||||
stop
|
||||
ENABLED+--------------+
|
||||
+ ^--------------|
|
||||
| start ||
|
||||
| start/restart ||
|
||||
| ||
|
||||
| ||
|
||||
v +v
|
||||
@ -46,31 +110,31 @@ restart || |
|
||||
|
||||
## The following is a detailed description of the transitions
|
||||
|
||||
Note that `set vxid` sets a vxid variable on the scheduler. This variable
|
||||
is passed down to the scheduler and the scheduler waits on that vxid when
|
||||
it first starts.
|
||||
Note that `set vxid` sets a vxid variable on the scheduler. This variable is
|
||||
passed down to the scheduler and the scheduler waits on that vxid when it first
|
||||
starts.
|
||||
|
||||
Transitions that happen automatically (at least once per poll period).
|
||||
* `ENABLED->ALLOCATED`: Reserved slot for worker
|
||||
* `ALLOCATED->STARTED`: Scheduler started
|
||||
* `STARTED->DISABLED`: Iff scheduler has stopped. Slot released.
|
||||
* `STARTED->DISABLED`: Iff scheduler has stopped. Release slot.
|
||||
|
||||
Transition that happen upon getting a STOP MESSAGE:
|
||||
Transitions that happen upon getting a STOP MESSAGE:
|
||||
* `ENABLED->DISABLED`: No action
|
||||
* `ALLOCATED->DISABLED`: Slot released
|
||||
* `STARTED->DISABLED`: Scheduler terminated & slot released
|
||||
* `ALLOCATED->DISABLED`: Release slot.
|
||||
* `STARTED->DISABLED`: Terminate scheduler & release slot
|
||||
* `DISABLED->DISABLED`: No Action
|
||||
|
||||
Transition that happen upon getting a START MESSAGE
|
||||
Transitions that happen upon getting a START MESSAGE
|
||||
* Database not yet registed: Register, set to ENABLED and take ENABLED action below.
|
||||
* `ENABLED->ENABLED`: Set vxid; then try the automatic transitions
|
||||
* `ALLOCATED->ALLOCATED`: Set vxid; then try the automatic transitions
|
||||
* `ENABLED->ENABLED`: Try automatic transitions
|
||||
* `ALLOCATED->ALLOCATED`: Try automatic transitions
|
||||
* `STARTED->STARTED`: No action
|
||||
* `DISABLED->ENABLED`: Set vxid
|
||||
* `DISABLED->ENABLED`: Try automatic transitions
|
||||
|
||||
Transition that happen upon getting a RESTART MESSAGE
|
||||
* Database not yet registed: Failure - no action taken
|
||||
* `ENABLED->ENABLED`: Set vxid
|
||||
* `ALLOCATED->ALLOCATED`: Set vxid
|
||||
* `STARTED->ALLOCATED`: Scheduler terminated, slot /not/ released, set vxid
|
||||
* `DISABLED->DISABLED`: Failure - no action taken
|
||||
Transitions that happen upon getting a RESTART MESSAGE
|
||||
* Database not yet registed: Register it set to ENABLED, take ENABLED actions
|
||||
* `ENABLED->ENABLED`: Set vxid, try automatic transitions
|
||||
* `ALLOCATED->ALLOCATED`: Set vxid, try automatic transitions
|
||||
* `STARTED->ALLOCATED`: Terminate scheduler, do /not/ release slot, set vxid, then try automatic transitions
|
||||
* `DISABLED->ENABLED`: Set vxid, try automatic transitions
|
||||
|
@ -363,7 +363,6 @@ scheduler_modify_state(DbHashEntry *entry, SchedulerState new_state)
|
||||
}
|
||||
|
||||
/* TRANSITION FUNCTIONS */
|
||||
|
||||
static void
|
||||
scheduler_state_trans_disabled_to_enabled(DbHashEntry *entry)
|
||||
{
|
||||
@ -393,7 +392,7 @@ scheduler_state_trans_started_to_allocated(DbHashEntry *entry)
|
||||
}
|
||||
|
||||
static void
|
||||
scheduler_state_trans_allocated_to_start(DbHashEntry *entry)
|
||||
scheduler_state_trans_allocated_to_started(DbHashEntry *entry)
|
||||
{
|
||||
pid_t worker_pid;
|
||||
bool worker_registered;
|
||||
@ -444,10 +443,10 @@ scheduler_state_trans_automatic(DbHashEntry *entry)
|
||||
case ENABLED:
|
||||
scheduler_state_trans_enabled_to_allocated(entry);
|
||||
if (entry->state == ALLOCATED)
|
||||
scheduler_state_trans_allocated_to_start(entry);
|
||||
scheduler_state_trans_allocated_to_started(entry);
|
||||
break;
|
||||
case ALLOCATED:
|
||||
scheduler_state_trans_allocated_to_start(entry);
|
||||
scheduler_state_trans_allocated_to_started(entry);
|
||||
break;
|
||||
case STARTED:
|
||||
if (get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED)
|
||||
@ -502,17 +501,20 @@ launcher_pre_shmem_cleanup(int code, Datum arg)
|
||||
*/
|
||||
|
||||
/*
|
||||
* This should be idempotent. If we find the background worker and it's
|
||||
* not stopped, do nothing. If we have a worker that stopped
|
||||
* but hasn't been cleaned up yet, simply restart the worker.
|
||||
* This should be idempotent. If we find the background worker and it's not
|
||||
* stopped, do nothing. In order to maintain idempotency, a scheduler in the
|
||||
* ENABLED, ALLOCATED or STARTED state cannot get a new vxid to wait on. (We
|
||||
* cannot pass in a new vxid to wait on for an already-started scheduler in any
|
||||
* case). This means that actions like restart, which are not idempotent, will
|
||||
* not have their effects changed by subsequent start actions, no matter the
|
||||
* state they are in when the start action is received.
|
||||
*/
|
||||
static AckResult
|
||||
message_start_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId vxid)
|
||||
message_start_action(HTAB *db_htab, BgwMessage *message)
|
||||
{
|
||||
DbHashEntry *entry;
|
||||
|
||||
entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
|
||||
entry->vxid = vxid;
|
||||
|
||||
if (entry->state == DISABLED)
|
||||
scheduler_state_trans_disabled_to_enabled(entry);
|
||||
@ -555,26 +557,24 @@ message_stop_action(HTAB *db_htab, BgwMessage *message)
|
||||
}
|
||||
|
||||
/*
|
||||
* This function will only restart an existing scheduler and will throw an error
|
||||
* if it is told to restart a nonexistent or disabled scheduler.
|
||||
*
|
||||
* One might think that this function would simply be a combination of stop and start above, however
|
||||
* we decided against that because we want to maintain the worker's "slot".
|
||||
* We don't want a race condition where some other db steals the scheduler of the other by requesting a worker at the wrong time.
|
||||
* This function will stop and restart a scheduler in the STARTED state, ENABLE
|
||||
* a scheduler if it does not exist or is in the DISABLED state and set the vxid
|
||||
* to wait on for a scheduler in any state. It is not idempotent. Additionally,
|
||||
* one might think that this function would simply be a combination of stop and
|
||||
* start above, but it is not as we maintain the worker's "slot" by never
|
||||
* releasing the worker from our "pool" of background workers as stopping and
|
||||
* starting would. We don't want a race condition where some other db steals
|
||||
* the scheduler of the other by requesting a worker at the wrong time. (This is
|
||||
* accomplished by moving from STARTED to ALLOCATED after shutting down the
|
||||
* worker, never releasing the entry and transitioning all the way back to
|
||||
* ENABLED).
|
||||
*/
|
||||
static AckResult
|
||||
message_restart_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId vxid)
|
||||
{
|
||||
DbHashEntry *entry;
|
||||
bool found;
|
||||
|
||||
entry = hash_search(db_htab, &message->db_oid, HASH_FIND, &found);
|
||||
|
||||
if (!found || entry->state == DISABLED)
|
||||
{
|
||||
ereport(WARNING, (errmsg("TimescaleDB background worker launcher instructed to restart a nonexistent scheduler BGW")));
|
||||
return ACK_FAILURE;
|
||||
}
|
||||
entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
|
||||
|
||||
entry->vxid = vxid;
|
||||
|
||||
@ -590,9 +590,7 @@ message_restart_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId
|
||||
scheduler_state_trans_started_to_allocated(entry);
|
||||
break;
|
||||
case DISABLED:
|
||||
/* This case should have been caught above */
|
||||
Assert(false);
|
||||
return ACK_FAILURE;
|
||||
scheduler_state_trans_disabled_to_enabled(entry);
|
||||
}
|
||||
|
||||
scheduler_state_trans_automatic(entry);
|
||||
@ -625,7 +623,7 @@ launcher_handle_message(HTAB *db_htab)
|
||||
switch (message->message_type)
|
||||
{
|
||||
case START:
|
||||
action_result = message_start_action(db_htab, message, vxid);
|
||||
action_result = message_start_action(db_htab, message);
|
||||
break;
|
||||
case STOP:
|
||||
action_result = message_stop_action(db_htab, message);
|
||||
|
@ -61,7 +61,7 @@ SELECT wait_worker_counts(1,0,1,0);
|
||||
t
|
||||
(1 row)
|
||||
|
||||
/*Now let's restart the scheduler and make sure our backend_start changed */
|
||||
/*Now let's restart the scheduler in single_2 and make sure our backend_start changed */
|
||||
SELECT backend_start as orig_backend_start
|
||||
FROM pg_stat_activity
|
||||
WHERE application_name = 'TimescaleDB Background Worker Scheduler'
|
||||
@ -206,6 +206,19 @@ select wait_equals(:'orig_backend_start');
|
||||
t
|
||||
(1 row)
|
||||
|
||||
/* Make sure restart starts a worker even if it is stopped*/
|
||||
SELECT _timescaledb_internal.stop_background_workers();
|
||||
stop_background_workers
|
||||
-------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT wait_worker_counts(1,0,0,0);
|
||||
wait_worker_counts
|
||||
--------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT _timescaledb_internal.restart_background_workers();
|
||||
restart_background_workers
|
||||
----------------------------
|
||||
|
@ -20,7 +20,7 @@ DROP DATABASE single;
|
||||
/* Now the db_scheduler for single should have disappeared*/
|
||||
SELECT wait_worker_counts(1,0,1,0);
|
||||
|
||||
/*Now let's restart the scheduler and make sure our backend_start changed */
|
||||
/*Now let's restart the scheduler in single_2 and make sure our backend_start changed */
|
||||
SELECT backend_start as orig_backend_start
|
||||
FROM pg_stat_activity
|
||||
WHERE application_name = 'TimescaleDB Background Worker Scheduler'
|
||||
@ -89,6 +89,9 @@ END
|
||||
$BODY$;
|
||||
select wait_equals(:'orig_backend_start');
|
||||
|
||||
/* Make sure restart starts a worker even if it is stopped*/
|
||||
SELECT _timescaledb_internal.stop_background_workers();
|
||||
SELECT wait_worker_counts(1,0,0,0);
|
||||
SELECT _timescaledb_internal.restart_background_workers();
|
||||
SELECT wait_worker_counts(1,0,1,0);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user