PostgreSQL Source Code git master
subscriptioncmds.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * subscriptioncmds.c
4 * subscription catalog manipulation functions
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/subscriptioncmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/commit_ts.h"
18#include "access/htup_details.h"
19#include "access/table.h"
20#include "access/twophase.h"
21#include "access/xact.h"
22#include "catalog/catalog.h"
23#include "catalog/dependency.h"
24#include "catalog/indexing.h"
25#include "catalog/namespace.h"
28#include "catalog/pg_authid_d.h"
29#include "catalog/pg_database_d.h"
32#include "catalog/pg_type.h"
33#include "commands/defrem.h"
36#include "executor/executor.h"
37#include "miscadmin.h"
38#include "nodes/makefuncs.h"
39#include "pgstat.h"
42#include "replication/origin.h"
43#include "replication/slot.h"
47#include "storage/lmgr.h"
48#include "utils/acl.h"
49#include "utils/builtins.h"
50#include "utils/guc.h"
51#include "utils/lsyscache.h"
52#include "utils/memutils.h"
53#include "utils/pg_lsn.h"
54#include "utils/syscache.h"
55
56/*
57 * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
58 * command.
59 */
60#define SUBOPT_CONNECT 0x00000001
61#define SUBOPT_ENABLED 0x00000002
62#define SUBOPT_CREATE_SLOT 0x00000004
63#define SUBOPT_SLOT_NAME 0x00000008
64#define SUBOPT_COPY_DATA 0x00000010
65#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66#define SUBOPT_REFRESH 0x00000040
67#define SUBOPT_BINARY 0x00000080
68#define SUBOPT_STREAMING 0x00000100
69#define SUBOPT_TWOPHASE_COMMIT 0x00000200
70#define SUBOPT_DISABLE_ON_ERR 0x00000400
71#define SUBOPT_PASSWORD_REQUIRED 0x00000800
72#define SUBOPT_RUN_AS_OWNER 0x00001000
73#define SUBOPT_FAILOVER 0x00002000
74#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
75#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
76#define SUBOPT_LSN 0x00010000
77#define SUBOPT_ORIGIN 0x00020000
78
79/* check if the 'val' has 'bits' set */
80#define IsSet(val, bits) (((val) & (bits)) == (bits))
81
82/*
83 * Structure to hold a bitmap representing the user-provided CREATE/ALTER
84 * SUBSCRIPTION command options and the parsed/default values of each of them.
85 */
86typedef struct SubOpts
87{
89 char *slot_name;
91 bool connect;
92 bool enabled;
95 bool refresh;
96 bool binary;
105 char *origin;
108
109/*
110 * PublicationRelKind represents a relation included in a publication.
111 * It stores the schema-qualified relation name (rv) and its kind (relkind).
112 */
113typedef struct PublicationRelKind
114{
118
119static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
121 List *publications, bool copydata,
122 bool retain_dead_tuples,
123 char *origin,
124 Oid *subrel_local_oids,
125 int subrel_count, char *subname);
127 List *publications,
128 bool copydata, char *origin,
129 Oid *subrel_local_oids,
130 int subrel_count,
131 char *subname);
133static void check_duplicates_in_publist(List *publist, Datum *datums);
134static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
135static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
136static void CheckAlterSubOption(Subscription *sub, const char *option,
137 bool slot_needs_update, bool isTopLevel);
138
139
140/*
141 * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
142 *
143 * Since not all options can be specified in both commands, this function
144 * will report an error if mutually exclusive options are specified.
145 */
146static void
148 bits32 supported_opts, SubOpts *opts)
149{
150 ListCell *lc;
151
152 /* Start out with cleared opts. */
153 memset(opts, 0, sizeof(SubOpts));
154
155 /* caller must expect some option */
156 Assert(supported_opts != 0);
157
158 /* If connect option is supported, these others also need to be. */
159 Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
160 IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
162
163 /* Set default values for the supported options. */
164 if (IsSet(supported_opts, SUBOPT_CONNECT))
165 opts->connect = true;
166 if (IsSet(supported_opts, SUBOPT_ENABLED))
167 opts->enabled = true;
168 if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
169 opts->create_slot = true;
170 if (IsSet(supported_opts, SUBOPT_COPY_DATA))
171 opts->copy_data = true;
172 if (IsSet(supported_opts, SUBOPT_REFRESH))
173 opts->refresh = true;
174 if (IsSet(supported_opts, SUBOPT_BINARY))
175 opts->binary = false;
176 if (IsSet(supported_opts, SUBOPT_STREAMING))
177 opts->streaming = LOGICALREP_STREAM_PARALLEL;
178 if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
179 opts->twophase = false;
180 if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
181 opts->disableonerr = false;
182 if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
183 opts->passwordrequired = true;
184 if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
185 opts->runasowner = false;
186 if (IsSet(supported_opts, SUBOPT_FAILOVER))
187 opts->failover = false;
188 if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
189 opts->retaindeadtuples = false;
190 if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
191 opts->maxretention = 0;
192 if (IsSet(supported_opts, SUBOPT_ORIGIN))
193 opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
194
195 /* Parse options */
196 foreach(lc, stmt_options)
197 {
198 DefElem *defel = (DefElem *) lfirst(lc);
199
200 if (IsSet(supported_opts, SUBOPT_CONNECT) &&
201 strcmp(defel->defname, "connect") == 0)
202 {
203 if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
204 errorConflictingDefElem(defel, pstate);
205
206 opts->specified_opts |= SUBOPT_CONNECT;
207 opts->connect = defGetBoolean(defel);
208 }
209 else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
210 strcmp(defel->defname, "enabled") == 0)
211 {
212 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
213 errorConflictingDefElem(defel, pstate);
214
215 opts->specified_opts |= SUBOPT_ENABLED;
216 opts->enabled = defGetBoolean(defel);
217 }
218 else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
219 strcmp(defel->defname, "create_slot") == 0)
220 {
221 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
222 errorConflictingDefElem(defel, pstate);
223
224 opts->specified_opts |= SUBOPT_CREATE_SLOT;
225 opts->create_slot = defGetBoolean(defel);
226 }
227 else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
228 strcmp(defel->defname, "slot_name") == 0)
229 {
230 if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
231 errorConflictingDefElem(defel, pstate);
232
233 opts->specified_opts |= SUBOPT_SLOT_NAME;
234 opts->slot_name = defGetString(defel);
235
236 /* Setting slot_name = NONE is treated as no slot name. */
237 if (strcmp(opts->slot_name, "none") == 0)
238 opts->slot_name = NULL;
239 else
240 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
241 }
242 else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
243 strcmp(defel->defname, "copy_data") == 0)
244 {
245 if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
246 errorConflictingDefElem(defel, pstate);
247
248 opts->specified_opts |= SUBOPT_COPY_DATA;
249 opts->copy_data = defGetBoolean(defel);
250 }
251 else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
252 strcmp(defel->defname, "synchronous_commit") == 0)
253 {
254 if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
255 errorConflictingDefElem(defel, pstate);
256
257 opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
258 opts->synchronous_commit = defGetString(defel);
259
260 /* Test if the given value is valid for synchronous_commit GUC. */
261 (void) set_config_option("synchronous_commit", opts->synchronous_commit,
263 false, 0, false);
264 }
265 else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
266 strcmp(defel->defname, "refresh") == 0)
267 {
268 if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
269 errorConflictingDefElem(defel, pstate);
270
271 opts->specified_opts |= SUBOPT_REFRESH;
272 opts->refresh = defGetBoolean(defel);
273 }
274 else if (IsSet(supported_opts, SUBOPT_BINARY) &&
275 strcmp(defel->defname, "binary") == 0)
276 {
277 if (IsSet(opts->specified_opts, SUBOPT_BINARY))
278 errorConflictingDefElem(defel, pstate);
279
280 opts->specified_opts |= SUBOPT_BINARY;
281 opts->binary = defGetBoolean(defel);
282 }
283 else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
284 strcmp(defel->defname, "streaming") == 0)
285 {
286 if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
287 errorConflictingDefElem(defel, pstate);
288
289 opts->specified_opts |= SUBOPT_STREAMING;
290 opts->streaming = defGetStreamingMode(defel);
291 }
292 else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
293 strcmp(defel->defname, "two_phase") == 0)
294 {
295 if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
296 errorConflictingDefElem(defel, pstate);
297
298 opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
299 opts->twophase = defGetBoolean(defel);
300 }
301 else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
302 strcmp(defel->defname, "disable_on_error") == 0)
303 {
304 if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
305 errorConflictingDefElem(defel, pstate);
306
307 opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
308 opts->disableonerr = defGetBoolean(defel);
309 }
310 else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
311 strcmp(defel->defname, "password_required") == 0)
312 {
313 if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
314 errorConflictingDefElem(defel, pstate);
315
316 opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
317 opts->passwordrequired = defGetBoolean(defel);
318 }
319 else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
320 strcmp(defel->defname, "run_as_owner") == 0)
321 {
322 if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
323 errorConflictingDefElem(defel, pstate);
324
325 opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
326 opts->runasowner = defGetBoolean(defel);
327 }
328 else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
329 strcmp(defel->defname, "failover") == 0)
330 {
331 if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
332 errorConflictingDefElem(defel, pstate);
333
334 opts->specified_opts |= SUBOPT_FAILOVER;
335 opts->failover = defGetBoolean(defel);
336 }
337 else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
338 strcmp(defel->defname, "retain_dead_tuples") == 0)
339 {
340 if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
341 errorConflictingDefElem(defel, pstate);
342
343 opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
344 opts->retaindeadtuples = defGetBoolean(defel);
345 }
346 else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
347 strcmp(defel->defname, "max_retention_duration") == 0)
348 {
349 if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
350 errorConflictingDefElem(defel, pstate);
351
352 opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
353 opts->maxretention = defGetInt32(defel);
354 }
355 else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
356 strcmp(defel->defname, "origin") == 0)
357 {
358 if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
359 errorConflictingDefElem(defel, pstate);
360
361 opts->specified_opts |= SUBOPT_ORIGIN;
362 pfree(opts->origin);
363
364 /*
365 * Even though the "origin" parameter allows only "none" and "any"
366 * values, it is implemented as a string type so that the
367 * parameter can be extended in future versions to support
368 * filtering using origin names specified by the user.
369 */
370 opts->origin = defGetString(defel);
371
372 if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
373 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
375 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
376 errmsg("unrecognized origin value: \"%s\"", opts->origin));
377 }
378 else if (IsSet(supported_opts, SUBOPT_LSN) &&
379 strcmp(defel->defname, "lsn") == 0)
380 {
381 char *lsn_str = defGetString(defel);
382 XLogRecPtr lsn;
383
384 if (IsSet(opts->specified_opts, SUBOPT_LSN))
385 errorConflictingDefElem(defel, pstate);
386
387 /* Setting lsn = NONE is treated as resetting LSN */
388 if (strcmp(lsn_str, "none") == 0)
389 lsn = InvalidXLogRecPtr;
390 else
391 {
392 /* Parse the argument as LSN */
394 CStringGetDatum(lsn_str)));
395
396 if (!XLogRecPtrIsValid(lsn))
398 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
399 errmsg("invalid WAL location (LSN): %s", lsn_str)));
400 }
401
402 opts->specified_opts |= SUBOPT_LSN;
403 opts->lsn = lsn;
404 }
405 else
407 (errcode(ERRCODE_SYNTAX_ERROR),
408 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
409 }
410
411 /*
412 * We've been explicitly asked to not connect, that requires some
413 * additional processing.
414 */
415 if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
416 {
417 /* Check for incompatible options from the user. */
418 if (opts->enabled &&
419 IsSet(opts->specified_opts, SUBOPT_ENABLED))
421 (errcode(ERRCODE_SYNTAX_ERROR),
422 /*- translator: both %s are strings of the form "option = value" */
423 errmsg("%s and %s are mutually exclusive options",
424 "connect = false", "enabled = true")));
425
426 if (opts->create_slot &&
427 IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
429 (errcode(ERRCODE_SYNTAX_ERROR),
430 errmsg("%s and %s are mutually exclusive options",
431 "connect = false", "create_slot = true")));
432
433 if (opts->copy_data &&
434 IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
436 (errcode(ERRCODE_SYNTAX_ERROR),
437 errmsg("%s and %s are mutually exclusive options",
438 "connect = false", "copy_data = true")));
439
440 /* Change the defaults of other options. */
441 opts->enabled = false;
442 opts->create_slot = false;
443 opts->copy_data = false;
444 }
445
446 /*
447 * Do additional checking for disallowed combination when slot_name = NONE
448 * was used.
449 */
450 if (!opts->slot_name &&
451 IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
452 {
453 if (opts->enabled)
454 {
455 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
457 (errcode(ERRCODE_SYNTAX_ERROR),
458 /*- translator: both %s are strings of the form "option = value" */
459 errmsg("%s and %s are mutually exclusive options",
460 "slot_name = NONE", "enabled = true")));
461 else
463 (errcode(ERRCODE_SYNTAX_ERROR),
464 /*- translator: both %s are strings of the form "option = value" */
465 errmsg("subscription with %s must also set %s",
466 "slot_name = NONE", "enabled = false")));
467 }
468
469 if (opts->create_slot)
470 {
471 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
473 (errcode(ERRCODE_SYNTAX_ERROR),
474 /*- translator: both %s are strings of the form "option = value" */
475 errmsg("%s and %s are mutually exclusive options",
476 "slot_name = NONE", "create_slot = true")));
477 else
479 (errcode(ERRCODE_SYNTAX_ERROR),
480 /*- translator: both %s are strings of the form "option = value" */
481 errmsg("subscription with %s must also set %s",
482 "slot_name = NONE", "create_slot = false")));
483 }
484 }
485}
486
487/*
488 * Check that the specified publications are present on the publisher.
489 */
490static void
492{
493 WalRcvExecResult *res;
494 StringInfoData cmd;
495 TupleTableSlot *slot;
496 List *publicationsCopy = NIL;
497 Oid tableRow[1] = {TEXTOID};
498
499 initStringInfo(&cmd);
500 appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
501 " pg_catalog.pg_publication t WHERE\n"
502 " t.pubname IN (");
503 GetPublicationsStr(publications, &cmd, true);
504 appendStringInfoChar(&cmd, ')');
505
506 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
507 pfree(cmd.data);
508
509 if (res->status != WALRCV_OK_TUPLES)
511 errmsg("could not receive list of publications from the publisher: %s",
512 res->err));
513
514 publicationsCopy = list_copy(publications);
515
516 /* Process publication(s). */
518 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
519 {
520 char *pubname;
521 bool isnull;
522
523 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
524 Assert(!isnull);
525
526 /* Delete the publication present in publisher from the list. */
527 publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
528 ExecClearTuple(slot);
529 }
530
532
534
535 if (list_length(publicationsCopy))
536 {
537 /* Prepare the list of non-existent publication(s) for error message. */
538 StringInfoData pubnames;
539
540 initStringInfo(&pubnames);
541
542 GetPublicationsStr(publicationsCopy, &pubnames, false);
544 errcode(ERRCODE_UNDEFINED_OBJECT),
545 errmsg_plural("publication %s does not exist on the publisher",
546 "publications %s do not exist on the publisher",
547 list_length(publicationsCopy),
548 pubnames.data));
549 }
550}
551
552/*
553 * Auxiliary function to build a text array out of a list of String nodes.
554 */
555static Datum
557{
558 ArrayType *arr;
559 Datum *datums;
560 MemoryContext memcxt;
561 MemoryContext oldcxt;
562
563 /* Create memory context for temporary allocations. */
565 "publicationListToArray to array",
567 oldcxt = MemoryContextSwitchTo(memcxt);
568
569 datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
570
571 check_duplicates_in_publist(publist, datums);
572
573 MemoryContextSwitchTo(oldcxt);
574
575 arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
576
577 MemoryContextDelete(memcxt);
578
579 return PointerGetDatum(arr);
580}
581
582/*
583 * Create new subscription.
584 */
587 bool isTopLevel)
588{
589 Relation rel;
590 ObjectAddress myself;
591 Oid subid;
592 bool nulls[Natts_pg_subscription];
593 Datum values[Natts_pg_subscription];
594 Oid owner = GetUserId();
595 HeapTuple tup;
596 char *conninfo;
597 char originname[NAMEDATALEN];
598 List *publications;
599 bits32 supported_opts;
600 SubOpts opts = {0};
601 AclResult aclresult;
602
603 /*
604 * Parse and check options.
605 *
606 * Connection and publication should not be specified here.
607 */
608 supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
616 parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
617
618 /*
619 * Since creating a replication slot is not transactional, rolling back
620 * the transaction leaves the created replication slot. So we cannot run
621 * CREATE SUBSCRIPTION inside a transaction block if creating a
622 * replication slot.
623 */
624 if (opts.create_slot)
625 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
626
627 /*
628 * We don't want to allow unprivileged users to be able to trigger
629 * attempts to access arbitrary network destinations, so require the user
630 * to have been specifically authorized to create subscriptions.
631 */
632 if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
634 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
635 errmsg("permission denied to create subscription"),
636 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
637 "pg_create_subscription")));
638
639 /*
640 * Since a subscription is a database object, we also check for CREATE
641 * permission on the database.
642 */
643 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
644 owner, ACL_CREATE);
645 if (aclresult != ACLCHECK_OK)
648
649 /*
650 * Non-superusers are required to set a password for authentication, and
651 * that password must be used by the target server, but the superuser can
652 * exempt a subscription from this requirement.
653 */
654 if (!opts.passwordrequired && !superuser_arg(owner))
656 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
657 errmsg("password_required=false is superuser-only"),
658 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
659
660 /*
661 * If built with appropriate switch, whine when regression-testing
662 * conventions for subscription names are violated.
663 */
664#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
665 if (strncmp(stmt->subname, "regress_", 8) != 0)
666 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
667#endif
668
669 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
670
671 /* Check if name is used */
672 subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
674 if (OidIsValid(subid))
675 {
678 errmsg("subscription \"%s\" already exists",
679 stmt->subname)));
680 }
681
682 /*
683 * Ensure that system configuration parameters are set appropriately to
684 * support retain_dead_tuples and max_retention_duration.
685 */
687 opts.retaindeadtuples, opts.retaindeadtuples,
688 (opts.maxretention > 0));
689
690 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
691 opts.slot_name == NULL)
692 opts.slot_name = stmt->subname;
693
694 /* The default for synchronous_commit of subscriptions is off. */
695 if (opts.synchronous_commit == NULL)
696 opts.synchronous_commit = "off";
697
698 conninfo = stmt->conninfo;
699 publications = stmt->publication;
700
701 /* Load the library providing us libpq calls. */
702 load_file("libpqwalreceiver", false);
703
704 /* Check the connection info string. */
705 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
706
707 /* Everything ok, form a new tuple. */
708 memset(values, 0, sizeof(values));
709 memset(nulls, false, sizeof(nulls));
710
711 subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
712 Anum_pg_subscription_oid);
713 values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
714 values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
715 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
716 values[Anum_pg_subscription_subname - 1] =
718 values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
719 values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
720 values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
721 values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
722 values[Anum_pg_subscription_subtwophasestate - 1] =
723 CharGetDatum(opts.twophase ?
724 LOGICALREP_TWOPHASE_STATE_PENDING :
725 LOGICALREP_TWOPHASE_STATE_DISABLED);
726 values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
727 values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
728 values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
729 values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
730 values[Anum_pg_subscription_subretaindeadtuples - 1] =
731 BoolGetDatum(opts.retaindeadtuples);
732 values[Anum_pg_subscription_submaxretention - 1] =
733 Int32GetDatum(opts.maxretention);
734 values[Anum_pg_subscription_subretentionactive - 1] =
735 Int32GetDatum(opts.retaindeadtuples);
736 values[Anum_pg_subscription_subconninfo - 1] =
737 CStringGetTextDatum(conninfo);
738 if (opts.slot_name)
739 values[Anum_pg_subscription_subslotname - 1] =
741 else
742 nulls[Anum_pg_subscription_subslotname - 1] = true;
743 values[Anum_pg_subscription_subsynccommit - 1] =
744 CStringGetTextDatum(opts.synchronous_commit);
745 values[Anum_pg_subscription_subpublications - 1] =
746 publicationListToArray(publications);
747 values[Anum_pg_subscription_suborigin - 1] =
749
750 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
751
752 /* Insert tuple into catalog. */
753 CatalogTupleInsert(rel, tup);
754 heap_freetuple(tup);
755
756 recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
757
758 /*
759 * A replication origin is currently created for all subscriptions,
760 * including those that only contain sequences or are otherwise empty.
761 *
762 * XXX: While this is technically unnecessary, optimizing it would require
763 * additional logic to skip origin creation during DDL operations and
764 * apply workers initialization, and to handle origin creation dynamically
765 * when tables are added to the subscription. It is not clear whether
766 * preventing creation of origins is worth additional complexity.
767 */
768 ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
769 replorigin_create(originname);
770
771 /*
772 * Connect to remote side to execute requested commands and fetch table
773 * and sequence info.
774 */
775 if (opts.connect)
776 {
777 char *err;
779 bool must_use_password;
780
781 /* Try to connect to the publisher. */
782 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
783 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
784 stmt->subname, &err);
785 if (!wrconn)
787 (errcode(ERRCODE_CONNECTION_FAILURE),
788 errmsg("subscription \"%s\" could not connect to the publisher: %s",
789 stmt->subname, err)));
790
791 PG_TRY();
792 {
793 bool has_tables = false;
794 List *pubrels;
795 char relation_state;
796
797 check_publications(wrconn, publications);
799 opts.copy_data,
800 opts.retaindeadtuples, opts.origin,
801 NULL, 0, stmt->subname);
803 opts.copy_data, opts.origin,
804 NULL, 0, stmt->subname);
805
806 if (opts.retaindeadtuples)
808
809 /*
810 * Set sync state based on if we were asked to do data copy or
811 * not.
812 */
813 relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
814
815 /*
816 * Build local relation status info. Relations are for both tables
817 * and sequences from the publisher.
818 */
819 pubrels = fetch_relation_list(wrconn, publications);
820
821 foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
822 {
823 Oid relid;
824 char relkind;
825 RangeVar *rv = pubrelinfo->rv;
826
827 relid = RangeVarGetRelid(rv, AccessShareLock, false);
828 relkind = get_rel_relkind(relid);
829
830 /* Check for supported relkind. */
831 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
832 rv->schemaname, rv->relname);
833 has_tables |= (relkind != RELKIND_SEQUENCE);
834 AddSubscriptionRelState(subid, relid, relation_state,
835 InvalidXLogRecPtr, true);
836 }
837
838 /*
839 * If requested, create permanent slot for the subscription. We
840 * won't use the initial snapshot for anything, so no need to
841 * export it.
842 *
843 * XXX: Similar to origins, it is not clear whether preventing the
844 * slot creation for empty and sequence-only subscriptions is
845 * worth additional complexity.
846 */
847 if (opts.create_slot)
848 {
849 bool twophase_enabled = false;
850
851 Assert(opts.slot_name);
852
853 /*
854 * Even if two_phase is set, don't create the slot with
855 * two-phase enabled. Will enable it once all the tables are
856 * synced and ready. This avoids race-conditions like prepared
857 * transactions being skipped due to changes not being applied
858 * due to checks in should_apply_changes_for_rel() when
859 * tablesync for the corresponding tables are in progress. See
860 * comments atop worker.c.
861 *
862 * Note that if tables were specified but copy_data is false
863 * then it is safe to enable two_phase up-front because those
864 * tables are already initially in READY state. When the
865 * subscription has no tables, we leave the twophase state as
866 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
867 * PUBLICATION to work.
868 */
869 if (opts.twophase && !opts.copy_data && has_tables)
870 twophase_enabled = true;
871
872 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
873 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
874
875 if (twophase_enabled)
876 UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
877
879 (errmsg("created replication slot \"%s\" on publisher",
880 opts.slot_name)));
881 }
882 }
883 PG_FINALLY();
884 {
886 }
887 PG_END_TRY();
888 }
889 else
891 (errmsg("subscription was created, but is not connected"),
892 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
893
895
897
898 /*
899 * Notify the launcher to start the apply worker if the subscription is
900 * enabled, or to create the conflict detection slot if retain_dead_tuples
901 * is enabled.
902 *
903 * Creating the conflict detection slot is essential even when the
904 * subscription is not enabled. This ensures that dead tuples are
905 * retained, which is necessary for accurately identifying the type of
906 * conflict during replication.
907 */
908 if (opts.enabled || opts.retaindeadtuples)
910
911 ObjectAddressSet(myself, SubscriptionRelationId, subid);
912
913 InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
914
915 return myself;
916}
917
918static void
920 List *validate_publications)
921{
922 char *err;
923 List *pubrels = NIL;
924 Oid *pubrel_local_oids;
925 List *subrel_states;
926 List *sub_remove_rels = NIL;
927 Oid *subrel_local_oids;
928 Oid *subseq_local_oids;
929 int subrel_count;
930 ListCell *lc;
931 int off;
932 int tbl_count = 0;
933 int seq_count = 0;
934 Relation rel = NULL;
935 typedef struct SubRemoveRels
936 {
937 Oid relid;
938 char state;
939 } SubRemoveRels;
940
942 bool must_use_password;
943
944 /* Load the library providing us libpq calls. */
945 load_file("libpqwalreceiver", false);
946
947 /* Try to connect to the publisher. */
948 must_use_password = sub->passwordrequired && !sub->ownersuperuser;
949 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
950 sub->name, &err);
951 if (!wrconn)
953 (errcode(ERRCODE_CONNECTION_FAILURE),
954 errmsg("subscription \"%s\" could not connect to the publisher: %s",
955 sub->name, err)));
956
957 PG_TRY();
958 {
959 if (validate_publications)
960 check_publications(wrconn, validate_publications);
961
962 /* Get the relation list from publisher. */
963 pubrels = fetch_relation_list(wrconn, sub->publications);
964
965 /* Get local relation list. */
966 subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
967 subrel_count = list_length(subrel_states);
968
969 /*
970 * Build qsorted arrays of local table oids and sequence oids for
971 * faster lookup. This can potentially contain all tables and
972 * sequences in the database so speed of lookup is important.
973 *
974 * We do not yet know the exact count of tables and sequences, so we
975 * allocate separate arrays for table OIDs and sequence OIDs based on
976 * the total number of relations (subrel_count).
977 */
978 subrel_local_oids = palloc(subrel_count * sizeof(Oid));
979 subseq_local_oids = palloc(subrel_count * sizeof(Oid));
980 foreach(lc, subrel_states)
981 {
983
984 if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
985 subseq_local_oids[seq_count++] = relstate->relid;
986 else
987 subrel_local_oids[tbl_count++] = relstate->relid;
988 }
989
990 qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
992 sub->retaindeadtuples, sub->origin,
993 subrel_local_oids, tbl_count,
994 sub->name);
995
996 qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
998 copy_data, sub->origin,
999 subseq_local_oids, seq_count,
1000 sub->name);
1001
1002 /*
1003 * Walk over the remote relations and try to match them to locally
1004 * known relations. If the relation is not known locally create a new
1005 * state for it.
1006 *
1007 * Also builds array of local oids of remote relations for the next
1008 * step.
1009 */
1010 off = 0;
1011 pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
1012
1013 foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
1014 {
1015 RangeVar *rv = pubrelinfo->rv;
1016 Oid relid;
1017 char relkind;
1018
1019 relid = RangeVarGetRelid(rv, AccessShareLock, false);
1020 relkind = get_rel_relkind(relid);
1021
1022 /* Check for supported relkind. */
1023 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1024 rv->schemaname, rv->relname);
1025
1026 pubrel_local_oids[off++] = relid;
1027
1028 if (!bsearch(&relid, subrel_local_oids,
1029 tbl_count, sizeof(Oid), oid_cmp) &&
1030 !bsearch(&relid, subseq_local_oids,
1031 seq_count, sizeof(Oid), oid_cmp))
1032 {
1033 AddSubscriptionRelState(sub->oid, relid,
1034 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
1035 InvalidXLogRecPtr, true);
1037 errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1038 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1039 rv->schemaname, rv->relname, sub->name));
1040 }
1041 }
1042
1043 /*
1044 * Next remove state for tables we should not care about anymore using
1045 * the data we collected above
1046 */
1047 qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
1048
1049 for (off = 0; off < tbl_count; off++)
1050 {
1051 Oid relid = subrel_local_oids[off];
1052
1053 if (!bsearch(&relid, pubrel_local_oids,
1054 list_length(pubrels), sizeof(Oid), oid_cmp))
1055 {
1056 char state;
1057 XLogRecPtr statelsn;
1058 SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));
1059
1060 /*
1061 * Lock pg_subscription_rel with AccessExclusiveLock to
1062 * prevent any race conditions with the apply worker
1063 * re-launching workers at the same time this code is trying
1064 * to remove those tables.
1065 *
1066 * Even if new worker for this particular rel is restarted it
1067 * won't be able to make any progress as we hold exclusive
1068 * lock on pg_subscription_rel till the transaction end. It
1069 * will simply exit as there is no corresponding rel entry.
1070 *
1071 * This locking also ensures that the state of rels won't
1072 * change till we are done with this refresh operation.
1073 */
1074 if (!rel)
1075 rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1076
1077 /* Last known rel state. */
1078 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1079
1080 RemoveSubscriptionRel(sub->oid, relid);
1081
1082 remove_rel->relid = relid;
1083 remove_rel->state = state;
1084
1085 sub_remove_rels = lappend(sub_remove_rels, remove_rel);
1086
1088
1089 /*
1090 * For READY state, we would have already dropped the
1091 * tablesync origin.
1092 */
1093 if (state != SUBREL_STATE_READY)
1094 {
1095 char originname[NAMEDATALEN];
1096
1097 /*
1098 * Drop the tablesync's origin tracking if exists.
1099 *
1100 * It is possible that the origin is not yet created for
1101 * tablesync worker, this can happen for the states before
1102 * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1103 * apply worker can also concurrently try to drop the
1104 * origin and by this time the origin might be already
1105 * removed. For these reasons, passing missing_ok = true.
1106 */
1107 ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1108 sizeof(originname));
1109 replorigin_drop_by_name(originname, true, false);
1110 }
1111
1113 (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1115 get_rel_name(relid),
1116 sub->name)));
1117 }
1118 }
1119
1120 /*
1121 * Drop the tablesync slots associated with removed tables. This has
1122 * to be at the end because otherwise if there is an error while doing
1123 * the database operations we won't be able to rollback dropped slots.
1124 */
1125 foreach_ptr(SubRemoveRels, rel, sub_remove_rels)
1126 {
1127 if (rel->state != SUBREL_STATE_READY &&
1128 rel->state != SUBREL_STATE_SYNCDONE)
1129 {
1130 char syncslotname[NAMEDATALEN] = {0};
1131
1132 /*
1133 * For READY/SYNCDONE states we know the tablesync slot has
1134 * already been dropped by the tablesync worker.
1135 *
1136 * For other states, there is no certainty, maybe the slot
1137 * does not exist yet. Also, if we fail after removing some of
1138 * the slots, next time, it will again try to drop already
1139 * dropped slots and fail. For these reasons, we allow
1140 * missing_ok = true for the drop.
1141 */
1142 ReplicationSlotNameForTablesync(sub->oid, rel->relid,
1143 syncslotname, sizeof(syncslotname));
1144 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1145 }
1146 }
1147
1148 /*
1149 * Next remove state for sequences we should not care about anymore
1150 * using the data we collected above
1151 */
1152 for (off = 0; off < seq_count; off++)
1153 {
1154 Oid relid = subseq_local_oids[off];
1155
1156 if (!bsearch(&relid, pubrel_local_oids,
1157 list_length(pubrels), sizeof(Oid), oid_cmp))
1158 {
1159 /*
1160 * This locking ensures that the state of rels won't change
1161 * till we are done with this refresh operation.
1162 */
1163 if (!rel)
1164 rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1165
1166 RemoveSubscriptionRel(sub->oid, relid);
1167
1169 errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1171 get_rel_name(relid),
1172 sub->name));
1173 }
1174 }
1175 }
1176 PG_FINALLY();
1177 {
1179 }
1180 PG_END_TRY();
1181
1182 if (rel)
1183 table_close(rel, NoLock);
1184}
1185
1186/*
1187 * Marks all sequences with INIT state.
1188 */
1189static void
1191{
1192 char *err = NULL;
1194 bool must_use_password;
1195
1196 /* Load the library providing us libpq calls. */
1197 load_file("libpqwalreceiver", false);
1198
1199 /* Try to connect to the publisher. */
1200 must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1201 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1202 sub->name, &err);
1203 if (!wrconn)
1204 ereport(ERROR,
1205 errcode(ERRCODE_CONNECTION_FAILURE),
1206 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1207 sub->name, err));
1208
1209 PG_TRY();
1210 {
1211 List *subrel_states;
1212
1214 sub->origin, NULL, 0, sub->name);
1215
1216 /* Get local sequence list. */
1217 subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1218 foreach_ptr(SubscriptionRelState, subrel, subrel_states)
1219 {
1220 Oid relid = subrel->relid;
1221
1222 UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
1223 InvalidXLogRecPtr, false);
1225 errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1227 get_rel_name(relid),
1228 sub->name));
1229 }
1230 }
1231 PG_FINALLY();
1232 {
1234 }
1235 PG_END_TRY();
1236}
1237
1238/*
1239 * Common checks for altering failover, two_phase, and retain_dead_tuples
1240 * options.
1241 */
1242static void
1244 bool slot_needs_update, bool isTopLevel)
1245{
1246 Assert(strcmp(option, "failover") == 0 ||
1247 strcmp(option, "two_phase") == 0 ||
1248 strcmp(option, "retain_dead_tuples") == 0);
1249
1250 /*
1251 * Altering the retain_dead_tuples option does not update the slot on the
1252 * publisher.
1253 */
1254 Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1255
1256 /*
1257 * Do not allow changing the option if the subscription is enabled. This
1258 * is because both failover and two_phase options of the slot on the
1259 * publisher cannot be modified if the slot is currently acquired by the
1260 * existing walsender.
1261 *
1262 * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1263 * the publisher by the existing walsender, so we could have allowed that
1264 * even when the subscription is enabled. But we kept this restriction for
1265 * the sake of consistency and simplicity.
1266 *
1267 * Additionally, do not allow changing the retain_dead_tuples option when
1268 * the subscription is enabled to prevent race conditions arising from the
1269 * new option value being acknowledged asynchronously by the launcher and
1270 * apply workers.
1271 *
1272 * Without the restriction, a race condition may arise when a user
1273 * disables and immediately re-enables the retain_dead_tuples option. In
1274 * this case, the launcher might drop the slot upon noticing the disabled
1275 * action, while the apply worker may keep maintaining
1276 * oldest_nonremovable_xid without noticing the option change. During this
1277 * period, a transaction ID wraparound could falsely make this ID appear
1278 * as if it originates from the future w.r.t the transaction ID stored in
1279 * the slot maintained by launcher.
1280 *
1281 * Similarly, if the user enables retain_dead_tuples concurrently with the
1282 * launcher starting the worker, the apply worker may start calculating
1283 * oldest_nonremovable_xid before the launcher notices the enable action.
1284 * Consequently, the launcher may update slot.xmin to a newer value than
1285 * that maintained by the worker. In subsequent cycles, upon integrating
1286 * the worker's oldest_nonremovable_xid, the launcher might detect a
1287 * retreat in the calculated xmin, necessitating additional handling.
1288 *
1289 * XXX To address the above race conditions, we can define
1290 * oldest_nonremovable_xid as FullTransactionID and adds the check to
1291 * disallow retreating the conflict slot's xmin. For now, we kept the
1292 * implementation simple by disallowing change to the retain_dead_tuples,
1293 * but in the future we can change this after some more analysis.
1294 *
1295 * Note that we could restrict only the enabling of retain_dead_tuples to
1296 * avoid the race conditions described above, but we maintain the
1297 * restriction for both enable and disable operations for the sake of
1298 * consistency.
1299 */
1300 if (sub->enabled)
1301 ereport(ERROR,
1302 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1303 errmsg("cannot set option \"%s\" for enabled subscription",
1304 option)));
1305
1306 if (slot_needs_update)
1307 {
1308 StringInfoData cmd;
1309
1310 /*
1311 * A valid slot must be associated with the subscription for us to
1312 * modify any of the slot's properties.
1313 */
1314 if (!sub->slotname)
1315 ereport(ERROR,
1316 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1317 errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1318 option)));
1319
1320 /* The changed option of the slot can't be rolled back. */
1321 initStringInfo(&cmd);
1322 appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1323
1324 PreventInTransactionBlock(isTopLevel, cmd.data);
1325 pfree(cmd.data);
1326 }
1327}
1328
1329/*
1330 * Alter the existing subscription.
1331 */
1334 bool isTopLevel)
1335{
1336 Relation rel;
1337 ObjectAddress myself;
1338 bool nulls[Natts_pg_subscription];
1339 bool replaces[Natts_pg_subscription];
1340 Datum values[Natts_pg_subscription];
1341 HeapTuple tup;
1342 Oid subid;
1343 bool update_tuple = false;
1344 bool update_failover = false;
1345 bool update_two_phase = false;
1346 bool check_pub_rdt = false;
1347 bool retain_dead_tuples;
1348 int max_retention;
1349 bool retention_active;
1350 char *origin;
1351 Subscription *sub;
1353 bits32 supported_opts;
1354 SubOpts opts = {0};
1355
1356 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1357
1358 /* Fetch the existing tuple. */
1359 tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1360 CStringGetDatum(stmt->subname));
1361
1362 if (!HeapTupleIsValid(tup))
1363 ereport(ERROR,
1364 (errcode(ERRCODE_UNDEFINED_OBJECT),
1365 errmsg("subscription \"%s\" does not exist",
1366 stmt->subname)));
1367
1368 form = (Form_pg_subscription) GETSTRUCT(tup);
1369 subid = form->oid;
1370
1371 /* must be owner */
1372 if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1374 stmt->subname);
1375
1376 sub = GetSubscription(subid, false);
1377
1378 retain_dead_tuples = sub->retaindeadtuples;
1379 origin = sub->origin;
1380 max_retention = sub->maxretention;
1381 retention_active = sub->retentionactive;
1382
1383 /*
1384 * Don't allow non-superuser modification of a subscription with
1385 * password_required=false.
1386 */
1387 if (!sub->passwordrequired && !superuser())
1388 ereport(ERROR,
1389 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1390 errmsg("password_required=false is superuser-only"),
1391 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1392
1393 /* Lock the subscription so nobody else can do anything with it. */
1394 LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1395
1396 /* Form a new tuple. */
1397 memset(values, 0, sizeof(values));
1398 memset(nulls, false, sizeof(nulls));
1399 memset(replaces, false, sizeof(replaces));
1400
1401 switch (stmt->kind)
1402 {
1404 {
1405 supported_opts = (SUBOPT_SLOT_NAME |
1414
1415 parse_subscription_options(pstate, stmt->options,
1416 supported_opts, &opts);
1417
1418 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1419 {
1420 /*
1421 * The subscription must be disabled to allow slot_name as
1422 * 'none', otherwise, the apply worker will repeatedly try
1423 * to stream the data using that slot_name which neither
1424 * exists on the publisher nor the user will be allowed to
1425 * create it.
1426 */
1427 if (sub->enabled && !opts.slot_name)
1428 ereport(ERROR,
1429 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1430 errmsg("cannot set %s for enabled subscription",
1431 "slot_name = NONE")));
1432
1433 if (opts.slot_name)
1434 values[Anum_pg_subscription_subslotname - 1] =
1436 else
1437 nulls[Anum_pg_subscription_subslotname - 1] = true;
1438 replaces[Anum_pg_subscription_subslotname - 1] = true;
1439 }
1440
1441 if (opts.synchronous_commit)
1442 {
1443 values[Anum_pg_subscription_subsynccommit - 1] =
1444 CStringGetTextDatum(opts.synchronous_commit);
1445 replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1446 }
1447
1448 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1449 {
1450 values[Anum_pg_subscription_subbinary - 1] =
1451 BoolGetDatum(opts.binary);
1452 replaces[Anum_pg_subscription_subbinary - 1] = true;
1453 }
1454
1455 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1456 {
1457 values[Anum_pg_subscription_substream - 1] =
1458 CharGetDatum(opts.streaming);
1459 replaces[Anum_pg_subscription_substream - 1] = true;
1460 }
1461
1462 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1463 {
1464 values[Anum_pg_subscription_subdisableonerr - 1]
1465 = BoolGetDatum(opts.disableonerr);
1466 replaces[Anum_pg_subscription_subdisableonerr - 1]
1467 = true;
1468 }
1469
1470 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1471 {
1472 /* Non-superuser may not disable password_required. */
1473 if (!opts.passwordrequired && !superuser())
1474 ereport(ERROR,
1475 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1476 errmsg("password_required=false is superuser-only"),
1477 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1478
1479 values[Anum_pg_subscription_subpasswordrequired - 1]
1480 = BoolGetDatum(opts.passwordrequired);
1481 replaces[Anum_pg_subscription_subpasswordrequired - 1]
1482 = true;
1483 }
1484
1485 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1486 {
1487 values[Anum_pg_subscription_subrunasowner - 1] =
1488 BoolGetDatum(opts.runasowner);
1489 replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1490 }
1491
1492 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1493 {
1494 /*
1495 * We need to update both the slot and the subscription
1496 * for the two_phase option. We can enable the two_phase
1497 * option for a slot only once the initial data
1498 * synchronization is done. This is to avoid missing some
1499 * data as explained in comments atop worker.c.
1500 */
1501 update_two_phase = !opts.twophase;
1502
1503 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1504 isTopLevel);
1505
1506 /*
1507 * Modifying the two_phase slot option requires a slot
1508 * lookup by slot name, so changing the slot name at the
1509 * same time is not allowed.
1510 */
1511 if (update_two_phase &&
1512 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1513 ereport(ERROR,
1514 (errcode(ERRCODE_SYNTAX_ERROR),
1515 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1516
1517 /*
1518 * Note that workers may still survive even if the
1519 * subscription has been disabled.
1520 *
1521 * Ensure workers have already been exited to avoid
1522 * getting prepared transactions while we are disabling
1523 * the two_phase option. Otherwise, the changes of an
1524 * already prepared transaction can be replicated again
1525 * along with its corresponding commit, leading to
1526 * duplicate data or errors.
1527 */
1528 if (logicalrep_workers_find(subid, true, true))
1529 ereport(ERROR,
1530 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1531 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1532 errhint("Try again after some time.")));
1533
1534 /*
1535 * two_phase cannot be disabled if there are any
1536 * uncommitted prepared transactions present otherwise it
1537 * can lead to duplicate data or errors as explained in
1538 * the comment above.
1539 */
1540 if (update_two_phase &&
1541 sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1542 LookupGXactBySubid(subid))
1543 ereport(ERROR,
1544 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1545 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1546 errhint("Resolve these transactions and try again.")));
1547
1548 /* Change system catalog accordingly */
1549 values[Anum_pg_subscription_subtwophasestate - 1] =
1550 CharGetDatum(opts.twophase ?
1551 LOGICALREP_TWOPHASE_STATE_PENDING :
1552 LOGICALREP_TWOPHASE_STATE_DISABLED);
1553 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1554 }
1555
1556 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1557 {
1558 /*
1559 * Similar to the two_phase case above, we need to update
1560 * the failover option for both the slot and the
1561 * subscription.
1562 */
1563 update_failover = true;
1564
1565 CheckAlterSubOption(sub, "failover", update_failover,
1566 isTopLevel);
1567
1568 values[Anum_pg_subscription_subfailover - 1] =
1569 BoolGetDatum(opts.failover);
1570 replaces[Anum_pg_subscription_subfailover - 1] = true;
1571 }
1572
1573 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1574 {
1575 values[Anum_pg_subscription_subretaindeadtuples - 1] =
1576 BoolGetDatum(opts.retaindeadtuples);
1577 replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1578
1579 /*
1580 * Update the retention status only if there's a change in
1581 * the retain_dead_tuples option value.
1582 *
1583 * Automatically marking retention as active when
1584 * retain_dead_tuples is enabled may not always be ideal,
1585 * especially if retention was previously stopped and the
1586 * user toggles retain_dead_tuples without adjusting the
1587 * publisher workload. However, this behavior provides a
1588 * convenient way for users to manually refresh the
1589 * retention status. Since retention will be stopped again
1590 * unless the publisher workload is reduced, this approach
1591 * is acceptable for now.
1592 */
1593 if (opts.retaindeadtuples != sub->retaindeadtuples)
1594 {
1595 values[Anum_pg_subscription_subretentionactive - 1] =
1596 BoolGetDatum(opts.retaindeadtuples);
1597 replaces[Anum_pg_subscription_subretentionactive - 1] = true;
1598
1599 retention_active = opts.retaindeadtuples;
1600 }
1601
1602 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1603
1604 /*
1605 * Workers may continue running even after the
1606 * subscription has been disabled.
1607 *
1608 * To prevent race conditions (as described in
1609 * CheckAlterSubOption()), ensure that all worker
1610 * processes have already exited before proceeding.
1611 */
1612 if (logicalrep_workers_find(subid, true, true))
1613 ereport(ERROR,
1614 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1615 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1616 errhint("Try again after some time.")));
1617
1618 /*
1619 * Notify the launcher to manage the replication slot for
1620 * conflict detection. This ensures that replication slot
1621 * is efficiently handled (created, updated, or dropped)
1622 * in response to any configuration changes.
1623 */
1625
1626 check_pub_rdt = opts.retaindeadtuples;
1627 retain_dead_tuples = opts.retaindeadtuples;
1628 }
1629
1630 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1631 {
1632 values[Anum_pg_subscription_submaxretention - 1] =
1633 Int32GetDatum(opts.maxretention);
1634 replaces[Anum_pg_subscription_submaxretention - 1] = true;
1635
1636 max_retention = opts.maxretention;
1637 }
1638
1639 /*
1640 * Ensure that system configuration parameters are set
1641 * appropriately to support retain_dead_tuples and
1642 * max_retention_duration.
1643 */
1644 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1645 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1647 retain_dead_tuples,
1648 retention_active,
1649 (max_retention > 0));
1650
1651 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1652 {
1653 values[Anum_pg_subscription_suborigin - 1] =
1654 CStringGetTextDatum(opts.origin);
1655 replaces[Anum_pg_subscription_suborigin - 1] = true;
1656
1657 /*
1658 * Check if changes from different origins may be received
1659 * from the publisher when the origin is changed to ANY
1660 * and retain_dead_tuples is enabled.
1661 */
1662 check_pub_rdt = retain_dead_tuples &&
1663 pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1664
1665 origin = opts.origin;
1666 }
1667
1668 update_tuple = true;
1669 break;
1670 }
1671
1673 {
1674 parse_subscription_options(pstate, stmt->options,
1676 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1677
1678 if (!sub->slotname && opts.enabled)
1679 ereport(ERROR,
1680 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1681 errmsg("cannot enable subscription that does not have a slot name")));
1682
1683 /*
1684 * Check track_commit_timestamp only when enabling the
1685 * subscription in case it was disabled after creation. See
1686 * comments atop CheckSubDeadTupleRetention() for details.
1687 */
1688 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1690 sub->retentionactive, false);
1691
1692 values[Anum_pg_subscription_subenabled - 1] =
1693 BoolGetDatum(opts.enabled);
1694 replaces[Anum_pg_subscription_subenabled - 1] = true;
1695
1696 if (opts.enabled)
1698
1699 update_tuple = true;
1700
1701 /*
1702 * The subscription might be initially created with
1703 * connect=false and retain_dead_tuples=true, meaning the
1704 * remote server's status may not be checked. Ensure this
1705 * check is conducted now.
1706 */
1707 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1708 break;
1709 }
1710
1712 /* Load the library providing us libpq calls. */
1713 load_file("libpqwalreceiver", false);
1714 /* Check the connection info string. */
1715 walrcv_check_conninfo(stmt->conninfo,
1716 sub->passwordrequired && !sub->ownersuperuser);
1717
1718 values[Anum_pg_subscription_subconninfo - 1] =
1719 CStringGetTextDatum(stmt->conninfo);
1720 replaces[Anum_pg_subscription_subconninfo - 1] = true;
1721 update_tuple = true;
1722
1723 /*
1724 * Since the remote server configuration might have changed,
1725 * perform a check to ensure it permits enabling
1726 * retain_dead_tuples.
1727 */
1728 check_pub_rdt = sub->retaindeadtuples;
1729 break;
1730
1732 {
1733 supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1734 parse_subscription_options(pstate, stmt->options,
1735 supported_opts, &opts);
1736
1737 values[Anum_pg_subscription_subpublications - 1] =
1738 publicationListToArray(stmt->publication);
1739 replaces[Anum_pg_subscription_subpublications - 1] = true;
1740
1741 update_tuple = true;
1742
1743 /* Refresh if user asked us to. */
1744 if (opts.refresh)
1745 {
1746 if (!sub->enabled)
1747 ereport(ERROR,
1748 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1749 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1750 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1751
1752 /*
1753 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1754 * why this is not allowed.
1755 */
1756 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1757 ereport(ERROR,
1758 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1759 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1760 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1761
1762 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1763
1764 /* Make sure refresh sees the new list of publications. */
1765 sub->publications = stmt->publication;
1766
1767 AlterSubscription_refresh(sub, opts.copy_data,
1768 stmt->publication);
1769 }
1770
1771 break;
1772 }
1773
1776 {
1777 List *publist;
1778 bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1779
1780 supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1781 parse_subscription_options(pstate, stmt->options,
1782 supported_opts, &opts);
1783
1784 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1785 values[Anum_pg_subscription_subpublications - 1] =
1786 publicationListToArray(publist);
1787 replaces[Anum_pg_subscription_subpublications - 1] = true;
1788
1789 update_tuple = true;
1790
1791 /* Refresh if user asked us to. */
1792 if (opts.refresh)
1793 {
1794 /* We only need to validate user specified publications. */
1795 List *validate_publications = (isadd) ? stmt->publication : NULL;
1796
1797 if (!sub->enabled)
1798 ereport(ERROR,
1799 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1800 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1801 /* translator: %s is an SQL ALTER command */
1802 errhint("Use %s instead.",
1803 isadd ?
1804 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1805 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1806
1807 /*
1808 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1809 * why this is not allowed.
1810 */
1811 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1812 ereport(ERROR,
1813 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1814 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1815 /* translator: %s is an SQL ALTER command */
1816 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1817 isadd ?
1818 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1819 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1820
1821 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1822
1823 /* Refresh the new list of publications. */
1824 sub->publications = publist;
1825
1826 AlterSubscription_refresh(sub, opts.copy_data,
1827 validate_publications);
1828 }
1829
1830 break;
1831 }
1832
1834 {
1835 if (!sub->enabled)
1836 ereport(ERROR,
1837 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1838 errmsg("%s is not allowed for disabled subscriptions",
1839 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
1840
1841 parse_subscription_options(pstate, stmt->options,
1843
1844 /*
1845 * The subscription option "two_phase" requires that
1846 * replication has passed the initial table synchronization
1847 * phase before the two_phase becomes properly enabled.
1848 *
1849 * But, having reached this two-phase commit "enabled" state
1850 * we must not allow any subsequent table initialization to
1851 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
1852 * disallowed when the user had requested two_phase = on mode.
1853 *
1854 * The exception to this restriction is when copy_data =
1855 * false, because when copy_data is false the tablesync will
1856 * start already in READY state and will exit directly without
1857 * doing anything.
1858 *
1859 * For more details see comments atop worker.c.
1860 */
1861 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1862 ereport(ERROR,
1863 (errcode(ERRCODE_SYNTAX_ERROR),
1864 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
1865 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1866
1867 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
1868
1869 AlterSubscription_refresh(sub, opts.copy_data, NULL);
1870
1871 break;
1872 }
1873
1875 {
1876 if (!sub->enabled)
1877 ereport(ERROR,
1878 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1879 errmsg("%s is not allowed for disabled subscriptions",
1880 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
1881
1883
1884 break;
1885 }
1886
1888 {
1889 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1890
1891 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1892 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1893
1894 /*
1895 * If the user sets subskiplsn, we do a sanity check to make
1896 * sure that the specified LSN is a probable value.
1897 */
1898 if (XLogRecPtrIsValid(opts.lsn))
1899 {
1900 RepOriginId originid;
1901 char originname[NAMEDATALEN];
1902 XLogRecPtr remote_lsn;
1903
1905 originname, sizeof(originname));
1906 originid = replorigin_by_name(originname, false);
1907 remote_lsn = replorigin_get_progress(originid, false);
1908
1909 /* Check the given LSN is at least a future LSN */
1910 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
1911 ereport(ERROR,
1912 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1913 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1914 LSN_FORMAT_ARGS(opts.lsn),
1915 LSN_FORMAT_ARGS(remote_lsn))));
1916 }
1917
1918 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1919 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1920
1921 update_tuple = true;
1922 break;
1923 }
1924
1925 default:
1926 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1927 stmt->kind);
1928 }
1929
1930 /* Update the catalog if needed. */
1931 if (update_tuple)
1932 {
1933 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1934 replaces);
1935
1936 CatalogTupleUpdate(rel, &tup->t_self, tup);
1937
1938 heap_freetuple(tup);
1939 }
1940
1941 /*
1942 * Try to acquire the connection necessary either for modifying the slot
1943 * or for checking if the remote server permits enabling
1944 * retain_dead_tuples.
1945 *
1946 * This has to be at the end because otherwise if there is an error while
1947 * doing the database operations we won't be able to rollback altered
1948 * slot.
1949 */
1950 if (update_failover || update_two_phase || check_pub_rdt)
1951 {
1952 bool must_use_password;
1953 char *err;
1955
1956 /* Load the library providing us libpq calls. */
1957 load_file("libpqwalreceiver", false);
1958
1959 /*
1960 * Try to connect to the publisher, using the new connection string if
1961 * available.
1962 */
1963 must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1964 wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
1965 true, true, must_use_password, sub->name,
1966 &err);
1967 if (!wrconn)
1968 ereport(ERROR,
1969 (errcode(ERRCODE_CONNECTION_FAILURE),
1970 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1971 sub->name, err)));
1972
1973 PG_TRY();
1974 {
1975 if (retain_dead_tuples)
1977
1979 retain_dead_tuples, origin, NULL, 0,
1980 sub->name);
1981
1982 if (update_failover || update_two_phase)
1984 update_failover ? &opts.failover : NULL,
1985 update_two_phase ? &opts.twophase : NULL);
1986 }
1987 PG_FINALLY();
1988 {
1990 }
1991 PG_END_TRY();
1992 }
1993
1995
1996 ObjectAddressSet(myself, SubscriptionRelationId, subid);
1997
1998 InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1999
2000 /* Wake up related replication workers to handle this change quickly. */
2002
2003 return myself;
2004}
2005
2006/*
2007 * Drop a subscription
2008 */
2009void
2011{
2012 Relation rel;
2013 ObjectAddress myself;
2014 HeapTuple tup;
2015 Oid subid;
2016 Oid subowner;
2017 Datum datum;
2018 bool isnull;
2019 char *subname;
2020 char *conninfo;
2021 char *slotname;
2022 List *subworkers;
2023 ListCell *lc;
2024 char originname[NAMEDATALEN];
2025 char *err = NULL;
2028 List *rstates;
2029 bool must_use_password;
2030
2031 /*
2032 * The launcher may concurrently start a new worker for this subscription.
2033 * During initialization, the worker checks for subscription validity and
2034 * exits if the subscription has already been dropped. See
2035 * InitializeLogRepWorker.
2036 */
2037 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2038
2039 tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2040 CStringGetDatum(stmt->subname));
2041
2042 if (!HeapTupleIsValid(tup))
2043 {
2044 table_close(rel, NoLock);
2045
2046 if (!stmt->missing_ok)
2047 ereport(ERROR,
2048 (errcode(ERRCODE_UNDEFINED_OBJECT),
2049 errmsg("subscription \"%s\" does not exist",
2050 stmt->subname)));
2051 else
2053 (errmsg("subscription \"%s\" does not exist, skipping",
2054 stmt->subname)));
2055
2056 return;
2057 }
2058
2059 form = (Form_pg_subscription) GETSTRUCT(tup);
2060 subid = form->oid;
2061 subowner = form->subowner;
2062 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2063
2064 /* must be owner */
2065 if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
2067 stmt->subname);
2068
2069 /* DROP hook for the subscription being removed */
2070 InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
2071
2072 /*
2073 * Lock the subscription so nobody else can do anything with it (including
2074 * the replication workers).
2075 */
2076 LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
2077
2078 /* Get subname */
2079 datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2080 Anum_pg_subscription_subname);
2081 subname = pstrdup(NameStr(*DatumGetName(datum)));
2082
2083 /* Get conninfo */
2084 datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2085 Anum_pg_subscription_subconninfo);
2086 conninfo = TextDatumGetCString(datum);
2087
2088 /* Get slotname */
2089 datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
2090 Anum_pg_subscription_subslotname, &isnull);
2091 if (!isnull)
2092 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2093 else
2094 slotname = NULL;
2095
2096 /*
2097 * Since dropping a replication slot is not transactional, the replication
2098 * slot stays dropped even if the transaction rolls back. So we cannot
2099 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2100 * replication slot. Also, in this case, we report a message for dropping
2101 * the subscription to the cumulative stats system.
2102 *
2103 * XXX The command name should really be something like "DROP SUBSCRIPTION
2104 * of a subscription that is associated with a replication slot", but we
2105 * don't have the proper facilities for that.
2106 */
2107 if (slotname)
2108 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2109
2110 ObjectAddressSet(myself, SubscriptionRelationId, subid);
2111 EventTriggerSQLDropAddObject(&myself, true, true);
2112
2113 /* Remove the tuple from catalog. */
2114 CatalogTupleDelete(rel, &tup->t_self);
2115
2116 ReleaseSysCache(tup);
2117
2118 /*
2119 * Stop all the subscription workers immediately.
2120 *
2121 * This is necessary if we are dropping the replication slot, so that the
2122 * slot becomes accessible.
2123 *
2124 * It is also necessary if the subscription is disabled and was disabled
2125 * in the same transaction. Then the workers haven't seen the disabling
2126 * yet and will still be running, leading to hangs later when we want to
2127 * drop the replication origin. If the subscription was disabled before
2128 * this transaction, then there shouldn't be any workers left, so this
2129 * won't make a difference.
2130 *
2131 * New workers won't be started because we hold an exclusive lock on the
2132 * subscription till the end of the transaction.
2133 */
2134 subworkers = logicalrep_workers_find(subid, false, true);
2135 foreach(lc, subworkers)
2136 {
2138
2140 }
2141 list_free(subworkers);
2142
2143 /*
2144 * Remove the no-longer-useful entry in the launcher's table of apply
2145 * worker start times.
2146 *
2147 * If this transaction rolls back, the launcher might restart a failed
2148 * apply worker before wal_retrieve_retry_interval milliseconds have
2149 * elapsed, but that's pretty harmless.
2150 */
2152
2153 /*
2154 * Cleanup of tablesync replication origins.
2155 *
2156 * Any READY-state relations would already have dealt with clean-ups.
2157 *
2158 * Note that the state can't change because we have already stopped both
2159 * the apply and tablesync workers and they can't restart because of
2160 * exclusive lock on the subscription.
2161 */
2162 rstates = GetSubscriptionRelations(subid, true, false, true);
2163 foreach(lc, rstates)
2164 {
2166 Oid relid = rstate->relid;
2167
2168 /* Only cleanup resources of tablesync workers */
2169 if (!OidIsValid(relid))
2170 continue;
2171
2172 /*
2173 * Drop the tablesync's origin tracking if exists.
2174 *
2175 * It is possible that the origin is not yet created for tablesync
2176 * worker so passing missing_ok = true. This can happen for the states
2177 * before SUBREL_STATE_FINISHEDCOPY.
2178 */
2179 ReplicationOriginNameForLogicalRep(subid, relid, originname,
2180 sizeof(originname));
2181 replorigin_drop_by_name(originname, true, false);
2182 }
2183
2184 /* Clean up dependencies */
2185 deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
2186
2187 /* Remove any associated relation synchronization states. */
2189
2190 /* Remove the origin tracking if exists. */
2191 ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
2192 replorigin_drop_by_name(originname, true, false);
2193
2194 /*
2195 * Tell the cumulative stats system that the subscription is getting
2196 * dropped.
2197 */
2199
2200 /*
2201 * If there is no slot associated with the subscription, we can finish
2202 * here.
2203 */
2204 if (!slotname && rstates == NIL)
2205 {
2206 table_close(rel, NoLock);
2207 return;
2208 }
2209
2210 /*
2211 * Try to acquire the connection necessary for dropping slots.
2212 *
2213 * Note: If the slotname is NONE/NULL then we allow the command to finish
2214 * and users need to manually cleanup the apply and tablesync worker slots
2215 * later.
2216 *
2217 * This has to be at the end because otherwise if there is an error while
2218 * doing the database operations we won't be able to rollback dropped
2219 * slot.
2220 */
2221 load_file("libpqwalreceiver", false);
2222
2223 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2224 subname, &err);
2225 if (wrconn == NULL)
2226 {
2227 if (!slotname)
2228 {
2229 /* be tidy */
2230 list_free(rstates);
2231 table_close(rel, NoLock);
2232 return;
2233 }
2234 else
2235 {
2236 ReportSlotConnectionError(rstates, subid, slotname, err);
2237 }
2238 }
2239
2240 PG_TRY();
2241 {
2242 foreach(lc, rstates)
2243 {
2245 Oid relid = rstate->relid;
2246
2247 /* Only cleanup resources of tablesync workers */
2248 if (!OidIsValid(relid))
2249 continue;
2250
2251 /*
2252 * Drop the tablesync slots associated with removed tables.
2253 *
2254 * For SYNCDONE/READY states, the tablesync slot is known to have
2255 * already been dropped by the tablesync worker.
2256 *
2257 * For other states, there is no certainty, maybe the slot does
2258 * not exist yet. Also, if we fail after removing some of the
2259 * slots, next time, it will again try to drop already dropped
2260 * slots and fail. For these reasons, we allow missing_ok = true
2261 * for the drop.
2262 */
2263 if (rstate->state != SUBREL_STATE_SYNCDONE)
2264 {
2265 char syncslotname[NAMEDATALEN] = {0};
2266
2267 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2268 sizeof(syncslotname));
2269 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2270 }
2271 }
2272
2273 list_free(rstates);
2274
2275 /*
2276 * If there is a slot associated with the subscription, then drop the
2277 * replication slot at the publisher.
2278 */
2279 if (slotname)
2280 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2281 }
2282 PG_FINALLY();
2283 {
2285 }
2286 PG_END_TRY();
2287
2288 table_close(rel, NoLock);
2289}
2290
2291/*
2292 * Drop the replication slot at the publisher node using the replication
2293 * connection.
2294 *
2295 * missing_ok - if true then only issue a LOG message if the slot doesn't
2296 * exist.
2297 */
2298void
2299ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2300{
2301 StringInfoData cmd;
2302
2303 Assert(wrconn);
2304
2305 load_file("libpqwalreceiver", false);
2306
2307 initStringInfo(&cmd);
2308 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2309
2310 PG_TRY();
2311 {
2312 WalRcvExecResult *res;
2313
2314 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2315
2316 if (res->status == WALRCV_OK_COMMAND)
2317 {
2318 /* NOTICE. Success. */
2320 (errmsg("dropped replication slot \"%s\" on publisher",
2321 slotname)));
2322 }
2323 else if (res->status == WALRCV_ERROR &&
2324 missing_ok &&
2325 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2326 {
2327 /* LOG. Error, but missing_ok = true. */
2328 ereport(LOG,
2329 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2330 slotname, res->err)));
2331 }
2332 else
2333 {
2334 /* ERROR. */
2335 ereport(ERROR,
2336 (errcode(ERRCODE_CONNECTION_FAILURE),
2337 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2338 slotname, res->err)));
2339 }
2340
2342 }
2343 PG_FINALLY();
2344 {
2345 pfree(cmd.data);
2346 }
2347 PG_END_TRY();
2348}
2349
2350/*
2351 * Internal workhorse for changing a subscription owner
2352 */
2353static void
2355{
2357 AclResult aclresult;
2358
2359 form = (Form_pg_subscription) GETSTRUCT(tup);
2360
2361 if (form->subowner == newOwnerId)
2362 return;
2363
2364 if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
2366 NameStr(form->subname));
2367
2368 /*
2369 * Don't allow non-superuser modification of a subscription with
2370 * password_required=false.
2371 */
2372 if (!form->subpasswordrequired && !superuser())
2373 ereport(ERROR,
2374 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2375 errmsg("password_required=false is superuser-only"),
2376 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2377
2378 /* Must be able to become new owner */
2379 check_can_set_role(GetUserId(), newOwnerId);
2380
2381 /*
2382 * current owner must have CREATE on database
2383 *
2384 * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2385 * other object types behave differently (e.g. you can't give a table to a
2386 * user who lacks CREATE privileges on a schema).
2387 */
2388 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2390 if (aclresult != ACLCHECK_OK)
2393
2394 form->subowner = newOwnerId;
2395 CatalogTupleUpdate(rel, &tup->t_self, tup);
2396
2397 /* Update owner dependency reference */
2398 changeDependencyOnOwner(SubscriptionRelationId,
2399 form->oid,
2400 newOwnerId);
2401
2402 InvokeObjectPostAlterHook(SubscriptionRelationId,
2403 form->oid, 0);
2404
2405 /* Wake up related background processes to handle this change quickly. */
2408}
2409
2410/*
2411 * Change subscription owner -- by name
2412 */
2414AlterSubscriptionOwner(const char *name, Oid newOwnerId)
2415{
2416 Oid subid;
2417 HeapTuple tup;
2418 Relation rel;
2419 ObjectAddress address;
2421
2422 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2423
2424 tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2426
2427 if (!HeapTupleIsValid(tup))
2428 ereport(ERROR,
2429 (errcode(ERRCODE_UNDEFINED_OBJECT),
2430 errmsg("subscription \"%s\" does not exist", name)));
2431
2432 form = (Form_pg_subscription) GETSTRUCT(tup);
2433 subid = form->oid;
2434
2435 AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2436
2437 ObjectAddressSet(address, SubscriptionRelationId, subid);
2438
2439 heap_freetuple(tup);
2440
2442
2443 return address;
2444}
2445
2446/*
2447 * Change subscription owner -- by OID
2448 */
2449void
2451{
2452 HeapTuple tup;
2453 Relation rel;
2454
2455 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2456
2457 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2458
2459 if (!HeapTupleIsValid(tup))
2460 ereport(ERROR,
2461 (errcode(ERRCODE_UNDEFINED_OBJECT),
2462 errmsg("subscription with OID %u does not exist", subid)));
2463
2464 AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2465
2466 heap_freetuple(tup);
2467
2469}
2470
2471/*
2472 * Check and log a warning if the publisher has subscribed to the same table,
2473 * its partition ancestors (if it's a partition), or its partition children (if
2474 * it's a partitioned table), from some other publishers. This check is
2475 * required in the following scenarios:
2476 *
2477 * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2478 * statements with "copy_data = true" and "origin = none":
2479 * - Warn the user that data with an origin might have been copied.
2480 * - This check is skipped for tables already added, as incremental sync via
2481 * WAL allows origin tracking. The list of such tables is in
2482 * subrel_local_oids.
2483 *
2484 * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2485 * statements with "retain_dead_tuples = true" and "origin = any", and for
2486 * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
2487 * or when the publisher's status changes (e.g., due to a connection string
2488 * update):
2489 * - Warn the user that only conflict detection info for local changes on
2490 * the publisher is retained. Data from other origins may lack sufficient
2491 * details for reliable conflict detection.
2492 * - See comments atop worker.c for more details.
2493 */
2494static void
2496 bool copydata, bool retain_dead_tuples,
2497 char *origin, Oid *subrel_local_oids,
2498 int subrel_count, char *subname)
2499{
2500 WalRcvExecResult *res;
2501 StringInfoData cmd;
2502 TupleTableSlot *slot;
2503 Oid tableRow[1] = {TEXTOID};
2504 List *publist = NIL;
2505 int i;
2506 bool check_rdt;
2507 bool check_table_sync;
2508 bool origin_none = origin &&
2509 pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2510
2511 /*
2512 * Enable retain_dead_tuples checks only when origin is set to 'any',
2513 * since with origin='none' only local changes are replicated to the
2514 * subscriber.
2515 */
2516 check_rdt = retain_dead_tuples && !origin_none;
2517
2518 /*
2519 * Enable table synchronization checks only when origin is 'none', to
2520 * ensure that data from other origins is not inadvertently copied.
2521 */
2522 check_table_sync = copydata && origin_none;
2523
2524 /* retain_dead_tuples and table sync checks occur separately */
2525 Assert(!(check_rdt && check_table_sync));
2526
2527 /* Return if no checks are required */
2528 if (!check_rdt && !check_table_sync)
2529 return;
2530
2531 initStringInfo(&cmd);
2533 "SELECT DISTINCT P.pubname AS pubname\n"
2534 "FROM pg_publication P,\n"
2535 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2536 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2537 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2538 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2539 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2540 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2541 GetPublicationsStr(publications, &cmd, true);
2542 appendStringInfoString(&cmd, ")\n");
2543
2544 /*
2545 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2546 * subrel_local_oids contains the list of relation oids that are already
2547 * present on the subscriber. This check should be skipped for these
2548 * tables if checking for table sync scenario. However, when handling the
2549 * retain_dead_tuples scenario, ensure all tables are checked, as some
2550 * existing tables may now include changes from other origins due to newly
2551 * created subscriptions on the publisher.
2552 */
2553 if (check_table_sync)
2554 {
2555 for (i = 0; i < subrel_count; i++)
2556 {
2557 Oid relid = subrel_local_oids[i];
2558 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2559 char *tablename = get_rel_name(relid);
2560
2561 appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2562 schemaname, tablename);
2563 }
2564 }
2565
2566 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2567 pfree(cmd.data);
2568
2569 if (res->status != WALRCV_OK_TUPLES)
2570 ereport(ERROR,
2571 (errcode(ERRCODE_CONNECTION_FAILURE),
2572 errmsg("could not receive list of replicated tables from the publisher: %s",
2573 res->err)));
2574
2575 /* Process publications. */
2577 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2578 {
2579 char *pubname;
2580 bool isnull;
2581
2582 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2583 Assert(!isnull);
2584
2585 ExecClearTuple(slot);
2586 publist = list_append_unique(publist, makeString(pubname));
2587 }
2588
2589 /*
2590 * Log a warning if the publisher has subscribed to the same table from
2591 * some other publisher. We cannot know the origin of data during the
2592 * initial sync. Data origins can be found only from the WAL by looking at
2593 * the origin id.
2594 *
2595 * XXX: For simplicity, we don't check whether the table has any data or
2596 * not. If the table doesn't have any data then we don't need to
2597 * distinguish between data having origin and data not having origin so we
2598 * can avoid logging a warning for table sync scenario.
2599 */
2600 if (publist)
2601 {
2602 StringInfoData pubnames;
2603
2604 /* Prepare the list of publication(s) for warning message. */
2605 initStringInfo(&pubnames);
2606 GetPublicationsStr(publist, &pubnames, false);
2607
2608 if (check_table_sync)
2610 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2611 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2612 subname),
2613 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2614 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2615 list_length(publist), pubnames.data),
2616 errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2617 else
2619 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2620 errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2621 subname),
2622 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2623 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2624 list_length(publist), pubnames.data),
2625 errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
2626 }
2627
2629
2631}
2632
2633/*
2634 * This function is similar to check_publications_origin_tables and serves
2635 * same purpose for sequences.
2636 */
2637static void
2639 bool copydata, char *origin,
2640 Oid *subrel_local_oids, int subrel_count,
2641 char *subname)
2642{
2643 WalRcvExecResult *res;
2644 StringInfoData cmd;
2645 TupleTableSlot *slot;
2646 Oid tableRow[1] = {TEXTOID};
2647 List *publist = NIL;
2648
2649 /*
2650 * Enable sequence synchronization checks only when origin is 'none' , to
2651 * ensure that sequence data from other origins is not inadvertently
2652 * copied.
2653 */
2654 if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
2655 return;
2656
2657 initStringInfo(&cmd);
2659 "SELECT DISTINCT P.pubname AS pubname\n"
2660 "FROM pg_publication P,\n"
2661 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2662 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2663 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2664 "WHERE C.oid = GPS.relid AND P.pubname IN (");
2665
2666 GetPublicationsStr(publications, &cmd, true);
2667 appendStringInfoString(&cmd, ")\n");
2668
2669 /*
2670 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2671 * subrel_local_oids contains the list of relations that are already
2672 * present on the subscriber. This check should be skipped as these will
2673 * not be re-synced.
2674 */
2675 for (int i = 0; i < subrel_count; i++)
2676 {
2677 Oid relid = subrel_local_oids[i];
2678 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2679 char *seqname = get_rel_name(relid);
2680
2681 appendStringInfo(&cmd,
2682 "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2683 schemaname, seqname);
2684 }
2685
2686 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2687 pfree(cmd.data);
2688
2689 if (res->status != WALRCV_OK_TUPLES)
2690 ereport(ERROR,
2691 (errcode(ERRCODE_CONNECTION_FAILURE),
2692 errmsg("could not receive list of replicated sequences from the publisher: %s",
2693 res->err)));
2694
2695 /* Process publications. */
2697 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2698 {
2699 char *pubname;
2700 bool isnull;
2701
2702 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2703 Assert(!isnull);
2704
2705 ExecClearTuple(slot);
2706 publist = list_append_unique(publist, makeString(pubname));
2707 }
2708
2709 /*
2710 * Log a warning if the publisher has subscribed to the same sequence from
2711 * some other publisher. We cannot know the origin of sequences data
2712 * during the initial sync.
2713 */
2714 if (publist)
2715 {
2716 StringInfoData pubnames;
2717
2718 /* Prepare the list of publication(s) for warning message. */
2719 initStringInfo(&pubnames);
2720 GetPublicationsStr(publist, &pubnames, false);
2721
2723 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2724 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2725 subname),
2726 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
2727 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
2728 list_length(publist), pubnames.data),
2729 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
2730 }
2731
2733
2735}
2736
2737/*
2738 * Determine whether the retain_dead_tuples can be enabled based on the
2739 * publisher's status.
2740 *
2741 * This option is disallowed if the publisher is running a version earlier
2742 * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2743 * server).
2744 *
2745 * See comments atop worker.c for a detailed explanation.
2746 */
2747static void
2749{
2750 WalRcvExecResult *res;
2751 Oid RecoveryRow[1] = {BOOLOID};
2752 TupleTableSlot *slot;
2753 bool isnull;
2754 bool remote_in_recovery;
2755
2756 if (walrcv_server_version(wrconn) < 19000)
2757 ereport(ERROR,
2758 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2759 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2760
2761 res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2762
2763 if (res->status != WALRCV_OK_TUPLES)
2764 ereport(ERROR,
2765 (errcode(ERRCODE_CONNECTION_FAILURE),
2766 errmsg("could not obtain recovery progress from the publisher: %s",
2767 res->err)));
2768
2770 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2771 elog(ERROR, "failed to fetch tuple for the recovery progress");
2772
2773 remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2774
2775 if (remote_in_recovery)
2776 ereport(ERROR,
2777 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2778 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
2779
2781
2783}
2784
2785/*
2786 * Check if the subscriber's configuration is adequate to enable the
2787 * retain_dead_tuples option.
2788 *
2789 * Issue an ERROR if the wal_level does not support the use of replication
2790 * slots when check_guc is set to true.
2791 *
2792 * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
2793 * set to true. This is only to highlight the importance of enabling
2794 * track_commit_timestamp instead of catching all the misconfigurations, as
2795 * this setting can be adjusted after subscription creation. Without it, the
2796 * apply worker will simply skip conflict detection.
2797 *
2798 * Issue a WARNING or NOTICE if the subscription is disabled and the retention
2799 * is active. Do not raise an ERROR since users can only modify
2800 * retain_dead_tuples for disabled subscriptions. And as long as the
2801 * subscription is enabled promptly, it will not pose issues.
2802 *
2803 * Issue a NOTICE to inform users that max_retention_duration is
2804 * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
2805 * is not issued because setting max_retention_duration causes no harm,
2806 * even when it is ineffective.
2807 */
2808void
2809CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
2810 int elevel_for_sub_disabled,
2811 bool retain_dead_tuples, bool retention_active,
2812 bool max_retention_set)
2813{
2814 Assert(elevel_for_sub_disabled == NOTICE ||
2815 elevel_for_sub_disabled == WARNING);
2816
2817 if (retain_dead_tuples)
2818 {
2819 if (check_guc && wal_level < WAL_LEVEL_REPLICA)
2820 ereport(ERROR,
2821 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2822 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2823 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2824
2825 if (check_guc && !track_commit_timestamp)
2827 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2828 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2829 errhint("Consider setting \"%s\" to true.",
2830 "track_commit_timestamp"));
2831
2832 if (sub_disabled && retention_active)
2833 ereport(elevel_for_sub_disabled,
2834 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2835 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2836 (elevel_for_sub_disabled > NOTICE)
2837 ? errhint("Consider setting %s to false.",
2838 "retain_dead_tuples") : 0);
2839 }
2840 else if (max_retention_set)
2841 {
2843 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2844 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
2845 }
2846}
2847
2848/*
2849 * Return true iff 'rv' is a member of the list.
2850 */
2851static bool
2853{
2855 {
2856 if (equal(relinfo->rv, rv))
2857 return true;
2858 }
2859
2860 return false;
2861}
2862
2863/*
2864 * Get the list of tables and sequences which belong to specified publications
2865 * on the publisher connection.
2866 *
2867 * Note that we don't support the case where the column list is different for
2868 * the same table in different publications to avoid sending unwanted column
2869 * information for some of the rows. This can happen when both the column
2870 * list and row filter are specified for different publications.
2871 */
2872static List *
2874{
2875 WalRcvExecResult *res;
2876 StringInfoData cmd;
2877 TupleTableSlot *slot;
2878 Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
2879 List *relationlist = NIL;
2881 bool check_columnlist = (server_version >= 150000);
2882 int column_count = check_columnlist ? 4 : 3;
2883 StringInfoData pub_names;
2884
2885 initStringInfo(&cmd);
2886 initStringInfo(&pub_names);
2887
2888 /* Build the pub_names comma-separated string. */
2889 GetPublicationsStr(publications, &pub_names, true);
2890
2891 /* Get the list of relations from the publisher */
2892 if (server_version >= 160000)
2893 {
2894 tableRow[3] = INT2VECTOROID;
2895
2896 /*
2897 * From version 16, we allowed passing multiple publications to the
2898 * function pg_get_publication_tables. This helped to filter out the
2899 * partition table whose ancestor is also published in this
2900 * publication array.
2901 *
2902 * Join pg_get_publication_tables with pg_publication to exclude
2903 * non-existing publications.
2904 *
2905 * Note that attrs are always stored in sorted order so we don't need
2906 * to worry if different publications have specified them in a
2907 * different order. See pub_collist_validate.
2908 */
2909 appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
2910 " FROM pg_class c\n"
2911 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2912 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2913 " FROM pg_publication\n"
2914 " WHERE pubname IN ( %s )) AS gpt\n"
2915 " ON gpt.relid = c.oid\n",
2916 pub_names.data);
2917
2918 /* From version 19, inclusion of sequences in the target is supported */
2919 if (server_version >= 190000)
2920 appendStringInfo(&cmd,
2921 "UNION ALL\n"
2922 " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
2923 " FROM pg_catalog.pg_publication_sequences s\n"
2924 " WHERE s.pubname IN ( %s )",
2925 pub_names.data);
2926 }
2927 else
2928 {
2929 tableRow[3] = NAMEARRAYOID;
2930 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
2931
2932 /* Get column lists for each relation if the publisher supports it */
2933 if (check_columnlist)
2934 appendStringInfoString(&cmd, ", t.attnames\n");
2935
2936 appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2937 " WHERE t.pubname IN ( %s )",
2938 pub_names.data);
2939 }
2940
2941 pfree(pub_names.data);
2942
2943 res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
2944 pfree(cmd.data);
2945
2946 if (res->status != WALRCV_OK_TUPLES)
2947 ereport(ERROR,
2948 (errcode(ERRCODE_CONNECTION_FAILURE),
2949 errmsg("could not receive list of replicated tables from the publisher: %s",
2950 res->err)));
2951
2952 /* Process tables. */
2954 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2955 {
2956 char *nspname;
2957 char *relname;
2958 bool isnull;
2959 char relkind;
2961
2962 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2963 Assert(!isnull);
2964 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2965 Assert(!isnull);
2966 relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
2967 Assert(!isnull);
2968
2969 relinfo->rv = makeRangeVar(nspname, relname, -1);
2970 relinfo->relkind = relkind;
2971
2972 if (relkind != RELKIND_SEQUENCE &&
2973 check_columnlist &&
2974 list_member_rangevar(relationlist, relinfo->rv))
2975 ereport(ERROR,
2976 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2977 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2978 nspname, relname));
2979 else
2980 relationlist = lappend(relationlist, relinfo);
2981
2982 ExecClearTuple(slot);
2983 }
2985
2987
2988 return relationlist;
2989}
2990
2991/*
2992 * This is to report the connection failure while dropping replication slots.
2993 * Here, we report the WARNING for all tablesync slots so that user can drop
2994 * them manually, if required.
2995 */
2996static void
2997ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
2998{
2999 ListCell *lc;
3000
3001 foreach(lc, rstates)
3002 {
3004 Oid relid = rstate->relid;
3005
3006 /* Only cleanup resources of tablesync workers */
3007 if (!OidIsValid(relid))
3008 continue;
3009
3010 /*
3011 * Caller needs to ensure that relstate doesn't change underneath us.
3012 * See DropSubscription where we get the relstates.
3013 */
3014 if (rstate->state != SUBREL_STATE_SYNCDONE)
3015 {
3016 char syncslotname[NAMEDATALEN] = {0};
3017
3018 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
3019 sizeof(syncslotname));
3020 elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3021 syncslotname);
3022 }
3023 }
3024
3025 ereport(ERROR,
3026 (errcode(ERRCODE_CONNECTION_FAILURE),
3027 errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3028 slotname, err),
3029 /* translator: %s is an SQL ALTER command */
3030 errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3031 "ALTER SUBSCRIPTION ... DISABLE",
3032 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3033}
3034
3035/*
3036 * Check for duplicates in the given list of publications and error out if
3037 * found one. Add publications to datums as text datums, if datums is not
3038 * NULL.
3039 */
3040static void
3042{
3043 ListCell *cell;
3044 int j = 0;
3045
3046 foreach(cell, publist)
3047 {
3048 char *name = strVal(lfirst(cell));
3049 ListCell *pcell;
3050
3051 foreach(pcell, publist)
3052 {
3053 char *pname = strVal(lfirst(pcell));
3054
3055 if (pcell == cell)
3056 break;
3057
3058 if (strcmp(name, pname) == 0)
3059 ereport(ERROR,
3061 errmsg("publication name \"%s\" used more than once",
3062 pname)));
3063 }
3064
3065 if (datums)
3066 datums[j++] = CStringGetTextDatum(name);
3067 }
3068}
3069
3070/*
3071 * Merge current subscription's publications and user-specified publications
3072 * from ADD/DROP PUBLICATIONS.
3073 *
3074 * If addpub is true, we will add the list of publications into oldpublist.
3075 * Otherwise, we will delete the list of publications from oldpublist. The
3076 * returned list is a copy, oldpublist itself is not changed.
3077 *
3078 * subname is the subscription name, for error messages.
3079 */
3080static List *
3081merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
3082{
3083 ListCell *lc;
3084
3085 oldpublist = list_copy(oldpublist);
3086
3087 check_duplicates_in_publist(newpublist, NULL);
3088
3089 foreach(lc, newpublist)
3090 {
3091 char *name = strVal(lfirst(lc));
3092 ListCell *lc2;
3093 bool found = false;
3094
3095 foreach(lc2, oldpublist)
3096 {
3097 char *pubname = strVal(lfirst(lc2));
3098
3099 if (strcmp(name, pubname) == 0)
3100 {
3101 found = true;
3102 if (addpub)
3103 ereport(ERROR,
3105 errmsg("publication \"%s\" is already in subscription \"%s\"",
3106 name, subname)));
3107 else
3108 oldpublist = foreach_delete_current(oldpublist, lc2);
3109
3110 break;
3111 }
3112 }
3113
3114 if (addpub && !found)
3115 oldpublist = lappend(oldpublist, makeString(name));
3116 else if (!addpub && !found)
3117 ereport(ERROR,
3118 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3119 errmsg("publication \"%s\" is not in subscription \"%s\"",
3120 name, subname)));
3121 }
3122
3123 /*
3124 * XXX Probably no strong reason for this, but for now it's to make ALTER
3125 * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3126 */
3127 if (!oldpublist)
3128 ereport(ERROR,
3129 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3130 errmsg("cannot drop all the publications from a subscription")));
3131
3132 return oldpublist;
3133}
3134
3135/*
3136 * Extract the streaming mode value from a DefElem. This is like
3137 * defGetBoolean() but also accepts the special value of "parallel".
3138 */
3139char
3141{
3142 /*
3143 * If no parameter value given, assume "true" is meant.
3144 */
3145 if (!def->arg)
3146 return LOGICALREP_STREAM_ON;
3147
3148 /*
3149 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3150 */
3151 switch (nodeTag(def->arg))
3152 {
3153 case T_Integer:
3154 switch (intVal(def->arg))
3155 {
3156 case 0:
3157 return LOGICALREP_STREAM_OFF;
3158 case 1:
3159 return LOGICALREP_STREAM_ON;
3160 default:
3161 /* otherwise, error out below */
3162 break;
3163 }
3164 break;
3165 default:
3166 {
3167 char *sval = defGetString(def);
3168
3169 /*
3170 * The set of strings accepted here should match up with the
3171 * grammar's opt_boolean_or_string production.
3172 */
3173 if (pg_strcasecmp(sval, "false") == 0 ||
3174 pg_strcasecmp(sval, "off") == 0)
3175 return LOGICALREP_STREAM_OFF;
3176 if (pg_strcasecmp(sval, "true") == 0 ||
3177 pg_strcasecmp(sval, "on") == 0)
3178 return LOGICALREP_STREAM_ON;
3179 if (pg_strcasecmp(sval, "parallel") == 0)
3180 return LOGICALREP_STREAM_PARALLEL;
3181 }
3182 break;
3183 }
3184
3185 ereport(ERROR,
3186 (errcode(ERRCODE_SYNTAX_ERROR),
3187 errmsg("%s requires a Boolean value or \"parallel\"",
3188 def->defname)));
3189 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3190}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5284
void check_can_set_role(Oid member, Oid role)
Definition: acl.c:5341
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2652
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3834
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4088
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
Definition: arrayfuncs.c:3382
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:6255
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define NameStr(name)
Definition: c.h:756
uint32 bits32
Definition: c.h:552
#define CppAsString2(x)
Definition: c.h:423
int32_t int32
Definition: c.h:539
#define OidIsValid(objectId)
Definition: c.h:779
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:448
bool track_commit_timestamp
Definition: commit_ts.c:109
int32 defGetInt32(DefElem *def)
Definition: define.c:149
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:371
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:149
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1193
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1308
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define PG_TRY(...)
Definition: elog.h:372
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:397
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define NOTICE
Definition: elog.h:35
#define PG_FINALLY(...)
Definition: elog.h:389
#define ereport(elevel,...)
Definition: elog.h:150
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition: err.c:43
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
#define palloc_object(type)
Definition: fe_memutils.h:74
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:682
Oid MyDatabaseId
Definition: globals.c:94
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:3216
@ GUC_ACTION_SET
Definition: guc.h:203
@ PGC_S_TEST
Definition: guc.h:125
@ PGC_BACKEND
Definition: guc.h:77
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define stmt
Definition: indent_codes.h:59
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition: indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition: indexing.c:365
int j
Definition: isn.c:78
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:293
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:1184
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition: launcher.c:652
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1154
List * lappend(List *list, void *datum)
Definition: list.c:339
List * list_delete(List *list, void *datum)
Definition: list.c:853
List * list_append_unique(List *list, void *datum)
Definition: list.c:1343
List * list_copy(const List *oldlist)
Definition: list.c:1573
void list_free(List *list)
Definition: list.c:1546
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
char * get_database_name(Oid dbid)
Definition: lsyscache.c:1259
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2170
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:2119
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:473
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void pfree(void *pointer)
Definition: mcxt.c:1594
void * palloc(Size size)
Definition: mcxt.c:1365
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Oid GetUserId(void)
Definition: miscinit.c:469
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:98
#define nodeTag(nodeptr)
Definition: nodes.h:139
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:182
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:258
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1037
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
Definition: parsenodes.h:4364
@ ALTER_SUBSCRIPTION_ENABLED
Definition: parsenodes.h:4366
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
Definition: parsenodes.h:4363
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
Definition: parsenodes.h:4361
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
Definition: parsenodes.h:4365
@ ALTER_SUBSCRIPTION_SKIP
Definition: parsenodes.h:4367
@ ALTER_SUBSCRIPTION_OPTIONS
Definition: parsenodes.h:4359
@ ALTER_SUBSCRIPTION_CONNECTION
Definition: parsenodes.h:4360
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
Definition: parsenodes.h:4362
@ OBJECT_DATABASE
Definition: parsenodes.h:2334
@ OBJECT_SUBSCRIPTION
Definition: parsenodes.h:2363
#define ACL_CREATE
Definition: parsenodes.h:85
static AmcheckOptions opts
Definition: pg_amcheck.c:112
NameData relname
Definition: pg_class.h:38
#define NAMEDATALEN
static int server_version
Definition: pg_dumpall.c:109
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:316
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:1047
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:168
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
Subscription * GetSubscription(Oid subid, bool missing_ok)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
NameData subname
FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
#define qsort(a, b, c, d)
Definition: port.h:479
static bool DatumGetBool(Datum X)
Definition: postgres.h:100
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:332
static Name DatumGetName(Datum X)
Definition: postgres.h:370
static Datum BoolGetDatum(bool X)
Definition: postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
static char DatumGetChar(Datum X)
Definition: postgres.h:122
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:360
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
static Datum CharGetDatum(char X)
Definition: postgres.h:132
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
#define RelationGetDescr(relation)
Definition: rel.h:541
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13062
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:266
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
char * defname
Definition: parsenodes.h:843
Node * arg
Definition: parsenodes.h:844
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
LogicalRepWorkerType type
char * relname
Definition: primnodes.h:83
char * schemaname
Definition: primnodes.h:80
bool copy_data
bits32 specified_opts
bool retaindeadtuples
bool disableonerr
bool create_slot
char * synchronous_commit
int32 maxretention
char streaming
char * origin
char * slot_name
XLogRecPtr lsn
bool passwordrequired
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
Definition: regguts.h:323
struct SubOpts SubOpts
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_STREAMING
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void check_duplicates_in_publist(List *publist, Datum *datums)
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
struct PublicationRelKind PublicationRelKind
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static bool list_member_rangevar(const List *list, RangeVar *rv)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
#define SUBOPT_CONNECT
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
bool superuser(void)
Definition: superuser.c:46
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:595
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:230
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:625
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:93
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1203
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1651
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:398
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:457
bool LookupGXactBySubid(Oid subid)
Definition: twophase.c:2797
String * makeString(char *str)
Definition: value.c:63
#define intVal(v)
Definition: value.h:79
#define strVal(v)
Definition: value.h:82
const char * name
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_server_version(conn)
Definition: walreceiver.h:447
#define walrcv_check_conninfo(conninfo, must_use_password)
Definition: walreceiver.h:437
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
Definition: walreceiver.h:461
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ WORKERTYPE_TABLESYNC
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3666
int wal_level
Definition: xlog.c:133
@ WAL_LEVEL_REPLICA
Definition: xlog.h:75
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint16 RepOriginId
Definition: xlogdefs.h:69
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28