PostgreSQL Source Code git master
syncutils.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * syncutils.c
3 * PostgreSQL logical replication: common synchronization code
4 *
5 * Copyright (c) 2025, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/syncutils.c
9 *
10 * NOTES
11 * This file contains code common for synchronization workers.
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
18#include "pgstat.h"
21#include "storage/ipc.h"
22#include "utils/lsyscache.h"
23#include "utils/memutils.h"
24
25/*
26 * Enum for phases of the subscription relations state.
27 *
28 * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
29 * state is no longer valid, and the subscription relations should be rebuilt.
30 *
31 * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
32 * relations state is being rebuilt.
33 *
34 * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
35 * up-to-date and valid.
36 */
37typedef enum
38{
43
45
46/*
47 * Exit routine for synchronization worker.
48 */
49pg_noreturn void
51{
53
54 /*
55 * Commit any outstanding transaction. This is the usual case, unless
56 * there was nothing to do for the table.
57 */
59 {
62 }
63
64 /* And flush all writes. */
66
68 {
70 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
72
73 /*
74 * Reset last_seqsync_start_time, so that next time a sequencesync
75 * worker is needed it can be started promptly.
76 */
78 }
79 else
80 {
83 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
87
88 /* Find the leader apply worker and signal it. */
91 }
92
93 /* Stop gracefully */
94 proc_exit(0);
95}
96
97/*
98 * Callback from syscache invalidation.
99 */
100void
102{
104}
105
106/*
107 * Attempt to launch a sync worker for one or more sequences or a table, if
108 * a worker slot is available and the retry interval has elapsed.
109 *
110 * wtype: sync worker type.
111 * nsyncworkers: Number of currently running sync workers for the subscription.
112 * relid: InvalidOid for sequencesync worker, actual relid for tablesync
113 * worker.
114 * last_start_time: Pointer to the last start time of the worker.
115 */
116void
117launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
118 TimestampTz *last_start_time)
119{
121
122 Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
123 (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
124
125 /* If there is a free sync worker slot, start a new sync worker */
126 if (nsyncworkers >= max_sync_workers_per_subscription)
127 return;
128
130
131 if (!(*last_start_time) ||
132 TimestampDifferenceExceeds(*last_start_time, now,
134 {
135 /*
136 * Set the last_start_time even if we fail to start the worker, so
137 * that we won't retry until wal_retrieve_retry_interval has elapsed.
138 */
139 *last_start_time = now;
140 (void) logicalrep_worker_launch(wtype,
145 relid, DSM_HANDLE_INVALID, false);
146 }
147}
148
149/*
150 * Process possible state change(s) of relations that are being synchronized
151 * and start new tablesync workers for the newly added tables. Also, start a
152 * new sequencesync worker for the newly added sequences.
153 */
154void
156{
157 switch (MyLogicalRepWorker->type)
158 {
160
161 /*
162 * Skip for parallel apply workers because they only operate on
163 * tables that are in a READY state. See pa_can_start() and
164 * should_apply_changes_for_rel().
165 */
166 break;
167
169 ProcessSyncingTablesForSync(current_lsn);
170 break;
171
172 case WORKERTYPE_APPLY:
173 ProcessSyncingTablesForApply(current_lsn);
175 break;
176
178 /* Should never happen. */
179 elog(ERROR, "sequence synchronization worker is not expected to process relations");
180 break;
181
183 /* Should never happen. */
184 elog(ERROR, "Unknown worker type");
185 }
186}
187
188/*
189 * Common code to fetch the up-to-date sync state info for tables and sequences.
190 *
191 * The pg_subscription_rel catalog is shared by tables and sequences. Changes
192 * to either sequences or tables can affect the validity of relation states, so
193 * we identify non-READY tables and non-READY sequences together to ensure
194 * consistency.
195 *
196 * has_pending_subtables: true if the subscription has one or more tables that
197 * are not in READY state, otherwise false.
198 * has_pending_subsequences: true if the subscription has one or more sequences
199 * that are not in READY state, otherwise false.
200 */
201void
202FetchRelationStates(bool *has_pending_subtables,
203 bool *has_pending_subsequences,
204 bool *started_tx)
205{
206 /*
207 * has_subtables and has_subsequences_non_ready are declared as static,
208 * since the same value can be used until the system table is invalidated.
209 */
210 static bool has_subtables = false;
211 static bool has_subsequences_non_ready = false;
212
213 *started_tx = false;
214
216 {
217 MemoryContext oldctx;
218 List *rstates;
219 SubscriptionRelState *rstate;
220
222 has_subsequences_non_ready = false;
223
224 /* Clean the old lists. */
227
228 if (!IsTransactionState())
229 {
231 *started_tx = true;
232 }
233
234 /* Fetch tables and sequences that are in non-READY state. */
235 rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
236 true);
237
238 /* Allocate the tracking info in a permanent memory context. */
240 foreach_ptr(SubscriptionRelState, subrel, rstates)
241 {
242 if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
243 has_subsequences_non_ready = true;
244 else
245 {
246 rstate = palloc(sizeof(SubscriptionRelState));
247 memcpy(rstate, subrel, sizeof(SubscriptionRelState));
249 rstate);
250 }
251 }
252 MemoryContextSwitchTo(oldctx);
253
254 /*
255 * Does the subscription have tables?
256 *
257 * If there were not-READY tables found then we know it does. But if
258 * table_states_not_ready was empty we still need to check again to
259 * see if there are 0 tables.
260 */
261 has_subtables = (table_states_not_ready != NIL) ||
263
264 /*
265 * If the subscription relation cache has been invalidated since we
266 * entered this routine, we still use and return the relations we just
267 * finished constructing, to avoid infinite loops, but we leave the
268 * table states marked as stale so that we'll rebuild it again on next
269 * access. Otherwise, we mark the table states as valid.
270 */
273 }
274
275 if (has_pending_subtables)
276 *has_pending_subtables = has_subtables;
277
278 if (has_pending_subsequences)
279 *has_pending_subsequences = has_subsequences_non_ready;
280}
Subscription * MySubscription
Definition: worker.c:479
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define pg_noreturn
Definition: c.h:169
uint32_t uint32
Definition: c.h:543
#define OidIsValid(objectId)
Definition: c.h:779
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
Assert(PointerIsAligned(start, uint64))
void proc_exit(int code)
Definition: ipc.c:104
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:324
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition: launcher.c:723
void logicalrep_reset_seqsync_start_time(void)
Definition: launcher.c:872
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
int max_sync_workers_per_subscription
Definition: launcher.c:53
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free_deep(List *list)
Definition: list.c:1560
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2170
void * palloc(Size size)
Definition: mcxt.c:1365
MemoryContext CacheMemoryContext
Definition: mcxt.c:169
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
void * arg
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
bool HasSubscriptionTables(Oid subid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
long pgstat_report_stat(bool force)
Definition: pgstat.c:694
uint64_t Datum
Definition: postgres.h:70
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
void ProcessSequencesForSync(void)
Definition: sequencesync.c:94
Definition: pg_list.h:54
LogicalRepWorkerType type
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition: syncutils.c:117
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition: syncutils.c:155
void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
Definition: syncutils.c:101
pg_noreturn void FinishSyncWorker(void)
Definition: syncutils.c:50
SyncingRelationsState
Definition: syncutils.c:38
@ SYNC_RELATIONS_STATE_VALID
Definition: syncutils.c:41
@ SYNC_RELATIONS_STATE_NEEDS_REBUILD
Definition: syncutils.c:39
@ SYNC_RELATIONS_STATE_REBUILD_STARTED
Definition: syncutils.c:40
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition: syncutils.c:202
static SyncingRelationsState relation_states_validity
Definition: syncutils.c:44
List * table_states_not_ready
Definition: tablesync.c:125
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
Definition: tablesync.c:244
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
Definition: tablesync.c:368
LogicalRepWorkerType
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition: xact.c:388
void StartTransactionCommand(void)
Definition: xact.c:3077
void CommitTransactionCommand(void)
Definition: xact.c:3175
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9515
int wal_retrieve_retry_interval
Definition: xlog.c:136
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2783
uint64 XLogRecPtr
Definition: xlogdefs.h:21