28#include "catalog/pg_authid_d.h"
29#include "catalog/pg_database_d.h"
60#define SUBOPT_CONNECT 0x00000001
61#define SUBOPT_ENABLED 0x00000002
62#define SUBOPT_CREATE_SLOT 0x00000004
63#define SUBOPT_SLOT_NAME 0x00000008
64#define SUBOPT_COPY_DATA 0x00000010
65#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66#define SUBOPT_REFRESH 0x00000040
67#define SUBOPT_BINARY 0x00000080
68#define SUBOPT_STREAMING 0x00000100
69#define SUBOPT_TWOPHASE_COMMIT 0x00000200
70#define SUBOPT_DISABLE_ON_ERR 0x00000400
71#define SUBOPT_PASSWORD_REQUIRED 0x00000800
72#define SUBOPT_RUN_AS_OWNER 0x00001000
73#define SUBOPT_FAILOVER 0x00002000
74#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
75#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
76#define SUBOPT_LSN 0x00010000
77#define SUBOPT_ORIGIN 0x00020000
80#define IsSet(val, bits) (((val) & (bits)) == (bits))
121 List *publications,
bool copydata,
122 bool retain_dead_tuples,
124 Oid *subrel_local_oids,
125 int subrel_count,
char *
subname);
128 bool copydata,
char *origin,
129 Oid *subrel_local_oids,
137 bool slot_needs_update,
bool isTopLevel);
156 Assert(supported_opts != 0);
165 opts->connect =
true;
167 opts->enabled =
true;
169 opts->create_slot =
true;
171 opts->copy_data =
true;
173 opts->refresh =
true;
175 opts->binary =
false;
177 opts->streaming = LOGICALREP_STREAM_PARALLEL;
179 opts->twophase =
false;
181 opts->disableonerr =
false;
183 opts->passwordrequired =
true;
185 opts->runasowner =
false;
187 opts->failover =
false;
189 opts->retaindeadtuples =
false;
191 opts->maxretention = 0;
196 foreach(lc, stmt_options)
201 strcmp(defel->
defname,
"connect") == 0)
210 strcmp(defel->
defname,
"enabled") == 0)
219 strcmp(defel->
defname,
"create_slot") == 0)
228 strcmp(defel->
defname,
"slot_name") == 0)
237 if (strcmp(
opts->slot_name,
"none") == 0)
238 opts->slot_name = NULL;
243 strcmp(defel->
defname,
"copy_data") == 0)
252 strcmp(defel->
defname,
"synchronous_commit") == 0)
266 strcmp(defel->
defname,
"refresh") == 0)
275 strcmp(defel->
defname,
"binary") == 0)
284 strcmp(defel->
defname,
"streaming") == 0)
293 strcmp(defel->
defname,
"two_phase") == 0)
302 strcmp(defel->
defname,
"disable_on_error") == 0)
311 strcmp(defel->
defname,
"password_required") == 0)
320 strcmp(defel->
defname,
"run_as_owner") == 0)
329 strcmp(defel->
defname,
"failover") == 0)
338 strcmp(defel->
defname,
"retain_dead_tuples") == 0)
347 strcmp(defel->
defname,
"max_retention_duration") == 0)
356 strcmp(defel->
defname,
"origin") == 0)
375 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
376 errmsg(
"unrecognized origin value: \"%s\"",
opts->origin));
379 strcmp(defel->
defname,
"lsn") == 0)
388 if (strcmp(lsn_str,
"none") == 0)
398 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
399 errmsg(
"invalid WAL location (LSN): %s", lsn_str)));
407 (
errcode(ERRCODE_SYNTAX_ERROR),
408 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
421 (
errcode(ERRCODE_SYNTAX_ERROR),
423 errmsg(
"%s and %s are mutually exclusive options",
424 "connect = false",
"enabled = true")));
426 if (
opts->create_slot &&
429 (
errcode(ERRCODE_SYNTAX_ERROR),
430 errmsg(
"%s and %s are mutually exclusive options",
431 "connect = false",
"create_slot = true")));
433 if (
opts->copy_data &&
436 (
errcode(ERRCODE_SYNTAX_ERROR),
437 errmsg(
"%s and %s are mutually exclusive options",
438 "connect = false",
"copy_data = true")));
441 opts->enabled =
false;
442 opts->create_slot =
false;
443 opts->copy_data =
false;
450 if (!
opts->slot_name &&
457 (
errcode(ERRCODE_SYNTAX_ERROR),
459 errmsg(
"%s and %s are mutually exclusive options",
460 "slot_name = NONE",
"enabled = true")));
463 (
errcode(ERRCODE_SYNTAX_ERROR),
465 errmsg(
"subscription with %s must also set %s",
466 "slot_name = NONE",
"enabled = false")));
469 if (
opts->create_slot)
473 (
errcode(ERRCODE_SYNTAX_ERROR),
475 errmsg(
"%s and %s are mutually exclusive options",
476 "slot_name = NONE",
"create_slot = true")));
479 (
errcode(ERRCODE_SYNTAX_ERROR),
481 errmsg(
"subscription with %s must also set %s",
482 "slot_name = NONE",
"create_slot = false")));
497 Oid tableRow[1] = {TEXTOID};
501 " pg_catalog.pg_publication t WHERE\n"
511 errmsg(
"could not receive list of publications from the publisher: %s",
514 publicationsCopy =
list_copy(publications);
544 errcode(ERRCODE_UNDEFINED_OBJECT),
545 errmsg_plural(
"publication %s does not exist on the publisher",
546 "publications %s do not exist on the publisher",
565 "publicationListToArray to array",
592 bool nulls[Natts_pg_subscription];
624 if (
opts.create_slot)
634 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
635 errmsg(
"permission denied to create subscription"),
636 errdetail(
"Only roles with privileges of the \"%s\" role may create subscriptions.",
637 "pg_create_subscription")));
656 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
657 errmsg(
"password_required=false is superuser-only"),
658 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
664#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
665 if (strncmp(
stmt->subname,
"regress_", 8) != 0)
666 elog(
WARNING,
"subscriptions created by regression test cases should have names starting with \"regress_\"");
678 errmsg(
"subscription \"%s\" already exists",
687 opts.retaindeadtuples,
opts.retaindeadtuples,
688 (
opts.maxretention > 0));
691 opts.slot_name == NULL)
695 if (
opts.synchronous_commit == NULL)
696 opts.synchronous_commit =
"off";
698 conninfo =
stmt->conninfo;
699 publications =
stmt->publication;
709 memset(nulls,
false,
sizeof(nulls));
712 Anum_pg_subscription_oid);
716 values[Anum_pg_subscription_subname - 1] =
722 values[Anum_pg_subscription_subtwophasestate - 1] =
724 LOGICALREP_TWOPHASE_STATE_PENDING :
725 LOGICALREP_TWOPHASE_STATE_DISABLED);
730 values[Anum_pg_subscription_subretaindeadtuples - 1] =
732 values[Anum_pg_subscription_submaxretention - 1] =
734 values[Anum_pg_subscription_subretentionactive - 1] =
736 values[Anum_pg_subscription_subconninfo - 1] =
739 values[Anum_pg_subscription_subslotname - 1] =
742 nulls[Anum_pg_subscription_subslotname - 1] =
true;
743 values[Anum_pg_subscription_subsynccommit - 1] =
745 values[Anum_pg_subscription_subpublications - 1] =
747 values[Anum_pg_subscription_suborigin - 1] =
779 bool must_use_password;
787 (
errcode(ERRCODE_CONNECTION_FAILURE),
788 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
793 bool has_tables =
false;
801 NULL, 0,
stmt->subname);
804 NULL, 0,
stmt->subname);
806 if (
opts.retaindeadtuples)
813 relation_state =
opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
833 has_tables |= (relkind != RELKIND_SEQUENCE);
847 if (
opts.create_slot)
849 bool twophase_enabled =
false;
869 if (
opts.twophase && !
opts.copy_data && has_tables)
870 twophase_enabled =
true;
875 if (twophase_enabled)
879 (
errmsg(
"created replication slot \"%s\" on publisher",
891 (
errmsg(
"subscription was created, but is not connected"),
892 errhint(
"To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
908 if (
opts.enabled ||
opts.retaindeadtuples)
920 List *validate_publications)
924 Oid *pubrel_local_oids;
927 Oid *subrel_local_oids;
928 Oid *subseq_local_oids;
935 typedef struct SubRemoveRels
942 bool must_use_password;
953 (
errcode(ERRCODE_CONNECTION_FAILURE),
954 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
959 if (validate_publications)
978 subrel_local_oids =
palloc(subrel_count *
sizeof(
Oid));
979 subseq_local_oids =
palloc(subrel_count *
sizeof(
Oid));
980 foreach(lc, subrel_states)
985 subseq_local_oids[seq_count++] = relstate->
relid;
987 subrel_local_oids[tbl_count++] = relstate->
relid;
993 subrel_local_oids, tbl_count,
999 subseq_local_oids, seq_count,
1026 pubrel_local_oids[off++] = relid;
1028 if (!bsearch(&relid, subrel_local_oids,
1030 !bsearch(&relid, subseq_local_oids,
1034 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
1038 relkind == RELKIND_SEQUENCE ?
"sequence" :
"table",
1049 for (off = 0; off < tbl_count; off++)
1051 Oid relid = subrel_local_oids[off];
1053 if (!bsearch(&relid, pubrel_local_oids,
1058 SubRemoveRels *remove_rel =
palloc(
sizeof(SubRemoveRels));
1082 remove_rel->relid = relid;
1083 remove_rel->state =
state;
1085 sub_remove_rels =
lappend(sub_remove_rels, remove_rel);
1093 if (
state != SUBREL_STATE_READY)
1108 sizeof(originname));
1127 if (rel->state != SUBREL_STATE_READY &&
1128 rel->state != SUBREL_STATE_SYNCDONE)
1143 syncslotname,
sizeof(syncslotname));
1152 for (off = 0; off < seq_count; off++)
1154 Oid relid = subseq_local_oids[off];
1156 if (!bsearch(&relid, pubrel_local_oids,
1169 errmsg_internal(
"sequence \"%s.%s\" removed from subscription \"%s\"",
1194 bool must_use_password;
1205 errcode(ERRCODE_CONNECTION_FAILURE),
1206 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
1211 List *subrel_states;
1220 Oid relid = subrel->relid;
1225 errmsg_internal(
"sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1244 bool slot_needs_update,
bool isTopLevel)
1247 strcmp(
option,
"two_phase") == 0 ||
1248 strcmp(
option,
"retain_dead_tuples") == 0);
1254 Assert(!slot_needs_update || strcmp(
option,
"retain_dead_tuples") != 0);
1302 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1303 errmsg(
"cannot set option \"%s\" for enabled subscription",
1306 if (slot_needs_update)
1316 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1317 errmsg(
"cannot set option \"%s\" for a subscription that does not have a slot name",
1338 bool nulls[Natts_pg_subscription];
1339 bool replaces[Natts_pg_subscription];
1343 bool update_tuple =
false;
1344 bool update_failover =
false;
1345 bool update_two_phase =
false;
1346 bool check_pub_rdt =
false;
1347 bool retain_dead_tuples;
1349 bool retention_active;
1364 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1365 errmsg(
"subscription \"%s\" does not exist",
1389 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1390 errmsg(
"password_required=false is superuser-only"),
1391 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1398 memset(nulls,
false,
sizeof(nulls));
1399 memset(replaces,
false,
sizeof(replaces));
1416 supported_opts, &
opts);
1429 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1430 errmsg(
"cannot set %s for enabled subscription",
1431 "slot_name = NONE")));
1434 values[Anum_pg_subscription_subslotname - 1] =
1437 nulls[Anum_pg_subscription_subslotname - 1] =
true;
1438 replaces[Anum_pg_subscription_subslotname - 1] =
true;
1441 if (
opts.synchronous_commit)
1443 values[Anum_pg_subscription_subsynccommit - 1] =
1445 replaces[Anum_pg_subscription_subsynccommit - 1] =
true;
1450 values[Anum_pg_subscription_subbinary - 1] =
1452 replaces[Anum_pg_subscription_subbinary - 1] =
true;
1457 values[Anum_pg_subscription_substream - 1] =
1459 replaces[Anum_pg_subscription_substream - 1] =
true;
1464 values[Anum_pg_subscription_subdisableonerr - 1]
1466 replaces[Anum_pg_subscription_subdisableonerr - 1]
1475 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1476 errmsg(
"password_required=false is superuser-only"),
1477 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1479 values[Anum_pg_subscription_subpasswordrequired - 1]
1481 replaces[Anum_pg_subscription_subpasswordrequired - 1]
1487 values[Anum_pg_subscription_subrunasowner - 1] =
1489 replaces[Anum_pg_subscription_subrunasowner - 1] =
true;
1501 update_two_phase = !
opts.twophase;
1511 if (update_two_phase &&
1514 (
errcode(ERRCODE_SYNTAX_ERROR),
1515 errmsg(
"\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1530 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1531 errmsg(
"cannot alter \"two_phase\" when logical replication worker is still running"),
1532 errhint(
"Try again after some time.")));
1540 if (update_two_phase &&
1544 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1545 errmsg(
"cannot disable \"two_phase\" when prepared transactions exist"),
1546 errhint(
"Resolve these transactions and try again.")));
1549 values[Anum_pg_subscription_subtwophasestate - 1] =
1551 LOGICALREP_TWOPHASE_STATE_PENDING :
1552 LOGICALREP_TWOPHASE_STATE_DISABLED);
1553 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1563 update_failover =
true;
1568 values[Anum_pg_subscription_subfailover - 1] =
1570 replaces[Anum_pg_subscription_subfailover - 1] =
true;
1575 values[Anum_pg_subscription_subretaindeadtuples - 1] =
1577 replaces[Anum_pg_subscription_subretaindeadtuples - 1] =
true;
1595 values[Anum_pg_subscription_subretentionactive - 1] =
1597 replaces[Anum_pg_subscription_subretentionactive - 1] =
true;
1599 retention_active =
opts.retaindeadtuples;
1614 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1615 errmsg(
"cannot alter retain_dead_tuples when logical replication worker is still running"),
1616 errhint(
"Try again after some time.")));
1626 check_pub_rdt =
opts.retaindeadtuples;
1627 retain_dead_tuples =
opts.retaindeadtuples;
1632 values[Anum_pg_subscription_submaxretention - 1] =
1634 replaces[Anum_pg_subscription_submaxretention - 1] =
true;
1636 max_retention =
opts.maxretention;
1649 (max_retention > 0));
1653 values[Anum_pg_subscription_suborigin - 1] =
1655 replaces[Anum_pg_subscription_suborigin - 1] =
true;
1662 check_pub_rdt = retain_dead_tuples &&
1665 origin =
opts.origin;
1668 update_tuple =
true;
1680 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1681 errmsg(
"cannot enable subscription that does not have a slot name")));
1692 values[Anum_pg_subscription_subenabled - 1] =
1694 replaces[Anum_pg_subscription_subenabled - 1] =
true;
1699 update_tuple =
true;
1718 values[Anum_pg_subscription_subconninfo - 1] =
1720 replaces[Anum_pg_subscription_subconninfo - 1] =
true;
1721 update_tuple =
true;
1735 supported_opts, &
opts);
1737 values[Anum_pg_subscription_subpublications - 1] =
1739 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1741 update_tuple =
true;
1748 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1749 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1750 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1758 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1759 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1760 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1782 supported_opts, &
opts);
1785 values[Anum_pg_subscription_subpublications - 1] =
1787 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1789 update_tuple =
true;
1795 List *validate_publications = (isadd) ?
stmt->publication : NULL;
1799 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1800 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1804 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1805 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1813 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1814 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1816 errhint(
"Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1818 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1819 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1827 validate_publications);
1837 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1838 errmsg(
"%s is not allowed for disabled subscriptions",
1839 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
1863 (
errcode(ERRCODE_SYNTAX_ERROR),
1864 errmsg(
"ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
1865 errhint(
"Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1878 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1879 errmsg(
"%s is not allowed for disabled subscriptions",
1880 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
1905 originname,
sizeof(originname));
1912 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1913 errmsg(
"skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1919 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
1921 update_tuple =
true;
1926 elog(
ERROR,
"unrecognized ALTER SUBSCRIPTION kind %d",
1950 if (update_failover || update_two_phase || check_pub_rdt)
1952 bool must_use_password;
1965 true,
true, must_use_password, sub->
name,
1969 (
errcode(ERRCODE_CONNECTION_FAILURE),
1970 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
1975 if (retain_dead_tuples)
1979 retain_dead_tuples, origin, NULL, 0,
1982 if (update_failover || update_two_phase)
1984 update_failover ? &
opts.failover : NULL,
1985 update_two_phase ? &
opts.twophase : NULL);
2029 bool must_use_password;
2046 if (!
stmt->missing_ok)
2048 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2049 errmsg(
"subscription \"%s\" does not exist",
2053 (
errmsg(
"subscription \"%s\" does not exist, skipping",
2061 subowner = form->subowner;
2062 must_use_password = !
superuser_arg(subowner) && form->subpasswordrequired;
2080 Anum_pg_subscription_subname);
2085 Anum_pg_subscription_subconninfo);
2090 Anum_pg_subscription_subslotname, &isnull);
2135 foreach(lc, subworkers)
2163 foreach(lc, rstates)
2180 sizeof(originname));
2204 if (!slotname && rstates ==
NIL)
2242 foreach(lc, rstates)
2263 if (rstate->
state != SUBREL_STATE_SYNCDONE)
2268 sizeof(syncslotname));
2320 (
errmsg(
"dropped replication slot \"%s\" on publisher",
2325 res->
sqlstate == ERRCODE_UNDEFINED_OBJECT)
2329 (
errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
2330 slotname, res->
err)));
2336 (
errcode(ERRCODE_CONNECTION_FAILURE),
2337 errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
2338 slotname, res->
err)));
2361 if (form->subowner == newOwnerId)
2372 if (!form->subpasswordrequired && !
superuser())
2374 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2375 errmsg(
"password_required=false is superuser-only"),
2376 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2394 form->subowner = newOwnerId;
2429 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2430 errmsg(
"subscription \"%s\" does not exist",
name)));
2461 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2462 errmsg(
"subscription with OID %u does not exist", subid)));
2496 bool copydata,
bool retain_dead_tuples,
2497 char *origin,
Oid *subrel_local_oids,
2498 int subrel_count,
char *
subname)
2503 Oid tableRow[1] = {TEXTOID};
2507 bool check_table_sync;
2508 bool origin_none = origin &&
2516 check_rdt = retain_dead_tuples && !origin_none;
2522 check_table_sync = copydata && origin_none;
2525 Assert(!(check_rdt && check_table_sync));
2528 if (!check_rdt && !check_table_sync)
2533 "SELECT DISTINCT P.pubname AS pubname\n"
2534 "FROM pg_publication P,\n"
2535 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2536 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2537 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2538 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2539 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2540 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2553 if (check_table_sync)
2555 for (
i = 0;
i < subrel_count;
i++)
2557 Oid relid = subrel_local_oids[
i];
2561 appendStringInfo(&cmd,
"AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2562 schemaname, tablename);
2571 (
errcode(ERRCODE_CONNECTION_FAILURE),
2572 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2608 if (check_table_sync)
2610 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2611 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2613 errdetail_plural(
"The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2614 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2616 errhint(
"Verify that initial data copied from the publisher tables did not come from other origins."));
2619 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2620 errmsg(
"subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2622 errdetail_plural(
"The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2623 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2625 errhint(
"Consider using origin = NONE or disabling retain_dead_tuples."));
2639 bool copydata,
char *origin,
2640 Oid *subrel_local_oids,
int subrel_count,
2646 Oid tableRow[1] = {TEXTOID};
2654 if (!copydata ||
pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
2659 "SELECT DISTINCT P.pubname AS pubname\n"
2660 "FROM pg_publication P,\n"
2661 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2662 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2663 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2664 "WHERE C.oid = GPS.relid AND P.pubname IN (");
2675 for (
int i = 0;
i < subrel_count;
i++)
2677 Oid relid = subrel_local_oids[
i];
2682 "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2683 schemaname, seqname);
2691 (
errcode(ERRCODE_CONNECTION_FAILURE),
2692 errmsg(
"could not receive list of replicated sequences from the publisher: %s",
2723 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2724 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2726 errdetail_plural(
"The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
2727 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
2729 errhint(
"Verify that initial data copied from the publisher sequences did not come from other origins."));
2751 Oid RecoveryRow[1] = {BOOLOID};
2754 bool remote_in_recovery;
2758 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2759 errmsg(
"cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2765 (
errcode(ERRCODE_CONNECTION_FAILURE),
2766 errmsg(
"could not obtain recovery progress from the publisher: %s",
2771 elog(
ERROR,
"failed to fetch tuple for the recovery progress");
2775 if (remote_in_recovery)
2777 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2778 errmsg(
"cannot enable retain_dead_tuples if the publisher is in recovery."));
2810 int elevel_for_sub_disabled,
2811 bool retain_dead_tuples,
bool retention_active,
2812 bool max_retention_set)
2815 elevel_for_sub_disabled ==
WARNING);
2817 if (retain_dead_tuples)
2821 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2822 errmsg(
"\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2823 errhint(
"\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2827 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2828 errmsg(
"commit timestamp and origin data required for detecting conflicts won't be retained"),
2829 errhint(
"Consider setting \"%s\" to true.",
2830 "track_commit_timestamp"));
2832 if (sub_disabled && retention_active)
2833 ereport(elevel_for_sub_disabled,
2834 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2835 errmsg(
"deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2836 (elevel_for_sub_disabled >
NOTICE)
2837 ?
errhint(
"Consider setting %s to false.",
2838 "retain_dead_tuples") : 0);
2840 else if (max_retention_set)
2843 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2844 errmsg(
"max_retention_duration is ineffective when retain_dead_tuples is disabled"));
2856 if (
equal(relinfo->rv, rv))
2882 int column_count = check_columnlist ? 4 : 3;
2894 tableRow[3] = INT2VECTOROID;
2909 appendStringInfo(&cmd,
"SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
2910 " FROM pg_class c\n"
2911 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2912 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2913 " FROM pg_publication\n"
2914 " WHERE pubname IN ( %s )) AS gpt\n"
2915 " ON gpt.relid = c.oid\n",
2922 " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE)
"::\"char\" AS relkind, NULL::int2vector AS attrs\n"
2923 " FROM pg_catalog.pg_publication_sequences s\n"
2924 " WHERE s.pubname IN ( %s )",
2929 tableRow[3] = NAMEARRAYOID;
2933 if (check_columnlist)
2937 " WHERE t.pubname IN ( %s )",
2948 (
errcode(ERRCODE_CONNECTION_FAILURE),
2949 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2972 if (relkind != RELKIND_SEQUENCE &&
2976 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2977 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
2980 relationlist =
lappend(relationlist, relinfo);
2988 return relationlist;
3001 foreach(lc, rstates)
3014 if (rstate->
state != SUBREL_STATE_SYNCDONE)
3019 sizeof(syncslotname));
3020 elog(
WARNING,
"could not drop tablesync replication slot \"%s\"",
3026 (
errcode(ERRCODE_CONNECTION_FAILURE),
3027 errmsg(
"could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3030 errhint(
"Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3031 "ALTER SUBSCRIPTION ... DISABLE",
3032 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3046 foreach(cell, publist)
3051 foreach(pcell, publist)
3058 if (strcmp(
name, pname) == 0)
3061 errmsg(
"publication name \"%s\" used more than once",
3089 foreach(lc, newpublist)
3095 foreach(lc2, oldpublist)
3099 if (strcmp(
name, pubname) == 0)
3105 errmsg(
"publication \"%s\" is already in subscription \"%s\"",
3114 if (addpub && !found)
3116 else if (!addpub && !found)
3118 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3119 errmsg(
"publication \"%s\" is not in subscription \"%s\"",
3129 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3130 errmsg(
"cannot drop all the publications from a subscription")));
3146 return LOGICALREP_STREAM_ON;
3157 return LOGICALREP_STREAM_OFF;
3159 return LOGICALREP_STREAM_ON;
3175 return LOGICALREP_STREAM_OFF;
3178 return LOGICALREP_STREAM_ON;
3180 return LOGICALREP_STREAM_PARALLEL;
3186 (
errcode(ERRCODE_SYNTAX_ERROR),
3187 errmsg(
"%s requires a Boolean value or \"parallel\"",
3189 return LOGICALREP_STREAM_OFF;
bool has_privs_of_role(Oid member, Oid role)
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
bool track_commit_timestamp
int32 defGetInt32(DefElem *def)
char * defGetString(DefElem *def)
bool defGetBoolean(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void load_file(const char *filename, bool restricted)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
bool equal(const void *a, const void *b)
void err(int eval, const char *fmt,...)
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
#define palloc_object(type)
#define DirectFunctionCall1(func, arg1)
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
if(TABLE==NULL||TABLE_index==NULL)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
void ApplyLauncherWakeupAtCommit(void)
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
List * list_delete(List *list, void *datum)
List * list_append_unique(List *list, void *datum)
List * list_copy(const List *oldlist)
void list_free(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
char * get_rel_name(Oid relid)
char * get_database_name(Oid dbid)
char get_rel_relkind(Oid relid)
Oid get_rel_namespace(Oid relid)
char * get_namespace_name(Oid nspid)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
Datum namein(PG_FUNCTION_ARGS)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
static AmcheckOptions opts
static int server_version
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
#define foreach_ptr(type, var, lst)
Datum pg_lsn_in(PG_FUNCTION_ARGS)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
Subscription * GetSubscription(Oid subid, bool missing_ok)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
static bool DatumGetBool(Datum X)
static Datum PointerGetDatum(const void *X)
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static Datum CStringGetDatum(const char *X)
static Datum Int32GetDatum(int32 X)
static Datum CharGetDatum(char X)
#define RelationGetDescr(relation)
const char * quote_identifier(const char *ident)
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
#define ERRCODE_DUPLICATE_OBJECT
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
LogicalRepWorkerType type
char * synchronous_commit
Tuplestorestate * tuplestore
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
static void check_duplicates_in_publist(List *publist, Datum *datums)
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
static Datum publicationListToArray(List *publist)
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
struct PublicationRelKind PublicationRelKind
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static bool list_member_rangevar(const List *list, RangeVar *rv)
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
bool LookupGXactBySubid(Oid subid)
String * makeString(char *str)
static WalReceiverConn * wrconn
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr