mirror of
https://github.com/timescale/timescaledb.git
synced 2025-04-20 03:21:15 +08:00
Add state machine and polling to launcher
This PR changes the launcher to use a state machine to keep track of the state of each database scheduler. Further, it add polling to go through the list of databases and check their states. This solves several issues 1) A CREATE DATABASE call using a template that already has TimescaleDB installed previously did not start a scheduler until the next database restart. A test for this case has been added. 2) A lack of available slots or background workers when a new database was added meant that the scheduler would not be stared until the next database restart. Now this will be retried on every polling event. This PR also simplifies logic since database entries are never removed from the hash table and thus never added more than once. State transitions are now easier to read and reason about. Documentations for the state transitions has been added.
This commit is contained in:
parent
d9b2dfed6b
commit
53ff6567ef
76
src/loader/README.md
Normal file
76
src/loader/README.md
Normal file
@ -0,0 +1,76 @@
|
||||
# Loader
|
||||
|
||||
The loader has two main purposes:
|
||||
|
||||
1) Load the correct versioned library for each database.
|
||||
Multiple databases in the same Postgres instance may contain
|
||||
different versions of TimescaleDB installed. The loader is
|
||||
responsible for loading the shared library corresponding
|
||||
to the correct TimescaleDB version for the database as soon
|
||||
as possible. For example, a database containing TimescaleDB
|
||||
version 0.8.0 will have timescaledb-0.8.0.so loaded.
|
||||
|
||||
2) Starting background worker schedulers for each database.
|
||||
Background worker schedulers launch background worker tasks
|
||||
for TimescaleDB. The launcher is responsible for launching
|
||||
schedulers for any database that has TimescaleDB installed.
|
||||
This is done by a background task called the launcher.
|
||||
|
||||
|
||||
# Launcher per-DB state machine
|
||||
|
||||
The following is the state machine that the launcher maintains
|
||||
for each database. The CAPITAL labels are the possible states,
|
||||
and the `lowercase` names for messages that trigger the accompanying
|
||||
transitions. Transitions without labels are taken automatically
|
||||
whenever available resources exist.
|
||||
```
|
||||
|
||||
stop
|
||||
ENABLED+--------------+
|
||||
+ ^--------------|
|
||||
| start ||
|
||||
| ||
|
||||
| ||
|
||||
v +v
|
||||
ALLOCATED+------> DISABLED
|
||||
^+ stop ^
|
||||
|| |
|
||||
restart || |
|
||||
|| |
|
||||
+v |
|
||||
STARTED+--------------+
|
||||
stop / scheduler quit
|
||||
|
||||
```
|
||||
|
||||
## The following is a detailed description of the transitions
|
||||
|
||||
Note that `set vxid` sets a vxid variable on the scheduler. This variable
|
||||
is passed down to the scheduler and the scheduler waits on that vxid when
|
||||
it first starts.
|
||||
|
||||
Transitions that happen automatically (at least once per poll period).
|
||||
* `ENABLED->ALLOCATED`: Reserved slot for worker
|
||||
* `ALLOCATED->STARTED`: Scheduler started
|
||||
* `STARTED->DISABLED`: Iff scheduler has stopped. Slot released.
|
||||
|
||||
Transition that happen upon getting a STOP MESSAGE:
|
||||
* `ENABLED->DISABLED`: No action
|
||||
* `ALLOCATED->DISABLED`: Slot released
|
||||
* `STARTED->DISABLED`: Scheduler terminated & slot released
|
||||
* `DISABLED->DISABLED`: No Action
|
||||
|
||||
Transition that happen upon getting a START MESSAGE
|
||||
* Database not yet registed: Register, set to ENABLED and take ENABLED action below.
|
||||
* `ENABLED->ENABLED`: Set vxid; then try the automatic transitions
|
||||
* `ALLOCATED->ALLOCATED`: Set vxid; then try the automatic transitions
|
||||
* `STARTED->STARTED`: No action
|
||||
* `DISABLED->ENABLED`: Set vxid
|
||||
|
||||
Transition that happen upon getting a RESTART MESSAGE
|
||||
* Database not yet registed: Failure - no action taken
|
||||
* `ENABLED->ENABLED`: Set vxid
|
||||
* `ALLOCATED->ALLOCATED`: Set vxid
|
||||
* `STARTED->ALLOCATED`: Scheduler terminated, slot /not/ released, set vxid
|
||||
* `DISABLED->DISABLED`: Failure - no action taken
|
@ -48,12 +48,36 @@ typedef enum AckResult
|
||||
ACK_SUCCESS,
|
||||
} AckResult;
|
||||
|
||||
/* See state machine in README.md */
|
||||
typedef enum SchedulerState
|
||||
{
|
||||
/* Scheduler should be started but has not been allocated or started */
|
||||
ENABLED = 0,
|
||||
/* The scheduler has been allocated a spot in timescaleDB's worker counter */
|
||||
ALLOCATED,
|
||||
/* Scheduler has been started */
|
||||
STARTED,
|
||||
|
||||
/*
|
||||
* Scheduler is stopped and should not be started automatically. START and
|
||||
* RESTART messages can re-enable the scheduler.
|
||||
*/
|
||||
DISABLED
|
||||
} SchedulerState;
|
||||
|
||||
#ifdef TS_DEBUG
|
||||
#define BGW_LAUNCHER_RESTART_TIME 0
|
||||
#define BGW_LAUNCHER_RESTART_TIME_S 0
|
||||
#else
|
||||
#define BGW_LAUNCHER_RESTART_TIME 60
|
||||
#define BGW_LAUNCHER_RESTART_TIME_S 60
|
||||
#endif
|
||||
|
||||
#ifdef TS_DEBUG
|
||||
#define BGW_LAUNCHER_POLL_TIME_MS 10L
|
||||
#else
|
||||
#define BGW_LAUNCHER_POLL_TIME_MS 60000L
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* Main bgw launcher for the cluster.
|
||||
*
|
||||
@ -76,8 +100,13 @@ typedef struct DbHashEntry
|
||||
Oid db_oid; /* key for the hash table, must be first */
|
||||
BackgroundWorkerHandle *db_scheduler_handle; /* needed to shut down
|
||||
* properly */
|
||||
SchedulerState state;
|
||||
VirtualTransactionId vxid;
|
||||
int state_transition_failures;
|
||||
} DbHashEntry;
|
||||
|
||||
static void
|
||||
scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry);
|
||||
|
||||
static void
|
||||
bgw_on_postmaster_death(void)
|
||||
@ -90,26 +119,23 @@ bgw_on_postmaster_death(void)
|
||||
}
|
||||
|
||||
static void
|
||||
report_bgw_limit_exceeded(void)
|
||||
report_bgw_limit_exceeded(DbHashEntry *entry)
|
||||
{
|
||||
ereport(LOG, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("TimescaleDB background worker limit of %d exceeded", guc_max_background_workers),
|
||||
errhint("Consider increasing timescaledb.max_background_workers.")));
|
||||
if (entry->state_transition_failures == 0)
|
||||
ereport(LOG, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("TimescaleDB background worker limit of %d exceeded", guc_max_background_workers),
|
||||
errhint("Consider increasing timescaledb.max_background_workers.")));
|
||||
entry->state_transition_failures++;
|
||||
}
|
||||
|
||||
/*
|
||||
* This error is thrown on failure to register a background worker with the
|
||||
* postmaster. This should be highly unusual and only happen when we run out of
|
||||
* background worker slots. In which case, it is okay to shut everything down
|
||||
* and hope that things are better when we restart
|
||||
*/
|
||||
static void
|
||||
report_error_on_worker_register_failure(void)
|
||||
report_error_on_worker_register_failure(DbHashEntry *entry)
|
||||
{
|
||||
ereport(LOG, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
|
||||
errmsg("no available background worker slots"),
|
||||
errhint("Consider increasing max_worker_processes in tandem with timescaledb.max_background_workers.")));
|
||||
|
||||
if (entry->state_transition_failures == 0)
|
||||
ereport(LOG, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
|
||||
errmsg("no available background worker slots"),
|
||||
errhint("Consider increasing max_worker_processes in tandem with timescaledb.max_background_workers.")));
|
||||
entry->state_transition_failures++;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -125,11 +151,16 @@ static BgwHandleStatus
|
||||
get_background_worker_pid(BackgroundWorkerHandle *handle, pid_t *pidp)
|
||||
{
|
||||
BgwHandleStatus status;
|
||||
pid_t pid;
|
||||
|
||||
if (handle == NULL)
|
||||
status = BGWH_STOPPED;
|
||||
else
|
||||
status = GetBackgroundWorkerPid(handle, pidp);
|
||||
{
|
||||
status = GetBackgroundWorkerPid(handle, &pid);
|
||||
if (pidp != NULL)
|
||||
*pidp = pid;
|
||||
}
|
||||
|
||||
if (status == BGWH_POSTMASTER_DIED)
|
||||
bgw_on_postmaster_death();
|
||||
@ -196,7 +227,7 @@ bgw_cluster_launcher_register(void)
|
||||
/* set up worker settings for our main worker */
|
||||
snprintf(worker.bgw_name, BGW_MAXLEN, "TimescaleDB Background Worker Launcher");
|
||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||
worker.bgw_restart_time = BGW_LAUNCHER_RESTART_TIME;
|
||||
worker.bgw_restart_time = BGW_LAUNCHER_RESTART_TIME_S;
|
||||
|
||||
/*
|
||||
* Starting at BgWorkerStart_RecoveryFinished means we won't ever get
|
||||
@ -254,13 +285,28 @@ init_database_htab(void)
|
||||
|
||||
/* Insert a scheduler entry into the hash table. Correctly set entry values. */
|
||||
static DbHashEntry *
|
||||
db_hash_entry_create(HTAB *db_htab, Oid db_oid)
|
||||
db_hash_entry_create_if_not_exists(HTAB *db_htab, Oid db_oid)
|
||||
{
|
||||
DbHashEntry *db_he;
|
||||
bool found;
|
||||
|
||||
db_he = (DbHashEntry *) hash_search(db_htab, &db_oid, HASH_ENTER, &found);
|
||||
db_he->db_scheduler_handle = NULL;
|
||||
if (!found)
|
||||
{
|
||||
db_he->db_scheduler_handle = NULL;
|
||||
db_he->state = ENABLED;
|
||||
SetInvalidVirtualTransactionId(db_he->vxid);
|
||||
db_he->state_transition_failures = 0;
|
||||
|
||||
/*
|
||||
* Try to allocate a spot right away to give schedulers priority over
|
||||
* other bgws. This is especially important on initial server startup
|
||||
* where we want to reserve slots for all schedulers before starting
|
||||
* any. This is done so that background workers started by schedulers
|
||||
* don't race for open slots with other schedulers on startup.
|
||||
*/
|
||||
scheduler_state_trans_enabled_to_allocated(db_he);
|
||||
}
|
||||
|
||||
return db_he;
|
||||
}
|
||||
@ -281,12 +327,6 @@ populate_database_htab(HTAB *db_htab)
|
||||
Relation rel;
|
||||
HeapScanDesc scan;
|
||||
HeapTuple tup;
|
||||
ListCell *lc;
|
||||
|
||||
/*
|
||||
* Used to store OIDs that can be assigned schedulers.
|
||||
*/
|
||||
List *db_oids = NIL;
|
||||
|
||||
/*
|
||||
* by this time we should already be connected to the db, and only have
|
||||
@ -298,12 +338,6 @@ populate_database_htab(HTAB *db_htab)
|
||||
rel = heap_open(DatabaseRelationId, AccessShareLock);
|
||||
scan = heap_beginscan_catalog(rel, 0, NULL);
|
||||
|
||||
/*
|
||||
* Do an initial scan just to figure out how many databases there are.
|
||||
* This initial scan is necessary so that we can maintain the invariant
|
||||
* that entries are not added to the hash table UNLESS they have already
|
||||
* been accounted for in the bgw_counter.
|
||||
*/
|
||||
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
|
||||
{
|
||||
Form_pg_database pgdb = (Form_pg_database) GETSTRUCT(tup);
|
||||
@ -312,79 +346,127 @@ populate_database_htab(HTAB *db_htab)
|
||||
continue; /* don't bother with dbs that don't allow
|
||||
* connections or are templates */
|
||||
|
||||
db_oids = lappend_oid(db_oids, HeapTupleGetOid(tup));
|
||||
db_hash_entry_create_if_not_exists(db_htab, HeapTupleGetOid(tup));
|
||||
}
|
||||
heap_endscan(scan);
|
||||
heap_close(rel, AccessShareLock);
|
||||
|
||||
/*
|
||||
* Now reserve slots for all schedulers via the bgw_counter. We want to
|
||||
* avoid the race condition where we have enough workers allocated to
|
||||
* start schedulers for all databases, but before we could get all of them
|
||||
* started, the (say) first scheduler has started too many jobs and then
|
||||
* we don't have enough schedulers for the dbs. So we need to increment
|
||||
* our workers all at once so that schedulers don't start workers stealing
|
||||
* other schedulers' spots.
|
||||
*/
|
||||
if (!bgw_total_workers_increment_by(list_length(db_oids)))
|
||||
{
|
||||
ereport(LOG, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("total databases = %ld TimescaleDB background worker limit %d, so no schedulers allocated", hash_get_num_entries(db_htab), guc_max_background_workers),
|
||||
errhint("You may start background workers manually by using the _timescaledb_internal.start_background_workers() function in each database you would like to have a scheduler worker in.")));
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* All schedulers have been correctly accounted for, so go ahead and
|
||||
* insert all into the actual hash table.
|
||||
*/
|
||||
foreach(lc, db_oids)
|
||||
(void) db_hash_entry_create(db_htab, lfirst_oid(lc));
|
||||
|
||||
/*
|
||||
* This commit is at the end of this function instead of after heap_close
|
||||
* above because the db_oids list is allocated on this txn's memory
|
||||
* context. We cannot free this context until we are done using the list.
|
||||
*/
|
||||
CommitTransactionCommand();
|
||||
}
|
||||
|
||||
/* Scan our hash table of dbs and register a worker for each */
|
||||
static void
|
||||
start_db_schedulers(HTAB *db_htab)
|
||||
scheduler_modify_state(DbHashEntry *entry, SchedulerState new_state)
|
||||
{
|
||||
Assert(entry->state != new_state);
|
||||
entry->state_transition_failures = 0;
|
||||
entry->state = new_state;
|
||||
}
|
||||
|
||||
/* TRANSITION FUNCTIONS */
|
||||
|
||||
static void
|
||||
scheduler_state_trans_disabled_to_enabled(DbHashEntry *entry)
|
||||
{
|
||||
Assert(entry->state == DISABLED);
|
||||
scheduler_modify_state(entry, ENABLED);
|
||||
}
|
||||
|
||||
static void
|
||||
scheduler_state_trans_enabled_to_allocated(DbHashEntry *entry)
|
||||
{
|
||||
Assert(entry->state == ENABLED);
|
||||
/* Reserve a spot for this scheduler with BGW counter */
|
||||
if (!bgw_total_workers_increment())
|
||||
{
|
||||
report_bgw_limit_exceeded(entry);
|
||||
return;
|
||||
}
|
||||
scheduler_modify_state(entry, ALLOCATED);
|
||||
}
|
||||
|
||||
static void
|
||||
scheduler_state_trans_started_to_allocated(DbHashEntry *entry)
|
||||
{
|
||||
Assert(entry->state == STARTED);
|
||||
Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED);
|
||||
scheduler_modify_state(entry, ALLOCATED);
|
||||
}
|
||||
|
||||
static void
|
||||
scheduler_state_trans_allocated_to_start(DbHashEntry *entry)
|
||||
{
|
||||
pid_t worker_pid;
|
||||
bool worker_registered;
|
||||
|
||||
Assert(entry->state == ALLOCATED);
|
||||
|
||||
worker_registered = register_entrypoint_for_db(entry->db_oid, entry->vxid, &entry->db_scheduler_handle);
|
||||
if (!worker_registered)
|
||||
{
|
||||
report_error_on_worker_register_failure(entry);
|
||||
return;
|
||||
}
|
||||
wait_for_background_worker_startup(entry->db_scheduler_handle, &worker_pid);
|
||||
SetInvalidVirtualTransactionId(entry->vxid);
|
||||
scheduler_modify_state(entry, STARTED);
|
||||
}
|
||||
|
||||
static void
|
||||
scheduler_state_trans_enabled_to_disabled(DbHashEntry *entry)
|
||||
{
|
||||
Assert(entry->state == ENABLED);
|
||||
scheduler_modify_state(entry, DISABLED);
|
||||
}
|
||||
|
||||
static void
|
||||
scheduler_state_trans_allocated_to_disabled(DbHashEntry *entry)
|
||||
{
|
||||
Assert(entry->state == ALLOCATED);
|
||||
bgw_total_workers_decrement();
|
||||
scheduler_modify_state(entry, DISABLED);
|
||||
}
|
||||
|
||||
static void
|
||||
scheduler_state_trans_started_to_disabled(DbHashEntry *entry)
|
||||
{
|
||||
Assert(entry->state == STARTED);
|
||||
Assert(get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED);
|
||||
bgw_total_workers_decrement();
|
||||
scheduler_modify_state(entry, DISABLED);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
scheduler_state_trans_automatic(DbHashEntry *entry)
|
||||
{
|
||||
switch (entry->state)
|
||||
{
|
||||
case ENABLED:
|
||||
scheduler_state_trans_enabled_to_allocated(entry);
|
||||
if (entry->state == ALLOCATED)
|
||||
scheduler_state_trans_allocated_to_start(entry);
|
||||
break;
|
||||
case ALLOCATED:
|
||||
scheduler_state_trans_allocated_to_start(entry);
|
||||
break;
|
||||
case STARTED:
|
||||
if (get_background_worker_pid(entry->db_scheduler_handle, NULL) == BGWH_STOPPED)
|
||||
scheduler_state_trans_started_to_disabled(entry);
|
||||
break;
|
||||
case DISABLED:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
scheduler_state_trans_automatic_all(HTAB *db_htab)
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
DbHashEntry *current_entry;
|
||||
int nstarted = 0;
|
||||
int ndatabases = hash_get_num_entries(db_htab);
|
||||
|
||||
hash_seq_init(&hash_seq, db_htab);
|
||||
while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
bool worker_registered = false;
|
||||
pid_t worker_pid;
|
||||
VirtualTransactionId vxid;
|
||||
|
||||
/* When called at server start, no need to wait on a vxid */
|
||||
SetInvalidVirtualTransactionId(vxid);
|
||||
|
||||
worker_registered = register_entrypoint_for_db(current_entry->db_oid, vxid, ¤t_entry->db_scheduler_handle);
|
||||
if (!worker_registered)
|
||||
{
|
||||
hash_seq_term(&hash_seq);
|
||||
ereport(LOG, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
|
||||
errmsg("TimescaleDB background worker scheduler for at least one database unable to start"),
|
||||
errhint("%d schedulers have been started, %d databases remain without scheduler. Increase max_worker_processes and restart the server.", nstarted, (ndatabases - nstarted))));
|
||||
|
||||
/*
|
||||
* We don't need to decrement as that will be handled by the
|
||||
* stopped workers check.
|
||||
*/
|
||||
break;
|
||||
}
|
||||
wait_for_background_worker_startup(current_entry->db_scheduler_handle, &worker_pid);
|
||||
nstarted++;
|
||||
}
|
||||
scheduler_state_trans_automatic(current_entry);
|
||||
}
|
||||
|
||||
/* This is called when we're going to shut down so we don't leave things messy*/
|
||||
@ -413,74 +495,9 @@ launcher_pre_shmem_cleanup(int code, Datum arg)
|
||||
bgw_message_queue_shmem_cleanup();
|
||||
}
|
||||
|
||||
/*Garbage collector cleaning up any stopped schedulers*/
|
||||
static void
|
||||
stopped_db_schedulers_cleanup(HTAB *db_htab)
|
||||
{
|
||||
HASH_SEQ_STATUS hash_seq;
|
||||
DbHashEntry *current_entry;
|
||||
bool found;
|
||||
|
||||
hash_seq_init(&hash_seq, db_htab);
|
||||
while ((current_entry = hash_seq_search(&hash_seq)) != NULL)
|
||||
{
|
||||
pid_t worker_pid;
|
||||
|
||||
if (get_background_worker_pid(current_entry->db_scheduler_handle, &worker_pid) == BGWH_STOPPED)
|
||||
{
|
||||
hash_search(db_htab, ¤t_entry->db_oid, HASH_REMOVE, &found);
|
||||
bgw_total_workers_decrement();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Returns the hashtable entry for a database's scheduler, allocating one if it does not exist. */
|
||||
static DbHashEntry *
|
||||
allocate_scheduler(HTAB *db_htab, Oid db_oid)
|
||||
{
|
||||
bool found;
|
||||
DbHashEntry *db_he;
|
||||
|
||||
db_he = hash_search(db_htab, &db_oid, HASH_FIND, &found);
|
||||
if (!found)
|
||||
{
|
||||
/* Reserve a spot for this scheduler with BGW counter */
|
||||
if (!bgw_total_workers_increment())
|
||||
{
|
||||
report_bgw_limit_exceeded();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Only insert into the hash table if we successfully get a counter
|
||||
* spot
|
||||
*/
|
||||
db_he = db_hash_entry_create(db_htab, db_oid);
|
||||
}
|
||||
|
||||
return db_he;
|
||||
}
|
||||
|
||||
static AckResult
|
||||
register_scheduler_handle(VirtualTransactionId vxid, DbHashEntry *db_he)
|
||||
{
|
||||
pid_t worker_pid;
|
||||
bool worker_registered = register_entrypoint_for_db(db_he->db_oid, vxid, &db_he->db_scheduler_handle);
|
||||
|
||||
if (!worker_registered)
|
||||
{
|
||||
report_error_on_worker_register_failure();
|
||||
return ACK_FAILURE;
|
||||
}
|
||||
wait_for_background_worker_startup(db_he->db_scheduler_handle, &worker_pid);
|
||||
return ACK_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
*************
|
||||
* Actions for message types we could receive off of the bgw_message_queue.
|
||||
* None of these action functions should do accounting clean-up, as this is done in exclusively
|
||||
* in stopped_db_schedulers.
|
||||
*************
|
||||
*/
|
||||
|
||||
@ -492,37 +509,54 @@ register_scheduler_handle(VirtualTransactionId vxid, DbHashEntry *db_he)
|
||||
static AckResult
|
||||
message_start_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId vxid)
|
||||
{
|
||||
DbHashEntry *db_he;
|
||||
pid_t worker_pid;
|
||||
DbHashEntry *entry;
|
||||
|
||||
db_he = allocate_scheduler(db_htab, message->db_oid);
|
||||
if (db_he == NULL)
|
||||
return ACK_FAILURE;
|
||||
entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
|
||||
entry->vxid = vxid;
|
||||
|
||||
if (get_background_worker_pid(db_he->db_scheduler_handle, &worker_pid) == BGWH_STOPPED)
|
||||
return register_scheduler_handle(vxid, db_he);
|
||||
if (entry->state == DISABLED)
|
||||
scheduler_state_trans_disabled_to_enabled(entry);
|
||||
|
||||
return ACK_SUCCESS;
|
||||
scheduler_state_trans_automatic(entry);
|
||||
|
||||
return (entry->state == STARTED ? ACK_SUCCESS : ACK_FAILURE);
|
||||
}
|
||||
|
||||
static AckResult
|
||||
message_stop_action(HTAB *db_htab, BgwMessage *message)
|
||||
{
|
||||
DbHashEntry *db_he;
|
||||
bool found;
|
||||
DbHashEntry *entry;
|
||||
|
||||
db_he = hash_search(db_htab, &message->db_oid, HASH_FIND, &found);
|
||||
if (found)
|
||||
/*
|
||||
* If the entry does not exist try to create it so we can put it in the
|
||||
* DISABLED state. Otherwise, it will be created during the next poll and
|
||||
* then will end up in the ENABLED state and proceed to being STARTED. But
|
||||
* this is not the behavior we want.
|
||||
*/
|
||||
entry = db_hash_entry_create_if_not_exists(db_htab, message->db_oid);
|
||||
|
||||
switch (entry->state)
|
||||
{
|
||||
terminate_background_worker(db_he->db_scheduler_handle);
|
||||
wait_for_background_worker_shutdown(db_he->db_scheduler_handle);
|
||||
case ENABLED:
|
||||
scheduler_state_trans_enabled_to_disabled(entry);
|
||||
break;
|
||||
case ALLOCATED:
|
||||
scheduler_state_trans_allocated_to_disabled(entry);
|
||||
break;
|
||||
case STARTED:
|
||||
terminate_background_worker(entry->db_scheduler_handle);
|
||||
wait_for_background_worker_shutdown(entry->db_scheduler_handle);
|
||||
scheduler_state_trans_started_to_disabled(entry);
|
||||
break;
|
||||
case DISABLED:
|
||||
break;
|
||||
}
|
||||
return ACK_SUCCESS;
|
||||
return entry->state == DISABLED ? ACK_SUCCESS : ACK_FAILURE;
|
||||
}
|
||||
|
||||
/*
|
||||
* This function will only restart an existing scheduler and will throw an error
|
||||
* if it is told to restart a nonexistent scheduler.
|
||||
* if it is told to restart a nonexistent or disabled scheduler.
|
||||
*
|
||||
* One might think that this function would simply be a combination of stop and start above, however
|
||||
* we decided against that because we want to maintain the worker's "slot".
|
||||
@ -531,24 +565,38 @@ message_stop_action(HTAB *db_htab, BgwMessage *message)
|
||||
static AckResult
|
||||
message_restart_action(HTAB *db_htab, BgwMessage *message, VirtualTransactionId vxid)
|
||||
{
|
||||
DbHashEntry *db_he;
|
||||
DbHashEntry *entry;
|
||||
bool found;
|
||||
|
||||
db_he = hash_search(db_htab, &message->db_oid, HASH_FIND, &found);
|
||||
if (!found)
|
||||
entry = hash_search(db_htab, &message->db_oid, HASH_FIND, &found);
|
||||
|
||||
if (!found || entry->state == DISABLED)
|
||||
{
|
||||
ereport(WARNING, (errmsg("TimescaleDB background worker launcher instructed to restart a nonexistent scheduler BGW")));
|
||||
return ACK_FAILURE;
|
||||
}
|
||||
|
||||
/*
|
||||
* Stop the scheduler no matter what, so we can restart. Both helper
|
||||
* functions correctly handle null scheduler_handles
|
||||
*/
|
||||
terminate_background_worker(db_he->db_scheduler_handle);
|
||||
wait_for_background_worker_shutdown(db_he->db_scheduler_handle);
|
||||
entry->vxid = vxid;
|
||||
|
||||
return register_scheduler_handle(vxid, db_he);
|
||||
switch (entry->state)
|
||||
{
|
||||
case ENABLED:
|
||||
break;
|
||||
case ALLOCATED:
|
||||
break;
|
||||
case STARTED:
|
||||
terminate_background_worker(entry->db_scheduler_handle);
|
||||
wait_for_background_worker_shutdown(entry->db_scheduler_handle);
|
||||
scheduler_state_trans_started_to_allocated(entry);
|
||||
break;
|
||||
case DISABLED:
|
||||
/* This case should have been caught above */
|
||||
Assert(false);
|
||||
return ACK_FAILURE;
|
||||
}
|
||||
|
||||
scheduler_state_trans_automatic(entry);
|
||||
return entry->state == STARTED ? ACK_SUCCESS : ACK_FAILURE;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -641,7 +689,9 @@ ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS)
|
||||
* have to exit(0) because if we exit in error we get restarted by the
|
||||
* postmaster.
|
||||
*/
|
||||
report_bgw_limit_exceeded();
|
||||
ereport(LOG, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("TimescaleDB background worker is set to 0"),
|
||||
errhint("TimescaleDB background worker launcher shutting down.")));
|
||||
proc_exit(0);
|
||||
}
|
||||
/* Connect to the db, no db name yet, so can only access shared catalogs */
|
||||
@ -655,22 +705,19 @@ ts_bgw_cluster_launcher_main(PG_FUNCTION_ARGS)
|
||||
|
||||
before_shmem_exit(launcher_pre_shmem_cleanup, PointerGetDatum(db_htab));
|
||||
|
||||
start_db_schedulers(db_htab);
|
||||
|
||||
while (true)
|
||||
{
|
||||
int wl_rc;
|
||||
bool handled_msgs = false;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
stopped_db_schedulers_cleanup(db_htab);
|
||||
if (launcher_handle_message(db_htab))
|
||||
populate_database_htab(db_htab);
|
||||
handled_msgs = launcher_handle_message(db_htab);
|
||||
scheduler_state_trans_automatic_all(db_htab);
|
||||
if (handled_msgs)
|
||||
continue;
|
||||
|
||||
#if PG96
|
||||
wl_rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
|
||||
#else
|
||||
wl_rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, PG_WAIT_EXTENSION);
|
||||
#endif
|
||||
wl_rc = WaitLatchCompat(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, BGW_LAUNCHER_POLL_TIME_MS);
|
||||
ResetLatch(MyLatch);
|
||||
if (wl_rc & WL_POSTMASTER_DEATH)
|
||||
bgw_on_postmaster_death();
|
||||
|
@ -381,8 +381,35 @@ SELECT wait_worker_counts(1,0,0,0);
|
||||
t
|
||||
(1 row)
|
||||
|
||||
/* Clean up after ourselves */
|
||||
/* Clean up the template database, removing our test utilities etc */
|
||||
\ir include/bgw_launcher_utils_cleanup.sql
|
||||
DROP FUNCTION wait_worker_counts(integer, integer, integer, integer);
|
||||
DROP VIEW worker_counts;
|
||||
\c single_2
|
||||
/* Now try creating a DB from a template with the extension already installed.
|
||||
* Make sure we see a scheduler start. */
|
||||
CREATE DATABASE single;
|
||||
SELECT wait_worker_counts(1,1,0,0);
|
||||
wait_worker_counts
|
||||
--------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
DROP DATABASE single;
|
||||
/* Now make sure that there's no race between create database and create extension.
|
||||
* Although to be honest, this race probably wouldn't manifest in this test. */
|
||||
\c template1
|
||||
DROP EXTENSION timescaledb;
|
||||
\c single_2
|
||||
CREATE DATABASE single;
|
||||
\c single
|
||||
SET client_min_messages = ERROR;
|
||||
CREATE EXTENSION timescaledb;
|
||||
RESET client_min_messages;
|
||||
\c single_2
|
||||
SELECT wait_worker_counts(1,1,0,0);
|
||||
wait_worker_counts
|
||||
--------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
|
@ -179,7 +179,24 @@ SELECT wait_worker_counts(1,0,0,1);
|
||||
COMMIT;
|
||||
/* End our transaction and it should immediately exit because it's a template database.*/
|
||||
SELECT wait_worker_counts(1,0,0,0);
|
||||
|
||||
/* Clean up after ourselves */
|
||||
/* Clean up the template database, removing our test utilities etc */
|
||||
\ir include/bgw_launcher_utils_cleanup.sql
|
||||
|
||||
\c single_2
|
||||
/* Now try creating a DB from a template with the extension already installed.
|
||||
* Make sure we see a scheduler start. */
|
||||
CREATE DATABASE single;
|
||||
SELECT wait_worker_counts(1,1,0,0);
|
||||
DROP DATABASE single;
|
||||
/* Now make sure that there's no race between create database and create extension.
|
||||
* Although to be honest, this race probably wouldn't manifest in this test. */
|
||||
\c template1
|
||||
DROP EXTENSION timescaledb;
|
||||
\c single_2
|
||||
CREATE DATABASE single;
|
||||
\c single
|
||||
SET client_min_messages = ERROR;
|
||||
CREATE EXTENSION timescaledb;
|
||||
RESET client_min_messages;
|
||||
\c single_2
|
||||
SELECT wait_worker_counts(1,1,0,0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user