PostgreSQL Source Code git master
async.c File Reference
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <signal.h>
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
Include dependency graph for async.c:

Go to the source code of this file.

Data Structures

struct  AsyncQueueEntry
 
struct  QueuePosition
 
struct  QueueBackendStatus
 
struct  AsyncQueueControl
 
struct  ListenAction
 
struct  ActionList
 
struct  Notification
 
struct  NotificationList
 
struct  NotificationHash
 

Macros

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
 
#define QUEUEALIGN(len)   INTALIGN(len)
 
#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)
 
#define QUEUE_POS_PAGE(x)   ((x).page)
 
#define QUEUE_POS_OFFSET(x)   ((x).offset)
 
#define SET_QUEUE_POS(x, y, z)
 
#define QUEUE_POS_EQUAL(x, y)    ((x).page == (y).page && (x).offset == (y).offset)
 
#define QUEUE_POS_IS_ZERO(x)    ((x).page == 0 && (x).offset == 0)
 
#define QUEUE_POS_MIN(x, y)
 
#define QUEUE_POS_MAX(x, y)
 
#define QUEUE_CLEANUP_DELAY   4
 
#define QUEUE_HEAD   (asyncQueueControl->head)
 
#define QUEUE_TAIL   (asyncQueueControl->tail)
 
#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)
 
#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)
 
#define QUEUE_BACKEND_PID(i)   (asyncQueueControl->backend[i].pid)
 
#define QUEUE_BACKEND_DBOID(i)   (asyncQueueControl->backend[i].dboid)
 
#define QUEUE_NEXT_LISTENER(i)   (asyncQueueControl->backend[i].nextListener)
 
#define QUEUE_BACKEND_POS(i)   (asyncQueueControl->backend[i].pos)
 
#define NotifyCtl   (&NotifyCtlData)
 
#define QUEUE_PAGESIZE   BLCKSZ
 
#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */
 
#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */
 

Typedefs

typedef struct AsyncQueueEntry AsyncQueueEntry
 
typedef struct QueuePosition QueuePosition
 
typedef struct QueueBackendStatus QueueBackendStatus
 
typedef struct AsyncQueueControl AsyncQueueControl
 
typedef struct ActionList ActionList
 
typedef struct Notification Notification
 
typedef struct NotificationList NotificationList
 

Enumerations

enum  ListenActionKind { LISTEN_LISTEN , LISTEN_UNLISTEN , LISTEN_UNLISTEN_ALL }
 

Functions

static int64 asyncQueuePageDiff (int64 p, int64 q)
 
static bool asyncQueuePagePrecedes (int64 p, int64 q)
 
static void queue_listen (ListenActionKind action, const char *channel)
 
static void Async_UnlistenOnExit (int code, Datum arg)
 
static void Exec_ListenPreCommit (void)
 
static void Exec_ListenCommit (const char *channel)
 
static void Exec_UnlistenCommit (const char *channel)
 
static void Exec_UnlistenAllCommit (void)
 
static bool IsListeningOn (const char *channel)
 
static void asyncQueueUnregister (void)
 
static bool asyncQueueIsFull (void)
 
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
 
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
 
static ListCellasyncQueueAddEntries (ListCell *nextNotify)
 
static double asyncQueueUsage (void)
 
static void asyncQueueFillWarning (void)
 
static void SignalBackends (void)
 
static void asyncQueueReadAllNotifications (void)
 
static bool asyncQueueProcessPageEntries (QueuePosition *current, QueuePosition stop, Snapshot snapshot)
 
static void asyncQueueAdvanceTail (void)
 
static void ProcessIncomingNotify (bool flush)
 
static bool AsyncExistsPendingNotify (Notification *n)
 
static void AddEventToPendingNotifies (Notification *n)
 
static uint32 notification_hash (const void *key, Size keysize)
 
static int notification_match (const void *key1, const void *key2, Size keysize)
 
static void ClearPendingActionsAndNotifies (void)
 
Size AsyncShmemSize (void)
 
void AsyncShmemInit (void)
 
Datum pg_notify (PG_FUNCTION_ARGS)
 
void Async_Notify (const char *channel, const char *payload)
 
void Async_Listen (const char *channel)
 
void Async_Unlisten (const char *channel)
 
void Async_UnlistenAll (void)
 
Datum pg_listening_channels (PG_FUNCTION_ARGS)
 
void AtPrepare_Notify (void)
 
void PreCommit_Notify (void)
 
void AtCommit_Notify (void)
 
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
 
void AtAbort_Notify (void)
 
void AtSubCommit_Notify (void)
 
void AtSubAbort_Notify (void)
 
void HandleNotifyInterrupt (void)
 
void ProcessNotifyInterrupt (bool flush)
 
void AsyncNotifyFreezeXids (TransactionId newFrozenXid)
 
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
 
bool check_notify_buffers (int *newval, void **extra, GucSource source)
 

Variables

static AsyncQueueControlasyncQueueControl
 
static SlruCtlData NotifyCtlData
 
static ListlistenChannels = NIL
 
static ActionListpendingActions = NULL
 
static NotificationListpendingNotifies = NULL
 
volatile sig_atomic_t notifyInterruptPending = false
 
static bool unlistenExitRegistered = false
 
static bool amRegisteredListener = false
 
static bool tryAdvanceTail = false
 
bool Trace_notify = false
 
int max_notify_queue_pages = 1048576
 

Macro Definition Documentation

◆ AsyncQueueEntryEmptySize

#define AsyncQueueEntryEmptySize   (offsetof(AsyncQueueEntry, data) + 2)

Definition at line 189 of file async.c.

◆ MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES   16 /* threshold to build hashtab */

Definition at line 397 of file async.c.

◆ NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)

Definition at line 163 of file async.c.

◆ NotifyCtl

#define NotifyCtl   (&NotifyCtlData)

Definition at line 310 of file async.c.

◆ QUEUE_BACKEND_DBOID

#define QUEUE_BACKEND_DBOID (   i)    (asyncQueueControl->backend[i].dboid)

Definition at line 301 of file async.c.

◆ QUEUE_BACKEND_PID

#define QUEUE_BACKEND_PID (   i)    (asyncQueueControl->backend[i].pid)

Definition at line 300 of file async.c.

◆ QUEUE_BACKEND_POS

#define QUEUE_BACKEND_POS (   i)    (asyncQueueControl->backend[i].pos)

Definition at line 303 of file async.c.

◆ QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY   4

Definition at line 238 of file async.c.

◆ QUEUE_FIRST_LISTENER

#define QUEUE_FIRST_LISTENER   (asyncQueueControl->firstListener)

Definition at line 299 of file async.c.

◆ QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL   5000 /* warn at most once every 5s */

Definition at line 313 of file async.c.

◆ QUEUE_HEAD

#define QUEUE_HEAD   (asyncQueueControl->head)

Definition at line 296 of file async.c.

◆ QUEUE_NEXT_LISTENER

#define QUEUE_NEXT_LISTENER (   i)    (asyncQueueControl->backend[i].nextListener)

Definition at line 302 of file async.c.

◆ QUEUE_PAGESIZE

#define QUEUE_PAGESIZE   BLCKSZ

Definition at line 311 of file async.c.

◆ QUEUE_POS_EQUAL

#define QUEUE_POS_EQUAL (   x,
  y 
)     ((x).page == (y).page && (x).offset == (y).offset)

Definition at line 209 of file async.c.

◆ QUEUE_POS_IS_ZERO

#define QUEUE_POS_IS_ZERO (   x)     ((x).page == 0 && (x).offset == 0)

Definition at line 212 of file async.c.

◆ QUEUE_POS_MAX

#define QUEUE_POS_MAX (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
static bool asyncQueuePagePrecedes(int64 p, int64 q)
Definition: async.c:475
int y
Definition: isn.c:76
int x
Definition: isn.c:75

Definition at line 222 of file async.c.

◆ QUEUE_POS_MIN

#define QUEUE_POS_MIN (   x,
  y 
)
Value:
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))

Definition at line 216 of file async.c.

◆ QUEUE_POS_OFFSET

#define QUEUE_POS_OFFSET (   x)    ((x).offset)

Definition at line 201 of file async.c.

◆ QUEUE_POS_PAGE

#define QUEUE_POS_PAGE (   x)    ((x).page)

Definition at line 200 of file async.c.

◆ QUEUE_STOP_PAGE

#define QUEUE_STOP_PAGE   (asyncQueueControl->stopPage)

Definition at line 298 of file async.c.

◆ QUEUE_TAIL

#define QUEUE_TAIL   (asyncQueueControl->tail)

Definition at line 297 of file async.c.

◆ QUEUEALIGN

#define QUEUEALIGN (   len)    INTALIGN(len)

Definition at line 187 of file async.c.

◆ SET_QUEUE_POS

#define SET_QUEUE_POS (   x,
  y,
 
)
Value:
do { \
(x).page = (y); \
(x).offset = (z); \
} while (0)

Definition at line 203 of file async.c.

Typedef Documentation

◆ ActionList

typedef struct ActionList ActionList

◆ AsyncQueueControl

◆ AsyncQueueEntry

◆ Notification

typedef struct Notification Notification

◆ NotificationList

◆ QueueBackendStatus

◆ QueuePosition

typedef struct QueuePosition QueuePosition

Enumeration Type Documentation

◆ ListenActionKind

Enumerator
LISTEN_LISTEN 
LISTEN_UNLISTEN 
LISTEN_UNLISTEN_ALL 

Definition at line 332 of file async.c.

333{
ListenActionKind
Definition: async.c:333
@ LISTEN_LISTEN
Definition: async.c:334
@ LISTEN_UNLISTEN_ALL
Definition: async.c:336
@ LISTEN_UNLISTEN
Definition: async.c:335

Function Documentation

◆ AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification n)
static

Definition at line 2409 of file async.c.

2410{
2412
2413 /* Create the hash table if it's time to */
2415 pendingNotifies->hashtab == NULL)
2416 {
2417 HASHCTL hash_ctl;
2418 ListCell *l;
2419
2420 /* Create the hash table */
2421 hash_ctl.keysize = sizeof(Notification *);
2422 hash_ctl.entrysize = sizeof(struct NotificationHash);
2423 hash_ctl.hash = notification_hash;
2424 hash_ctl.match = notification_match;
2425 hash_ctl.hcxt = CurTransactionContext;
2427 hash_create("Pending Notifies",
2428 256L,
2429 &hash_ctl,
2431
2432 /* Insert all the already-existing events */
2433 foreach(l, pendingNotifies->events)
2434 {
2435 Notification *oldn = (Notification *) lfirst(l);
2436 bool found;
2437
2439 &oldn,
2440 HASH_ENTER,
2441 &found);
2442 Assert(!found);
2443 }
2444 }
2445
2446 /* Add new event to the list, in order */
2448
2449 /* Add event to the hash table if needed */
2450 if (pendingNotifies->hashtab != NULL)
2451 {
2452 bool found;
2453
2455 &n,
2456 HASH_ENTER,
2457 &found);
2458 Assert(!found);
2459 }
2460}
#define MIN_HASHABLE_NOTIFIES
Definition: async.c:397
static uint32 notification_hash(const void *key, Size keysize)
Definition: async.c:2468
static int notification_match(const void *key1, const void *key2, Size keysize)
Definition: async.c:2482
static NotificationList * pendingNotifies
Definition: async.c:404
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
Assert(PointerIsAligned(start, uint64))
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_COMPARE
Definition: hsearch.h:99
#define HASH_FUNCTION
Definition: hsearch.h:98
List * lappend(List *list, void *datum)
Definition: list.c:339
MemoryContext CurTransactionContext
Definition: mcxt.c:172
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
Size keysize
Definition: hsearch.h:75
HashValueFunc hash
Definition: hsearch.h:78
Size entrysize
Definition: hsearch.h:76
HashCompareFunc match
Definition: hsearch.h:80
MemoryContext hcxt
Definition: hsearch.h:86
HTAB * hashtab
Definition: async.c:393
List * events
Definition: async.c:392

References Assert(), CurTransactionContext, HASHCTL::entrysize, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), notification_match(), and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ Async_Listen()

void Async_Listen ( const char *  channel)

Definition at line 737 of file async.c.

738{
739 if (Trace_notify)
740 elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
741
742 queue_listen(LISTEN_LISTEN, channel);
743}
bool Trace_notify
Definition: async.c:425
static void queue_listen(ListenActionKind action, const char *channel)
Definition: async.c:689
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:226
int MyProcPid
Definition: globals.c:47

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

◆ Async_Notify()

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 590 of file async.c.

591{
592 int my_level = GetCurrentTransactionNestLevel();
593 size_t channel_len;
594 size_t payload_len;
595 Notification *n;
596 MemoryContext oldcontext;
597
598 if (IsParallelWorker())
599 elog(ERROR, "cannot send notifications from a parallel worker");
600
601 if (Trace_notify)
602 elog(DEBUG1, "Async_Notify(%s)", channel);
603
604 channel_len = channel ? strlen(channel) : 0;
605 payload_len = payload ? strlen(payload) : 0;
606
607 /* a channel name must be specified */
608 if (channel_len == 0)
610 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
611 errmsg("channel name cannot be empty")));
612
613 /* enforce length limits */
614 if (channel_len >= NAMEDATALEN)
616 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
617 errmsg("channel name too long")));
618
619 if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
621 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
622 errmsg("payload string too long")));
623
624 /*
625 * We must construct the Notification entry, even if we end up not using
626 * it, in order to compare it cheaply to existing list entries.
627 *
628 * The notification list needs to live until end of transaction, so store
629 * it in the transaction context.
630 */
632
633 n = (Notification *) palloc(offsetof(Notification, data) +
634 channel_len + payload_len + 2);
635 n->channel_len = channel_len;
636 n->payload_len = payload_len;
637 strcpy(n->data, channel);
638 if (payload)
639 strcpy(n->data + channel_len + 1, payload);
640 else
641 n->data[channel_len + 1] = '\0';
642
643 if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
644 {
645 NotificationList *notifies;
646
647 /*
648 * First notify event in current (sub)xact. Note that we allocate the
649 * NotificationList in TopTransactionContext; the nestingLevel might
650 * get changed later by AtSubCommit_Notify.
651 */
652 notifies = (NotificationList *)
654 sizeof(NotificationList));
655 notifies->nestingLevel = my_level;
656 notifies->events = list_make1(n);
657 /* We certainly don't need a hashtable yet */
658 notifies->hashtab = NULL;
659 notifies->upper = pendingNotifies;
660 pendingNotifies = notifies;
661 }
662 else
663 {
664 /* Now check for duplicates */
666 {
667 /* It's a dup, so forget it */
668 pfree(n);
669 MemoryContextSwitchTo(oldcontext);
670 return;
671 }
672
673 /* Append more events to existing list */
675 }
676
677 MemoryContextSwitchTo(oldcontext);
678}
static bool AsyncExistsPendingNotify(Notification *n)
Definition: async.c:2368
static void AddEventToPendingNotifies(Notification *n)
Definition: async.c:2409
#define NOTIFY_PAYLOAD_MAX_LENGTH
Definition: async.c:163
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
#define IsParallelWorker()
Definition: parallel.h:60
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1229
MemoryContext TopTransactionContext
Definition: mcxt.c:171
void pfree(void *pointer)
Definition: mcxt.c:1594
void * palloc(Size size)
Definition: mcxt.c:1365
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define NAMEDATALEN
const void * data
#define list_make1(x1)
Definition: pg_list.h:212
int nestingLevel
Definition: async.c:391
struct NotificationList * upper
Definition: async.c:394
uint16 payload_len
Definition: async.c:384
char data[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:386
uint16 channel_len
Definition: async.c:383
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:930

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

◆ Async_Unlisten()

void Async_Unlisten ( const char *  channel)

Definition at line 751 of file async.c.

752{
753 if (Trace_notify)
754 elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
755
756 /* If we couldn't possibly be listening, no need to queue anything */
758 return;
759
761}
static ActionList * pendingActions
Definition: async.c:352
static bool unlistenExitRegistered
Definition: async.c:416

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

◆ Async_UnlistenAll()

void Async_UnlistenAll ( void  )

Definition at line 769 of file async.c.

770{
771 if (Trace_notify)
772 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
773
774 /* If we couldn't possibly be listening, no need to queue anything */
776 return;
777
779}

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, pendingActions, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

◆ Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int  code,
Datum  arg 
)
static

Definition at line 822 of file async.c.

823{
826}
static void Exec_UnlistenAllCommit(void)
Definition: async.c:1193
static void asyncQueueUnregister(void)
Definition: async.c:1230

References asyncQueueUnregister(), and Exec_UnlistenAllCommit().

Referenced by Exec_ListenPreCommit().

◆ AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification n)
static

Definition at line 2368 of file async.c.

2369{
2370 if (pendingNotifies == NULL)
2371 return false;
2372
2373 if (pendingNotifies->hashtab != NULL)
2374 {
2375 /* Use the hash table to probe for a match */
2377 &n,
2378 HASH_FIND,
2379 NULL))
2380 return true;
2381 }
2382 else
2383 {
2384 /* Must scan the event list */
2385 ListCell *l;
2386
2387 foreach(l, pendingNotifies->events)
2388 {
2389 Notification *oldn = (Notification *) lfirst(l);
2390
2391 if (n->channel_len == oldn->channel_len &&
2392 n->payload_len == oldn->payload_len &&
2393 memcmp(n->data, oldn->data,
2394 n->channel_len + n->payload_len + 2) == 0)
2395 return true;
2396 }
2397 }
2398
2399 return false;
2400}
@ HASH_FIND
Definition: hsearch.h:113

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

◆ AsyncNotifyFreezeXids()

void AsyncNotifyFreezeXids ( TransactionId  newFrozenXid)

Definition at line 2196 of file async.c.

2197{
2198 QueuePosition pos;
2199 QueuePosition head;
2200 int64 curpage = -1;
2201 int slotno = -1;
2202 char *page_buffer = NULL;
2203 bool page_dirty = false;
2204
2205 /*
2206 * Acquire locks in the correct order to avoid deadlocks. As per the
2207 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
2208 * bank locks.
2209 *
2210 * We only need SHARED mode since we're just reading the head/tail
2211 * positions, not modifying them.
2212 */
2213 LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
2214 LWLockAcquire(NotifyQueueLock, LW_SHARED);
2215
2216 pos = QUEUE_TAIL;
2217 head = QUEUE_HEAD;
2218
2219 /* Release NotifyQueueLock early, we only needed to read the positions */
2220 LWLockRelease(NotifyQueueLock);
2221
2222 /*
2223 * Scan the queue from tail to head, freezing XIDs as needed. We hold
2224 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
2225 * we're working.
2226 */
2227 while (!QUEUE_POS_EQUAL(pos, head))
2228 {
2229 AsyncQueueEntry *qe;
2230 TransactionId xid;
2231 int64 pageno = QUEUE_POS_PAGE(pos);
2232 int offset = QUEUE_POS_OFFSET(pos);
2233
2234 /* If we need a different page, release old lock and get new one */
2235 if (pageno != curpage)
2236 {
2237 LWLock *lock;
2238
2239 /* Release previous page if any */
2240 if (slotno >= 0)
2241 {
2242 if (page_dirty)
2243 {
2244 NotifyCtl->shared->page_dirty[slotno] = true;
2245 page_dirty = false;
2246 }
2248 }
2249
2250 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
2252 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
2254 page_buffer = NotifyCtl->shared->page_buffer[slotno];
2255 curpage = pageno;
2256 }
2257
2258 qe = (AsyncQueueEntry *) (page_buffer + offset);
2259 xid = qe->xid;
2260
2261 if (TransactionIdIsNormal(xid) &&
2262 TransactionIdPrecedes(xid, newFrozenXid))
2263 {
2264 if (TransactionIdDidCommit(xid))
2265 {
2267 page_dirty = true;
2268 }
2269 else
2270 {
2272 page_dirty = true;
2273 }
2274 }
2275
2276 /* Advance to next entry */
2277 asyncQueueAdvance(&pos, qe->length);
2278 }
2279
2280 /* Release final page lock if we acquired one */
2281 if (slotno >= 0)
2282 {
2283 if (page_dirty)
2284 NotifyCtl->shared->page_dirty[slotno] = true;
2286 }
2287
2288 LWLockRelease(NotifyQueueTailLock);
2289}
#define QUEUE_POS_OFFSET(x)
Definition: async.c:201
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
Definition: async.c:1286
#define QUEUE_TAIL
Definition: async.c:297
#define QUEUE_POS_PAGE(x)
Definition: async.c:200
#define NotifyCtl
Definition: async.c:310
#define QUEUE_HEAD
Definition: async.c:296
#define QUEUE_POS_EQUAL(x, y)
Definition: async.c:209
int64_t int64
Definition: c.h:540
uint32 TransactionId
Definition: c.h:662
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
@ LW_EXCLUSIVE
Definition: lwlock.h:112
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
Definition: slru.c:527
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
Definition: slru.h:160
TransactionId xid
Definition: async.c:181
Definition: lwlock.h:42
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126
#define FrozenTransactionId
Definition: transam.h:33
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References asyncQueueAdvance(), FrozenTransactionId, InvalidTransactionId, AsyncQueueEntry::length, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_HEAD, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUE_TAIL, SimpleLruGetBankLock(), SimpleLruReadPage(), TransactionIdDidCommit(), TransactionIdIsNormal, TransactionIdPrecedes(), and AsyncQueueEntry::xid.

Referenced by vac_truncate_clog().

◆ asyncQueueAddEntries()

static ListCell * asyncQueueAddEntries ( ListCell nextNotify)
static

Definition at line 1355 of file async.c.

1356{
1357 AsyncQueueEntry qe;
1358 QueuePosition queue_head;
1359 int64 pageno;
1360 int offset;
1361 int slotno;
1362 LWLock *prevlock;
1363
1364 /*
1365 * We work with a local copy of QUEUE_HEAD, which we write back to shared
1366 * memory upon exiting. The reason for this is that if we have to advance
1367 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
1368 * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
1369 * subsequent insertions would try to put entries into a page that slru.c
1370 * thinks doesn't exist yet.) So, use a local position variable. Note
1371 * that if we do fail, any already-inserted queue entries are forgotten;
1372 * this is okay, since they'd be useless anyway after our transaction
1373 * rolls back.
1374 */
1375 queue_head = QUEUE_HEAD;
1376
1377 /*
1378 * If this is the first write since the postmaster started, we need to
1379 * initialize the first page of the async SLRU. Otherwise, the current
1380 * page should be initialized already, so just fetch it.
1381 */
1382 pageno = QUEUE_POS_PAGE(queue_head);
1383 prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
1384
1385 /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
1386 LWLockAcquire(prevlock, LW_EXCLUSIVE);
1387
1388 if (QUEUE_POS_IS_ZERO(queue_head))
1389 slotno = SimpleLruZeroPage(NotifyCtl, pageno);
1390 else
1391 slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
1393
1394 /* Note we mark the page dirty before writing in it */
1395 NotifyCtl->shared->page_dirty[slotno] = true;
1396
1397 while (nextNotify != NULL)
1398 {
1399 Notification *n = (Notification *) lfirst(nextNotify);
1400
1401 /* Construct a valid queue entry in local variable qe */
1403
1404 offset = QUEUE_POS_OFFSET(queue_head);
1405
1406 /* Check whether the entry really fits on the current page */
1407 if (offset + qe.length <= QUEUE_PAGESIZE)
1408 {
1409 /* OK, so advance nextNotify past this item */
1410 nextNotify = lnext(pendingNotifies->events, nextNotify);
1411 }
1412 else
1413 {
1414 /*
1415 * Write a dummy entry to fill up the page. Actually readers will
1416 * only check dboid and since it won't match any reader's database
1417 * OID, they will ignore this entry and move on.
1418 */
1419 qe.length = QUEUE_PAGESIZE - offset;
1420 qe.dboid = InvalidOid;
1422 qe.data[0] = '\0'; /* empty channel */
1423 qe.data[1] = '\0'; /* empty payload */
1424 }
1425
1426 /* Now copy qe into the shared buffer page */
1427 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
1428 &qe,
1429 qe.length);
1430
1431 /* Advance queue_head appropriately, and detect if page is full */
1432 if (asyncQueueAdvance(&(queue_head), qe.length))
1433 {
1434 LWLock *lock;
1435
1436 pageno = QUEUE_POS_PAGE(queue_head);
1437 lock = SimpleLruGetBankLock(NotifyCtl, pageno);
1438 if (lock != prevlock)
1439 {
1440 LWLockRelease(prevlock);
1442 prevlock = lock;
1443 }
1444
1445 /*
1446 * Page is full, so we're done here, but first fill the next page
1447 * with zeroes. The reason to do this is to ensure that slru.c's
1448 * idea of the head page is always the same as ours, which avoids
1449 * boundary problems in SimpleLruTruncate. The test in
1450 * asyncQueueIsFull() ensured that there is room to create this
1451 * page without overrunning the queue.
1452 */
1453 slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
1454
1455 /*
1456 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
1457 * set flag to remember that we should try to advance the tail
1458 * pointer (we don't want to actually do that right here).
1459 */
1460 if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1461 tryAdvanceTail = true;
1462
1463 /* And exit the loop */
1464 break;
1465 }
1466 }
1467
1468 /* Success, so update the global QUEUE_HEAD */
1469 QUEUE_HEAD = queue_head;
1470
1471 LWLockRelease(prevlock);
1472
1473 return nextNotify;
1474}
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
Definition: async.c:1319
static bool tryAdvanceTail
Definition: async.c:422
#define QUEUE_CLEANUP_DELAY
Definition: async.c:238
#define QUEUE_POS_IS_ZERO(x)
Definition: async.c:212
#define QUEUE_PAGESIZE
Definition: async.c:311
static ListCell * lnext(const List *l, const ListCell *c)
Definition: pg_list.h:343
#define InvalidOid
Definition: postgres_ext.h:37
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
Definition: slru.c:375
char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]
Definition: async.c:183

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), tryAdvanceTail, and AsyncQueueEntry::xid.

Referenced by PreCommit_Notify().

◆ asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition position,
int  entryLength 
)
static

Definition at line 1286 of file async.c.

1287{
1288 int64 pageno = QUEUE_POS_PAGE(*position);
1289 int offset = QUEUE_POS_OFFSET(*position);
1290 bool pageJump = false;
1291
1292 /*
1293 * Move to the next writing position: First jump over what we have just
1294 * written or read.
1295 */
1296 offset += entryLength;
1297 Assert(offset <= QUEUE_PAGESIZE);
1298
1299 /*
1300 * In a second step check if another entry can possibly be written to the
1301 * page. If so, stay here, we have reached the next position. If not, then
1302 * we need to move on to the next page.
1303 */
1305 {
1306 pageno++;
1307 offset = 0;
1308 pageJump = true;
1309 }
1310
1311 SET_QUEUE_POS(*position, pageno, offset);
1312 return pageJump;
1313}
#define SET_QUEUE_POS(x, y, z)
Definition: async.c:203
#define AsyncQueueEntryEmptySize
Definition: async.c:189
#define QUEUEALIGN(len)
Definition: async.c:187

References Assert(), AsyncQueueEntryEmptySize, QUEUE_PAGESIZE, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, QUEUEALIGN, and SET_QUEUE_POS.

Referenced by AsyncNotifyFreezeXids(), asyncQueueAddEntries(), and asyncQueueProcessPageEntries().

◆ asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void  )
static

Definition at line 2114 of file async.c.

2115{
2116 QueuePosition min;
2117 int64 oldtailpage;
2118 int64 newtailpage;
2119 int64 boundary;
2120
2121 /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
2122 LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
2123
2124 /*
2125 * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
2126 * (ie, exactly match at least one backend's queue position), so it must
2127 * be updated atomically with the actual computation. Since v13, we could
2128 * get away with not doing it like that, but it seems prudent to keep it
2129 * so.
2130 *
2131 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
2132 * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
2133 * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
2134 * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
2135 * there are pages we can truncate but haven't yet finished doing so.
2136 *
2137 * For concurrency's sake, we don't want to hold NotifyQueueLock while
2138 * performing SimpleLruTruncate. This is OK because no backend will try
2139 * to access the pages we are in the midst of truncating.
2140 */
2141 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2142 min = QUEUE_HEAD;
2144 {
2146 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
2147 }
2148 QUEUE_TAIL = min;
2149 oldtailpage = QUEUE_STOP_PAGE;
2150 LWLockRelease(NotifyQueueLock);
2151
2152 /*
2153 * We can truncate something if the global tail advanced across an SLRU
2154 * segment boundary.
2155 *
2156 * XXX it might be better to truncate only once every several segments, to
2157 * reduce the number of directory scans.
2158 */
2159 newtailpage = QUEUE_POS_PAGE(min);
2160 boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
2161 if (asyncQueuePagePrecedes(oldtailpage, boundary))
2162 {
2163 /*
2164 * SimpleLruTruncate() will ask for SLRU bank locks but will also
2165 * release the lock again.
2166 */
2167 SimpleLruTruncate(NotifyCtl, newtailpage);
2168
2169 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
2170 QUEUE_STOP_PAGE = newtailpage;
2171 LWLockRelease(NotifyQueueLock);
2172 }
2173
2174 LWLockRelease(NotifyQueueTailLock);
2175}
#define QUEUE_FIRST_LISTENER
Definition: async.c:299
#define QUEUE_POS_MIN(x, y)
Definition: async.c:216
#define QUEUE_BACKEND_POS(i)
Definition: async.c:303
#define QUEUE_BACKEND_PID(i)
Definition: async.c:300
#define QUEUE_NEXT_LISTENER(i)
Definition: async.c:302
#define QUEUE_STOP_PAGE
Definition: async.c:298
int i
Definition: isn.c:77
#define InvalidPid
Definition: miscadmin.h:32
#define SLRU_PAGES_PER_SEGMENT
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int ProcNumber
Definition: procnumber.h:24
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Definition: slru.c:1433

References Assert(), asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

◆ asyncQueueFillWarning()

static void asyncQueueFillWarning ( void  )
static

Definition at line 1527 of file async.c.

1528{
1529 double fillDegree;
1530 TimestampTz t;
1531
1532 fillDegree = asyncQueueUsage();
1533 if (fillDegree < 0.5)
1534 return;
1535
1536 t = GetCurrentTimestamp();
1537
1540 {
1542 int32 minPid = InvalidPid;
1543
1545 {
1547 min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
1549 minPid = QUEUE_BACKEND_PID(i);
1550 }
1551
1553 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
1554 (minPid != InvalidPid ?
1555 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
1556 : 0),
1557 (minPid != InvalidPid ?
1558 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
1559 : 0)));
1560
1562 }
1563}
static double asyncQueueUsage(void)
Definition: async.c:1506
static AsyncQueueControl * asyncQueueControl
Definition: async.c:294
#define QUEUE_FULL_WARN_INTERVAL
Definition: async.c:313
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
int32_t int32
Definition: c.h:539
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errhint(const char *fmt,...)
Definition: elog.c:1330
#define WARNING
Definition: elog.h:36
TimestampTz lastQueueFillWarn
Definition: async.c:290

References Assert(), asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

◆ asyncQueueIsFull()

static bool asyncQueueIsFull ( void  )
static

Definition at line 1271 of file async.c.

1272{
1273 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1274 int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1275 int64 occupied = headPage - tailPage;
1276
1277 return occupied >= max_notify_queue_pages;
1278}
int max_notify_queue_pages
Definition: async.c:428

References max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by PreCommit_Notify().

◆ asyncQueueNotificationToEntry()

static void asyncQueueNotificationToEntry ( Notification n,
AsyncQueueEntry qe 
)
static

Definition at line 1319 of file async.c.

1320{
1321 size_t channellen = n->channel_len;
1322 size_t payloadlen = n->payload_len;
1323 int entryLength;
1324
1325 Assert(channellen < NAMEDATALEN);
1326 Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
1327
1328 /* The terminators are already included in AsyncQueueEntryEmptySize */
1329 entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
1330 entryLength = QUEUEALIGN(entryLength);
1331 qe->length = entryLength;
1332 qe->dboid = MyDatabaseId;
1334 qe->srcPid = MyProcPid;
1335 memcpy(qe->data, n->data, channellen + payloadlen + 2);
1336}
Oid MyDatabaseId
Definition: globals.c:94
int32 srcPid
Definition: async.c:182
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:455

References Assert(), AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, AsyncQueueEntry::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

◆ asyncQueuePageDiff()

static int64 asyncQueuePageDiff ( int64  p,
int64  q 
)
inlinestatic

Definition at line 465 of file async.c.

466{
467 return p - q;
468}

Referenced by SignalBackends().

◆ asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64  p,
int64  q 
)
inlinestatic

Definition at line 475 of file async.c.

476{
477 return p < q;
478}

Referenced by asyncQueueAdvanceTail(), and AsyncShmemInit().

◆ asyncQueueProcessPageEntries()

static bool asyncQueueProcessPageEntries ( QueuePosition current,
QueuePosition  stop,
Snapshot  snapshot 
)
static

Definition at line 1977 of file async.c.

1980{
1981 int64 curpage = QUEUE_POS_PAGE(*current);
1982 int slotno;
1983 char *page_buffer;
1984 bool reachedStop = false;
1985 bool reachedEndOfPage;
1986
1987 /*
1988 * We copy the entries into a local buffer to avoid holding the SLRU lock
1989 * while we transmit them to our frontend. The local buffer must be
1990 * adequately aligned.
1991 */
1992 alignas(AsyncQueueEntry) char local_buf[QUEUE_PAGESIZE];
1993 char *local_buf_end = local_buf;
1994
1995 slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
1997 page_buffer = NotifyCtl->shared->page_buffer[slotno];
1998
1999 do
2000 {
2001 QueuePosition thisentry = *current;
2002 AsyncQueueEntry *qe;
2003
2004 if (QUEUE_POS_EQUAL(thisentry, stop))
2005 break;
2006
2007 qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
2008
2009 /*
2010 * Advance *current over this message, possibly to the next page. As
2011 * noted in the comments for asyncQueueReadAllNotifications, we must
2012 * do this before possibly failing while processing the message.
2013 */
2014 reachedEndOfPage = asyncQueueAdvance(current, qe->length);
2015
2016 /* Ignore messages destined for other databases */
2017 if (qe->dboid == MyDatabaseId)
2018 {
2019 if (XidInMVCCSnapshot(qe->xid, snapshot))
2020 {
2021 /*
2022 * The source transaction is still in progress, so we can't
2023 * process this message yet. Break out of the loop, but first
2024 * back up *current so we will reprocess the message next
2025 * time. (Note: it is unlikely but not impossible for
2026 * TransactionIdDidCommit to fail, so we can't really avoid
2027 * this advance-then-back-up behavior when dealing with an
2028 * uncommitted message.)
2029 *
2030 * Note that we must test XidInMVCCSnapshot before we test
2031 * TransactionIdDidCommit, else we might return a message from
2032 * a transaction that is not yet visible to snapshots; compare
2033 * the comments at the head of heapam_visibility.c.
2034 *
2035 * Also, while our own xact won't be listed in the snapshot,
2036 * we need not check for TransactionIdIsCurrentTransactionId
2037 * because our transaction cannot (yet) have queued any
2038 * messages.
2039 */
2040 *current = thisentry;
2041 reachedStop = true;
2042 break;
2043 }
2044
2045 /*
2046 * Quick check for the case that we're not listening on any
2047 * channels, before calling TransactionIdDidCommit(). This makes
2048 * that case a little faster, but more importantly, it ensures
2049 * that if there's a bad entry in the queue for which
2050 * TransactionIdDidCommit() fails for some reason, we can skip
2051 * over it on the first LISTEN in a session, and not get stuck on
2052 * it indefinitely.
2053 */
2054 if (listenChannels == NIL)
2055 continue;
2056
2057 if (TransactionIdDidCommit(qe->xid))
2058 {
2059 memcpy(local_buf_end, qe, qe->length);
2060 local_buf_end += qe->length;
2061 }
2062 else
2063 {
2064 /*
2065 * The source transaction aborted or crashed, so we just
2066 * ignore its notifications.
2067 */
2068 }
2069 }
2070
2071 /* Loop back if we're not at end of page */
2072 } while (!reachedEndOfPage);
2073
2074 /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
2076
2077 /*
2078 * Now that we have let go of the SLRU bank lock, send the notifications
2079 * to our backend
2080 */
2081 Assert(local_buf_end - local_buf <= BLCKSZ);
2082 for (char *p = local_buf; p < local_buf_end;)
2083 {
2084 AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
2085
2086 /* qe->data is the null-terminated channel name */
2087 char *channel = qe->data;
2088
2089 if (IsListeningOn(channel))
2090 {
2091 /* payload follows channel name */
2092 char *payload = qe->data + strlen(channel) + 1;
2093
2094 NotifyMyFrontEnd(channel, payload, qe->srcPid);
2095 }
2096
2097 p += qe->length;
2098 }
2099
2100 if (QUEUE_POS_EQUAL(*current, stop))
2101 reachedStop = true;
2102
2103 return reachedStop;
2104}
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
Definition: async.c:2344
static List * listenChannels
Definition: async.c:320
struct AsyncQueueEntry AsyncQueueEntry
static bool IsListeningOn(const char *channel)
Definition: async.c:1211
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
Definition: slru.c:630
bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
Definition: snapmgr.c:1870

References Assert(), asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, InvalidTransactionId, IsListeningOn(), AsyncQueueEntry::length, listenChannels, LWLockRelease(), MyDatabaseId, NIL, NotifyCtl, NotifyMyFrontEnd(), QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

◆ asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void  )
static

Definition at line 1851 of file async.c.

1852{
1853 QueuePosition pos;
1854 QueuePosition head;
1855 Snapshot snapshot;
1856
1857 /* Fetch current state */
1858 LWLockAcquire(NotifyQueueLock, LW_SHARED);
1859 /* Assert checks that we have a valid state entry */
1862 head = QUEUE_HEAD;
1863 LWLockRelease(NotifyQueueLock);
1864
1865 if (QUEUE_POS_EQUAL(pos, head))
1866 {
1867 /* Nothing to do, we have read all notifications already. */
1868 return;
1869 }
1870
1871 /*----------
1872 * Get snapshot we'll use to decide which xacts are still in progress.
1873 * This is trickier than it might seem, because of race conditions.
1874 * Consider the following example:
1875 *
1876 * Backend 1: Backend 2:
1877 *
1878 * transaction starts
1879 * UPDATE foo SET ...;
1880 * NOTIFY foo;
1881 * commit starts
1882 * queue the notify message
1883 * transaction starts
1884 * LISTEN foo; -- first LISTEN in session
1885 * SELECT * FROM foo WHERE ...;
1886 * commit to clog
1887 * commit starts
1888 * add backend 2 to array of listeners
1889 * advance to queue head (this code)
1890 * commit to clog
1891 *
1892 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
1893 * wasn't committed yet. Ideally we'd ensure that client 2 would
1894 * eventually get transaction 1's notify message, but there's no way
1895 * to do that; until we're in the listener array, there's no guarantee
1896 * that the notify message doesn't get removed from the queue.
1897 *
1898 * Therefore the coding technique transaction 2 is using is unsafe:
1899 * applications must commit a LISTEN before inspecting database state,
1900 * if they want to ensure they will see notifications about subsequent
1901 * changes to that state.
1902 *
1903 * What we do guarantee is that we'll see all notifications from
1904 * transactions committing after the snapshot we take here.
1905 * Exec_ListenPreCommit has already added us to the listener array,
1906 * so no not-yet-committed messages can be removed from the queue
1907 * before we see them.
1908 *----------
1909 */
1910 snapshot = RegisterSnapshot(GetLatestSnapshot());
1911
1912 /*
1913 * It is possible that we fail while trying to send a message to our
1914 * frontend (for example, because of encoding conversion failure). If
1915 * that happens it is critical that we not try to send the same message
1916 * over and over again. Therefore, we set ExitOnAnyError to upgrade any
1917 * ERRORs to FATAL, causing the client connection to be closed on error.
1918 *
1919 * We used to only skip over the offending message and try to soldier on,
1920 * but it was somewhat questionable to lose a notification and give the
1921 * client an ERROR instead. A client application is not be prepared for
1922 * that and can't tell that a notification was missed. It was also not
1923 * very useful in practice because notifications are often processed while
1924 * a connection is idle and reading a message from the client, and in that
1925 * state, any error is upgraded to FATAL anyway. Closing the connection
1926 * is a clear signal to the application that it might have missed
1927 * notifications.
1928 */
1929 {
1930 bool save_ExitOnAnyError = ExitOnAnyError;
1931 bool reachedStop;
1932
1933 ExitOnAnyError = true;
1934
1935 do
1936 {
1937 /*
1938 * Process messages up to the stop position, end of page, or an
1939 * uncommitted message.
1940 *
1941 * Our stop position is what we found to be the head's position
1942 * when we entered this function. It might have changed already.
1943 * But if it has, we will receive (or have already received and
1944 * queued) another signal and come here again.
1945 *
1946 * We are not holding NotifyQueueLock here! The queue can only
1947 * extend beyond the head pointer (see above) and we leave our
1948 * backend's pointer where it is so nobody will truncate or
1949 * rewrite pages under us. Especially we don't want to hold a lock
1950 * while sending the notifications to the frontend.
1951 */
1952 reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
1953 } while (!reachedStop);
1954
1955 /* Update shared state */
1956 LWLockAcquire(NotifyQueueLock, LW_SHARED);
1958 LWLockRelease(NotifyQueueLock);
1959
1960 ExitOnAnyError = save_ExitOnAnyError;
1961 }
1962
1963 /* Done with snapshot */
1964 UnregisterSnapshot(snapshot);
1965}
static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)
Definition: async.c:1977
ProcNumber MyProcNumber
Definition: globals.c:90
bool ExitOnAnyError
Definition: globals.c:123
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:353
void UnregisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:864
Snapshot RegisterSnapshot(Snapshot snapshot)
Definition: snapmgr.c:822

References Assert(), asyncQueueProcessPageEntries(), ExitOnAnyError, GetLatestSnapshot(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_POS_EQUAL, RegisterSnapshot(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), and ProcessIncomingNotify().

◆ asyncQueueUnregister()

static void asyncQueueUnregister ( void  )
static

Definition at line 1230 of file async.c.

1231{
1232 Assert(listenChannels == NIL); /* else caller error */
1233
1234 if (!amRegisteredListener) /* nothing to do */
1235 return;
1236
1237 /*
1238 * Need exclusive lock here to manipulate list links.
1239 */
1240 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1241 /* Mark our entry as invalid */
1244 /* and remove it from the list */
1247 else
1248 {
1250 {
1252 {
1254 break;
1255 }
1256 }
1257 }
1259 LWLockRelease(NotifyQueueLock);
1260
1261 /* mark ourselves as no longer listed in the global array */
1262 amRegisteredListener = false;
1263}
static bool amRegisteredListener
Definition: async.c:419
#define QUEUE_BACKEND_DBOID(i)
Definition: async.c:301

References amRegisteredListener, Assert(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, listenChannels, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

◆ asyncQueueUsage()

static double asyncQueueUsage ( void  )
static

Definition at line 1506 of file async.c.

1507{
1508 int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
1509 int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
1510 int64 occupied = headPage - tailPage;
1511
1512 if (occupied == 0)
1513 return (double) 0; /* fast exit for common case */
1514
1515 return (double) occupied / (double) max_notify_queue_pages;
1516}

References max_notify_queue_pages, QUEUE_HEAD, QUEUE_POS_PAGE, and QUEUE_TAIL.

Referenced by asyncQueueFillWarning(), and pg_notification_queue_usage().

◆ AsyncShmemInit()

void AsyncShmemInit ( void  )

Definition at line 501 of file async.c.

502{
503 bool found;
504 Size size;
505
506 /*
507 * Create or attach to the AsyncQueueControl structure.
508 */
509 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
510 size = add_size(size, offsetof(AsyncQueueControl, backend));
511
513 ShmemInitStruct("Async Queue Control", size, &found);
514
515 if (!found)
516 {
517 /* First time through, so initialize it */
520 QUEUE_STOP_PAGE = 0;
523 for (int i = 0; i < MaxBackends; i++)
524 {
529 }
530 }
531
532 /*
533 * Set up SLRU management of the pg_notify data. Note that long segment
534 * names are used in order to avoid wraparound.
535 */
536 NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
538 "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
539 SYNC_HANDLER_NONE, true);
540
541 if (!found)
542 {
543 /*
544 * During start or reboot, clean out the pg_notify directory.
545 */
547 }
548}
size_t Size
Definition: c.h:615
int MaxBackends
Definition: globals.c:146
int notify_buffers
Definition: globals.c:164
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
Definition: slru.c:252
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
Definition: slru.c:1816
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
Definition: slru.c:1769
@ SYNC_HANDLER_NONE
Definition: sync.h:42

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

◆ AsyncShmemSize()

Size AsyncShmemSize ( void  )

Definition at line 484 of file async.c.

485{
486 Size size;
487
488 /* This had better match AsyncShmemInit */
489 size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
490 size = add_size(size, offsetof(AsyncQueueControl, backend));
491
493
494 return size;
495}
Size SimpleLruShmemSize(int nslots, int nlsns)
Definition: slru.c:198

References add_size(), MaxBackends, mul_size(), notify_buffers, and SimpleLruShmemSize().

Referenced by CalculateShmemSize().

◆ AtAbort_Notify()

void AtAbort_Notify ( void  )

Definition at line 1671 of file async.c.

1672{
1673 /*
1674 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
1675 * we have registered as a listener but have not made any entry in
1676 * listenChannels. In that case, deregister again.
1677 */
1680
1681 /* And clean up */
1683}
static void ClearPendingActionsAndNotifies(void)
Definition: async.c:2498

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), listenChannels, and NIL.

Referenced by AbortTransaction().

◆ AtCommit_Notify()

void AtCommit_Notify ( void  )

Definition at line 967 of file async.c.

968{
969 ListCell *p;
970
971 /*
972 * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
973 * return as soon as possible
974 */
976 return;
977
978 if (Trace_notify)
979 elog(DEBUG1, "AtCommit_Notify");
980
981 /* Perform any pending listen/unlisten actions */
982 if (pendingActions != NULL)
983 {
984 foreach(p, pendingActions->actions)
985 {
986 ListenAction *actrec = (ListenAction *) lfirst(p);
987
988 switch (actrec->action)
989 {
990 case LISTEN_LISTEN:
991 Exec_ListenCommit(actrec->channel);
992 break;
993 case LISTEN_UNLISTEN:
995 break;
998 break;
999 }
1000 }
1001 }
1002
1003 /* If no longer listening to anything, get out of listener array */
1006
1007 /*
1008 * Send signals to listening backends. We need do this only if there are
1009 * pending notifies, which were previously added to the shared queue by
1010 * PreCommit_Notify().
1011 */
1012 if (pendingNotifies != NULL)
1014
1015 /*
1016 * If it's time to try to advance the global tail pointer, do that.
1017 *
1018 * (It might seem odd to do this in the sender, when more than likely the
1019 * listeners won't yet have read the messages we just sent. However,
1020 * there's less contention if only the sender does it, and there is little
1021 * need for urgency in advancing the global tail. So this typically will
1022 * be clearing out messages that were sent some time ago.)
1023 */
1024 if (tryAdvanceTail)
1025 {
1026 tryAdvanceTail = false;
1028 }
1029
1030 /* And clean up */
1032}
static void SignalBackends(void)
Definition: async.c:1581
static void Exec_ListenCommit(const char *channel)
Definition: async.c:1135
static void Exec_UnlistenCommit(const char *channel)
Definition: async.c:1162
static void asyncQueueAdvanceTail(void)
Definition: async.c:2114
List * actions
Definition: async.c:348
char channel[FLEXIBLE_ARRAY_MEMBER]
Definition: async.c:342
ListenActionKind action
Definition: async.c:341

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, listenChannels, NIL, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

◆ AtPrepare_Notify()

void AtPrepare_Notify ( void  )

Definition at line 835 of file async.c.

836{
837 /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
840 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
841 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
842}

References ereport, errcode(), errmsg(), ERROR, pendingActions, and pendingNotifies.

Referenced by PrepareTransaction().

◆ AtSubAbort_Notify()

void AtSubAbort_Notify ( void  )

Definition at line 1761 of file async.c.

1762{
1763 int my_level = GetCurrentTransactionNestLevel();
1764
1765 /*
1766 * All we have to do is pop the stack --- the actions/notifies made in
1767 * this subxact are no longer interesting, and the space will be freed
1768 * when CurTransactionContext is recycled. We still have to free the
1769 * ActionList and NotificationList objects themselves, though, because
1770 * those are allocated in TopTransactionContext.
1771 *
1772 * Note that there might be no entries at all, or no entries for the
1773 * current subtransaction level, either because none were ever created, or
1774 * because we reentered this routine due to trouble during subxact abort.
1775 */
1776 while (pendingActions != NULL &&
1777 pendingActions->nestingLevel >= my_level)
1778 {
1779 ActionList *childPendingActions = pendingActions;
1780
1782 pfree(childPendingActions);
1783 }
1784
1785 while (pendingNotifies != NULL &&
1786 pendingNotifies->nestingLevel >= my_level)
1787 {
1788 NotificationList *childPendingNotifies = pendingNotifies;
1789
1791 pfree(childPendingNotifies);
1792 }
1793}
int nestingLevel
Definition: async.c:347
struct ActionList * upper
Definition: async.c:349

References GetCurrentTransactionNestLevel(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by AbortSubTransaction().

◆ AtSubCommit_Notify()

void AtSubCommit_Notify ( void  )

Definition at line 1691 of file async.c.

1692{
1693 int my_level = GetCurrentTransactionNestLevel();
1694
1695 /* If there are actions at our nesting level, we must reparent them. */
1696 if (pendingActions != NULL &&
1697 pendingActions->nestingLevel >= my_level)
1698 {
1699 if (pendingActions->upper == NULL ||
1700 pendingActions->upper->nestingLevel < my_level - 1)
1701 {
1702 /* nothing to merge; give the whole thing to the parent */
1704 }
1705 else
1706 {
1707 ActionList *childPendingActions = pendingActions;
1708
1710
1711 /*
1712 * Mustn't try to eliminate duplicates here --- see queue_listen()
1713 */
1716 childPendingActions->actions);
1717 pfree(childPendingActions);
1718 }
1719 }
1720
1721 /* If there are notifies at our nesting level, we must reparent them. */
1722 if (pendingNotifies != NULL &&
1723 pendingNotifies->nestingLevel >= my_level)
1724 {
1725 Assert(pendingNotifies->nestingLevel == my_level);
1726
1727 if (pendingNotifies->upper == NULL ||
1728 pendingNotifies->upper->nestingLevel < my_level - 1)
1729 {
1730 /* nothing to merge; give the whole thing to the parent */
1732 }
1733 else
1734 {
1735 /*
1736 * Formerly, we didn't bother to eliminate duplicates here, but
1737 * now we must, else we fall foul of "Assert(!found)", either here
1738 * or during a later attempt to build the parent-level hashtable.
1739 */
1740 NotificationList *childPendingNotifies = pendingNotifies;
1741 ListCell *l;
1742
1744 /* Insert all the subxact's events into parent, except for dups */
1745 foreach(l, childPendingNotifies->events)
1746 {
1747 Notification *childn = (Notification *) lfirst(l);
1748
1749 if (!AsyncExistsPendingNotify(childn))
1751 }
1752 pfree(childPendingNotifies);
1753 }
1754 }
1755}
List * list_concat(List *list1, const List *list2)
Definition: list.c:561

References ActionList::actions, AddEventToPendingNotifies(), Assert(), AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

◆ check_notify_buffers()

bool check_notify_buffers ( int *  newval,
void **  extra,
GucSource  source 
)

Definition at line 2514 of file async.c.

2515{
2516 return check_slru_buffers("notify_buffers", newval);
2517}
#define newval
bool check_slru_buffers(const char *name, int *newval)
Definition: slru.c:355

References check_slru_buffers(), and newval.

◆ ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void  )
static

Definition at line 2498 of file async.c.

2499{
2500 /*
2501 * Everything's allocated in either TopTransactionContext or the context
2502 * for the subtransaction to which it corresponds. So, there's nothing to
2503 * do here except reset the pointers; the space will be reclaimed when the
2504 * contexts are deleted.
2505 */
2506 pendingActions = NULL;
2507 pendingNotifies = NULL;
2508}

References pendingActions, and pendingNotifies.

Referenced by AtAbort_Notify(), and AtCommit_Notify().

◆ Exec_ListenCommit()

static void Exec_ListenCommit ( const char *  channel)
static

Definition at line 1135 of file async.c.

1136{
1137 MemoryContext oldcontext;
1138
1139 /* Do nothing if we are already listening on this channel */
1140 if (IsListeningOn(channel))
1141 return;
1142
1143 /*
1144 * Add the new channel name to listenChannels.
1145 *
1146 * XXX It is theoretically possible to get an out-of-memory failure here,
1147 * which would be bad because we already committed. For the moment it
1148 * doesn't seem worth trying to guard against that, but maybe improve this
1149 * later.
1150 */
1153 MemoryContextSwitchTo(oldcontext);
1154}
char * pstrdup(const char *in)
Definition: mcxt.c:1759
MemoryContext TopMemoryContext
Definition: mcxt.c:166

References IsListeningOn(), lappend(), listenChannels, MemoryContextSwitchTo(), pstrdup(), and TopMemoryContext.

Referenced by AtCommit_Notify().

◆ Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void  )
static

Definition at line 1040 of file async.c.

1041{
1042 QueuePosition head;
1043 QueuePosition max;
1044 ProcNumber prevListener;
1045
1046 /*
1047 * Nothing to do if we are already listening to something, nor if we
1048 * already ran this routine in this transaction.
1049 */
1051 return;
1052
1053 if (Trace_notify)
1054 elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
1055
1056 /*
1057 * Before registering, make sure we will unlisten before dying. (Note:
1058 * this action does not get undone if we abort later.)
1059 */
1061 {
1064 }
1065
1066 /*
1067 * This is our first LISTEN, so establish our pointer.
1068 *
1069 * We set our pointer to the global tail pointer and then move it forward
1070 * over already-committed notifications. This ensures we cannot miss any
1071 * not-yet-committed notifications. We might get a few more but that
1072 * doesn't hurt.
1073 *
1074 * In some scenarios there might be a lot of committed notifications that
1075 * have not yet been pruned away (because some backend is being lazy about
1076 * reading them). To reduce our startup time, we can look at other
1077 * backends and adopt the maximum "pos" pointer of any backend that's in
1078 * our database; any notifications it's already advanced over are surely
1079 * committed and need not be re-examined by us. (We must consider only
1080 * backends connected to our DB, because others will not have bothered to
1081 * check committed-ness of notifications in our DB.)
1082 *
1083 * We need exclusive lock here so we can look at other backends' entries
1084 * and manipulate the list links.
1085 */
1086 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1087 head = QUEUE_HEAD;
1088 max = QUEUE_TAIL;
1089 prevListener = INVALID_PROC_NUMBER;
1091 {
1093 max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
1094 /* Also find last listening backend before this one */
1095 if (i < MyProcNumber)
1096 prevListener = i;
1097 }
1101 /* Insert backend into list of listeners at correct position */
1102 if (prevListener != INVALID_PROC_NUMBER)
1103 {
1105 QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
1106 }
1107 else
1108 {
1111 }
1112 LWLockRelease(NotifyQueueLock);
1113
1114 /* Now we are listed in the global array, so remember we're listening */
1115 amRegisteredListener = true;
1116
1117 /*
1118 * Try to move our pointer forward as far as possible. This will skip
1119 * over already-committed notifications, which we want to do because they
1120 * might be quite stale. Note that we are not yet listening on anything,
1121 * so we won't deliver such notifications to our frontend. Also, although
1122 * our transaction might have executed NOTIFY, those message(s) aren't
1123 * queued yet so we won't skip them here.
1124 */
1125 if (!QUEUE_POS_EQUAL(max, head))
1127}
#define QUEUE_POS_MAX(x, y)
Definition: async.c:222
static void asyncQueueReadAllNotifications(void)
Definition: async.c:1851
static void Async_UnlistenOnExit(int code, Datum arg)
Definition: async.c:822
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

◆ Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void  )
static

Definition at line 1193 of file async.c.

1194{
1195 if (Trace_notify)
1196 elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
1197
1200}
void list_free_deep(List *list)
Definition: list.c:1560

References DEBUG1, elog, list_free_deep(), listenChannels, MyProcPid, NIL, and Trace_notify.

Referenced by Async_UnlistenOnExit(), and AtCommit_Notify().

◆ Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char *  channel)
static

Definition at line 1162 of file async.c.

1163{
1164 ListCell *q;
1165
1166 if (Trace_notify)
1167 elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
1168
1169 foreach(q, listenChannels)
1170 {
1171 char *lchan = (char *) lfirst(q);
1172
1173 if (strcmp(lchan, channel) == 0)
1174 {
1176 pfree(lchan);
1177 break;
1178 }
1179 }
1180
1181 /*
1182 * We do not complain about unlistening something not being listened;
1183 * should we?
1184 */
1185}
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391

References DEBUG1, elog, foreach_delete_current, lfirst, listenChannels, MyProcPid, pfree(), and Trace_notify.

Referenced by AtCommit_Notify().

◆ HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void  )

Definition at line 1804 of file async.c.

1805{
1806 /*
1807 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
1808 * you do here.
1809 */
1810
1811 /* signal that work needs to be done */
1813
1814 /* make sure the event is processed in due course */
1816}
volatile sig_atomic_t notifyInterruptPending
Definition: async.c:413
struct Latch * MyLatch
Definition: globals.c:63
void SetLatch(Latch *latch)
Definition: latch.c:290

References MyLatch, notifyInterruptPending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

◆ IsListeningOn()

static bool IsListeningOn ( const char *  channel)
static

Definition at line 1211 of file async.c.

1212{
1213 ListCell *p;
1214
1215 foreach(p, listenChannels)
1216 {
1217 char *lchan = (char *) lfirst(p);
1218
1219 if (strcmp(lchan, channel) == 0)
1220 return true;
1221 }
1222 return false;
1223}

References lfirst, and listenChannels.

Referenced by asyncQueueProcessPageEntries(), and Exec_ListenCommit().

◆ notification_hash()

static uint32 notification_hash ( const void *  key,
Size  keysize 
)
static

Definition at line 2468 of file async.c.

2469{
2470 const Notification *k = *(const Notification *const *) key;
2471
2472 Assert(keysize == sizeof(Notification *));
2473 /* We don't bother to include the payload's trailing null in the hash */
2474 return DatumGetUInt32(hash_any((const unsigned char *) k->data,
2475 k->channel_len + k->payload_len + 1));
2476}
static Datum hash_any(const unsigned char *k, int keylen)
Definition: hashfn.h:31
static uint32 DatumGetUInt32(Datum X)
Definition: postgres.h:232

References Assert(), Notification::channel_len, Notification::data, DatumGetUInt32(), hash_any(), sort-test::key, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ notification_match()

static int notification_match ( const void *  key1,
const void *  key2,
Size  keysize 
)
static

Definition at line 2482 of file async.c.

2483{
2484 const Notification *k1 = *(const Notification *const *) key1;
2485 const Notification *k2 = *(const Notification *const *) key2;
2486
2487 Assert(keysize == sizeof(Notification *));
2488 if (k1->channel_len == k2->channel_len &&
2489 k1->payload_len == k2->payload_len &&
2490 memcmp(k1->data, k2->data,
2491 k1->channel_len + k1->payload_len + 2) == 0)
2492 return 0; /* equal */
2493 return 1; /* not equal */
2494}

References Assert(), Notification::channel_len, Notification::data, and Notification::payload_len.

Referenced by AddEventToPendingNotifies().

◆ NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char *  channel,
const char *  payload,
int32  srcPid 
)

Definition at line 2344 of file async.c.

2345{
2347 {
2349
2351 pq_sendint32(&buf, srcPid);
2352 pq_sendstring(&buf, channel);
2353 pq_sendstring(&buf, payload);
2355
2356 /*
2357 * NOTE: we do not do pq_flush() here. Some level of caller will
2358 * handle it later, allowing this message to be combined into a packet
2359 * with other ones.
2360 */
2361 }
2362 else
2363 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
2364}
@ DestRemote
Definition: dest.h:89
#define INFO
Definition: elog.h:34
static char * buf
Definition: pg_test_fsync.c:72
CommandDest whereToSendOutput
Definition: postgres.c:92
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:195
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
#define PqMsg_NotificationResponse
Definition: protocol.h:41

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and ProcessParallelMessage().

◆ pg_listening_channels()

Datum pg_listening_channels ( PG_FUNCTION_ARGS  )

Definition at line 789 of file async.c.

790{
791 FuncCallContext *funcctx;
792
793 /* stuff done only on the first call of the function */
794 if (SRF_IS_FIRSTCALL())
795 {
796 /* create a function context for cross-call persistence */
797 funcctx = SRF_FIRSTCALL_INIT();
798 }
799
800 /* stuff done on every call of the function */
801 funcctx = SRF_PERCALL_SETUP();
802
803 if (funcctx->call_cntr < list_length(listenChannels))
804 {
805 char *channel = (char *) list_nth(listenChannels,
806 funcctx->call_cntr);
807
808 SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
809 }
810
811 SRF_RETURN_DONE(funcctx);
812}
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:304
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:308
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:310
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:306
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:328
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
uint64 call_cntr
Definition: funcapi.h:65

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), listenChannels, SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

◆ pg_notification_queue_usage()

Datum pg_notification_queue_usage ( PG_FUNCTION_ARGS  )

Definition at line 1481 of file async.c.

1482{
1483 double usage;
1484
1485 /* Advance the queue tail so we don't report a too-large result */
1487
1488 LWLockAcquire(NotifyQueueLock, LW_SHARED);
1490 LWLockRelease(NotifyQueueLock);
1491
1493}
#define PG_RETURN_FLOAT8(x)
Definition: fmgr.h:367
static void usage(const char *progname)
Definition: vacuumlo.c:414

References asyncQueueAdvanceTail(), asyncQueueUsage(), LW_SHARED, LWLockAcquire(), LWLockRelease(), PG_RETURN_FLOAT8, and usage().

◆ pg_notify()

Datum pg_notify ( PG_FUNCTION_ARGS  )

Definition at line 556 of file async.c.

557{
558 const char *channel;
559 const char *payload;
560
561 if (PG_ARGISNULL(0))
562 channel = "";
563 else
565
566 if (PG_ARGISNULL(1))
567 payload = "";
568 else
570
571 /* For NOTIFY as a statement, this is checked in ProcessUtility */
573
574 Async_Notify(channel, payload);
575
577}
void Async_Notify(const char *channel, const char *payload)
Definition: async.c:590
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void PreventCommandDuringRecovery(const char *cmdname)
Definition: utility.c:443
char * text_to_cstring(const text *t)
Definition: varlena.c:214

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

◆ PreCommit_Notify()

void PreCommit_Notify ( void  )

Definition at line 860 of file async.c.

861{
862 ListCell *p;
863
865 return; /* no relevant statements in this xact */
866
867 if (Trace_notify)
868 elog(DEBUG1, "PreCommit_Notify");
869
870 /* Preflight for any pending listen/unlisten actions */
871 if (pendingActions != NULL)
872 {
873 foreach(p, pendingActions->actions)
874 {
875 ListenAction *actrec = (ListenAction *) lfirst(p);
876
877 switch (actrec->action)
878 {
879 case LISTEN_LISTEN:
881 break;
882 case LISTEN_UNLISTEN:
883 /* there is no Exec_UnlistenPreCommit() */
884 break;
886 /* there is no Exec_UnlistenAllPreCommit() */
887 break;
888 }
889 }
890 }
891
892 /* Queue any pending notifies (must happen after the above) */
893 if (pendingNotifies)
894 {
895 ListCell *nextNotify;
896
897 /*
898 * Make sure that we have an XID assigned to the current transaction.
899 * GetCurrentTransactionId is cheap if we already have an XID, but not
900 * so cheap if we don't, and we'd prefer not to do that work while
901 * holding NotifyQueueLock.
902 */
904
905 /*
906 * Serialize writers by acquiring a special lock that we hold till
907 * after commit. This ensures that queue entries appear in commit
908 * order, and in particular that there are never uncommitted queue
909 * entries ahead of committed ones, so an uncommitted transaction
910 * can't block delivery of deliverable notifications.
911 *
912 * We use a heavyweight lock so that it'll automatically be released
913 * after either commit or abort. This also allows deadlocks to be
914 * detected, though really a deadlock shouldn't be possible here.
915 *
916 * The lock is on "database 0", which is pretty ugly but it doesn't
917 * seem worth inventing a special locktag category just for this.
918 * (Historical note: before PG 9.0, a similar lock on "database 0" was
919 * used by the flatfiles mechanism.)
920 */
921 LockSharedObject(DatabaseRelationId, InvalidOid, 0,
923
924 /* Now push the notifications into the queue */
925 nextNotify = list_head(pendingNotifies->events);
926 while (nextNotify != NULL)
927 {
928 /*
929 * Add the pending notifications to the queue. We acquire and
930 * release NotifyQueueLock once per page, which might be overkill
931 * but it does allow readers to get in while we're doing this.
932 *
933 * A full queue is very uncommon and should really not happen,
934 * given that we have so much space available in the SLRU pages.
935 * Nevertheless we need to deal with this possibility. Note that
936 * when we get here we are in the process of committing our
937 * transaction, but we have not yet committed to clog, so at this
938 * point in time we can still roll the transaction back.
939 */
940 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
942 if (asyncQueueIsFull())
944 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
945 errmsg("too many notifications in the NOTIFY queue")));
946 nextNotify = asyncQueueAddEntries(nextNotify);
947 LWLockRelease(NotifyQueueLock);
948 }
949
950 /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
951 }
952}
static void Exec_ListenPreCommit(void)
Definition: async.c:1040
static ListCell * asyncQueueAddEntries(ListCell *nextNotify)
Definition: async.c:1355
static void asyncQueueFillWarning(void)
Definition: async.c:1527
static bool asyncQueueIsFull(void)
Definition: async.c:1271
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
#define AccessExclusiveLock
Definition: lockdefs.h:43
static ListCell * list_head(const List *l)
Definition: pg_list.h:128

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pendingActions, pendingNotifies, and Trace_notify.

Referenced by CommitTransaction().

◆ ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool  flush)
static

Definition at line 2303 of file async.c.

2304{
2305 /* We *must* reset the flag */
2306 notifyInterruptPending = false;
2307
2308 /* Do nothing else if we aren't actively listening */
2309 if (listenChannels == NIL)
2310 return;
2311
2312 if (Trace_notify)
2313 elog(DEBUG1, "ProcessIncomingNotify");
2314
2315 set_ps_display("notify interrupt");
2316
2317 /*
2318 * We must run asyncQueueReadAllNotifications inside a transaction, else
2319 * bad things happen if it gets an error.
2320 */
2322
2324
2326
2327 /*
2328 * If this isn't an end-of-command case, we must flush the notify messages
2329 * to ensure frontend gets them promptly.
2330 */
2331 if (flush)
2332 pq_flush();
2333
2334 set_ps_display("idle");
2335
2336 if (Trace_notify)
2337 elog(DEBUG1, "ProcessIncomingNotify: done");
2338}
#define pq_flush()
Definition: libpq.h:46
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
void StartTransactionCommand(void)
Definition: xact.c:3077
void CommitTransactionCommand(void)
Definition: xact.c:3175

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, listenChannels, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

◆ ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool  flush)

Definition at line 1834 of file async.c.

1835{
1837 return; /* not really idle */
1838
1839 /* Loop in case another signal arrives while sending messages */
1841 ProcessIncomingNotify(flush);
1842}
static void ProcessIncomingNotify(bool flush)
Definition: async.c:2303
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5007

References IsTransactionOrTransactionBlock(), notifyInterruptPending, and ProcessIncomingNotify().

Referenced by PostgresMain(), and ProcessClientReadInterrupt().

◆ queue_listen()

static void queue_listen ( ListenActionKind  action,
const char *  channel 
)
static

Definition at line 689 of file async.c.

690{
691 MemoryContext oldcontext;
692 ListenAction *actrec;
693 int my_level = GetCurrentTransactionNestLevel();
694
695 /*
696 * Unlike Async_Notify, we don't try to collapse out duplicates. It would
697 * be too complicated to ensure we get the right interactions of
698 * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
699 * would be any performance benefit anyway in sane applications.
700 */
702
703 /* space for terminating null is included in sizeof(ListenAction) */
704 actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
705 strlen(channel) + 1);
706 actrec->action = action;
707 strcpy(actrec->channel, channel);
708
709 if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
710 {
711 ActionList *actions;
712
713 /*
714 * First action in current sub(xact). Note that we allocate the
715 * ActionList in TopTransactionContext; the nestingLevel might get
716 * changed later by AtSubCommit_Notify.
717 */
718 actions = (ActionList *)
720 actions->nestingLevel = my_level;
721 actions->actions = list_make1(actrec);
722 actions->upper = pendingActions;
723 pendingActions = actions;
724 }
725 else
727
728 MemoryContextSwitchTo(oldcontext);
729}

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

◆ SignalBackends()

static void SignalBackends ( void  )
static

Definition at line 1581 of file async.c.

1582{
1583 int32 *pids;
1584 ProcNumber *procnos;
1585 int count;
1586
1587 /*
1588 * Identify backends that we need to signal. We don't want to send
1589 * signals while holding the NotifyQueueLock, so this loop just builds a
1590 * list of target PIDs.
1591 *
1592 * XXX in principle these pallocs could fail, which would be bad. Maybe
1593 * preallocate the arrays? They're not that large, though.
1594 */
1595 pids = (int32 *) palloc(MaxBackends * sizeof(int32));
1596 procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
1597 count = 0;
1598
1599 LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
1601 {
1602 int32 pid = QUEUE_BACKEND_PID(i);
1603 QueuePosition pos;
1604
1605 Assert(pid != InvalidPid);
1606 pos = QUEUE_BACKEND_POS(i);
1608 {
1609 /*
1610 * Always signal listeners in our own database, unless they're
1611 * already caught up (unlikely, but possible).
1612 */
1613 if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
1614 continue;
1615 }
1616 else
1617 {
1618 /*
1619 * Listeners in other databases should be signaled only if they
1620 * are far behind.
1621 */
1624 continue;
1625 }
1626 /* OK, need to signal this one */
1627 pids[count] = pid;
1628 procnos[count] = i;
1629 count++;
1630 }
1631 LWLockRelease(NotifyQueueLock);
1632
1633 /* Now send signals */
1634 for (int i = 0; i < count; i++)
1635 {
1636 int32 pid = pids[i];
1637
1638 /*
1639 * If we are signaling our own process, no need to involve the kernel;
1640 * just set the flag directly.
1641 */
1642 if (pid == MyProcPid)
1643 {
1645 continue;
1646 }
1647
1648 /*
1649 * Note: assuming things aren't broken, a signal failure here could
1650 * only occur if the target backend exited since we released
1651 * NotifyQueueLock; which is unlikely but certainly possible. So we
1652 * just log a low-level debug message if it happens.
1653 */
1654 if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
1655 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
1656 }
1657
1658 pfree(pids);
1659 pfree(procnos);
1660}
static int64 asyncQueuePageDiff(int64 p, int64 q)
Definition: async.c:465
#define DEBUG3
Definition: elog.h:28
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_NOTIFY_INTERRUPT
Definition: procsignal.h:33

References Assert(), asyncQueuePageDiff(), DEBUG3, elog, i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, notifyInterruptPending, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by AtCommit_Notify().

Variable Documentation

◆ amRegisteredListener

bool amRegisteredListener = false
static

◆ asyncQueueControl

AsyncQueueControl* asyncQueueControl
static

Definition at line 294 of file async.c.

Referenced by asyncQueueFillWarning(), and AsyncShmemInit().

◆ listenChannels

◆ max_notify_queue_pages

int max_notify_queue_pages = 1048576

Definition at line 428 of file async.c.

Referenced by asyncQueueIsFull(), and asyncQueueUsage().

◆ NotifyCtlData

SlruCtlData NotifyCtlData
static

Definition at line 308 of file async.c.

◆ notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

◆ pendingActions

◆ pendingNotifies

◆ Trace_notify

◆ tryAdvanceTail

bool tryAdvanceTail = false
static

Definition at line 422 of file async.c.

Referenced by asyncQueueAddEntries(), and AtCommit_Notify().

◆ unlistenExitRegistered

bool unlistenExitRegistered = false
static

Definition at line 416 of file async.c.

Referenced by Async_Unlisten(), Async_UnlistenAll(), and Exec_ListenPreCommit().