70 errmsg(
"logical replication sequence synchronization worker for subscription \"%s\" has finished",
83 errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
131 if (!(*last_start_time) ||
139 *last_start_time =
now;
179 elog(
ERROR,
"sequence synchronization worker is not expected to process relations");
203 bool *has_pending_subsequences,
210 static bool has_subtables =
false;
211 static bool has_subsequences_non_ready =
false;
222 has_subsequences_non_ready =
false;
243 has_subsequences_non_ready =
true;
275 if (has_pending_subtables)
276 *has_pending_subtables = has_subtables;
278 if (has_pending_subsequences)
279 *has_pending_subsequences = has_subsequences_non_ready;
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define OidIsValid(objectId)
#define DSM_HANDLE_INVALID
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
Assert(PointerIsAligned(start, uint64))
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
void logicalrep_reset_seqsync_start_time(void)
LogicalRepWorker * MyLogicalRepWorker
int max_sync_workers_per_subscription
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
char * get_rel_name(Oid relid)
char get_rel_relkind(Oid relid)
MemoryContext CacheMemoryContext
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define foreach_ptr(type, var, lst)
bool HasSubscriptionTables(Oid subid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
long pgstat_report_stat(bool force)
void ProcessSequencesForSync(void)
LogicalRepWorkerType type
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
void ProcessSyncingRelations(XLogRecPtr current_lsn)
void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
pg_noreturn void FinishSyncWorker(void)
@ SYNC_RELATIONS_STATE_VALID
@ SYNC_RELATIONS_STATE_NEEDS_REBUILD
@ SYNC_RELATIONS_STATE_REBUILD_STARTED
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
static SyncingRelationsState relation_states_validity
List * table_states_not_ready
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
XLogRecPtr GetXLogWriteRecPtr(void)
int wal_retrieve_retry_interval
void XLogFlush(XLogRecPtr record)