PostgreSQL Source Code git master
conflict.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * conflict.c
3 * Support routines for logging conflicts.
4 *
5 * Copyright (c) 2024-2025, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/conflict.c
9 *
10 * This file contains the code for logging conflicts on the subscriber during
11 * logical replication.
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/commit_ts.h"
18#include "access/tableam.h"
19#include "executor/executor.h"
20#include "pgstat.h"
23#include "storage/lmgr.h"
24#include "utils/lsyscache.h"
25
26static const char *const ConflictTypeNames[] = {
27 [CT_INSERT_EXISTS] = "insert_exists",
28 [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
29 [CT_UPDATE_EXISTS] = "update_exists",
30 [CT_UPDATE_MISSING] = "update_missing",
31 [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
32 [CT_UPDATE_DELETED] = "update_deleted",
33 [CT_DELETE_MISSING] = "delete_missing",
34 [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
35};
36
38static void errdetail_apply_conflict(EState *estate,
39 ResultRelInfo *relinfo,
41 TupleTableSlot *searchslot,
42 TupleTableSlot *localslot,
43 TupleTableSlot *remoteslot,
44 Oid indexoid, TransactionId localxmin,
45 RepOriginId localorigin,
46 TimestampTz localts, StringInfo err_msg);
47static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
49 TupleTableSlot *searchslot,
50 TupleTableSlot *localslot,
51 TupleTableSlot *remoteslot,
52 Oid indexoid);
53static char *build_index_value_desc(EState *estate, Relation localrel,
54 TupleTableSlot *slot, Oid indexoid);
55
56/*
57 * Get the xmin and commit timestamp data (origin and timestamp) associated
58 * with the provided local row.
59 *
60 * Return true if the commit timestamp data was found, false otherwise.
61 */
62bool
64 RepOriginId *localorigin, TimestampTz *localts)
65{
66 Datum xminDatum;
67 bool isnull;
68
70 &isnull);
71 *xmin = DatumGetTransactionId(xminDatum);
72 Assert(!isnull);
73
74 /*
75 * The commit timestamp data is not available if track_commit_timestamp is
76 * disabled.
77 */
79 {
80 *localorigin = InvalidRepOriginId;
81 *localts = 0;
82 return false;
83 }
84
85 return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
86}
87
88/*
89 * This function is used to report a conflict while applying replication
90 * changes.
91 *
92 * 'searchslot' should contain the tuple used to search the local row to be
93 * updated or deleted.
94 *
95 * 'remoteslot' should contain the remote new tuple, if any.
96 *
97 * conflicttuples is a list of local rows that caused the conflict and the
98 * conflict related information. See ConflictTupleInfo.
99 *
100 * The caller must ensure that all the indexes passed in ConflictTupleInfo are
101 * locked so that we can fetch and display the conflicting key values.
102 */
103void
104ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
105 ConflictType type, TupleTableSlot *searchslot,
106 TupleTableSlot *remoteslot, List *conflicttuples)
107{
108 Relation localrel = relinfo->ri_RelationDesc;
109 StringInfoData err_detail;
110
111 initStringInfo(&err_detail);
112
113 /* Form errdetail message by combining conflicting tuples information. */
114 foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
115 errdetail_apply_conflict(estate, relinfo, type, searchslot,
116 conflicttuple->slot, remoteslot,
117 conflicttuple->indexoid,
118 conflicttuple->xmin,
119 conflicttuple->origin,
120 conflicttuple->ts,
121 &err_detail);
122
124
125 ereport(elevel,
127 errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
129 RelationGetRelationName(localrel),
131 errdetail_internal("%s", err_detail.data));
132}
133
134/*
135 * Find all unique indexes to check for a conflict and store them into
136 * ResultRelInfo.
137 */
138void
140{
141 List *uniqueIndexes = NIL;
142
143 for (int i = 0; i < relInfo->ri_NumIndices; i++)
144 {
145 Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
146
147 if (indexRelation == NULL)
148 continue;
149
150 /* Detect conflict only for unique indexes */
151 if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
152 continue;
153
154 /* Don't support conflict detection for deferrable index */
155 if (!indexRelation->rd_index->indimmediate)
156 continue;
157
158 uniqueIndexes = lappend_oid(uniqueIndexes,
159 RelationGetRelid(indexRelation));
160 }
161
162 relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
163}
164
165/*
166 * Add SQLSTATE error code to the current conflict report.
167 */
168static int
170{
171 switch (type)
172 {
173 case CT_INSERT_EXISTS:
174 case CT_UPDATE_EXISTS:
176 return errcode(ERRCODE_UNIQUE_VIOLATION);
183 }
184
185 Assert(false);
186 return 0; /* silence compiler warning */
187}
188
189/*
190 * Add an errdetail() line showing conflict detail.
191 *
192 * The DETAIL line comprises of two parts:
193 * 1. Explanation of the conflict type, including the origin and commit
194 * timestamp of the existing local row.
195 * 2. Display of conflicting key, existing local row, remote new row, and
196 * replica identity columns, if any. The remote old row is excluded as its
197 * information is covered in the replica identity columns.
198 */
199static void
201 ConflictType type, TupleTableSlot *searchslot,
202 TupleTableSlot *localslot, TupleTableSlot *remoteslot,
203 Oid indexoid, TransactionId localxmin,
204 RepOriginId localorigin, TimestampTz localts,
205 StringInfo err_msg)
206{
207 StringInfoData err_detail;
208 char *val_desc;
209 char *origin_name;
210
211 initStringInfo(&err_detail);
212
213 /* First, construct a detailed message describing the type of conflict */
214 switch (type)
215 {
216 case CT_INSERT_EXISTS:
217 case CT_UPDATE_EXISTS:
219 Assert(OidIsValid(indexoid) &&
221
222 if (localts)
223 {
224 if (localorigin == InvalidRepOriginId)
225 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
226 get_rel_name(indexoid),
227 localxmin, timestamptz_to_str(localts));
228 else if (replorigin_by_oid(localorigin, true, &origin_name))
229 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
230 get_rel_name(indexoid), origin_name,
231 localxmin, timestamptz_to_str(localts));
232
233 /*
234 * The origin that modified this row has been removed. This
235 * can happen if the origin was created by a different apply
236 * worker and its associated subscription and origin were
237 * dropped after updating the row, or if the origin was
238 * manually dropped by the user.
239 */
240 else
241 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
242 get_rel_name(indexoid),
243 localxmin, timestamptz_to_str(localts));
244 }
245 else
246 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
247 get_rel_name(indexoid), localxmin);
248
249 break;
250
252 if (localorigin == InvalidRepOriginId)
253 appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
254 localxmin, timestamptz_to_str(localts));
255 else if (replorigin_by_oid(localorigin, true, &origin_name))
256 appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
257 origin_name, localxmin, timestamptz_to_str(localts));
258
259 /* The origin that modified this row has been removed. */
260 else
261 appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
262 localxmin, timestamptz_to_str(localts));
263
264 break;
265
267 if (localts)
268 {
269 if (localorigin == InvalidRepOriginId)
270 appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
271 localxmin, timestamptz_to_str(localts));
272 else if (replorigin_by_oid(localorigin, true, &origin_name))
273 appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
274 origin_name, localxmin, timestamptz_to_str(localts));
275
276 /* The origin that modified this row has been removed. */
277 else
278 appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
279 localxmin, timestamptz_to_str(localts));
280 }
281 else
282 appendStringInfo(&err_detail, _("The row to be updated was deleted."));
283
284 break;
285
287 appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
288 break;
289
291 if (localorigin == InvalidRepOriginId)
292 appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
293 localxmin, timestamptz_to_str(localts));
294 else if (replorigin_by_oid(localorigin, true, &origin_name))
295 appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
296 origin_name, localxmin, timestamptz_to_str(localts));
297
298 /* The origin that modified this row has been removed. */
299 else
300 appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
301 localxmin, timestamptz_to_str(localts));
302
303 break;
304
306 appendStringInfoString(&err_detail, _("Could not find the row to be deleted."));
307 break;
308 }
309
310 Assert(err_detail.len > 0);
311
312 val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
313 localslot, remoteslot, indexoid);
314
315 /*
316 * Next, append the key values, existing local row, remote row, and
317 * replica identity columns after the message.
318 */
319 if (val_desc)
320 appendStringInfo(&err_detail, "\n%s", val_desc);
321
322 /*
323 * Insert a blank line to visually separate the new detail line from the
324 * existing ones.
325 */
326 if (err_msg->len > 0)
327 appendStringInfoChar(err_msg, '\n');
328
329 appendStringInfoString(err_msg, err_detail.data);
330}
331
332/*
333 * Helper function to build the additional details for conflicting key,
334 * existing local row, remote row, and replica identity columns.
335 *
336 * If the return value is NULL, it indicates that the current user lacks
337 * permissions to view the columns involved.
338 */
339static char *
342 TupleTableSlot *searchslot,
343 TupleTableSlot *localslot,
344 TupleTableSlot *remoteslot,
345 Oid indexoid)
346{
347 Relation localrel = relinfo->ri_RelationDesc;
348 Oid relid = RelationGetRelid(localrel);
349 TupleDesc tupdesc = RelationGetDescr(localrel);
350 StringInfoData tuple_value;
351 char *desc = NULL;
352
353 Assert(searchslot || localslot || remoteslot);
354
355 initStringInfo(&tuple_value);
356
357 /*
358 * Report the conflicting key values in the case of a unique constraint
359 * violation.
360 */
363 {
364 Assert(OidIsValid(indexoid) && localslot);
365
366 desc = build_index_value_desc(estate, localrel, localslot, indexoid);
367
368 if (desc)
369 appendStringInfo(&tuple_value, _("Key %s"), desc);
370 }
371
372 if (localslot)
373 {
374 /*
375 * The 'modifiedCols' only applies to the new tuple, hence we pass
376 * NULL for the existing local row.
377 */
378 desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
379 NULL, 64);
380
381 if (desc)
382 {
383 if (tuple_value.len > 0)
384 {
385 appendStringInfoString(&tuple_value, "; ");
386 appendStringInfo(&tuple_value, _("existing local row %s"),
387 desc);
388 }
389 else
390 {
391 appendStringInfo(&tuple_value, _("Existing local row %s"),
392 desc);
393 }
394 }
395 }
396
397 if (remoteslot)
398 {
399 Bitmapset *modifiedCols;
400
401 /*
402 * Although logical replication doesn't maintain the bitmap for the
403 * columns being inserted, we still use it to create 'modifiedCols'
404 * for consistency with other calls to ExecBuildSlotValueDescription.
405 *
406 * Note that generated columns are formed locally on the subscriber.
407 */
408 modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
409 ExecGetUpdatedCols(relinfo, estate));
410 desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
411 modifiedCols, 64);
412
413 if (desc)
414 {
415 if (tuple_value.len > 0)
416 {
417 appendStringInfoString(&tuple_value, "; ");
418 appendStringInfo(&tuple_value, _("remote row %s"), desc);
419 }
420 else
421 {
422 appendStringInfo(&tuple_value, _("Remote row %s"), desc);
423 }
424 }
425 }
426
427 if (searchslot)
428 {
429 /*
430 * Note that while index other than replica identity may be used (see
431 * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
432 * when applying update or delete, such an index scan may not result
433 * in a unique tuple and we still compare the complete tuple in such
434 * cases, thus such indexes are not used here.
435 */
436 Oid replica_index = GetRelationIdentityOrPK(localrel);
437
439
440 /*
441 * If the table has a valid replica identity index, build the index
442 * key value string. Otherwise, construct the full tuple value for
443 * REPLICA IDENTITY FULL cases.
444 */
445 if (OidIsValid(replica_index))
446 desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
447 else
448 desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
449
450 if (desc)
451 {
452 if (tuple_value.len > 0)
453 {
454 appendStringInfoString(&tuple_value, "; ");
455 appendStringInfo(&tuple_value, OidIsValid(replica_index)
456 ? _("replica identity %s")
457 : _("replica identity full %s"), desc);
458 }
459 else
460 {
461 appendStringInfo(&tuple_value, OidIsValid(replica_index)
462 ? _("Replica identity %s")
463 : _("Replica identity full %s"), desc);
464 }
465 }
466 }
467
468 if (tuple_value.len == 0)
469 return NULL;
470
471 appendStringInfoChar(&tuple_value, '.');
472 return tuple_value.data;
473}
474
475/*
476 * Helper functions to construct a string describing the contents of an index
477 * entry. See BuildIndexValueDescription for details.
478 *
479 * The caller must ensure that the index with the OID 'indexoid' is locked so
480 * that we can fetch and display the conflicting key value.
481 */
482static char *
484 Oid indexoid)
485{
486 char *index_value;
487 Relation indexDesc;
489 bool isnull[INDEX_MAX_KEYS];
490 TupleTableSlot *tableslot = slot;
491
492 if (!tableslot)
493 return NULL;
494
496
497 indexDesc = index_open(indexoid, NoLock);
498
499 /*
500 * If the slot is a virtual slot, copy it into a heap tuple slot as
501 * FormIndexDatum only works with heap tuple slots.
502 */
503 if (TTS_IS_VIRTUAL(slot))
504 {
505 tableslot = table_slot_create(localrel, &estate->es_tupleTable);
506 tableslot = ExecCopySlot(tableslot, slot);
507 }
508
509 /*
510 * Initialize ecxt_scantuple for potential use in FormIndexDatum when
511 * index expressions are present.
512 */
513 GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
514
515 /*
516 * The values/nulls arrays passed to BuildIndexValueDescription should be
517 * the results of FormIndexDatum, which are the "raw" input to the index
518 * AM.
519 */
520 FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
521
522 index_value = BuildIndexValueDescription(indexDesc, values, isnull);
523
524 index_close(indexDesc, NoLock);
525
526 return index_value;
527}
Subscription * MySubscription
Definition: worker.c:479
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:251
static Datum values[MAXATTR]
Definition: bootstrap.c:153
uint32 TransactionId
Definition: c.h:662
#define OidIsValid(objectId)
Definition: c.h:779
bool track_commit_timestamp
Definition: commit_ts.c:109
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:272
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition: conflict.c:104
static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts, StringInfo err_msg)
Definition: conflict.c:200
static const char *const ConflictTypeNames[]
Definition: conflict.c:26
static char * build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid)
Definition: conflict.c:340
static char * build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid)
Definition: conflict.c:483
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition: conflict.c:139
static int errcode_apply_conflict(ConflictType type)
Definition: conflict.c:169
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:63
ConflictType
Definition: conflict.h:32
@ CT_UPDATE_DELETED
Definition: conflict.h:43
@ CT_MULTIPLE_UNIQUE_CONFLICTS
Definition: conflict.h:55
@ CT_DELETE_MISSING
Definition: conflict.h:52
@ CT_UPDATE_ORIGIN_DIFFERS
Definition: conflict.h:37
@ CT_INSERT_EXISTS
Definition: conflict.h:34
@ CT_UPDATE_EXISTS
Definition: conflict.h:40
@ CT_UPDATE_MISSING
Definition: conflict.h:46
@ CT_DELETE_ORIGIN_DIFFERS
Definition: conflict.h:49
int64 TimestampTz
Definition: timestamp.h:39
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1243
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define _(x)
Definition: elog.c:91
#define ereport(elevel,...)
Definition: elog.h:150
char * ExecBuildSlotValueDescription(Oid reloid, TupleTableSlot *slot, TupleDesc tupdesc, Bitmapset *modifiedCols, int maxfieldlen)
Definition: execMain.c:2391
Bitmapset * ExecGetInsertedCols(ResultRelInfo *relinfo, EState *estate)
Definition: execUtils.c:1361
Bitmapset * ExecGetUpdatedCols(ResultRelInfo *relinfo, EState *estate)
Definition: execUtils.c:1382
#define GetPerTupleExprContext(estate)
Definition: executor.h:656
char * BuildIndexValueDescription(Relation indexRelation, const Datum *values, const bool *isnull)
Definition: genam.c:178
Assert(PointerIsAligned(start, uint64))
IndexInfo * BuildIndexInfo(Relation index)
Definition: index.c:2428
void FormIndexDatum(IndexInfo *indexInfo, TupleTableSlot *slot, EState *estate, Datum *values, bool *isnull)
Definition: index.c:2730
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
int i
Definition: isn.c:77
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
Definition: lmgr.c:351
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:493
#define InvalidRepOriginId
Definition: origin.h:33
#define INDEX_MAX_KEYS
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:77
void pgstat_report_subscription_conflict(Oid subid, ConflictType type)
uint64_t Datum
Definition: postgres.h:70
static TransactionId DatumGetTransactionId(Datum X)
Definition: postgres.h:272
unsigned int Oid
Definition: postgres_ext.h:32
#define RelationGetRelid(relation)
Definition: rel.h:515
#define RelationGetDescr(relation)
Definition: rel.h:541
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationGetNamespace(relation)
Definition: rel.h:556
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:904
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
List * es_tupleTable
Definition: execnodes.h:712
bool ii_Unique
Definition: execnodes.h:200
Definition: pg_list.h:54
Form_pg_index rd_index
Definition: rel.h:192
int ri_NumIndices
Definition: execnodes.h:483
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:580
Relation ri_RelationDesc
Definition: execnodes.h:480
RelationPtr ri_IndexRelationDescs
Definition: execnodes.h:486
IndexInfo ** ri_IndexRelationInfo
Definition: execnodes.h:489
#define MinTransactionIdAttributeNumber
Definition: sysattr.h:22
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:92
#define TTS_IS_VIRTUAL(slot)
Definition: tuptable.h:237
static Datum slot_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:419
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:524
const char * type
uint16 RepOriginId
Definition: xlogdefs.h:69