2

I have an app written in Go (I don't think the programming language is relevant though...) that should act as a scheduler. It connects to a PostgreSQL database and calculates at what time is the next run for some tasks. The app sleeps until it's time to run the next task and then runs it (it actually publishes a message in a queue and another app is in charge with running the task).

In order to reduce the risk of not running a scheduled task at a given time, this app should be run on multiple nodes and only one node should publish the message to run the task.

The table in the DB has these two columns:

id serial
next_run int4

When the app starts it performs this query:

SELECT * FROM task;

After this it calculates all the times at which the tasks should be run. When it's time to run the first task, all nodes calculate the following next run and they perform this query:

UPDATE task SET next_run = ${new_next_run} WHERE id = ${id} AND next_run = ${current_next_run};

If the number of affected rows is 1, it means the node was the first one to reach the database and it is the one that will publish the message to run the task. In theory this should work and I even created a small prototype to test this produces the expected result.

In practice, there is a scenario in which this logic fails. When the app starts, it checks for all tasks and calculates their next runs. If the next task should run in less than 3 seconds (approx.), when the update query is performed, it doesn't check the "where" clause against the real database, but against what was saved in the cache. Or at least this is what I suppose happens...

If the first task to run is in more than 3 seconds, the tests pass:

T: node 1 -> rows affected = 1; publish message
T+100 milliseconds: node 2 -> rows affected = 0; don't publish message
T+130 milliseconds: node 3 -> rows affected = 0; don't publish message
..........
T+300 milliseconds: node n -> rows affected = 0; don't publish message

If the first task to run is in less than 3 seconds, the tests fail and my log looks like this:

T: node 1 -> rows affected = 1; publish message
T+100 milliseconds: node 2 -> rows affected = 1; publish message
T+130 milliseconds: node 3 -> rows affected = 0; don't publish message
T+180 milliseconds: node 4 -> rows affected = 1; publish message
T+200 milliseconds: node 5 -> rows affected = 1; publish message
T+220 milliseconds: node 6 -> rows affected = 0; don't publish message
..........
T+300 milliseconds: node n -> rows affected = 0; don't publish message

The only explanation making sense is that the UPDATE is performed using the cache, not the real database and that's why it gives unpredictable results. But I know in PostgreSQL there's no way to flush the cache using a query, so I come back to the question in the title: How to query PostgreSQL real database, not its cache?

Later edit:

The pseudocode:

start program
declare global tasks variable

read all tasks from DB
    SELECT * FROM task
for each task in tasks
    calculate next_run and save it in the global tasks variable
    if calculated next_run != next_run from database then
        update next_run in database
            UPDATE task SET next_run = ${next_run} WHERE id = ${id}
    endif
endfor

for true
    sort tasks by next_run asc
    extract first task from tasks
    if next_run > now
        sleep (next_run - now) seconds
    endif

    read a fresh instance of task from DB
        SELECT * FROM task WHERE id = ${id}
    calculate subsequent next_run
    update subsequent next_run in tasks
    update subsequent next_run in DB
        BEGIN;
        SELECT id FROM task WHERE id = ${id} AND next_run = ${current_next_run} FOR UPDATE;
        UPDATE task SET next_run = ${subsequent_next_run} WHERE id = ${id};
        COMMIT;
    if transaction succeeded then // rows in result set and rows affected
        run the task
    endif
endfor
2
  • Do you disconnect before sleeping? If not, what is the isolation level used by your connections? Commented Feb 5, 2021 at 8:16
  • Yes, I've tried with a disconnect before sleeping. I also tried to disconnect & reconnect right before the update, but it didn't help. Regarding the isolation level, I've used the default one (read committed), repeatable read and serializable. Commented Feb 5, 2021 at 8:46

2 Answers 2

1

All data modifications are performed only in the cache (shared buffers). Later, dirty pages from the cache are persisted to disk. The upshot is that the cache always contains the latest data, it can never be "stale".

So you have to look for another explanation, perhaps in your code.

Looking at your code, I see two oddities:

  • In the beginning, before the endless loop, you sometimes update next_run of task. Couldn't that lead to strangeness if either other threads have a different idea of what calculated next_run is or if that first update interferes with the processing in the endless loop of another thread?

  • In the database transaction in the endless loop, you use WHERE next_run = ${current_next_run} in the SELECT ... FOR UPDATE, but the condition is missing from the actual UPDATE, so the latter could modify a row even if its next_run has been modified by a concurrent UPDATE.

By the way, there is no need to do something like

BEGIN;
SELECT ... WHERE xy FOR UPDATE;
UPDATE ... WHERE xy;
COMMIT;

The UPDATE alone would do the same thing, since an UPDATE is always atomic in PostgreSQL: the value read is guaranteed to be the most recent visible version at the time of the data modification.

Sign up to request clarification or add additional context in comments.

6 Comments

But do all processes access the "same" cache or is there a session cache that each process uses? The code is quite straightforward: - query the database for all tasks - calculate next run for all tasks - if the calculated first task is different than the one in the db, update it - sleep until next run - perform the update to see if any rows are affected - if rows_affected is 1, publish message The tests pass when the app sleeps for more than 3 seconds.
There is only one cache, and I can assure you that your theory is wrong. I'd like to find the race condition in your algorithm, but the description is too vague. Could you phrase the algorithm as pseudo-code and SQL statements?
I've added the pseudocode and all the queries in the question.
1. I will refactor the first update to be performed in a transaction and only if wasn't updated. This way it won't interfere with what's in the infinite loop. 2. I initially used only the update statement with the conditiion (next_run = ${current_next_run}) and I've changed to use select for update as @bahman-movaqar advised. If I use only the update with condition, should it run in serializable isolation level?
The transaction isolation level shouldn't make a difference, but perhaps it would be an interesting experiment to use a different one and see if you get serialization errors. You shouldn't.
|
1

As laurenz-albe mentioned there's no stale cache in PG.

Are you running your select and update inside a transaction?

  • If no, that most certainly explains the behaviour.
  • If yes, either
    • try "repeatable read" isolation level - the default is "read committed" which is not strong enough for your use case.
    • use SELECT FOR UPDATE to lock the row while a node is working on it.

A side note: Generally, in such scenarios (many workers reading and writing the same data) you'd either need a synchronisation mechanism (eg only one node writes and the rest only read or each node locs the data while reading and writing) or a conflict-resolution strategy (eg keeping a change journal for each data entry rather than a single row). In your case, you seem to have opted for the former.

4 Comments

Initially no, it was a simple UPDATE query. But I've just used a SELECT FOR UPDATE + UPDATE inside a transaction and it's still the same result. Also tried with repeatable read isolation level with no success.
Oh, that's interesting. Can you share your codebase? What's the type of next_run? If TIMESTAMP, do you calculate the value of new_next_run in Go or in PG? If Go, are you sure clocks on all worker nodes are absolutely in sync?
I don't think you are on the right track. You suggest replacing one way of optimistic locking with another, so nothing would change, except that you get a serialization error instead of "0 rows modified".
@BahmanMovaqar I can't share the codebase unfortunately. I've added the pseudocode in the question though, and the code was reviewed internally by my colleagues and nobody saw anything wrong with it. next_run is int in the db and it is calculated in Go because in some cases it needs to be adjusted due to daylight saving time. Maybe something I didn't mention before and it may be relevant: the tests are performed locally, with all the nodes started on my local machine.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.