PostgreSQL Source Code git master
worker_internal.h
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * worker_internal.h
4 * Internal headers shared by logical replication workers.
5 *
6 * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
7 *
8 * src/include/replication/worker_internal.h
9 *
10 *-------------------------------------------------------------------------
11 */
12#ifndef WORKER_INTERNAL_H
13#define WORKER_INTERNAL_H
14
15#include "access/xlogdefs.h"
17#include "datatype/timestamp.h"
18#include "miscadmin.h"
21#include "storage/buffile.h"
22#include "storage/fileset.h"
23#include "storage/lock.h"
24#include "storage/shm_mq.h"
25#include "storage/shm_toc.h"
26#include "storage/spin.h"
27
28/* Different types of worker */
30{
37
38typedef struct LogicalRepWorker
39{
40 /* What type of worker is this? */
42
43 /* Time at which this worker was launched. */
45
46 /* Indicates if this slot is used or free. */
47 bool in_use;
48
49 /* Increased every time the slot is taken by new worker. */
51
52 /* Pointer to proc array. NULL if not running. */
54
55 /* Database id to connect to. */
57
58 /* User to use for connection (will be same as owner of subscription). */
60
61 /* Subscription id for the worker. */
63
64 /* Used for initial table synchronization. */
68 slock_t relmutex;
69
70 /*
71 * Used to create the changes and subxact files for the streaming
72 * transactions. Upon the arrival of the first streaming transaction or
73 * when the first-time leader apply worker times out while sending changes
74 * to the parallel apply worker, the fileset will be initialized, and it
75 * will be deleted when the worker exits. Under this, separate buffiles
76 * would be created for each transaction which will be deleted after the
77 * transaction is finished.
78 */
80
81 /*
82 * PID of leader apply worker if this slot is used for a parallel apply
83 * worker, InvalidPid otherwise.
84 */
86
87 /* Indicates whether apply can be performed in parallel. */
89
90 /*
91 * Changes made by this transaction and subsequent ones must be preserved.
92 * This ensures that update_deleted conflicts can be accurately detected
93 * during the apply phase of logical replication by this worker.
94 *
95 * The logical replication launcher manages an internal replication slot
96 * named "pg_conflict_detection". It asynchronously collects this ID to
97 * decide when to advance the xmin value of the slot.
98 *
99 * This ID is set to InvalidTransactionId when the apply worker stops
100 * retaining information needed for conflict detection.
101 */
103
104 /* Stats. */
110
113
114/*
115 * State of the transaction in parallel apply worker.
116 *
117 * The enum values must have the same order as the transaction state
118 * transitions.
119 */
121{
126
127/*
128 * State of fileset used to communicate changes from leader to parallel
129 * apply worker.
130 *
131 * FS_EMPTY indicates an initial state where the leader doesn't need to use
132 * the file to communicate with the parallel apply worker.
133 *
134 * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
135 * to the file.
136 *
137 * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
138 * the file.
139 *
140 * FS_READY indicates that it is now ok for a parallel apply worker to
141 * read the file.
142 */
144{
150
151/*
152 * Struct for sharing information between leader apply worker and parallel
153 * apply workers.
154 */
156{
157 slock_t mutex;
158
160
161 /*
162 * State used to ensure commit ordering.
163 *
164 * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
165 * handling the transaction finish commands while the apply leader will
166 * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
167 * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
168 * STREAM_ABORT).
169 */
171
172 /* Information from the corresponding LogicalRepWorker slot. */
175
176 /*
177 * Indicates whether there are pending streaming blocks in the queue. The
178 * parallel apply worker will check it before starting to wait.
179 */
181
182 /*
183 * XactLastCommitEnd from the parallel apply worker. This is required by
184 * the leader worker so it can update the lsn_mappings.
185 */
187
188 /*
189 * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
190 * serialize changes to the file, and share the fileset with the parallel
191 * apply worker when processing the transaction finish command. Then the
192 * parallel apply worker will apply all the spooled messages.
193 *
194 * FileSet is used here instead of SharedFileSet because we need it to
195 * survive after releasing the shared memory so that the leader apply
196 * worker can re-use the same fileset for the next streaming transaction.
197 */
201
202/*
203 * Information which is used to manage the parallel apply worker.
204 */
206{
207 /*
208 * This queue is used to send changes from the leader apply worker to the
209 * parallel apply worker.
210 */
212
213 /*
214 * This queue is used to transfer error messages from the parallel apply
215 * worker to the leader apply worker.
216 */
218
220
221 /*
222 * Indicates whether the leader apply worker needs to serialize the
223 * remaining changes to a file due to timeout when attempting to send data
224 * to the parallel apply worker via shared memory.
225 */
227
228 /*
229 * True if the worker is being used to process a parallel apply
230 * transaction. False indicates this worker is available for re-use.
231 */
232 bool in_use;
233
236
237/* Main memory context for apply worker. Permanent during worker lifetime. */
239
241
243
245
246/* libpqreceiver connection */
248
249/* Worker and subscription objects. */
252
254
256
258
259extern void logicalrep_worker_attach(int slot);
261 Oid subid, Oid relid,
262 bool only_running);
263extern List *logicalrep_workers_find(Oid subid, bool only_running,
264 bool acquire_lock);
266 Oid dbid, Oid subid, const char *subname,
267 Oid userid, Oid relid,
268 dsm_handle subworker_dsm,
269 bool retain_dead_tuples);
270extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
271 Oid relid);
273extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
274 Oid relid);
276
278extern int logicalrep_sync_worker_count(Oid subid);
279
280extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
281 char *originname, Size szoriginname);
282
283extern bool AllTablesyncsReady(void);
284extern bool HasSubscriptionTablesCached(void);
285extern void UpdateTwoPhaseState(Oid suboid, char new_state);
286
287extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
288extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
289extern void ProcessSequencesForSync(void);
290
291pg_noreturn extern void FinishSyncWorker(void);
292extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
293extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
294 Oid relid, TimestampTz *last_start_time);
295extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
296extern void FetchRelationStates(bool *has_pending_subtables,
297 bool *has_pending_sequences, bool *started_tx);
298
299extern void stream_start_internal(TransactionId xid, bool first_segment);
300extern void stream_stop_internal(TransactionId xid);
301
302/* Common streaming function to apply all the spooled messages */
303extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
304 XLogRecPtr lsn);
305
306extern void apply_dispatch(StringInfo s);
307
308extern void maybe_reread_subscription(void);
309
310extern void stream_cleanup_files(Oid subid, TransactionId xid);
311
313 char *slotname,
314 XLogRecPtr *origin_startpos);
315
316extern void start_apply(XLogRecPtr origin_startpos);
317
318extern void InitializeLogRepWorker(void);
319
320extern void SetupApplyOrSyncWorker(int worker_slot);
321
322extern void DisableSubscriptionAndExit(void);
323
324extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
325
326/* Function for apply error callback */
327extern void apply_error_callback(void *arg);
328extern void set_apply_error_context_origin(char *originname);
329
330/* Parallel apply worker setup and interactions */
331extern void pa_allocate_worker(TransactionId xid);
333extern void pa_detach_all_error_mq(void);
334
335extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
336 const void *data);
338 bool stream_locked);
339
341 ParallelTransState xact_state);
343
344extern void pa_start_subtrans(TransactionId current_xid,
345 TransactionId top_xid);
346extern void pa_reset_subtrans(void);
347extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
349 PartialFileSetState fileset_state);
350
351extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
352extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
353
354extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
355extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
356
357extern void pa_decr_and_wait_stream_block(void);
358
359extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
360 XLogRecPtr remote_lsn);
361
362#define isParallelApplyWorker(worker) ((worker)->in_use && \
363 (worker)->type == WORKERTYPE_PARALLEL_APPLY)
364#define isTableSyncWorker(worker) ((worker)->in_use && \
365 (worker)->type == WORKERTYPE_TABLESYNC)
366#define isSequenceSyncWorker(worker) ((worker)->in_use && \
367 (worker)->type == WORKERTYPE_SEQUENCESYNC)
368
369static inline bool
371{
373}
374
375static inline bool
377{
379}
380
381static inline bool
383{
386}
387
388static inline bool
390{
393}
394
395#endif /* WORKER_INTERNAL_H */
#define PGDLLIMPORT
Definition: c.h:1310
#define pg_noreturn
Definition: c.h:169
uint16_t uint16
Definition: c.h:542
uint32_t uint32
Definition: c.h:543
uint32 TransactionId
Definition: c.h:662
size_t Size
Definition: c.h:615
int64 TimestampTz
Definition: timestamp.h:39
uint32 dsm_handle
Definition: dsm_impl.h:55
Assert(PointerIsAligned(start, uint64))
int LOCKMODE
Definition: lockdefs.h:26
void * arg
const void * data
NameData subname
uint64_t Datum
Definition: postgres.h:70
unsigned int Oid
Definition: postgres_ext.h:32
Definition: pg_list.h:54
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
TimestampTz last_seqsync_start_time
LogicalRepWorkerType type
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
TransactionId oldest_nonremovable_xid
XLogRecPtr reply_lsn
TimestampTz last_send_time
Definition: proc.h:179
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
ParallelTransState xact_state
bool AllTablesyncsReady(void)
Definition: tablesync.c:1600
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
#define isParallelApplyWorker(worker)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:5514
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:293
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:5381
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
PGDLLIMPORT ErrorContextCallback * apply_error_context_stack
Definition: worker.c:469
PGDLLIMPORT bool in_remote_transaction
Definition: worker.c:484
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:746
PGDLLIMPORT MemoryContext ApplyMessageContext
Definition: worker.c:471
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:324
static bool am_parallel_apply_worker(void)
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition: syncutils.c:117
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition: syncutils.c:155
PGDLLIMPORT LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
struct ParallelApplyWorkerShared ParallelApplyWorkerShared
void logicalrep_worker_attach(int slot)
Definition: launcher.c:757
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1862
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition: launcher.c:258
void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
Definition: syncutils.c:101
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
Definition: tablesync.c:244
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:5583
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition: launcher.c:723
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
bool HasSubscriptionTablesCached(void)
Definition: tablesync.c:1630
pg_noreturn void FinishSyncWorker(void)
Definition: syncutils.c:50
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void apply_dispatch(StringInfo s)
Definition: worker.c:3775
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
Definition: launcher.c:679
void DisableSubscriptionAndExit(void)
Definition: worker.c:5943
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
#define isSequenceSyncWorker(worker)
void pa_detach_all_error_mq(void)
void logicalrep_reset_seqsync_start_time(void)
Definition: launcher.c:872
PGDLLIMPORT struct WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
PGDLLIMPORT bool InitializingApplyWorker
Definition: worker.c:499
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1687
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void set_apply_error_context_origin(char *originname)
Definition: worker.c:6301
LogicalRepWorkerType
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
struct LogicalRepWorker LogicalRepWorker
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition: launcher.c:652
static bool am_sequencesync_worker(void)
PGDLLIMPORT List * table_states_not_ready
Definition: tablesync.c:125
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
Definition: tablesync.c:368
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:5869
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
#define isTableSyncWorker(worker)
PGDLLIMPORT Subscription * MySubscription
Definition: worker.c:479
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_sequences, bool *started_tx)
Definition: syncutils.c:202
void apply_error_callback(void *arg)
Definition: worker.c:6159
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3939
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:927
void maybe_reread_subscription(void)
Definition: worker.c:5038
void InitializeLogRepWorker(void)
Definition: worker.c:5737
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
PGDLLIMPORT ParallelApplyWorkerShared * MyParallelShared
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2260
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1651
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
void ProcessSequencesForSync(void)
Definition: sequencesync.c:94
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
PGDLLIMPORT MemoryContext ApplyContext
Definition: worker.c:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21