PostgreSQL Source Code git master
wait.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * wait.c
4 * Implements WAIT FOR, which allows waiting for events such as
5 * time passing or LSN having been replayed on replica.
6 *
7 * Portions Copyright (c) 2025, PostgreSQL Global Development Group
8 *
9 * IDENTIFICATION
10 * src/backend/commands/wait.c
11 *
12 *-------------------------------------------------------------------------
13 */
14#include "postgres.h"
15
16#include <math.h>
17
18#include "access/xlogrecovery.h"
19#include "access/xlogwait.h"
20#include "commands/defrem.h"
21#include "commands/wait.h"
22#include "executor/executor.h"
23#include "parser/parse_node.h"
24#include "storage/proc.h"
25#include "utils/builtins.h"
26#include "utils/guc.h"
27#include "utils/pg_lsn.h"
28#include "utils/snapmgr.h"
29
30
31void
33{
34 XLogRecPtr lsn;
35 int64 timeout = 0;
36 WaitLSNResult waitLSNResult;
37 bool throw = true;
38 TupleDesc tupdesc;
39 TupOutputState *tstate;
40 const char *result = "<unset>";
41 bool timeout_specified = false;
42 bool no_throw_specified = false;
43
44 /* Parse and validate the mandatory LSN */
46 CStringGetDatum(stmt->lsn_literal)));
47
48 foreach_node(DefElem, defel, stmt->options)
49 {
50 if (strcmp(defel->defname, "timeout") == 0)
51 {
52 char *timeout_str;
53 const char *hintmsg;
54 double result;
55
56 if (timeout_specified)
57 errorConflictingDefElem(defel, pstate);
58 timeout_specified = true;
59
60 timeout_str = defGetString(defel);
61
62 if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
63 {
65 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 errmsg("invalid timeout value: \"%s\"", timeout_str),
67 hintmsg ? errhint("%s", _(hintmsg)) : 0);
68 }
69
70 /*
71 * Get rid of any fractional part in the input. This is so we
72 * don't fail on just-out-of-range values that would round into
73 * range.
74 */
75 result = rint(result);
76
77 /* Range check */
78 if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
80 errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
81 errmsg("timeout value is out of range"));
82
83 if (result < 0)
85 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
86 errmsg("timeout cannot be negative"));
87
88 timeout = (int64) result;
89 }
90 else if (strcmp(defel->defname, "no_throw") == 0)
91 {
92 if (no_throw_specified)
93 errorConflictingDefElem(defel, pstate);
94
95 no_throw_specified = true;
96
97 throw = !defGetBoolean(defel);
98 }
99 else
100 {
102 errcode(ERRCODE_SYNTAX_ERROR),
103 errmsg("option \"%s\" not recognized",
104 defel->defname),
105 parser_errposition(pstate, defel->location));
106 }
107 }
108
109 /*
110 * We are going to wait for the LSN replay. We should first care that we
111 * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
112 * Otherwise, our snapshot could prevent the replay of WAL records
113 * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
114 * command, not a procedure or function.
115 *
116 * At first, we should check there is no active snapshot. According to
117 * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
118 * processed with a snapshot. Thankfully, we can pop this snapshot,
119 * because PortalRunUtility() can tolerate this.
120 */
121 if (ActiveSnapshotSet())
123
124 /*
125 * At second, invalidate a catalog snapshot if any. And we should be done
126 * with the preparation.
127 */
129
130 /* Give up if there is still an active or registered snapshot. */
133 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
134 errmsg("WAIT FOR must be only called without an active or registered snapshot"),
135 errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED."));
136
137 /*
138 * As the result we should hold no snapshot, and correspondingly our xmin
139 * should be unset.
140 */
142
143 waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
144
145 /*
146 * Process the result of WaitForLSN(). Throw appropriate error if needed.
147 */
148 switch (waitLSNResult)
149 {
151 /* Nothing to do on success */
152 result = "success";
153 break;
154
156 if (throw)
158 errcode(ERRCODE_QUERY_CANCELED),
159 errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
160 LSN_FORMAT_ARGS(lsn),
162 else
163 result = "timeout";
164 break;
165
167 if (throw)
168 {
169 if (PromoteIsTriggered())
170 {
172 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
173 errmsg("recovery is not in progress"),
174 errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
175 LSN_FORMAT_ARGS(lsn),
177 }
178 else
180 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
181 errmsg("recovery is not in progress"),
182 errhint("Waiting for the replay LSN can only be executed during recovery."));
183 }
184 else
185 result = "not in recovery";
186 break;
187 }
188
189 /* need a tuple descriptor representing a single TEXT column */
190 tupdesc = WaitStmtResultDesc(stmt);
191
192 /* prepare for projection of tuples */
193 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
194
195 /* Send it */
196 do_text_output_oneline(tstate, result);
197
198 end_tup_output(tstate);
199}
200
203{
204 TupleDesc tupdesc;
205
206 /* Need a tuple descriptor representing a single TEXT column */
207 tupdesc = CreateTemplateTupleDesc(1);
208 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
209 TEXTOID, -1, 0);
210 return tupdesc;
211}
int16 AttrNumber
Definition: attnum.h:21
int64_t int64
Definition: c.h:540
#define FLOAT8_FITS_IN_INT64(num)
Definition: c.h:1096
#define unlikely(x)
Definition: c.h:407
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:371
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define _(x)
Definition: elog.c:91
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2522
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2444
#define do_text_output_oneline(tstate, str_to_emit)
Definition: executor.h:628
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:682
bool parse_real(const char *value, double *result, int flags, const char **hintmsg)
Definition: guc.c:2833
#define GUC_UNIT_MS
Definition: guc.h:239
Assert(PointerIsAligned(start, uint64))
#define stmt
Definition: indent_codes.h:59
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:106
#define foreach_node(type, var, lst)
Definition: pg_list.h:496
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:360
bool ActiveSnapshotSet(void)
Definition: snapmgr.c:810
bool HaveRegisteredOrActiveSnapshot(void)
Definition: snapmgr.c:1642
void PopActiveSnapshot(void)
Definition: snapmgr.c:773
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:454
PGPROC * MyProc
Definition: proc.c:67
TransactionId xmin
Definition: proc.h:194
#define InvalidTransactionId
Definition: transam.h:31
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:182
void TupleDescInitEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:842
void ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
Definition: wait.c:32
TupleDesc WaitStmtResultDesc(WaitStmt *stmt)
Definition: wait.c:202
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool PromoteIsTriggered(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition: xlogwait.c:314
WaitLSNResult
Definition: xlogwait.h:26
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition: xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition: xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition: xlogwait.h:27
@ WAIT_LSN_TYPE_REPLAY
Definition: xlogwait.h:38