2

This MPSC Queue (Multi Producer Single Consumer Queue) keeps on waiting in the consumer side sometimes although I have used CAS operations. I have added CAS operation for the enqueue function. Since I am using pointers as the data, I am using nullptr as a status to figure out whether the data is written or not.

In the test I am generating data using 4 producers and read from a single consumer. I am encoding and putting the same data in 4 producers and after received I am storing them in a HashTable to verify that I got all the data.

#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <unordered_map>

#define QUEUE_SIZE 20

std::atomic<u_int> head {0};
std::atomic<u_int> tail {0};

// Use an array of int pointers for storage
std::atomic<int*> data[QUEUE_SIZE] = {};

bool enqueue(int value) {
  u_int current_tail = 0, next_tail = 0;
  do {
    current_tail = tail.load(std::memory_order_seq_cst);
    next_tail = (current_tail + 1) % QUEUE_SIZE;

    if (next_tail == head.load(std::memory_order_seq_cst)) {
      return false;  // queue is full
    }
  } while (!tail.compare_exchange_weak(current_tail, next_tail));

  // Allocate memory for the value
  int* new_value = new int(value);
  // Attempt to store the value in the current tail slot
  data[current_tail].store(new_value, std::memory_order_seq_cst);

  return true;
}

int dequeue(int &value) {
  u_int current_head = head.load(std::memory_order_seq_cst);
  if (current_head == tail.load(std::memory_order_seq_cst)) {
    return -1;  // queue is empty
  }

  int* stored_value = data[current_head].exchange(nullptr, std::memory_order_seq_cst);
  if (stored_value == nullptr) {
    return -2;  // not ready
  }

  value = *stored_value;
  delete stored_value;  // Free the allocated memory

  u_int next_head = (current_head + 1) % QUEUE_SIZE;
  head.store(next_head, std::memory_order_seq_cst);
  return 0;
}

#define TEST_SIZE 50
#define NUMBER_OF_PRODUCERS 4

bool test_mpsc_queue() {
  bool passed = true;
  std::vector<std::thread> producers;
  for (int producer_index = 0; producer_index < NUMBER_OF_PRODUCERS; producer_index++) {
    producers.emplace_back([producer_index]() {
      for (int i = 0; i < TEST_SIZE; i++) {
        auto start_time = std::chrono::steady_clock::now();
        auto end_time = start_time + std::chrono::seconds(10);
        while (!enqueue(producer_index * TEST_SIZE + i)) {
          // std::this_thread::yield();
          if (std::chrono::steady_clock::now() > end_time) {
            std::cout << "Producer wait exceeded when writing index " << i << " true index " << producer_index << "\n";
            break;
          }
        }
      }
    });
  }

  std::unordered_map<int, int> results;
  std::thread consumer_thread([&]() {
    for (size_t i = 0; i < TEST_SIZE * NUMBER_OF_PRODUCERS; i++) {
      auto start_time = std::chrono::steady_clock::now();
      auto end_time = start_time + std::chrono::seconds(5);
      int value = 123456789;
      int ret = -10;
      while ((ret = dequeue(value)) != 0) {
        if (std::chrono::steady_clock::now() > end_time) {
          std::cout << "Consumer wait exceeded when reading index " << i << " ret=" << ret << "\n";
          break;
        }
        // std::this_thread::yield();
      }
      if (ret == 0) {
        value = value % TEST_SIZE;
        results[value]++;
      }
    }
  });

  consumer_thread.join();
  for (auto &p : producers) {
    p.join();
  }

  for (size_t i = 0; i < TEST_SIZE; i++) {
    if (results[i] != NUMBER_OF_PRODUCERS) {
      passed = false;
      std::cout << "Failed: test value " << i << " count " << results[i] << "\n";
      break;
    }
  }
  if (passed) {
    std::cout << "Success\n";
  }
  return passed;
}

int main() {
  auto start_time = std::chrono::steady_clock::now();
  auto end_time = start_time + std::chrono::minutes(10);

  int test_count = 0;
  while (std::chrono::steady_clock::now() < end_time) {
    if (!test_mpsc_queue()) {
      std::cout << "Test " << test_count << " failed\n";
      return -1;
    }
    test_count++;
  }

  std::cout << "Completed " << test_count << " tests in 10 minutes." << std::endl;
  return 0;
}

But I get the following error

Success
Success
Success
Success
Success
Success
Success
Success
Consumer wait exceeded when reading index 180 ret=-1
Consumer wait exceeded when reading index 181 ret=-1
Consumer wait exceeded when reading index 182 ret=-1
Consumer wait exceeded when reading index 183 ret=-1
Consumer wait exceeded when reading index 184 ret=-1
Consumer wait exceeded when reading index 185 ret=-1
Consumer wait exceeded when reading index 186 ret=-1
Consumer wait exceeded when reading index 187 ret=-1
Consumer wait exceeded when reading index 188 ret=-1
Consumer wait exceeded when reading index 189 ret=-1
Consumer wait exceeded when reading index 190 ret=-1
Consumer wait exceeded when reading index 191 ret=-1
Consumer wait exceeded when reading index 192 ret=-1
Consumer wait exceeded when reading index 193 ret=-1
Consumer wait exceeded when reading index 194 ret=-1
Consumer wait exceeded when reading index 195 ret=-1
Consumer wait exceeded when reading index 196 ret=-1
Consumer wait exceeded when reading index 197 ret=-1
Consumer wait exceeded when reading index 198 ret=-1
Consumer wait exceeded when reading index 199 ret=-1
Failed: test value 2 count 3
Test 152 failed

I was expecting the test to keep on printing success for 10 minutes since I am generating data continuously and checking.

10
  • This would be more efficient with a power-of-2 size so the compiler can optimize % to just a bitwise AND. Using new/delete and pointers is probably less efficient than using an array of structs that includes an atomic sequence counter or flag (with the rest of the struct being non-atomic, like your pointed-to int). The MPMC queue discussed in Lock-free Progress Guarantees in a circular buffer queue uses sequence counters in each bucket, but is otherwise I think similar to how yours should work. Commented Jul 25 at 0:04
  • Having only a single reader means you don't need a way to "claim" a bucket for a reader separate from marking it as ready, ready to be overwritten by a writer. So I don't understand what int* stored_value = data[current_head].exchange(nullptr) is doing which a pure load(acquire) and a pure store(nullptr, relaxed) wouldn't do. There are no other readers to race with, and the head/tail counters mean you're spin-waiting to see a write, but the writer isn't waiting for anything the reader does except the head update. (Which you do with separate load and store instead of RMW.) Commented Jul 25 at 0:10
  • What exactly does Consumer wait exceeded when reading index 180 ret=-1 mean? I haven't read through your test harness, so I'm not sure exactly what kind of error / bug I'm looking for. Commented Jul 25 at 0:11
  • 1. I try to keep the Queue size generic for now. That is why I did not use power-of-2 sizes. Commented Jul 25 at 5:42
  • 1
    In my test, it consistently fails at index 180, which could be a clue. Commented Jul 25 at 14:25

1 Answer 1

5

You have encountered the ABA problem.

Suppose we start with head == tail == 0 (empty queue) and the following events occur in sequence:

  1. Producer P1 calls enqueue(). It loads 0 from tail, so it has current_tail == 0 and next_tail == 1. It loads 0 from head; this does not equal next_tail, so it concludes the queue is not full, and it is ready to execute the compare_exchange. But before it does so, P1 is delayed for a while (e.g. scheduled out), and the following events occur in the meantime.

  2. Producers P2-P4 enqueue a total of 19 elements. We now have tail == 19 and head == 0, and the queue is full.

  3. The consumer dequeues one element. We now have tail == 19 and head == 1, and the queue is not full (contains 18 elements).

  4. Another producer, say P2, enqueues one element. We now have tail == 0 and head == 1, and the queue is full (19 elements).

  5. P1 resumes and executes its compare_exchange. Since tail again contains the value 0, matching P1's value for current_tail, the compare_exchange succeeds and stores the value of next_tail, namely 1, to tail. We now have tail == 1 and head == 1 which erroneously indicates that the queue is empty. We have effectively lost 20 elements: the 19 elements previously in the queue, together with the one that P1 just attempted to add.

This would fit the symptom you observe. Your test failed at index 180; that is, after 180 elements were dequeued, no more were found. Since your test is supposed to enqueue a total of 200 elements (50 each by 4 producers), that would be consistent with 20 elements having been lost.

So the error in your algorithm is that you're implicitly assuming that if the value of tail seen by the first load in enqueue is equal to the value seen by the compare_exchange, then tail has actually remained unchanged in the meantime, and thus your determination that the queue had space is still valid. But that assumption can be false; tail may have been changed but then ended up back at its initial value. And in that case, your previous determination that the queue had space is not valid anymore. This is a classic example of an ABA problem.


One way to fix this is to not "wrap" the values for head and tail themselves with the % QUEUE_SIZE operation; instead, simply increment them. (You will still use % to compute the desired array index when actually accessing data.) This way, the same number is never stored twice in tail, so if compare_exchange observes the same value as current_tail, you know that no elements have been added to the queue since you loaded from tail; and so if the queue had space before, it still has space now.

The following changes would achieve this. With them, the tests pass successfully for me.

--- mpsc.cc.orig    2025-07-25 09:39:43
+++ mpsc.cc.new 2025-07-25 09:41:20
@@ -16,9 +16,9 @@
   u_int current_tail = 0, next_tail = 0;
   do {
     current_tail = tail.load(std::memory_order_seq_cst);
-    next_tail = (current_tail + 1) % QUEUE_SIZE;
+    next_tail = (current_tail + 1);
 
-    if (next_tail == head.load(std::memory_order_seq_cst)) {
+    if (next_tail == head.load(std::memory_order_seq_cst) + QUEUE_SIZE) {
       return false;  // queue is full
     }
   } while (!tail.compare_exchange_weak(current_tail, next_tail));
@@ -26,7 +26,7 @@
   // Allocate memory for the value
   int* new_value = new int(value);
   // Attempt to store the value in the current tail slot
-  data[current_tail].store(new_value, std::memory_order_seq_cst);
+  data[current_tail % QUEUE_SIZE].store(new_value, std::memory_order_seq_cst);
 
   return true;
 }
@@ -37,7 +37,7 @@
     return -1;  // queue is empty
   }
 
-  int* stored_value = data[current_head].exchange(nullptr, std::memory_order_seq_cst);
+  int* stored_value = data[current_head % QUEUE_SIZE].exchange(nullptr, std::memory_order_seq_cst);
   if (stored_value == nullptr) {
     return -2;  // not ready
   }
@@ -45,7 +45,7 @@
   value = *stored_value;
   delete stored_value;  // Free the allocated memory
 
-  u_int next_head = (current_head + 1) % QUEUE_SIZE;
+  u_int next_head = (current_head + 1);
   head.store(next_head, std::memory_order_seq_cst);
   return 0;
 }

You do still have to make sure not to overflow u_int; so ensure that the total number of elements to be processed is less than 2^32. If that's not enough, you could use uint64_t instead, which should be adequate unless your program may need to run for decades without being restarted.

Sign up to request clarification or add additional context in comments.

2 Comments

Nice debugging / detective work. If QUEUE_SIZE was a power of 2, unsigned int wrapping would be a non-problem since it would be congruent with the % you're doing to generate an index. (32-bit atomic counters would be much better if we care about 32-bit systems, otherwise as you say 64-bit is fine unless we need decades or centuries of run-time with high-throughput usage.)
Nice sleuthing. The solution works. Thanks for the clear explanation.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.