diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index dc22bf3dd..e1544bddd 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -7,6 +7,7 @@ set(INSTALL_FILE ${PROJECT_NAME}--${PROJECT_VERSION_MOD}.sql) set(PRE_INSTALL_SOURCE_FILES pre_install/schemas.sql # Must be first pre_install/tables.sql + pre_install/bgw_scheduler_startup.sql ) # Things like aggregate functions cannot be REPLACEd and really diff --git a/sql/bgw_scheduler.sql b/sql/bgw_scheduler.sql index 001264b8d..fadc0c916 100644 --- a/sql/bgw_scheduler.sql +++ b/sql/bgw_scheduler.sql @@ -13,8 +13,6 @@ RETURNS BOOL AS '@LOADER_PATHNAME@', 'ts_bgw_db_workers_start' LANGUAGE C VOLATILE; -SELECT _timescaledb_internal.start_background_workers(); - INSERT INTO _timescaledb_config.bgw_job (id, application_name, job_type, schedule_INTERVAL, max_runtime, max_retries, retry_period) VALUES (1, 'Telemetry Reporter', 'telemetry_and_version_check_if_enabled', INTERVAL '24h', INTERVAL '100s', -1, INTERVAL '1h') ON CONFLICT (id) DO NOTHING; diff --git a/sql/pre_install/bgw_scheduler_startup.sql b/sql/pre_install/bgw_scheduler_startup.sql new file mode 100644 index 000000000..59bc700ac --- /dev/null +++ b/sql/pre_install/bgw_scheduler_startup.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION _timescaledb_internal.restart_background_workers() +RETURNS BOOL +AS '@LOADER_PATHNAME@', 'ts_bgw_db_workers_restart' +LANGUAGE C VOLATILE; + +SELECT _timescaledb_internal.restart_background_workers(); diff --git a/src/loader/README.md b/src/loader/README.md index 118ff9b8c..065ac65f0 100644 --- a/src/loader/README.md +++ b/src/loader/README.md @@ -2,21 +2,85 @@ The loader has two main purposes: -1) Load the correct versioned library for each database. -Multiple databases in the same Postgres instance may contain -different versions of TimescaleDB installed. The loader is -responsible for loading the shared library corresponding -to the correct TimescaleDB version for the database as soon -as possible. For example, a database containing TimescaleDB -version 0.8.0 will have timescaledb-0.8.0.so loaded. +1) Load the correct versioned library for each database. Multiple databases in + the same Postgres instance may have different versions of TimescaleDB + installed. The loader is responsible for loading the shared library + corresponding to the correct TimescaleDB version for the database as soon as + possible. For example, a database containing TimescaleDB version 0.8.0 will + have timescaledb-0.8.0.so loaded. -2) Starting background worker schedulers for each database. - Background worker schedulers launch background worker tasks - for TimescaleDB. The launcher is responsible for launching - schedulers for any database that has TimescaleDB installed. - This is done by a background task called the launcher. +2) Starting a background task called the launcher at server startup. The + launcher is responsible for launching schedulers (one for each database) that + are responsible for checking whether the TimescaleDB extension is installed + in a database. In case of no TimescaleDB extension, the scheduler exits until + it is reactivated for that database, which happens, for instance, when the + extension is installed. If a scheduler finds an extension, its task is to + schedule jobs for that database. The launcher controls when schedulers are + started up or shut down in response to events that necessitate such actions. + It also instantiates a counter from which TimescaleDB background workers are + allocated to be sure we are not using more `worker_processes` than we should. +# Messages the launcher may receive +The launcher implements a simple message queue to be notified when it should +take certain actions, like starting or restarting a scheduler for a given +database. + +##Message types sent to the launcher: + +`start`: Used to start the scheduler by the user. It is meant to be an +idempotent start, as in, if it is run multiple times, it is the same as if it +were run once. It is used mainly to reactivate a scheduler that the user had +stopped. It does not reset the vxid of a scheduler and the started scheduler +will not wait on txn finish. + +`stop`: Used to stop the scheduler immediately. It does not wait on a vxid and +it is idempotent. + +`restart`: Used to either stop and restart the scheduler if it is running or +start it if it is not. Technically, this would be better named `force_restart` +as that better describes the action to start or restart the scheduler. The +scheduler is immediately restarted, but waits on the vxid of the txn that sent +the message. It is not idempotent, and will restart newly started schedulers, +even while they are waiting. However, if the scheduler is already started or +allocated, its "slot" is never released back to the pool, so as not to allow a +job worker to "steal" a scheduler's slot during a restart. + +## When/which messages are sent: + +Server startup: no message sent. However, the launcher takes essentially the +`start` action for each database (without the message handling/signalling bit). +It cannot figure out whether a scheduler should exist for a given database +because it can only connect to shared catalogs. The scheduler is responsible for +shutting down if it should not exist (because either TimescaleDB is not +installed in the database or the version of TimescaleDB installed does not have +a scheduler function to call). + +`CREATE DATABASE`: essentially the same as server startup. The launcher checks +for new databases each time it wakes up and will start schedulers for any that +it has not seen before. + +`CREATE EXTENSION`: the create script sends a `restart` message. It does not use +the `start` message because we need to wait waiting on the vxid of the process +that is running `CREATE EXTENSION`. There is also the possibility that the +idepotency of the `start` action, even if it waited on a vxid, would cause race +conditions in cases where the server has just started or the database has been +created. + +`ALTER EXTENSION UPDATE`: the pre-update script sends a `restart` message. This +ensures that the current scheduler is shut down as the action starts, it then +waits on the vxid of the calling txn to figure out the correct version of the +extension to use. + +`DROP EXTENSION`: sends a `restart` message, which is necessary because a +rollback of the drop extension command can still happen. The scheduler therefore +waits on the vxid of the txn running `DROP EXTENSION` and then will take the +correct action depending on whether the extension exists when the txn finishes. + +`DROP DATABASE`: sends a `stop` message, causing immediate shutdown of the +scheduler. This is necessary as the database cannot be dropped if there are any +open connections to it (the scheduler maintains a connection to the db). + # Launcher per-DB state machine The following is the state machine that the launcher maintains @@ -29,7 +93,7 @@ whenever available resources exist. stop ENABLED+--------------+ + ^--------------| - | start || + | start/restart || | || | || v +v @@ -46,31 +110,31 @@ restart || | ## The following is a detailed description of the transitions -Note that `set vxid` sets a vxid variable on the scheduler. This variable -is passed down to the scheduler and the scheduler waits on that vxid when -it first starts. +Note that `set vxid` sets a vxid variable on the scheduler. This variable is +passed down to the scheduler and the scheduler waits on that vxid when it first +starts. Transitions that happen automatically (at least once per poll period). * `ENABLED->ALLOCATED`: Reserved slot for worker * `ALLOCATED->STARTED`: Scheduler started -* `STARTED->DISABLED`: Iff scheduler has stopped. Slot released. +* `STARTED->DISABLED`: Iff scheduler has stopped. Release slot. -Transition that happen upon getting a STOP MESSAGE: +Transitions that happen upon getting a STOP MESSAGE: * `ENABLED->DISABLED`: No action -* `ALLOCATED->DISABLED`: Slot released -* `STARTED->DISABLED`: Scheduler terminated & slot released +* `ALLOCATED->DISABLED`: Release slot. +* `STARTED->DISABLED`: Terminate scheduler & release slot * `DISABLED->DISABLED`: No Action -Transition that happen upon getting a START MESSAGE +Transitions that happen upon getting a START MESSAGE * Database not yet registed: Register, set to ENABLED and take ENABLED action below. -* `ENABLED->ENABLED`: Set vxid; then try the automatic transitions -* `ALLOCATED->ALLOCATED`: Set vxid; then try the automatic transitions +* `ENABLED->ENABLED`: Try automatic transitions +* `ALLOCATED->ALLOCATED`: Try automatic transitions * `STARTED->STARTED`: No action -* `DISABLED->ENABLED`: Set vxid +* `DISABLED->ENABLED`: Try automatic transitions -Transition that happen upon getting a RESTART MESSAGE -* Database not yet registed: Failure - no action taken -* `ENABLED->ENABLED`: Set vxid -* `ALLOCATED->ALLOCATED`: Set vxid -* `STARTED->ALLOCATED`: Scheduler terminated, slot /not/ released, set vxid -* `DISABLED->DISABLED`: Failure - no action taken +Transitions that happen upon getting a RESTART MESSAGE +* Database not yet registed: Register it set to ENABLED, take ENABLED actions +* `ENABLED->ENABLED`: Set vxid, try automatic transitions +* `ALLOCATED->ALLOCATED`: Set vxid, try automatic transitions +* `STARTED->ALLOCATED`: Terminate scheduler, do /not/ release slot, set vxid, then try automatic transitions +* `DISABLED->ENABLED`: Set vxid, try automatic transitions diff --git a/src/loader/bgw_launcher.c b/src/loader/bgw_launcher.c index 71657716a..b43f94895 100644 --- a/src/loader/bgw_launcher.c +++ b/src/loader/bgw_launcher.c @@ -363,7 +363,6 @@ scheduler_modify_state(DbHashEntry *entry, SchedulerState new_state) } /* TRANSITION FUNCTIONS */ - static void scheduler_state_trans_disabled_to_enabled(DbHashEntry *entry) { @@ -393,7 +392,7 @@ scheduler_state_trans_started_to_allocated(DbHashEntry *entry) } static void -scheduler_state_trans_allocated_to_start(DbHashEntry *entry) +scheduler_state_trans_allocated_to_started(DbHashEntry *entry) { pid_t worker_pid; bool worker_registered; @@ -444,10 +443,10 @@ scheduler_state_trans_automatic(DbHashEntry *entry) case ENABLED: scheduler_state_trans_enabled_to_allocated(entry); if (entry->state == ALLOCATED) - scheduler_state_trans_allocated_to_start(entry); + scheduler_state_trans_allocated_to_started(entry); break; case ALLOCATED: - scheduler_state_trans_allocated_to_start(entry); + scheduler_state_trans_allocated_to_started(entry); break; case STARTED: if (get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED) @@ -502,17 +501,20 @@ launcher_pre_shmem_cleanup(int code, Datum arg) */ /* - * This should be idempotent. If we find the background worker and it's - * not stopped, do nothing. If we have a worker that stopped - * but hasn't been cleaned up yet, simply restart the worker. + * This should be idempotent. If we find the background worker and it's not + * stopped, do nothing. In order to maintain idempotency, a scheduler in the + * ENABLED, ALLOCATED or STARTED state cannot get a new vxid to wait on. (We + * cannot pass in a new vxid to wait on for an already-started scheduler in any + * case). This means that actions like restart, which are not idempotent, will + * not have their effects changed by subsequent start actions, no matter the + * state they are in when the start action is received. */ static AckResult -message_start_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId vxid) +message_start_action(HTAB *db_htab, BgwMessage *message) { DbHashEntry *entry; entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid); - entry->vxid = vxid; if (entry->state == DISABLED) scheduler_state_trans_disabled_to_enabled(entry); @@ -555,26 +557,24 @@ message_stop_action(HTAB *db_htab, BgwMessage *message) } /* - * This function will only restart an existing scheduler and will throw an error - * if it is told to restart a nonexistent or disabled scheduler. - * - * One might think that this function would simply be a combination of stop and start above, however - * we decided against that because we want to maintain the worker's "slot". - * We don't want a race condition where some other db steals the scheduler of the other by requesting a worker at the wrong time. + * This function will stop and restart a scheduler in the STARTED state, ENABLE + * a scheduler if it does not exist or is in the DISABLED state and set the vxid + * to wait on for a scheduler in any state. It is not idempotent. Additionally, + * one might think that this function would simply be a combination of stop and + * start above, but it is not as we maintain the worker's "slot" by never + * releasing the worker from our "pool" of background workers as stopping and + * starting would. We don't want a race condition where some other db steals + * the scheduler of the other by requesting a worker at the wrong time. (This is + * accomplished by moving from STARTED to ALLOCATED after shutting down the + * worker, never releasing the entry and transitioning all the way back to + * ENABLED). */ static AckResult message_restart_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId vxid) { DbHashEntry *entry; - bool found; - entry = hash_search(db_htab, &message->db_oid, HASH_FIND, &found); - - if (!found || entry->state == DISABLED) - { - ereport(WARNING, (errmsg("TimescaleDB background worker launcher instructed to restart a nonexistent scheduler BGW"))); - return ACK_FAILURE; - } + entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid); entry->vxid = vxid; @@ -590,9 +590,7 @@ message_restart_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId scheduler_state_trans_started_to_allocated(entry); break; case DISABLED: - /* This case should have been caught above */ - Assert(false); - return ACK_FAILURE; + scheduler_state_trans_disabled_to_enabled(entry); } scheduler_state_trans_automatic(entry); @@ -625,7 +623,7 @@ launcher_handle_message(HTAB *db_htab) switch (message->message_type) { case START: - action_result = message_start_action(db_htab, message, vxid); + action_result = message_start_action(db_htab, message); break; case STOP: action_result = message_stop_action(db_htab, message); diff --git a/test/expected/bgw_launcher.out b/test/expected/bgw_launcher.out index af60d7acd..515726f16 100644 --- a/test/expected/bgw_launcher.out +++ b/test/expected/bgw_launcher.out @@ -61,7 +61,7 @@ SELECT wait_worker_counts(1,0,1,0); t (1 row) -/*Now let's restart the scheduler and make sure our backend_start changed */ +/*Now let's restart the scheduler in single_2 and make sure our backend_start changed */ SELECT backend_start as orig_backend_start FROM pg_stat_activity WHERE application_name = 'TimescaleDB Background Worker Scheduler' @@ -206,6 +206,19 @@ select wait_equals(:'orig_backend_start'); t (1 row) +/* Make sure restart starts a worker even if it is stopped*/ +SELECT _timescaledb_internal.stop_background_workers(); + stop_background_workers +------------------------- + t +(1 row) + +SELECT wait_worker_counts(1,0,0,0); + wait_worker_counts +-------------------- + t +(1 row) + SELECT _timescaledb_internal.restart_background_workers(); restart_background_workers ---------------------------- diff --git a/test/sql/bgw_launcher.sql b/test/sql/bgw_launcher.sql index 702c714b7..35930c821 100644 --- a/test/sql/bgw_launcher.sql +++ b/test/sql/bgw_launcher.sql @@ -20,7 +20,7 @@ DROP DATABASE single; /* Now the db_scheduler for single should have disappeared*/ SELECT wait_worker_counts(1,0,1,0); -/*Now let's restart the scheduler and make sure our backend_start changed */ +/*Now let's restart the scheduler in single_2 and make sure our backend_start changed */ SELECT backend_start as orig_backend_start FROM pg_stat_activity WHERE application_name = 'TimescaleDB Background Worker Scheduler' @@ -89,6 +89,9 @@ END $BODY$; select wait_equals(:'orig_backend_start'); +/* Make sure restart starts a worker even if it is stopped*/ +SELECT _timescaledb_internal.stop_background_workers(); +SELECT wait_worker_counts(1,0,0,0); SELECT _timescaledb_internal.restart_background_workers(); SELECT wait_worker_counts(1,0,1,0);