12#ifndef WORKER_INTERNAL_H
13#define WORKER_INTERNAL_H
269 bool retain_dead_tuples);
281 char *originname,
Size szoriginname);
297 bool *has_pending_sequences,
bool *started_tx);
362#define isParallelApplyWorker(worker) ((worker)->in_use && \
363 (worker)->type == WORKERTYPE_PARALLEL_APPLY)
364#define isTableSyncWorker(worker) ((worker)->in_use && \
365 (worker)->type == WORKERTYPE_TABLESYNC)
366#define isSequenceSyncWorker(worker) ((worker)->in_use && \
367 (worker)->type == WORKERTYPE_SEQUENCESYNC)
Assert(PointerIsAligned(start, uint64))
TimestampTz last_recv_time
TimestampTz last_seqsync_start_time
LogicalRepWorkerType type
TransactionId oldest_nonremovable_xid
TimestampTz last_send_time
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
int logicalrep_worker_slot_no
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
uint16 logicalrep_worker_generation
ParallelTransState xact_state
XLogRecPtr last_commit_end
bool AllTablesyncsReady(void)
@ PARALLEL_TRANS_FINISHED
#define isParallelApplyWorker(worker)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void stream_cleanup_files(Oid subid, TransactionId xid)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
PGDLLIMPORT ErrorContextCallback * apply_error_context_stack
PGDLLIMPORT bool in_remote_transaction
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
PGDLLIMPORT MemoryContext ApplyMessageContext
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
static bool am_parallel_apply_worker(void)
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
void ProcessSyncingRelations(XLogRecPtr current_lsn)
PGDLLIMPORT LogicalRepWorker * MyLogicalRepWorker
struct ParallelApplyWorkerShared ParallelApplyWorkerShared
void logicalrep_worker_attach(int slot)
void stream_stop_internal(TransactionId xid)
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
void start_apply(XLogRecPtr origin_startpos)
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
bool HasSubscriptionTablesCached(void)
pg_noreturn void FinishSyncWorker(void)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void apply_dispatch(StringInfo s)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
void DisableSubscriptionAndExit(void)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
#define isSequenceSyncWorker(worker)
void pa_detach_all_error_mq(void)
void logicalrep_reset_seqsync_start_time(void)
PGDLLIMPORT struct WalReceiverConn * LogRepWorkerWalRcvConn
PGDLLIMPORT bool InitializingApplyWorker
void stream_start_internal(TransactionId xid, bool first_segment)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void set_apply_error_context_origin(char *originname)
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
struct LogicalRepWorker LogicalRepWorker
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
@ FS_SERIALIZE_IN_PROGRESS
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
static bool am_sequencesync_worker(void)
PGDLLIMPORT List * table_states_not_ready
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
void SetupApplyOrSyncWorker(int worker_slot)
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
#define isTableSyncWorker(worker)
PGDLLIMPORT Subscription * MySubscription
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_sequences, bool *started_tx)
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
int logicalrep_sync_worker_count(Oid subid)
void maybe_reread_subscription(void)
void InitializeLogRepWorker(void)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
PGDLLIMPORT ParallelApplyWorkerShared * MyParallelShared
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
void UpdateTwoPhaseState(Oid suboid, char new_state)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
void ProcessSequencesForSync(void)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
PGDLLIMPORT MemoryContext ApplyContext