0

I have a PostgreSQL events table partitioned by event_timestamp:

CREATE TABLE events 
(
    id SERIAL PRIMARY KEY,
    event_timestamp TIMESTAMP NOT NULL,
    processed BOOLEAN DEFAULT FALSE,
    payload JSONB
) PARTITION BY RANGE (event_timestamp);

Currently, a single worker polls and processes events, marking them as processed to avoid reprocessing. The query used is:

SELECT * 
FROM events 
WHERE processed = false 
ORDER BY event_timestamp 
LIMIT 10_000;

To increase throughput, I need multiple workers. However, this risks duplicate processing as workers may select the same events simultaneously.

I'm seeking an efficient strategy to allow multiple workers to process events concurrently without duplicates. The solution should ensure each event is processed exactly once. How can I achieve this in PostgreSQL? Any guidance or examples would be greatly appreciated.

2
  • You can use explicit row locks. Add FOR UPDATE SKIP LOCKED at the end of this select and that's it. Make sure your workers use separate sessions/transactions - some connection pools can be configured to re-use the same session and transaction for different queries, which won't work with this type of locking. Commented Jun 15, 2024 at 11:07
  • What is the bottleneck? Maybe the effort would be better spent making that single worker faster, rather than making more of them. Commented Jun 15, 2024 at 16:18

1 Answer 1

2

You can use explicit row locks. Add FOR UPDATE SKIP LOCKED at the end of this select and that's it:

SELECT * 
FROM events 
WHERE processed = false 
ORDER BY event_timestamp 
LIMIT 10_000
FOR UPDATE SKIP LOCKED;--here

Once worker A reads their 10k rows, they leave them locked FOR UPDATE until they COMMIT or ROLLBACK their transaction. If another worker requests another 10k before then, they'll see the first 10k is locked and they'll skip them thanks to SKIP LOCKED.

Here's a thread where you can find this demo1 showing how workers collide and all grab the same row, and how they begin to skip those locked rows and each ends up going for a different one in demo2, after adding the locks.

Make sure your workers use separate sessions/transactions - some connection pools can be configured to re-use the same session and transaction for different queries, which won't work with this type of locking.


You might want to take a look at NOTIFY/pg_notify() and LISTEN. You can CREATE TRIGGER t1 AFTER INSERT ON events and whenever there's something coming in, the trigger can immediately pg_notify() on the same channel a sleeping client is LISTENing on, to wake it up and make it deploy a worker to process the newly added events.

Sign up to request clarification or add additional context in comments.

6 Comments

pg_notify seems fire and forget. If worker not active, looks data loss. I will try FOR UPDATE SKIP LOCKED. Not sure on performance due to locks.
@Forece85 It's not mutually exclusive with polling. You can get a faster response to each event with notify/listen but still keep a backup client polling the table to screen for events that were left unprocessed by the listeners for some reason. Still, that's just a random remark I thought was worth mentioning.
adding FOR UPDATE SKIP LOCKED changing query plan: Limit -> Sort -> Index Only Scan to Limit -> LockRows -> Sort -> Seq Scan Any other ways to improve this?
What's your table and index definition? In this test I'm seeing only a drop from index-only to a regular index scan.
Reduced generated rows to 1,1e3 made it seq scan. Seems query planner decision due to low rows ? If yes, so in general, adding FOR UPDATE SKIP LOCKED should not be much different in actual load?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.