PostgreSQL Source Code git master
logicalproto.h File Reference
#include "access/xact.h"
#include "executor/tuptable.h"
#include "replication/reorderbuffer.h"
#include "utils/rel.h"
Include dependency graph for logicalproto.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepTupleData
 
struct  LogicalRepRelation
 
struct  LogicalRepTyp
 
struct  LogicalRepBeginData
 
struct  LogicalRepCommitData
 
struct  LogicalRepPreparedTxnData
 
struct  LogicalRepCommitPreparedTxnData
 
struct  LogicalRepRollbackPreparedTxnData
 
struct  LogicalRepStreamAbortData
 

Macros

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1
 
#define LOGICALREP_PROTO_VERSION_NUM   1
 
#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2
 
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3
 
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM   4
 
#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
 
#define LOGICALREP_COLUMN_NULL   'n'
 
#define LOGICALREP_COLUMN_UNCHANGED   'u'
 
#define LOGICALREP_COLUMN_TEXT   't'
 
#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */
 

Typedefs

typedef enum LogicalRepMsgType LogicalRepMsgType
 
typedef struct LogicalRepTupleData LogicalRepTupleData
 
typedef uint32 LogicalRepRelId
 
typedef struct LogicalRepRelation LogicalRepRelation
 
typedef struct LogicalRepTyp LogicalRepTyp
 
typedef struct LogicalRepBeginData LogicalRepBeginData
 
typedef struct LogicalRepCommitData LogicalRepCommitData
 
typedef struct LogicalRepPreparedTxnData LogicalRepPreparedTxnData
 
typedef struct LogicalRepCommitPreparedTxnData LogicalRepCommitPreparedTxnData
 
typedef struct LogicalRepRollbackPreparedTxnData LogicalRepRollbackPreparedTxnData
 
typedef struct LogicalRepStreamAbortData LogicalRepStreamAbortData
 

Enumerations

enum  LogicalRepMsgType {
  LOGICAL_REP_MSG_BEGIN = 'B' , LOGICAL_REP_MSG_COMMIT = 'C' , LOGICAL_REP_MSG_ORIGIN = 'O' , LOGICAL_REP_MSG_INSERT = 'I' ,
  LOGICAL_REP_MSG_UPDATE = 'U' , LOGICAL_REP_MSG_DELETE = 'D' , LOGICAL_REP_MSG_TRUNCATE = 'T' , LOGICAL_REP_MSG_RELATION = 'R' ,
  LOGICAL_REP_MSG_TYPE = 'Y' , LOGICAL_REP_MSG_MESSAGE = 'M' , LOGICAL_REP_MSG_BEGIN_PREPARE = 'b' , LOGICAL_REP_MSG_PREPARE = 'P' ,
  LOGICAL_REP_MSG_COMMIT_PREPARED = 'K' , LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r' , LOGICAL_REP_MSG_STREAM_START = 'S' , LOGICAL_REP_MSG_STREAM_STOP = 'E' ,
  LOGICAL_REP_MSG_STREAM_COMMIT = 'c' , LOGICAL_REP_MSG_STREAM_ABORT = 'A' , LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
}
 

Functions

void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
 
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
 
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
 
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
 
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
 
void logicalrep_write_stream_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
void logicalrep_read_stream_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
 
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
 
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
 
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
 
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
 
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
 
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
 
Listlogicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
 
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
 
LogicalRepRelationlogicalrep_read_rel (StringInfo in)
 
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
 
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
 
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
 
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
 
void logicalrep_write_stream_stop (StringInfo out)
 
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
TransactionId logicalrep_read_stream_commit (StringInfo in, LogicalRepCommitData *commit_data)
 
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
 
void logicalrep_read_stream_abort (StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
 
const char * logicalrep_message_type (LogicalRepMsgType action)
 
bool logicalrep_should_publish_column (Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
 

Macro Definition Documentation

◆ LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_BINARY   'b' /* added in PG14 */

Definition at line 99 of file logicalproto.h.

◆ LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_NULL   'n'

Definition at line 96 of file logicalproto.h.

◆ LOGICALREP_COLUMN_TEXT

#define LOGICALREP_COLUMN_TEXT   't'

Definition at line 98 of file logicalproto.h.

◆ LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_UNCHANGED   'u'

Definition at line 97 of file logicalproto.h.

◆ LOGICALREP_PROTO_MAX_VERSION_NUM

#define LOGICALREP_PROTO_MAX_VERSION_NUM   LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM

Definition at line 45 of file logicalproto.h.

◆ LOGICALREP_PROTO_MIN_VERSION_NUM

#define LOGICALREP_PROTO_MIN_VERSION_NUM   1

Definition at line 40 of file logicalproto.h.

◆ LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM   4

Definition at line 44 of file logicalproto.h.

◆ LOGICALREP_PROTO_STREAM_VERSION_NUM

#define LOGICALREP_PROTO_STREAM_VERSION_NUM   2

Definition at line 42 of file logicalproto.h.

◆ LOGICALREP_PROTO_TWOPHASE_VERSION_NUM

#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM   3

Definition at line 43 of file logicalproto.h.

◆ LOGICALREP_PROTO_VERSION_NUM

#define LOGICALREP_PROTO_VERSION_NUM   1

Definition at line 41 of file logicalproto.h.

Typedef Documentation

◆ LogicalRepBeginData

◆ LogicalRepCommitData

◆ LogicalRepCommitPreparedTxnData

◆ LogicalRepMsgType

◆ LogicalRepPreparedTxnData

◆ LogicalRepRelation

◆ LogicalRepRelId

Definition at line 101 of file logicalproto.h.

◆ LogicalRepRollbackPreparedTxnData

◆ LogicalRepStreamAbortData

◆ LogicalRepTupleData

◆ LogicalRepTyp

typedef struct LogicalRepTyp LogicalRepTyp

Enumeration Type Documentation

◆ LogicalRepMsgType

Enumerator
LOGICAL_REP_MSG_BEGIN 
LOGICAL_REP_MSG_COMMIT 
LOGICAL_REP_MSG_ORIGIN 
LOGICAL_REP_MSG_INSERT 
LOGICAL_REP_MSG_UPDATE 
LOGICAL_REP_MSG_DELETE 
LOGICAL_REP_MSG_TRUNCATE 
LOGICAL_REP_MSG_RELATION 
LOGICAL_REP_MSG_TYPE 
LOGICAL_REP_MSG_MESSAGE 
LOGICAL_REP_MSG_BEGIN_PREPARE 
LOGICAL_REP_MSG_PREPARE 
LOGICAL_REP_MSG_COMMIT_PREPARED 
LOGICAL_REP_MSG_ROLLBACK_PREPARED 
LOGICAL_REP_MSG_STREAM_START 
LOGICAL_REP_MSG_STREAM_STOP 
LOGICAL_REP_MSG_STREAM_COMMIT 
LOGICAL_REP_MSG_STREAM_ABORT 
LOGICAL_REP_MSG_STREAM_PREPARE 

Definition at line 57 of file logicalproto.h.

58{
LogicalRepMsgType
Definition: logicalproto.h:58
@ LOGICAL_REP_MSG_INSERT
Definition: logicalproto.h:62
@ LOGICAL_REP_MSG_TRUNCATE
Definition: logicalproto.h:65
@ LOGICAL_REP_MSG_STREAM_STOP
Definition: logicalproto.h:74
@ LOGICAL_REP_MSG_BEGIN
Definition: logicalproto.h:59
@ LOGICAL_REP_MSG_STREAM_PREPARE
Definition: logicalproto.h:77
@ LOGICAL_REP_MSG_STREAM_ABORT
Definition: logicalproto.h:76
@ LOGICAL_REP_MSG_BEGIN_PREPARE
Definition: logicalproto.h:69
@ LOGICAL_REP_MSG_STREAM_START
Definition: logicalproto.h:73
@ LOGICAL_REP_MSG_COMMIT
Definition: logicalproto.h:60
@ LOGICAL_REP_MSG_PREPARE
Definition: logicalproto.h:70
@ LOGICAL_REP_MSG_RELATION
Definition: logicalproto.h:66
@ LOGICAL_REP_MSG_MESSAGE
Definition: logicalproto.h:68
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
Definition: logicalproto.h:72
@ LOGICAL_REP_MSG_COMMIT_PREPARED
Definition: logicalproto.h:71
@ LOGICAL_REP_MSG_TYPE
Definition: logicalproto.h:67
@ LOGICAL_REP_MSG_DELETE
Definition: logicalproto.h:64
@ LOGICAL_REP_MSG_STREAM_COMMIT
Definition: logicalproto.h:75
@ LOGICAL_REP_MSG_ORIGIN
Definition: logicalproto.h:61
@ LOGICAL_REP_MSG_UPDATE
Definition: logicalproto.h:63

Function Documentation

◆ logicalrep_message_type()

const char * logicalrep_message_type ( LogicalRepMsgType  action)

Definition at line 1212 of file proto.c.

1213{
1214 static char err_unknown[20];
1215
1216 switch (action)
1217 {
1219 return "BEGIN";
1221 return "COMMIT";
1223 return "ORIGIN";
1225 return "INSERT";
1227 return "UPDATE";
1229 return "DELETE";
1231 return "TRUNCATE";
1233 return "RELATION";
1235 return "TYPE";
1237 return "MESSAGE";
1239 return "BEGIN PREPARE";
1241 return "PREPARE";
1243 return "COMMIT PREPARED";
1245 return "ROLLBACK PREPARED";
1247 return "STREAM START";
1249 return "STREAM STOP";
1251 return "STREAM COMMIT";
1253 return "STREAM ABORT";
1255 return "STREAM PREPARE";
1256 }
1257
1258 /*
1259 * This message provides context in the error raised when applying a
1260 * logical message. So we can't throw an error here. Return an unknown
1261 * indicator value so that the original error is still reported.
1262 */
1263 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1264
1265 return err_unknown;
1266}
#define snprintf
Definition: port.h:239

References generate_unaccent_rules::action, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and snprintf.

Referenced by apply_error_callback().

◆ logicalrep_read_begin()

void logicalrep_read_begin ( StringInfo  in,
LogicalRepBeginData begin_data 
)

Definition at line 63 of file proto.c.

64{
65 /* read fields */
66 begin_data->final_lsn = pq_getmsgint64(in);
67 if (!XLogRecPtrIsValid(begin_data->final_lsn))
68 elog(ERROR, "final_lsn not set in begin message");
69 begin_data->committime = pq_getmsgint64(in);
70 begin_data->xid = pq_getmsgint(in, 4);
71}
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
XLogRecPtr final_lsn
Definition: logicalproto.h:129
TransactionId xid
Definition: logicalproto.h:131
TimestampTz committime
Definition: logicalproto.h:130
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29

References LogicalRepBeginData::committime, elog, ERROR, LogicalRepBeginData::final_lsn, pq_getmsgint(), pq_getmsgint64(), LogicalRepBeginData::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_begin().

◆ logicalrep_read_begin_prepare()

void logicalrep_read_begin_prepare ( StringInfo  in,
LogicalRepPreparedTxnData begin_data 
)

Definition at line 134 of file proto.c.

135{
136 /* read fields */
137 begin_data->prepare_lsn = pq_getmsgint64(in);
138 if (!XLogRecPtrIsValid(begin_data->prepare_lsn))
139 elog(ERROR, "prepare_lsn not set in begin prepare message");
140 begin_data->end_lsn = pq_getmsgint64(in);
141 if (!XLogRecPtrIsValid(begin_data->end_lsn))
142 elog(ERROR, "end_lsn not set in begin prepare message");
143 begin_data->prepare_time = pq_getmsgint64(in);
144 begin_data->xid = pq_getmsgint(in, 4);
145
146 /* read gid (copy it into a pre-allocated buffer) */
147 strlcpy(begin_data->gid, pq_getmsgstring(in), sizeof(begin_data->gid));
148}
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), LogicalRepPreparedTxnData::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_begin_prepare().

◆ logicalrep_read_commit()

void logicalrep_read_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 98 of file proto.c.

99{
100 /* read flags (unused for now) */
101 uint8 flags = pq_getmsgbyte(in);
102
103 if (flags != 0)
104 elog(ERROR, "unrecognized flags %u in commit message", flags);
105
106 /* read fields */
107 commit_data->commit_lsn = pq_getmsgint64(in);
108 commit_data->end_lsn = pq_getmsgint64(in);
109 commit_data->committime = pq_getmsgint64(in);
110}
uint8_t uint8
Definition: c.h:541
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
TimestampTz committime
Definition: logicalproto.h:138

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), and pq_getmsgint64().

Referenced by apply_handle_commit().

◆ logicalrep_read_commit_prepared()

void logicalrep_read_commit_prepared ( StringInfo  in,
LogicalRepCommitPreparedTxnData prepare_data 
)

Definition at line 267 of file proto.c.

268{
269 /* read flags */
270 uint8 flags = pq_getmsgbyte(in);
271
272 if (flags != 0)
273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275 /* read fields */
276 prepare_data->commit_lsn = pq_getmsgint64(in);
277 if (!XLogRecPtrIsValid(prepare_data->commit_lsn))
278 elog(ERROR, "commit_lsn is not set in commit prepared message");
279 prepare_data->end_lsn = pq_getmsgint64(in);
280 if (!XLogRecPtrIsValid(prepare_data->end_lsn))
281 elog(ERROR, "end_lsn is not set in commit prepared message");
282 prepare_data->commit_time = pq_getmsgint64(in);
283 prepare_data->xid = pq_getmsgint(in, 4);
284
285 /* read gid (copy it into a pre-allocated buffer) */
286 strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
287}

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), LogicalRepCommitPreparedTxnData::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_commit_prepared().

◆ logicalrep_read_delete()

LogicalRepRelId logicalrep_read_delete ( StringInfo  in,
LogicalRepTupleData oldtup 
)

Definition at line 561 of file proto.c.

562{
563 char action;
564 LogicalRepRelId relid;
565
566 /* read the relation id */
567 relid = pq_getmsgint(in, 4);
568
569 /* read and verify action */
570 action = pq_getmsgbyte(in);
571 if (action != 'K' && action != 'O')
572 elog(ERROR, "expected action 'O' or 'K', got %c", action);
573
574 logicalrep_read_tuple(in, oldtup);
575
576 return relid;
577}
uint32 LogicalRepRelId
Definition: logicalproto.h:101
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
Definition: proto.c:864

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_delete().

◆ logicalrep_read_insert()

LogicalRepRelId logicalrep_read_insert ( StringInfo  in,
LogicalRepTupleData newtup 
)

Definition at line 428 of file proto.c.

429{
430 char action;
431 LogicalRepRelId relid;
432
433 /* read the relation id */
434 relid = pq_getmsgint(in, 4);
435
436 action = pq_getmsgbyte(in);
437 if (action != 'N')
438 elog(ERROR, "expected new tuple but got %d",
439 action);
440
441 logicalrep_read_tuple(in, newtup);
442
443 return relid;
444}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_insert().

◆ logicalrep_read_origin()

char * logicalrep_read_origin ( StringInfo  in,
XLogRecPtr origin_lsn 
)

Definition at line 390 of file proto.c.

391{
392 /* fixed fields */
393 *origin_lsn = pq_getmsgint64(in);
394
395 /* return origin */
396 return pstrdup(pq_getmsgstring(in));
397}
char * pstrdup(const char *in)
Definition: mcxt.c:1759

References pq_getmsgint64(), pq_getmsgstring(), and pstrdup().

◆ logicalrep_read_prepare()

void logicalrep_read_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 228 of file proto.c.

229{
230 logicalrep_read_prepare_common(in, "prepare", prepare_data);
231}
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
Definition: proto.c:199

References logicalrep_read_prepare_common().

Referenced by apply_handle_prepare().

◆ logicalrep_read_rel()

LogicalRepRelation * logicalrep_read_rel ( StringInfo  in)

Definition at line 698 of file proto.c.

699{
701
702 rel->remoteid = pq_getmsgint(in, 4);
703
704 /* Read relation name from stream */
706 rel->relname = pstrdup(pq_getmsgstring(in));
707
708 /* Read the replica identity. */
709 rel->replident = pq_getmsgbyte(in);
710
711 /* relkind is not sent */
712 rel->relkind = 0;
713
714 /* Get attribute description */
715 logicalrep_read_attrs(in, rel);
716
717 return rel;
718}
void * palloc(Size size)
Definition: mcxt.c:1365
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
Definition: proto.c:988
static const char * logicalrep_read_namespace(StringInfo in)
Definition: proto.c:1050
LogicalRepRelId remoteid
Definition: logicalproto.h:107

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relkind, LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

◆ logicalrep_read_rollback_prepared()

void logicalrep_read_rollback_prepared ( StringInfo  in,
LogicalRepRollbackPreparedTxnData rollback_data 
)

Definition at line 325 of file proto.c.

327{
328 /* read flags */
329 uint8 flags = pq_getmsgbyte(in);
330
331 if (flags != 0)
332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334 /* read fields */
335 rollback_data->prepare_end_lsn = pq_getmsgint64(in);
336 if (!XLogRecPtrIsValid(rollback_data->prepare_end_lsn))
337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
338 rollback_data->rollback_end_lsn = pq_getmsgint64(in);
339 if (!XLogRecPtrIsValid(rollback_data->rollback_end_lsn))
340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
341 rollback_data->prepare_time = pq_getmsgint64(in);
342 rollback_data->rollback_time = pq_getmsgint64(in);
343 rollback_data->xid = pq_getmsgint(in, 4);
344
345 /* read gid (copy it into a pre-allocated buffer) */
346 strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
347}

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), LogicalRepRollbackPreparedTxnData::xid, and XLogRecPtrIsValid.

Referenced by apply_handle_rollback_prepared().

◆ logicalrep_read_stream_abort()

void logicalrep_read_stream_abort ( StringInfo  in,
LogicalRepStreamAbortData abort_data,
bool  read_abort_info 
)

Definition at line 1187 of file proto.c.

1190{
1191 Assert(abort_data);
1192
1193 abort_data->xid = pq_getmsgint(in, 4);
1194 abort_data->subxid = pq_getmsgint(in, 4);
1195
1196 if (read_abort_info)
1197 {
1198 abort_data->abort_lsn = pq_getmsgint64(in);
1199 abort_data->abort_time = pq_getmsgint64(in);
1200 }
1201 else
1202 {
1203 abort_data->abort_lsn = InvalidXLogRecPtr;
1204 abort_data->abort_time = 0;
1205 }
1206}
Assert(PointerIsAligned(start, uint64))
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, Assert(), InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

◆ logicalrep_read_stream_commit()

TransactionId logicalrep_read_stream_commit ( StringInfo  in,
LogicalRepCommitData commit_data 
)

Definition at line 1132 of file proto.c.

1133{
1134 TransactionId xid;
1135 uint8 flags;
1136
1137 xid = pq_getmsgint(in, 4);
1138
1139 /* read flags (unused for now) */
1140 flags = pq_getmsgbyte(in);
1141
1142 if (flags != 0)
1143 elog(ERROR, "unrecognized flags %u in commit message", flags);
1144
1145 /* read fields */
1146 commit_data->commit_lsn = pq_getmsgint64(in);
1147 commit_data->end_lsn = pq_getmsgint64(in);
1148 commit_data->committime = pq_getmsgint64(in);
1149
1150 return xid;
1151}
uint32 TransactionId
Definition: c.h:662

References LogicalRepCommitData::commit_lsn, LogicalRepCommitData::committime, elog, LogicalRepCommitData::end_lsn, ERROR, pq_getmsgbyte(), pq_getmsgint(), and pq_getmsgint64().

Referenced by apply_handle_stream_commit().

◆ logicalrep_read_stream_prepare()

void logicalrep_read_stream_prepare ( StringInfo  in,
LogicalRepPreparedTxnData prepare_data 
)

Definition at line 365 of file proto.c.

366{
367 logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
368}

References logicalrep_read_prepare_common().

Referenced by apply_handle_stream_prepare().

◆ logicalrep_read_stream_start()

TransactionId logicalrep_read_stream_start ( StringInfo  in,
bool *  first_segment 
)

Definition at line 1082 of file proto.c.

1083{
1084 TransactionId xid;
1085
1086 Assert(first_segment);
1087
1088 xid = pq_getmsgint(in, 4);
1089 *first_segment = (pq_getmsgbyte(in) == 1);
1090
1091 return xid;
1092}

References Assert(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_stream_start().

◆ logicalrep_read_truncate()

List * logicalrep_read_truncate ( StringInfo  in,
bool *  cascade,
bool *  restart_seqs 
)

Definition at line 615 of file proto.c.

617{
618 int i;
619 int nrelids;
620 List *relids = NIL;
621 uint8 flags;
622
623 nrelids = pq_getmsgint(in, 4);
624
625 /* read and decode truncate flags */
626 flags = pq_getmsgint(in, 1);
627 *cascade = (flags & TRUNCATE_CASCADE) > 0;
628 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
629
630 for (i = 0; i < nrelids; i++)
631 relids = lappend_oid(relids, pq_getmsgint(in, 4));
632
633 return relids;
634}
int i
Definition: isn.c:77
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
#define NIL
Definition: pg_list.h:68
#define TRUNCATE_RESTART_SEQS
Definition: proto.c:30
#define TRUNCATE_CASCADE
Definition: proto.c:29
Definition: pg_list.h:54

References i, lappend_oid(), NIL, pq_getmsgint(), TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by apply_handle_truncate().

◆ logicalrep_read_typ()

void logicalrep_read_typ ( StringInfo  in,
LogicalRepTyp ltyp 
)

Definition at line 757 of file proto.c.

758{
759 ltyp->remoteid = pq_getmsgint(in, 4);
760
761 /* Read type name from stream */
763 ltyp->typname = pstrdup(pq_getmsgstring(in));
764}

References logicalrep_read_namespace(), LogicalRepTyp::nspname, pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepTyp::remoteid, and LogicalRepTyp::typname.

Referenced by apply_handle_type().

◆ logicalrep_read_update()

LogicalRepRelId logicalrep_read_update ( StringInfo  in,
bool *  has_oldtuple,
LogicalRepTupleData oldtup,
LogicalRepTupleData newtup 
)

Definition at line 487 of file proto.c.

490{
491 char action;
492 LogicalRepRelId relid;
493
494 /* read the relation id */
495 relid = pq_getmsgint(in, 4);
496
497 /* read and verify action */
498 action = pq_getmsgbyte(in);
499 if (action != 'K' && action != 'O' && action != 'N')
500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
501 action);
502
503 /* check for old tuple */
504 if (action == 'K' || action == 'O')
505 {
506 logicalrep_read_tuple(in, oldtup);
507 *has_oldtuple = true;
508
509 action = pq_getmsgbyte(in);
510 }
511 else
512 *has_oldtuple = false;
513
514 /* check for new tuple */
515 if (action != 'N')
516 elog(ERROR, "expected action 'N', got %c",
517 action);
518
519 logicalrep_read_tuple(in, newtup);
520
521 return relid;
522}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

◆ logicalrep_should_publish_column()

bool logicalrep_should_publish_column ( Form_pg_attribute  att,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 1282 of file proto.c.

1284{
1285 if (att->attisdropped)
1286 return false;
1287
1288 /* If a column list is provided, publish only the cols in that list. */
1289 if (columns)
1290 return bms_is_member(att->attnum, columns);
1291
1292 /* All non-generated columns are always published. */
1293 if (!att->attgenerated)
1294 return true;
1295
1296 /*
1297 * Stored generated columns are only published when the user sets
1298 * publish_generated_columns as stored.
1299 */
1300 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1301 return include_gencols_type == PUBLISH_GENCOLS_STORED;
1302
1303 return false;
1304}
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510

References bms_is_member().

Referenced by logicalrep_write_attrs(), logicalrep_write_tuple(), and send_relation_and_attrs().

◆ logicalrep_write_begin()

void logicalrep_write_begin ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 49 of file proto.c.

50{
52
53 /* fixed fields */
54 pq_sendint64(out, txn->final_lsn);
55 pq_sendint64(out, txn->commit_time);
56 pq_sendint32(out, txn->xid);
57}
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152
TimestampTz commit_time
XLogRecPtr final_lsn
TransactionId xid

References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and ReorderBufferTXN::xid.

Referenced by pgoutput_send_begin().

◆ logicalrep_write_begin_prepare()

void logicalrep_write_begin_prepare ( StringInfo  out,
ReorderBufferTXN txn 
)

Definition at line 116 of file proto.c.

117{
119
120 /* fixed fields */
121 pq_sendint64(out, txn->final_lsn);
122 pq_sendint64(out, txn->end_lsn);
123 pq_sendint64(out, txn->prepare_time);
124 pq_sendint32(out, txn->xid);
125
126 /* send gid */
127 pq_sendstring(out, txn->gid);
128}
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
XLogRecPtr end_lsn
TimestampTz prepare_time

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_prepare_txn().

◆ logicalrep_write_commit()

void logicalrep_write_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 78 of file proto.c.

80{
81 uint8 flags = 0;
82
84
85 /* send the flags field (unused for now) */
86 pq_sendbyte(out, flags);
87
88 /* send fields */
89 pq_sendint64(out, commit_lsn);
90 pq_sendint64(out, txn->end_lsn);
91 pq_sendint64(out, txn->commit_time);
92}

References ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_COMMIT, pq_sendbyte(), and pq_sendint64().

Referenced by pgoutput_commit_txn().

◆ logicalrep_write_commit_prepared()

void logicalrep_write_commit_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 237 of file proto.c.

239{
240 uint8 flags = 0;
241
243
244 /*
245 * This should only ever happen for two-phase commit transactions, in
246 * which case we expect to have a valid GID.
247 */
248 Assert(txn->gid != NULL);
249
250 /* send the flags field */
251 pq_sendbyte(out, flags);
252
253 /* send fields */
254 pq_sendint64(out, commit_lsn);
255 pq_sendint64(out, txn->end_lsn);
256 pq_sendint64(out, txn->commit_time);
257 pq_sendint32(out, txn->xid);
258
259 /* send gid */
260 pq_sendstring(out, txn->gid);
261}

References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

◆ logicalrep_write_delete()

void logicalrep_write_delete ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 528 of file proto.c.

532{
533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
538
539 /* transaction ID (if not valid, we're not streaming) */
540 if (TransactionIdIsValid(xid))
541 pq_sendint32(out, xid);
542
543 /* use Oid as relation identifier */
545
546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 pq_sendbyte(out, 'O'); /* old tuple follows */
548 else
549 pq_sendbyte(out, 'K'); /* old key follows */
550
551 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
552 include_gencols_type);
553}
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:770
#define RelationGetRelid(relation)
Definition: rel.h:515
Form_pg_class rd_rel
Definition: rel.h:111
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_insert()

void logicalrep_write_insert ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 403 of file proto.c.

407{
409
410 /* transaction ID (if not valid, we're not streaming) */
411 if (TransactionIdIsValid(xid))
412 pq_sendint32(out, xid);
413
414 /* use Oid as relation identifier */
416
417 pq_sendbyte(out, 'N'); /* new tuple follows */
418 logicalrep_write_tuple(out, rel, newslot, binary, columns,
419 include_gencols_type);
420}

References LOGICAL_REP_MSG_INSERT, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

◆ logicalrep_write_message()

void logicalrep_write_message ( StringInfo  out,
TransactionId  xid,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)

Definition at line 640 of file proto.c.

643{
644 uint8 flags = 0;
645
647
648 /* encode and send message flags */
649 if (transactional)
650 flags |= MESSAGE_TRANSACTIONAL;
651
652 /* transaction ID (if not valid, we're not streaming) */
653 if (TransactionIdIsValid(xid))
654 pq_sendint32(out, xid);
655
656 pq_sendint8(out, flags);
657 pq_sendint64(out, lsn);
658 pq_sendstring(out, prefix);
659 pq_sendint32(out, sz);
660 pq_sendbytes(out, message, sz);
661}
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
static void pq_sendint8(StringInfo buf, uint8 i)
Definition: pqformat.h:128
#define MESSAGE_TRANSACTIONAL
Definition: proto.c:28

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

◆ logicalrep_write_origin()

void logicalrep_write_origin ( StringInfo  out,
const char *  origin,
XLogRecPtr  origin_lsn 
)

Definition at line 374 of file proto.c.

376{
378
379 /* fixed fields */
380 pq_sendint64(out, origin_lsn);
381
382 /* origin string */
383 pq_sendstring(out, origin);
384}

References LOGICAL_REP_MSG_ORIGIN, pq_sendbyte(), pq_sendint64(), and pq_sendstring().

Referenced by send_repl_origin().

◆ logicalrep_write_prepare()

void logicalrep_write_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 187 of file proto.c.

189{
191 txn, prepare_lsn);
192}
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:155

References LOGICAL_REP_MSG_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_prepare_txn().

◆ logicalrep_write_rel()

void logicalrep_write_rel ( StringInfo  out,
TransactionId  xid,
Relation  rel,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 667 of file proto.c.

670{
671 char *relname;
672
674
675 /* transaction ID (if not valid, we're not streaming) */
676 if (TransactionIdIsValid(xid))
677 pq_sendint32(out, xid);
678
679 /* use Oid as relation identifier */
681
682 /* send qualified relation name */
686
687 /* send replica identity */
688 pq_sendbyte(out, rel->rd_rel->relreplident);
689
690 /* send the attribute info */
691 logicalrep_write_attrs(out, rel, columns, include_gencols_type);
692}
NameData relname
Definition: pg_class.h:38
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
Definition: proto.c:1030
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:924
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationGetNamespace(relation)
Definition: rel.h:556

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_rollback_prepared()

void logicalrep_write_rollback_prepared ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)

Definition at line 293 of file proto.c.

296{
297 uint8 flags = 0;
298
300
301 /*
302 * This should only ever happen for two-phase commit transactions, in
303 * which case we expect to have a valid GID.
304 */
305 Assert(txn->gid != NULL);
306
307 /* send the flags field */
308 pq_sendbyte(out, flags);
309
310 /* send fields */
311 pq_sendint64(out, prepare_end_lsn);
312 pq_sendint64(out, txn->end_lsn);
313 pq_sendint64(out, prepare_time);
314 pq_sendint64(out, txn->commit_time);
315 pq_sendint32(out, txn->xid);
316
317 /* send gid */
318 pq_sendstring(out, txn->gid);
319}

References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

◆ logicalrep_write_stream_abort()

void logicalrep_write_stream_abort ( StringInfo  out,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  abort_lsn,
TimestampTz  abort_time,
bool  write_abort_info 
)

Definition at line 1161 of file proto.c.

1164{
1166
1168
1169 /* transaction ID */
1170 pq_sendint32(out, xid);
1171 pq_sendint32(out, subxid);
1172
1173 if (write_abort_info)
1174 {
1175 pq_sendint64(out, abort_lsn);
1176 pq_sendint64(out, abort_time);
1177 }
1178}

References Assert(), LOGICAL_REP_MSG_STREAM_ABORT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), and TransactionIdIsValid.

Referenced by pgoutput_stream_abort().

◆ logicalrep_write_stream_commit()

void logicalrep_write_stream_commit ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)

Definition at line 1107 of file proto.c.

1109{
1110 uint8 flags = 0;
1111
1113
1115
1116 /* transaction ID */
1117 pq_sendint32(out, txn->xid);
1118
1119 /* send the flags field (unused for now) */
1120 pq_sendbyte(out, flags);
1121
1122 /* send fields */
1123 pq_sendint64(out, commit_lsn);
1124 pq_sendint64(out, txn->end_lsn);
1125 pq_sendint64(out, txn->commit_time);
1126}

References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, LOGICAL_REP_MSG_STREAM_COMMIT, pq_sendbyte(), pq_sendint32(), pq_sendint64(), TransactionIdIsValid, and ReorderBufferTXN::xid.

Referenced by pgoutput_stream_commit().

◆ logicalrep_write_stream_prepare()

void logicalrep_write_stream_prepare ( StringInfo  out,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)

Definition at line 353 of file proto.c.

356{
358 txn, prepare_lsn);
359}

References LOGICAL_REP_MSG_STREAM_PREPARE, and logicalrep_write_prepare_common().

Referenced by pgoutput_stream_prepare_txn().

◆ logicalrep_write_stream_start()

void logicalrep_write_stream_start ( StringInfo  out,
TransactionId  xid,
bool  first_segment 
)

Definition at line 1064 of file proto.c.

1066{
1068
1070
1071 /* transaction ID (we're starting to stream, so must be valid) */
1072 pq_sendint32(out, xid);
1073
1074 /* 1 if this is the first streaming segment for this xid */
1075 pq_sendbyte(out, first_segment ? 1 : 0);
1076}

References Assert(), LOGICAL_REP_MSG_STREAM_START, pq_sendbyte(), pq_sendint32(), and TransactionIdIsValid.

Referenced by pgoutput_stream_start().

◆ logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo  out)

Definition at line 1098 of file proto.c.

1099{
1101}

References LOGICAL_REP_MSG_STREAM_STOP, and pq_sendbyte().

Referenced by pgoutput_stream_stop().

◆ logicalrep_write_truncate()

void logicalrep_write_truncate ( StringInfo  out,
TransactionId  xid,
int  nrelids,
Oid  relids[],
bool  cascade,
bool  restart_seqs 
)

Definition at line 583 of file proto.c.

588{
589 int i;
590 uint8 flags = 0;
591
593
594 /* transaction ID (if not valid, we're not streaming) */
595 if (TransactionIdIsValid(xid))
596 pq_sendint32(out, xid);
597
598 pq_sendint32(out, nrelids);
599
600 /* encode and send truncate flags */
601 if (cascade)
602 flags |= TRUNCATE_CASCADE;
603 if (restart_seqs)
604 flags |= TRUNCATE_RESTART_SEQS;
605 pq_sendint8(out, flags);
606
607 for (i = 0; i < nrelids; i++)
608 pq_sendint32(out, relids[i]);
609}

References i, LOGICAL_REP_MSG_TRUNCATE, pq_sendbyte(), pq_sendint32(), pq_sendint8(), TransactionIdIsValid, TRUNCATE_CASCADE, and TRUNCATE_RESTART_SEQS.

Referenced by pgoutput_truncate().

◆ logicalrep_write_typ()

void logicalrep_write_typ ( StringInfo  out,
TransactionId  xid,
Oid  typoid 
)

Definition at line 726 of file proto.c.

727{
728 Oid basetypoid = getBaseType(typoid);
729 HeapTuple tup;
730 Form_pg_type typtup;
731
733
734 /* transaction ID (if not valid, we're not streaming) */
735 if (TransactionIdIsValid(xid))
736 pq_sendint32(out, xid);
737
738 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
739 if (!HeapTupleIsValid(tup))
740 elog(ERROR, "cache lookup failed for type %u", basetypoid);
741 typtup = (Form_pg_type) GETSTRUCT(tup);
742
743 /* use Oid as type identifier */
744 pq_sendint32(out, typoid);
745
746 /* send qualified type name */
747 logicalrep_write_namespace(out, typtup->typnamespace);
748 pq_sendstring(out, NameStr(typtup->typname));
749
750 ReleaseSysCache(tup);
751}
#define NameStr(name)
Definition: c.h:756
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
Oid getBaseType(Oid typid)
Definition: lsyscache.c:2688
FormData_pg_type * Form_pg_type
Definition: pg_type.h:261
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
unsigned int Oid
Definition: postgres_ext.h:32
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220

References elog, ERROR, getBaseType(), GETSTRUCT(), HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

◆ logicalrep_write_update()

void logicalrep_write_update ( StringInfo  out,
TransactionId  xid,
Relation  rel,
TupleTableSlot oldslot,
TupleTableSlot newslot,
bool  binary,
Bitmapset columns,
PublishGencolsType  include_gencols_type 
)

Definition at line 450 of file proto.c.

454{
456
457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461 /* transaction ID (if not valid, we're not streaming) */
462 if (TransactionIdIsValid(xid))
463 pq_sendint32(out, xid);
464
465 /* use Oid as relation identifier */
467
468 if (oldslot != NULL)
469 {
470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 pq_sendbyte(out, 'O'); /* old tuple follows */
472 else
473 pq_sendbyte(out, 'K'); /* old key follows */
474 logicalrep_write_tuple(out, rel, oldslot, binary, columns,
475 include_gencols_type);
476 }
477
478 pq_sendbyte(out, 'N'); /* new tuple follows */
479 logicalrep_write_tuple(out, rel, newslot, binary, columns,
480 include_gencols_type);
481}

References Assert(), LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().