PostgreSQL Source Code git master
tablesync.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for tablesync.c:

Go to the source code of this file.

Functions

static bool wait_for_table_state_change (Oid relid, char expected_state)
 
static bool wait_for_worker_state_change (char expected_state)
 
void ProcessSyncingTablesForSync (XLogRecPtr current_lsn)
 
void ProcessSyncingTablesForApply (XLogRecPtr current_lsn)
 
static Listmake_copy_attnamelist (LogicalRepRelMapEntry *rel)
 
static int copy_read_data (void *outbuf, int minread, int maxread)
 
static void fetch_remote_table_info (char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
 
static void copy_table (Relation rel)
 
void ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot)
 
static char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
static void start_table_sync (XLogRecPtr *origin_startpos, char **slotname)
 
static void run_tablesync_worker ()
 
void TableSyncWorkerMain (Datum main_arg)
 
bool AllTablesyncsReady (void)
 
bool HasSubscriptionTablesCached (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 

Variables

Listtable_states_not_ready = NIL
 
static StringInfo copybuf = NULL
 

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1600 of file tablesync.c.

1601{
1602 bool started_tx;
1603 bool has_tables;
1604
1605 /* We need up-to-date sync state info for subscription tables here. */
1606 FetchRelationStates(&has_tables, NULL, &started_tx);
1607
1608 if (started_tx)
1609 {
1611 pgstat_report_stat(true);
1612 }
1613
1614 /*
1615 * Return false when there are no tables in subscription or not all tables
1616 * are in ready state; true otherwise.
1617 */
1618 return has_tables && (table_states_not_ready == NIL);
1619}
#define NIL
Definition: pg_list.h:68
long pgstat_report_stat(bool force)
Definition: pgstat.c:694
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition: syncutils.c:202
List * table_states_not_ready
Definition: tablesync.c:125
void CommitTransactionCommand(void)
Definition: xact.c:3175

References CommitTransactionCommand(), FetchRelationStates(), NIL, pgstat_report_stat(), and table_states_not_ready.

Referenced by pa_can_start(), ProcessSyncingTablesForApply(), run_apply_worker(), and wait_for_local_flush().

◆ copy_read_data()

static int copy_read_data ( void *  outbuf,
int  minread,
int  maxread 
)
static

Definition at line 646 of file tablesync.c.

647{
648 int bytesread = 0;
649 int avail;
650
651 /* If there are some leftover data from previous read, use it. */
652 avail = copybuf->len - copybuf->cursor;
653 if (avail)
654 {
655 if (avail > maxread)
656 avail = maxread;
657 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
658 copybuf->cursor += avail;
659 maxread -= avail;
660 bytesread += avail;
661 }
662
663 while (maxread > 0 && bytesread < minread)
664 {
666 int len;
667 char *buf = NULL;
668
669 for (;;)
670 {
671 /* Try read the data. */
673
675
676 if (len == 0)
677 break;
678 else if (len < 0)
679 return bytesread;
680 else
681 {
682 /* Process the data */
683 copybuf->data = buf;
684 copybuf->len = len;
685 copybuf->cursor = 0;
686
687 avail = copybuf->len - copybuf->cursor;
688 if (avail > maxread)
689 avail = maxread;
690 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
691 outbuf = (char *) outbuf + avail;
692 copybuf->cursor += avail;
693 maxread -= avail;
694 bytesread += avail;
695 }
696
697 if (maxread <= 0 || bytesread >= minread)
698 return bytesread;
699 }
700
701 /*
702 * Wait for more data or latch.
703 */
707 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
708
710 }
711
712 return bytesread;
713}
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
struct Latch * MyLatch
Definition: globals.c:63
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:223
void ResetLatch(Latch *latch)
Definition: latch.c:374
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
const void size_t len
static char * buf
Definition: pg_test_fsync.c:72
int pgsocket
Definition: port.h:29
#define PGINVALID_SOCKET
Definition: port.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static StringInfo copybuf
Definition: tablesync.c:127
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455

References buf, CHECK_FOR_INTERRUPTS, copybuf, StringInfoData::cursor, StringInfoData::data, fd(), StringInfoData::len, len, LogRepWorkerWalRcvConn, MyLatch, PGINVALID_SOCKET, ResetLatch(), WaitLatchOrSocket(), walrcv_receive, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.

Referenced by copy_table().

◆ copy_table()

static void copy_table ( Relation  rel)
static

Definition at line 1043 of file tablesync.c.

1044{
1045 LogicalRepRelMapEntry *relmapentry;
1046 LogicalRepRelation lrel;
1047 List *qual = NIL;
1048 WalRcvExecResult *res;
1049 StringInfoData cmd;
1050 CopyFromState cstate;
1051 List *attnamelist;
1052 ParseState *pstate;
1053 List *options = NIL;
1054 bool gencol_published = false;
1055
1056 /* Get the publisher relation info. */
1058 RelationGetRelationName(rel), &lrel, &qual,
1059 &gencol_published);
1060
1061 /* Put the relation into relmap. */
1063
1064 /* Map the publisher relation to local one. */
1065 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1066 Assert(rel == relmapentry->localrel);
1067
1068 /* Start copy on the publisher. */
1069 initStringInfo(&cmd);
1070
1071 /* Regular or partitioned table with no row filter or generated columns */
1072 if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
1073 && qual == NIL && !gencol_published)
1074 {
1075 appendStringInfo(&cmd, "COPY %s",
1077
1078 /* If the table has columns, then specify the columns */
1079 if (lrel.natts)
1080 {
1081 appendStringInfoString(&cmd, " (");
1082
1083 /*
1084 * XXX Do we need to list the columns in all cases? Maybe we're
1085 * replicating all columns?
1086 */
1087 for (int i = 0; i < lrel.natts; i++)
1088 {
1089 if (i > 0)
1090 appendStringInfoString(&cmd, ", ");
1091
1093 }
1094
1095 appendStringInfoChar(&cmd, ')');
1096 }
1097
1098 appendStringInfoString(&cmd, " TO STDOUT");
1099 }
1100 else
1101 {
1102 /*
1103 * For non-tables and tables with row filters, we need to do COPY
1104 * (SELECT ...), but we can't just do SELECT * because we may need to
1105 * copy only subset of columns including generated columns. For tables
1106 * with any row filters, build a SELECT query with OR'ed row filters
1107 * for COPY.
1108 *
1109 * We also need to use this same COPY (SELECT ...) syntax when
1110 * generated columns are published, because copy of generated columns
1111 * is not supported by the normal COPY.
1112 */
1113 appendStringInfoString(&cmd, "COPY (SELECT ");
1114 for (int i = 0; i < lrel.natts; i++)
1115 {
1117 if (i < lrel.natts - 1)
1118 appendStringInfoString(&cmd, ", ");
1119 }
1120
1121 appendStringInfoString(&cmd, " FROM ");
1122
1123 /*
1124 * For regular tables, make sure we don't copy data from a child that
1125 * inherits the named table as those will be copied separately.
1126 */
1127 if (lrel.relkind == RELKIND_RELATION)
1128 appendStringInfoString(&cmd, "ONLY ");
1129
1131 /* list of OR'ed filters */
1132 if (qual != NIL)
1133 {
1134 ListCell *lc;
1135 char *q = strVal(linitial(qual));
1136
1137 appendStringInfo(&cmd, " WHERE %s", q);
1138 for_each_from(lc, qual, 1)
1139 {
1140 q = strVal(lfirst(lc));
1141 appendStringInfo(&cmd, " OR %s", q);
1142 }
1143 list_free_deep(qual);
1144 }
1145
1146 appendStringInfoString(&cmd, ") TO STDOUT");
1147 }
1148
1149 /*
1150 * Prior to v16, initial table synchronization will use text format even
1151 * if the binary option is enabled for a subscription.
1152 */
1155 {
1156 appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1157 options = list_make1(makeDefElem("format",
1158 (Node *) makeString("binary"), -1));
1159 }
1160
1161 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1162 pfree(cmd.data);
1163 if (res->status != WALRCV_OK_COPY_OUT)
1164 ereport(ERROR,
1165 (errcode(ERRCODE_CONNECTION_FAILURE),
1166 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1167 lrel.nspname, lrel.relname, res->err)));
1169
1171
1172 pstate = make_parsestate(NULL);
1174 NULL, false, false);
1175
1176 attnamelist = make_copy_attnamelist(relmapentry);
1177 cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1178
1179 /* Do the copy */
1180 (void) CopyFrom(cstate);
1181
1182 logicalrep_rel_close(relmapentry, NoLock);
1183}
Subscription * MySubscription
Definition: worker.c:479
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1529
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:779
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
void list_free_deep(List *list)
Definition: list.c:1560
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition: makefuncs.c:637
void pfree(void *pointer)
Definition: mcxt.c:1594
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define lfirst(lc)
Definition: pg_list.h:172
#define list_make1(x1)
Definition: pg_list.h:212
#define for_each_from(cell, lst, N)
Definition: pg_list.h:414
#define linitial(l)
Definition: pg_list.h:178
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationGetNamespace(relation)
Definition: rel.h:556
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:13146
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13062
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:517
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:361
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: pg_list.h:54
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Definition: nodes.h:135
WalRcvExecStatus status
Definition: walreceiver.h:220
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:626
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
Definition: tablesync.c:725
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:646
String * makeString(char *str)
Definition: value.c:63
#define strVal(v)
Definition: value.h:82
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_server_version(conn)
Definition: walreceiver.h:447
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465

References AccessShareLock, addRangeTableEntryForRelation(), appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert(), LogicalRepRelation::attnames, BeginCopyFrom(), Subscription::binary, copy_read_data(), copybuf, CopyFrom(), StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, fetch_remote_table_info(), for_each_from, get_namespace_name(), i, initStringInfo(), lfirst, linitial, list_free_deep(), list_make1, LogicalRepRelMapEntry::localrel, logicalrep_rel_close(), logicalrep_rel_open(), logicalrep_relmap_update(), LogRepWorkerWalRcvConn, make_copy_attnamelist(), make_parsestate(), makeDefElem(), makeString(), makeStringInfo(), MySubscription, LogicalRepRelation::natts, NIL, NoLock, LogicalRepRelation::nspname, pfree(), quote_identifier(), quote_qualified_identifier(), RelationGetNamespace, RelationGetRelationName, LogicalRepRelation::relkind, LogicalRepRelation::relname, LogicalRepRelation::remoteid, WalRcvExecResult::status, strVal, walrcv_clear_result(), walrcv_exec, WALRCV_OK_COPY_OUT, and walrcv_server_version.

Referenced by LogicalRepSyncTableStart().

◆ fetch_remote_table_info()

static void fetch_remote_table_info ( char *  nspname,
char *  relname,
LogicalRepRelation lrel,
List **  qual,
bool *  gencol_published 
)
static

Definition at line 725 of file tablesync.c.

727{
728 WalRcvExecResult *res;
729 StringInfoData cmd;
730 TupleTableSlot *slot;
731 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
732 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
733 Oid qualRow[] = {TEXTOID};
734 bool isnull;
735 int natt;
736 StringInfo pub_names = NULL;
737 Bitmapset *included_cols = NULL;
739
740 lrel->nspname = nspname;
741 lrel->relname = relname;
742
743 /* First fetch Oid and replica identity. */
744 initStringInfo(&cmd);
745 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
746 " FROM pg_catalog.pg_class c"
747 " INNER JOIN pg_catalog.pg_namespace n"
748 " ON (c.relnamespace = n.oid)"
749 " WHERE n.nspname = %s"
750 " AND c.relname = %s",
751 quote_literal_cstr(nspname),
754 lengthof(tableRow), tableRow);
755
756 if (res->status != WALRCV_OK_TUPLES)
758 (errcode(ERRCODE_CONNECTION_FAILURE),
759 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
760 nspname, relname, res->err)));
761
763 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
765 (errcode(ERRCODE_UNDEFINED_OBJECT),
766 errmsg("table \"%s.%s\" not found on publisher",
767 nspname, relname)));
768
769 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
770 Assert(!isnull);
771 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
772 Assert(!isnull);
773 lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
774 Assert(!isnull);
775
778
779
780 /*
781 * Get column lists for each relation.
782 *
783 * We need to do this before fetching info about column names and types,
784 * so that we can skip columns that should not be replicated.
785 */
786 if (server_version >= 150000)
787 {
788 WalRcvExecResult *pubres;
789 TupleTableSlot *tslot;
790 Oid attrsRow[] = {INT2VECTOROID};
791
792 /* Build the pub_names comma-separated string. */
793 pub_names = makeStringInfo();
795
796 /*
797 * Fetch info about column lists for the relation (from all the
798 * publications).
799 */
800 resetStringInfo(&cmd);
801 appendStringInfo(&cmd,
802 "SELECT DISTINCT"
803 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
804 " THEN NULL ELSE gpt.attrs END)"
805 " FROM pg_publication p,"
806 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
807 " pg_class c"
808 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
809 " AND p.pubname IN ( %s )",
810 lrel->remoteid,
811 pub_names->data);
812
814 lengthof(attrsRow), attrsRow);
815
816 if (pubres->status != WALRCV_OK_TUPLES)
818 (errcode(ERRCODE_CONNECTION_FAILURE),
819 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
820 nspname, relname, pubres->err)));
821
822 /*
823 * We don't support the case where the column list is different for
824 * the same table when combining publications. See comments atop
825 * fetch_relation_list. So there should be only one row returned.
826 * Although we already checked this when creating the subscription, we
827 * still need to check here in case the column list was changed after
828 * creating the subscription and before the sync worker is started.
829 */
830 if (tuplestore_tuple_count(pubres->tuplestore) > 1)
832 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
833 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
834 nspname, relname));
835
836 /*
837 * Get the column list and build a single bitmap with the attnums.
838 *
839 * If we find a NULL value, it means all the columns should be
840 * replicated.
841 */
843 if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
844 {
845 Datum cfval = slot_getattr(tslot, 1, &isnull);
846
847 if (!isnull)
848 {
849 ArrayType *arr;
850 int nelems;
851 int16 *elems;
852
853 arr = DatumGetArrayTypeP(cfval);
854 nelems = ARR_DIMS(arr)[0];
855 elems = (int16 *) ARR_DATA_PTR(arr);
856
857 for (natt = 0; natt < nelems; natt++)
858 included_cols = bms_add_member(included_cols, elems[natt]);
859 }
860
861 ExecClearTuple(tslot);
862 }
864
865 walrcv_clear_result(pubres);
866 }
867
868 /*
869 * Now fetch column names and types.
870 */
871 resetStringInfo(&cmd);
873 "SELECT a.attnum,"
874 " a.attname,"
875 " a.atttypid,"
876 " a.attnum = ANY(i.indkey)");
877
878 /* Generated columns can be replicated since version 18. */
879 if (server_version >= 180000)
880 appendStringInfoString(&cmd, ", a.attgenerated != ''");
881
882 appendStringInfo(&cmd,
883 " FROM pg_catalog.pg_attribute a"
884 " LEFT JOIN pg_catalog.pg_index i"
885 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
886 " WHERE a.attnum > 0::pg_catalog.int2"
887 " AND NOT a.attisdropped %s"
888 " AND a.attrelid = %u"
889 " ORDER BY a.attnum",
890 lrel->remoteid,
891 (server_version >= 120000 && server_version < 180000 ?
892 "AND a.attgenerated = ''" : ""),
893 lrel->remoteid);
895 server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
896
897 if (res->status != WALRCV_OK_TUPLES)
899 (errcode(ERRCODE_CONNECTION_FAILURE),
900 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
901 nspname, relname, res->err)));
902
903 /* We don't know the number of rows coming, so allocate enough space. */
904 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
905 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
906 lrel->attkeys = NULL;
907
908 /*
909 * Store the columns as a list of names. Ignore those that are not
910 * present in the column list, if there is one.
911 */
912 natt = 0;
914 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
915 {
916 char *rel_colname;
918
919 attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
920 Assert(!isnull);
921
922 /* If the column is not in the column list, skip it. */
923 if (included_cols != NULL && !bms_is_member(attnum, included_cols))
924 {
925 ExecClearTuple(slot);
926 continue;
927 }
928
929 rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
930 Assert(!isnull);
931
932 lrel->attnames[natt] = rel_colname;
933 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
934 Assert(!isnull);
935
936 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
937 lrel->attkeys = bms_add_member(lrel->attkeys, natt);
938
939 /* Remember if the remote table has published any generated column. */
940 if (server_version >= 180000 && !(*gencol_published))
941 {
942 *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
943 Assert(!isnull);
944 }
945
946 /* Should never happen. */
947 if (++natt >= MaxTupleAttributeNumber)
948 elog(ERROR, "too many columns in remote table \"%s.%s\"",
949 nspname, relname);
950
951 ExecClearTuple(slot);
952 }
954
955 lrel->natts = natt;
956
958
959 /*
960 * Get relation's row filter expressions. DISTINCT avoids the same
961 * expression of a table in multiple publications from being included
962 * multiple times in the final expression.
963 *
964 * We need to copy the row even if it matches just one of the
965 * publications, so we later combine all the quals with OR.
966 *
967 * For initial synchronization, row filtering can be ignored in following
968 * cases:
969 *
970 * 1) one of the subscribed publications for the table hasn't specified
971 * any row filter
972 *
973 * 2) one of the subscribed publications has puballtables set to true
974 *
975 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
976 * that includes this relation
977 */
978 if (server_version >= 150000)
979 {
980 /* Reuse the already-built pub_names. */
981 Assert(pub_names != NULL);
982
983 /* Check for row filters. */
984 resetStringInfo(&cmd);
985 appendStringInfo(&cmd,
986 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
987 " FROM pg_publication p,"
988 " LATERAL pg_get_publication_tables(p.pubname) gpt"
989 " WHERE gpt.relid = %u"
990 " AND p.pubname IN ( %s )",
991 lrel->remoteid,
992 pub_names->data);
993
994 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
995
996 if (res->status != WALRCV_OK_TUPLES)
998 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
999 nspname, relname, res->err)));
1000
1001 /*
1002 * Multiple row filter expressions for the same table will be combined
1003 * by COPY using OR. If any of the filter expressions for this table
1004 * are null, it means the whole table will be copied. In this case it
1005 * is not necessary to construct a unified row filter expression at
1006 * all.
1007 */
1009 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1010 {
1011 Datum rf = slot_getattr(slot, 1, &isnull);
1012
1013 if (!isnull)
1014 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1015 else
1016 {
1017 /* Ignore filters and cleanup as necessary. */
1018 if (*qual)
1019 {
1020 list_free_deep(*qual);
1021 *qual = NIL;
1022 }
1023 break;
1024 }
1025
1026 ExecClearTuple(slot);
1027 }
1029
1031 destroyStringInfo(pub_names);
1032 }
1033
1034 pfree(cmd.data);
1035}
#define ARR_DATA_PTR(a)
Definition: array.h:322
#define DatumGetArrayTypeP(X)
Definition: array.h:261
#define ARR_DIMS(a)
Definition: array.h:294
int16 AttrNumber
Definition: attnum.h:21
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
#define TextDatumGetCString(d)
Definition: builtins.h:98
int16_t int16
Definition: c.h:538
#define lengthof(array)
Definition: c.h:792
#define elog(elevel,...)
Definition: elog.h:226
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
List * lappend(List *list, void *datum)
Definition: list.c:339
void * palloc0(Size size)
Definition: mcxt.c:1395
int16 attnum
Definition: pg_attribute.h:74
NameData relname
Definition: pg_class.h:38
static int server_version
Definition: pg_dumpall.c:109
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
static bool DatumGetBool(Datum X)
Definition: postgres.h:100
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:252
uint64_t Datum
Definition: postgres.h:70
static char DatumGetChar(Datum X)
Definition: postgres.h:122
static int16 DatumGetInt16(Datum X)
Definition: postgres.h:172
unsigned int Oid
Definition: postgres_ext.h:32
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
void destroyStringInfo(StringInfo str)
Definition: stringinfo.c:409
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
Bitmapset * attkeys
Definition: logicalproto.h:115
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:580
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:398
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:457
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207

References appendStringInfo(), appendStringInfoString(), ARR_DATA_PTR, ARR_DIMS, Assert(), LogicalRepRelation::attkeys, LogicalRepRelation::attnames, attnum, LogicalRepRelation::atttyps, bms_add_member(), bms_is_member(), StringInfoData::data, DatumGetArrayTypeP, DatumGetBool(), DatumGetChar(), DatumGetInt16(), DatumGetObjectId(), destroyStringInfo(), elog, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), GetPublicationsStr(), initStringInfo(), lappend(), lengthof, list_free_deep(), LogRepWorkerWalRcvConn, MakeSingleTupleTableSlot(), makeString(), makeStringInfo(), MaxTupleAttributeNumber, MySubscription, LogicalRepRelation::natts, NIL, LogicalRepRelation::nspname, palloc0(), pfree(), Subscription::publications, quote_literal_cstr(), LogicalRepRelation::relkind, relname, LogicalRepRelation::relname, LogicalRepRelation::remoteid, LogicalRepRelation::replident, resetStringInfo(), server_version, slot_getattr(), WalRcvExecResult::status, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), tuplestore_tuple_count(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and walrcv_server_version.

Referenced by copy_table().

◆ HasSubscriptionTablesCached()

bool HasSubscriptionTablesCached ( void  )

Definition at line 1630 of file tablesync.c.

1631{
1632 bool started_tx;
1633 bool has_tables;
1634
1635 /* We need up-to-date subscription tables info here */
1636 FetchRelationStates(&has_tables, NULL, &started_tx);
1637
1638 if (started_tx)
1639 {
1641 pgstat_report_stat(true);
1642 }
1643
1644 return has_tables;
1645}

References CommitTransactionCommand(), FetchRelationStates(), and pgstat_report_stat().

Referenced by wait_for_local_flush().

◆ LogicalRepSyncTableStart()

static char * LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)
static

Definition at line 1219 of file tablesync.c.

1220{
1221 char *slotname;
1222 char *err;
1223 char relstate;
1224 XLogRecPtr relstate_lsn;
1225 Relation rel;
1226 AclResult aclresult;
1227 WalRcvExecResult *res;
1228 char originname[NAMEDATALEN];
1229 RepOriginId originid;
1230 UserContext ucxt;
1231 bool must_use_password;
1232 bool run_as_owner;
1233
1234 /* Check the state of the table synchronization. */
1238 &relstate_lsn);
1240
1241 /* Is the use of a password mandatory? */
1242 must_use_password = MySubscription->passwordrequired &&
1244
1246 MyLogicalRepWorker->relstate = relstate;
1247 MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1249
1250 /*
1251 * If synchronization is already done or no longer necessary, exit now
1252 * that we've updated shared memory state.
1253 */
1254 switch (relstate)
1255 {
1256 case SUBREL_STATE_SYNCDONE:
1257 case SUBREL_STATE_READY:
1258 case SUBREL_STATE_UNKNOWN:
1259 FinishSyncWorker(); /* doesn't return */
1260 }
1261
1262 /* Calculate the name of the tablesync slot. */
1263 slotname = (char *) palloc(NAMEDATALEN);
1266 slotname,
1267 NAMEDATALEN);
1268
1269 /*
1270 * Here we use the slot name instead of the subscription name as the
1271 * application_name, so that it is different from the leader apply worker,
1272 * so that synchronous replication can distinguish them.
1273 */
1276 must_use_password,
1277 slotname, &err);
1278 if (LogRepWorkerWalRcvConn == NULL)
1279 ereport(ERROR,
1280 (errcode(ERRCODE_CONNECTION_FAILURE),
1281 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1282 MySubscription->name, err)));
1283
1284 Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1285 MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1286 MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1287
1288 /* Assign the origin tracking record name. */
1291 originname,
1292 sizeof(originname));
1293
1294 if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1295 {
1296 /*
1297 * We have previously errored out before finishing the copy so the
1298 * replication slot might exist. We want to remove the slot if it
1299 * already exists and proceed.
1300 *
1301 * XXX We could also instead try to drop the slot, last time we failed
1302 * but for that, we might need to clean up the copy state as it might
1303 * be in the middle of fetching the rows. Also, if there is a network
1304 * breakdown then it wouldn't have succeeded so trying it next time
1305 * seems like a better bet.
1306 */
1308 }
1309 else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1310 {
1311 /*
1312 * The COPY phase was previously done, but tablesync then crashed
1313 * before it was able to finish normally.
1314 */
1316
1317 /*
1318 * The origin tracking name must already exist. It was created first
1319 * time this tablesync was launched.
1320 */
1321 originid = replorigin_by_name(originname, false);
1322 replorigin_session_setup(originid, 0);
1323 replorigin_session_origin = originid;
1324 *origin_startpos = replorigin_session_get_progress(false);
1325
1327
1328 goto copy_table_done;
1329 }
1330
1332 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1335
1336 /* Update the state and make it visible to others. */
1342 false);
1344 pgstat_report_stat(true);
1345
1347
1348 /*
1349 * Use a standard write lock here. It might be better to disallow access
1350 * to the table while it's being synchronized. But we don't want to block
1351 * the main apply process from working and it has to open the relation in
1352 * RowExclusiveLock when remapping remote relation id to local one.
1353 */
1355
1356 /*
1357 * Start a transaction in the remote node in REPEATABLE READ mode. This
1358 * ensures that both the replication slot we create (see below) and the
1359 * COPY are consistent with each other.
1360 */
1362 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1363 0, NULL);
1364 if (res->status != WALRCV_OK_COMMAND)
1365 ereport(ERROR,
1366 (errcode(ERRCODE_CONNECTION_FAILURE),
1367 errmsg("table copy could not start transaction on publisher: %s",
1368 res->err)));
1370
1371 /*
1372 * Create a new permanent logical decoding slot. This slot will be used
1373 * for the catchup phase after COPY is done, so tell it to use the
1374 * snapshot to make the final data consistent.
1375 */
1377 slotname, false /* permanent */ , false /* two_phase */ ,
1379 CRS_USE_SNAPSHOT, origin_startpos);
1380
1381 /*
1382 * Setup replication origin tracking. The purpose of doing this before the
1383 * copy is to avoid doing the copy again due to any error in setting up
1384 * origin tracking.
1385 */
1386 originid = replorigin_by_name(originname, true);
1387 if (!OidIsValid(originid))
1388 {
1389 /*
1390 * Origin tracking does not exist, so create it now.
1391 *
1392 * Then advance to the LSN got from walrcv_create_slot. This is WAL
1393 * logged for the purpose of recovery. Locks are to prevent the
1394 * replication origin from vanishing while advancing.
1395 */
1396 originid = replorigin_create(originname);
1397
1398 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1399 replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1400 true /* go backward */ , true /* WAL log */ );
1401 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1402
1403 replorigin_session_setup(originid, 0);
1404 replorigin_session_origin = originid;
1405 }
1406 else
1407 {
1408 ereport(ERROR,
1410 errmsg("replication origin \"%s\" already exists",
1411 originname)));
1412 }
1413
1414 /*
1415 * If the user did not opt to run as the owner of the subscription
1416 * ('run_as_owner'), then copy the table as the owner of the table.
1417 */
1418 run_as_owner = MySubscription->runasowner;
1419 if (!run_as_owner)
1420 SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1421
1422 /*
1423 * Check that our table sync worker has permission to insert into the
1424 * target table.
1425 */
1426 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1427 ACL_INSERT);
1428 if (aclresult != ACLCHECK_OK)
1429 aclcheck_error(aclresult,
1430 get_relkind_objtype(rel->rd_rel->relkind),
1432
1433 /*
1434 * COPY FROM does not honor RLS policies. That is not a problem for
1435 * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1436 * who has it implicitly), but other roles should not be able to
1437 * circumvent RLS. Disallow logical replication into RLS enabled
1438 * relations for such roles.
1439 */
1441 ereport(ERROR,
1442 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1443 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1446
1447 /* Now do the initial data copy */
1449 copy_table(rel);
1451
1452 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1453 if (res->status != WALRCV_OK_COMMAND)
1454 ereport(ERROR,
1455 (errcode(ERRCODE_CONNECTION_FAILURE),
1456 errmsg("table copy could not finish transaction on publisher: %s",
1457 res->err)));
1459
1460 if (!run_as_owner)
1461 RestoreUserContext(&ucxt);
1462
1463 table_close(rel, NoLock);
1464
1465 /* Make the copy visible. */
1467
1468 /*
1469 * Update the persisted state to indicate the COPY phase is done; make it
1470 * visible to others.
1471 */
1474 SUBREL_STATE_FINISHEDCOPY,
1476 false);
1477
1479
1480copy_table_done:
1481
1482 elog(DEBUG1,
1483 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1484 originname, LSN_FORMAT_ARGS(*origin_startpos));
1485
1486 /*
1487 * We are done with the initial data synchronization, update the state.
1488 */
1490 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1491 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1493
1494 /*
1495 * Finally, wait until the leader apply worker tells us to catch up and
1496 * then return to let LogicalRepApplyLoop do it.
1497 */
1498 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1499 return slotname;
1500}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2652
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4037
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
#define OidIsValid(objectId)
Definition: c.h:779
#define DEBUG1
Definition: elog.h:30
void err(int eval, const char *fmt,...)
Definition: err.c:43
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:107
#define RowExclusiveLock
Definition: lockdefs.h:38
void * palloc(Size size)
Definition: mcxt.c:1365
Oid GetUserId(void)
Definition: miscinit.c:469
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:988
ObjectType get_relkind_objtype(char relkind)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
RepOriginId replorigin_session_origin
Definition: origin.c:163
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:911
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1120
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1273
#define ACL_INSERT
Definition: parsenodes.h:76
#define NAMEDATALEN
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
#define InvalidOid
Definition: postgres_ext.h:37
#define RelationGetRelid(relation)
Definition: rel.h:515
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:271
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:680
void PopActiveSnapshot(void)
Definition: snapmgr.c:773
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
XLogRecPtr relstate_lsn
Form_pg_class rd_rel
Definition: rel.h:111
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
pg_noreturn void FinishSyncWorker(void)
Definition: syncutils.c:50
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:189
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1203
static void copy_table(Relation rel)
Definition: tablesync.c:1043
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
void CommandCounterIncrement(void)
Definition: xact.c:1101
void StartTransactionCommand(void)
Definition: xact.c:3077
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint16 RepOriginId
Definition: xlogdefs.h:69
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert(), check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog, ereport, WalRcvExecResult::err, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, Subscription::failover, FinishSyncWorker(), get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, Subscription::ownersuperuser, palloc(), Subscription::passwordrequired, pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), RestoreUserContext(), RLS_ENABLED, RowExclusiveLock, Subscription::runasowner, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), WalRcvExecResult::status, LogicalRepWorker::subid, SwitchToUntrustedUser(), table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.

Referenced by start_table_sync().

◆ make_copy_attnamelist()

static List * make_copy_attnamelist ( LogicalRepRelMapEntry rel)
static

Definition at line 626 of file tablesync.c.

627{
628 List *attnamelist = NIL;
629 int i;
630
631 for (i = 0; i < rel->remoterel.natts; i++)
632 {
633 attnamelist = lappend(attnamelist,
635 }
636
637
638 return attnamelist;
639}
LogicalRepRelation remoterel

References LogicalRepRelation::attnames, i, lappend(), makeString(), LogicalRepRelation::natts, NIL, and LogicalRepRelMapEntry::remoterel.

Referenced by copy_table().

◆ ProcessSyncingTablesForApply()

void ProcessSyncingTablesForApply ( XLogRecPtr  current_lsn)

Definition at line 368 of file tablesync.c.

369{
370 struct tablesync_start_time_mapping
371 {
372 Oid relid;
373 TimestampTz last_start_time;
374 };
375 static HTAB *last_start_times = NULL;
376 ListCell *lc;
377 bool started_tx;
378 bool should_exit = false;
379 Relation rel = NULL;
380
382
383 /* We need up-to-date sync state info for subscription tables here. */
384 FetchRelationStates(NULL, NULL, &started_tx);
385
386 /*
387 * Prepare a hash table for tracking last start times of workers, to avoid
388 * immediate restarts. We don't need it if there are no tables that need
389 * syncing.
390 */
392 {
393 HASHCTL ctl;
394
395 ctl.keysize = sizeof(Oid);
396 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
397 last_start_times = hash_create("Logical replication table sync worker start times",
398 256, &ctl, HASH_ELEM | HASH_BLOBS);
399 }
400
401 /*
402 * Clean up the hash table when we're done with all tables (just to
403 * release the bit of memory).
404 */
406 {
408 last_start_times = NULL;
409 }
410
411 /*
412 * Process all tables that are being synchronized.
413 */
414 foreach(lc, table_states_not_ready)
415 {
417
418 if (!started_tx)
419 {
421 started_tx = true;
422 }
423
424 Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
425
426 if (rstate->state == SUBREL_STATE_SYNCDONE)
427 {
428 /*
429 * Apply has caught up to the position where the table sync has
430 * finished. Mark the table as ready so that the apply will just
431 * continue to replicate it normally.
432 */
433 if (current_lsn >= rstate->lsn)
434 {
435 char originname[NAMEDATALEN];
436
437 rstate->state = SUBREL_STATE_READY;
438 rstate->lsn = current_lsn;
439
440 /*
441 * Remove the tablesync origin tracking if exists.
442 *
443 * There is a chance that the user is concurrently performing
444 * refresh for the subscription where we remove the table
445 * state and its origin or the tablesync worker would have
446 * already removed this origin. We can't rely on tablesync
447 * worker to remove the origin tracking as if there is any
448 * error while dropping we won't restart it to drop the
449 * origin. So passing missing_ok = true.
450 *
451 * Lock the subscription and origin in the same order as we
452 * are doing during DDL commands to avoid deadlocks. See
453 * AlterSubscription_refresh.
454 */
455 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
456 0, AccessShareLock);
457
458 if (!rel)
459 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
460
462 rstate->relid,
463 originname,
464 sizeof(originname));
465 replorigin_drop_by_name(originname, true, false);
466
467 /*
468 * Update the state to READY only after the origin cleanup.
469 */
471 rstate->relid, rstate->state,
472 rstate->lsn, true);
473 }
474 }
475 else
476 {
477 LogicalRepWorker *syncworker;
478
479 /*
480 * Look for a sync worker for this relation.
481 */
482 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
483
486 rstate->relid, false);
487
488 if (syncworker)
489 {
490 /* Found one, update our copy of its state */
491 SpinLockAcquire(&syncworker->relmutex);
492 rstate->state = syncworker->relstate;
493 rstate->lsn = syncworker->relstate_lsn;
494 if (rstate->state == SUBREL_STATE_SYNCWAIT)
495 {
496 /*
497 * Sync worker is waiting for apply. Tell sync worker it
498 * can catchup now.
499 */
500 syncworker->relstate = SUBREL_STATE_CATCHUP;
501 syncworker->relstate_lsn =
502 Max(syncworker->relstate_lsn, current_lsn);
503 }
504 SpinLockRelease(&syncworker->relmutex);
505
506 /* If we told worker to catch up, wait for it. */
507 if (rstate->state == SUBREL_STATE_SYNCWAIT)
508 {
509 /* Signal the sync worker, as it may be waiting for us. */
510 if (syncworker->proc)
512
513 /* Now safe to release the LWLock */
514 LWLockRelease(LogicalRepWorkerLock);
515
516 if (started_tx)
517 {
518 /*
519 * We must commit the existing transaction to release
520 * the existing locks before entering a busy loop.
521 * This is required to avoid any undetected deadlocks
522 * due to any existing lock as deadlock detector won't
523 * be able to detect the waits on the latch.
524 *
525 * Also close any tables prior to the commit.
526 */
527 if (rel)
528 {
529 table_close(rel, NoLock);
530 rel = NULL;
531 }
533 pgstat_report_stat(false);
534 }
535
536 /*
537 * Enter busy loop and wait for synchronization worker to
538 * reach expected state (or die trying).
539 */
541 started_tx = true;
542
544 SUBREL_STATE_SYNCDONE);
545 }
546 else
547 LWLockRelease(LogicalRepWorkerLock);
548 }
549 else
550 {
551 /*
552 * If there is no sync worker for this table yet, count
553 * running sync workers for this subscription, while we have
554 * the lock.
555 */
556 int nsyncworkers =
558 struct tablesync_start_time_mapping *hentry;
559 bool found;
560
561 /* Now safe to release the LWLock */
562 LWLockRelease(LogicalRepWorkerLock);
563
564 hentry = hash_search(last_start_times, &rstate->relid,
565 HASH_ENTER, &found);
566 if (!found)
567 hentry->last_start_time = 0;
568
570 rstate->relid, &hentry->last_start_time);
571 }
572 }
573 }
574
575 /* Close table if opened */
576 if (rel)
577 table_close(rel, NoLock);
578
579
580 if (started_tx)
581 {
582 /*
583 * Even when the two_phase mode is requested by the user, it remains
584 * as 'pending' until all tablesyncs have reached READY state.
585 *
586 * When this happens, we restart the apply worker and (if the
587 * conditions are still ok) then the two_phase tri-state will become
588 * 'enabled' at that time.
589 *
590 * Note: If the subscription has no tables then leave the state as
591 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
592 * work.
593 */
594 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
595 {
596 CommandCounterIncrement(); /* make updates visible */
597 if (AllTablesyncsReady())
598 {
599 ereport(LOG,
600 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
602 should_exit = true;
603 }
604 }
605
607 pgstat_report_stat(true);
608 }
609
610 if (should_exit)
611 {
612 /*
613 * Reset the last-start time for this worker so that the launcher will
614 * restart it without waiting for wal_retrieve_retry_interval.
615 */
617
618 proc_exit(0);
619 }
620}
#define Max(x, y)
Definition: c.h:1002
int64 TimestampTz
Definition: timestamp.h:39
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
#define LOG
Definition: elog.h:31
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:746
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition: launcher.c:258
static dshash_table * last_start_times
Definition: launcher.c:91
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:927
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1154
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2170
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:439
tree ctl
Definition: radixtree.h:1838
Definition: dynahash.c:222
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition: syncutils.c:117
bool AllTablesyncsReady(void)
Definition: tablesync.c:1600
static bool wait_for_table_state_change(Oid relid, char expected_state)
Definition: tablesync.c:140
@ WORKERTYPE_TABLESYNC
bool IsTransactionState(void)
Definition: xact.c:388

References AccessShareLock, AllTablesyncsReady(), ApplyLauncherForgetWorkerStartTime(), Assert(), CommandCounterIncrement(), CommitTransactionCommand(), ctl, ereport, errmsg(), FetchRelationStates(), get_rel_relkind(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), last_start_times, launch_sync_worker(), lfirst, LockSharedObject(), LOG, logicalrep_sync_worker_count(), logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, NoLock, Subscription::oid, pgstat_report_stat(), LogicalRepWorker::proc, proc_exit(), SubscriptionRelState::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), RowExclusiveLock, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_close(), table_open(), table_states_not_ready, Subscription::twophasestate, UpdateSubscriptionRelState(), wait_for_table_state_change(), and WORKERTYPE_TABLESYNC.

Referenced by ProcessSyncingRelations().

◆ ProcessSyncingTablesForSync()

void ProcessSyncingTablesForSync ( XLogRecPtr  current_lsn)

Definition at line 244 of file tablesync.c.

245{
247
248 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
249 current_lsn >= MyLogicalRepWorker->relstate_lsn)
250 {
251 TimeLineID tli;
252 char syncslotname[NAMEDATALEN] = {0};
253 char originname[NAMEDATALEN] = {0};
254
255 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
256 MyLogicalRepWorker->relstate_lsn = current_lsn;
257
259
260 /*
261 * UpdateSubscriptionRelState must be called within a transaction.
262 */
263 if (!IsTransactionState())
265
270 false);
271
272 /*
273 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
274 * the slot.
275 */
277
278 /*
279 * Cleanup the tablesync slot.
280 *
281 * This has to be done after updating the state because otherwise if
282 * there is an error while doing the database operations we won't be
283 * able to rollback dropped slot.
284 */
287 syncslotname,
288 sizeof(syncslotname));
289
290 /*
291 * It is important to give an error if we are unable to drop the slot,
292 * otherwise, it won't be dropped till the corresponding subscription
293 * is dropped. So passing missing_ok = false.
294 */
296
298 pgstat_report_stat(false);
299
300 /*
301 * Start a new transaction to clean up the tablesync origin tracking.
302 * This transaction will be ended within the FinishSyncWorker(). Now,
303 * even, if we fail to remove this here, the apply worker will ensure
304 * to clean it up afterward.
305 *
306 * We need to do this after the table state is set to SYNCDONE.
307 * Otherwise, if an error occurs while performing the database
308 * operation, the worker will be restarted and the in-memory state of
309 * replication progress (remote_lsn) won't be rolled-back which would
310 * have been cleared before restart. So, the restarted worker will use
311 * invalid replication progress state resulting in replay of
312 * transactions that have already been applied.
313 */
315
318 originname,
319 sizeof(originname));
320
321 /*
322 * Resetting the origin session removes the ownership of the slot.
323 * This is needed to allow the origin to be dropped.
324 */
329
330 /*
331 * Drop the tablesync's origin tracking if exists.
332 *
333 * There is a chance that the user is concurrently performing refresh
334 * for the subscription where we remove the table state and its origin
335 * or the apply worker would have removed this origin. So passing
336 * missing_ok = true.
337 */
338 replorigin_drop_by_name(originname, true, false);
339
341 }
342 else
344}
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
void replorigin_session_reset(void)
Definition: origin.c:1226
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
#define InvalidRepOriginId
Definition: origin.h:33
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
uint32 TimeLineID
Definition: xlogdefs.h:63

References CommitTransactionCommand(), FinishSyncWorker(), InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, replorigin_session_reset(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.

Referenced by ProcessSyncingRelations().

◆ ReplicationSlotNameForTablesync()

void ReplicationSlotNameForTablesync ( Oid  suboid,
Oid  relid,
char *  syncslotname,
Size  szslot 
)

Definition at line 1203 of file tablesync.c.

1205{
1206 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1207 relid, GetSystemIdentifier());
1208}
#define UINT64_FORMAT
Definition: c.h:562
#define snprintf
Definition: port.h:239
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4609

References GetSystemIdentifier(), snprintf, and UINT64_FORMAT.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), ProcessSyncingTablesForSync(), and ReportSlotConnectionError().

◆ run_tablesync_worker()

static void run_tablesync_worker ( )
static

Definition at line 1554 of file tablesync.c.

1555{
1556 char originname[NAMEDATALEN];
1557 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1558 char *slotname = NULL;
1560
1561 start_table_sync(&origin_startpos, &slotname);
1562
1565 originname,
1566 sizeof(originname));
1567
1569
1570 set_stream_options(&options, slotname, &origin_startpos);
1571
1573
1574 /* Apply the changes till we catchup with the apply worker. */
1575 start_apply(origin_startpos);
1576}
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:5514
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:5583
void set_apply_error_context_origin(char *originname)
Definition: worker.c:6301
static char ** options
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
Definition: tablesync.c:1511
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451

References InvalidXLogRecPtr, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, NAMEDATALEN, Subscription::oid, options, LogicalRepWorker::relid, ReplicationOriginNameForLogicalRep(), set_apply_error_context_origin(), set_stream_options(), start_apply(), start_table_sync(), and walrcv_startstreaming.

Referenced by TableSyncWorkerMain().

◆ start_table_sync()

static void start_table_sync ( XLogRecPtr origin_startpos,
char **  slotname 
)
static

Definition at line 1511 of file tablesync.c.

1512{
1513 char *sync_slotname = NULL;
1514
1516
1517 PG_TRY();
1518 {
1519 /* Call initial sync. */
1520 sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1521 }
1522 PG_CATCH();
1523 {
1526 else
1527 {
1528 /*
1529 * Report the worker failed during table synchronization. Abort
1530 * the current transaction so that the stats message is sent in an
1531 * idle state.
1532 */
1536
1537 PG_RE_THROW();
1538 }
1539 }
1540 PG_END_TRY();
1541
1542 /* allocate slot name in long-lived context */
1543 *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1544 pfree(sync_slotname);
1545}
void DisableSubscriptionAndExit(void)
Definition: worker.c:5943
MemoryContext ApplyContext
Definition: worker.c:472
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1746
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1219
static bool am_tablesync_worker(void)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4880

References AbortOutOfAnyTransaction(), am_tablesync_worker(), ApplyContext, Assert(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MySubscription, Subscription::oid, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgstat_report_subscription_error(), and WORKERTYPE_TABLESYNC.

Referenced by run_tablesync_worker().

◆ TableSyncWorkerMain()

void TableSyncWorkerMain ( Datum  main_arg)

Definition at line 1580 of file tablesync.c.

1581{
1582 int worker_slot = DatumGetInt32(main_arg);
1583
1584 SetupApplyOrSyncWorker(worker_slot);
1585
1587
1589}
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:5869
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212
static void run_tablesync_worker()
Definition: tablesync.c:1554

References DatumGetInt32(), FinishSyncWorker(), run_tablesync_worker(), and SetupApplyOrSyncWorker().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1651 of file tablesync.c.

1652{
1653 Relation rel;
1654 HeapTuple tup;
1655 bool nulls[Natts_pg_subscription];
1656 bool replaces[Natts_pg_subscription];
1657 Datum values[Natts_pg_subscription];
1658
1659 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1660 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1661 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1662
1663 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1664 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1665 if (!HeapTupleIsValid(tup))
1666 elog(ERROR,
1667 "cache lookup failed for subscription oid %u",
1668 suboid);
1669
1670 /* Form a new tuple. */
1671 memset(values, 0, sizeof(values));
1672 memset(nulls, false, sizeof(nulls));
1673 memset(replaces, false, sizeof(replaces));
1674
1675 /* And update/set two_phase state */
1676 values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1677 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1678
1679 tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1680 values, nulls, replaces);
1681 CatalogTupleUpdate(rel, &tup->t_self, tup);
1682
1683 heap_freetuple(tup);
1685}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition: indexing.c:313
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
static Datum CharGetDatum(char X)
Definition: postgres.h:132
#define RelationGetDescr(relation)
Definition: rel.h:541
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91

References Assert(), CatalogTupleUpdate(), CharGetDatum(), elog, ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by CreateSubscription(), and run_apply_worker().

◆ wait_for_table_state_change()

static bool wait_for_table_state_change ( Oid  relid,
char  expected_state 
)
static

Definition at line 140 of file tablesync.c.

141{
142 char state;
143
144 for (;;)
145 {
146 LogicalRepWorker *worker;
147 XLogRecPtr statelsn;
148
150
153 relid, &statelsn);
154
155 if (state == SUBREL_STATE_UNKNOWN)
156 break;
157
158 if (state == expected_state)
159 return true;
160
161 /* Check if the sync worker is still running and bail if not. */
162 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
165 false);
166 LWLockRelease(LogicalRepWorkerLock);
167 if (!worker)
168 break;
169
170 (void) WaitLatch(MyLatch,
172 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
173
175 }
176
177 return false;
178}
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:454
Definition: regguts.h:323

References CHECK_FOR_INTERRUPTS, GetSubscriptionRelState(), InvalidateCatalogSnapshot(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, and WORKERTYPE_TABLESYNC.

Referenced by ProcessSyncingTablesForApply().

◆ wait_for_worker_state_change()

static bool wait_for_worker_state_change ( char  expected_state)
static

Definition at line 189 of file tablesync.c.

190{
191 int rc;
192
193 for (;;)
194 {
195 LogicalRepWorker *worker;
196
198
199 /*
200 * Done if already in correct state. (We assume this fetch is atomic
201 * enough to not give a misleading answer if we do it with no lock.)
202 */
203 if (MyLogicalRepWorker->relstate == expected_state)
204 return true;
205
206 /*
207 * Bail out if the apply worker has died, else signal it we're
208 * waiting.
209 */
210 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
213 false);
214 if (worker && worker->proc)
216 LWLockRelease(LogicalRepWorkerLock);
217 if (!worker)
218 break;
219
220 /*
221 * Wait. We expect to get a latch signal back from the apply worker,
222 * but use a timeout in case it dies without sending one.
223 */
224 rc = WaitLatch(MyLatch,
226 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
227
228 if (rc & WL_LATCH_SET)
230 }
231
232 return false;
233}
@ WORKERTYPE_APPLY

References CHECK_FOR_INTERRUPTS, InvalidOid, logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, LogicalRepWorker::proc, LogicalRepWorker::relstate, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, and WORKERTYPE_APPLY.

Referenced by LogicalRepSyncTableStart().

Variable Documentation

◆ copybuf

◆ table_states_not_ready

List* table_states_not_ready = NIL