This MPSC Queue (Multi Producer Single Consumer Queue) keeps on waiting in the consumer side sometimes although I have used CAS operations. I have added CAS operation for the enqueue function. Since I am using pointers as the data, I am using nullptr as a status to figure out whether the data is written or not.
In the test I am generating data using 4 producers and read from a single consumer. I am encoding and putting the same data in 4 producers and after received I am storing them in a HashTable to verify that I got all the data.
#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <unordered_map>
#define QUEUE_SIZE 20
std::atomic<u_int> head {0};
std::atomic<u_int> tail {0};
// Use an array of int pointers for storage
std::atomic<int*> data[QUEUE_SIZE] = {};
bool enqueue(int value) {
u_int current_tail = 0, next_tail = 0;
do {
current_tail = tail.load(std::memory_order_seq_cst);
next_tail = (current_tail + 1) % QUEUE_SIZE;
if (next_tail == head.load(std::memory_order_seq_cst)) {
return false; // queue is full
}
} while (!tail.compare_exchange_weak(current_tail, next_tail));
// Allocate memory for the value
int* new_value = new int(value);
// Attempt to store the value in the current tail slot
data[current_tail].store(new_value, std::memory_order_seq_cst);
return true;
}
int dequeue(int &value) {
u_int current_head = head.load(std::memory_order_seq_cst);
if (current_head == tail.load(std::memory_order_seq_cst)) {
return -1; // queue is empty
}
int* stored_value = data[current_head].exchange(nullptr, std::memory_order_seq_cst);
if (stored_value == nullptr) {
return -2; // not ready
}
value = *stored_value;
delete stored_value; // Free the allocated memory
u_int next_head = (current_head + 1) % QUEUE_SIZE;
head.store(next_head, std::memory_order_seq_cst);
return 0;
}
#define TEST_SIZE 50
#define NUMBER_OF_PRODUCERS 4
bool test_mpsc_queue() {
bool passed = true;
std::vector<std::thread> producers;
for (int producer_index = 0; producer_index < NUMBER_OF_PRODUCERS; producer_index++) {
producers.emplace_back([producer_index]() {
for (int i = 0; i < TEST_SIZE; i++) {
auto start_time = std::chrono::steady_clock::now();
auto end_time = start_time + std::chrono::seconds(10);
while (!enqueue(producer_index * TEST_SIZE + i)) {
// std::this_thread::yield();
if (std::chrono::steady_clock::now() > end_time) {
std::cout << "Producer wait exceeded when writing index " << i << " true index " << producer_index << "\n";
break;
}
}
}
});
}
std::unordered_map<int, int> results;
std::thread consumer_thread([&]() {
for (size_t i = 0; i < TEST_SIZE * NUMBER_OF_PRODUCERS; i++) {
auto start_time = std::chrono::steady_clock::now();
auto end_time = start_time + std::chrono::seconds(5);
int value = 123456789;
int ret = -10;
while ((ret = dequeue(value)) != 0) {
if (std::chrono::steady_clock::now() > end_time) {
std::cout << "Consumer wait exceeded when reading index " << i << " ret=" << ret << "\n";
break;
}
// std::this_thread::yield();
}
if (ret == 0) {
value = value % TEST_SIZE;
results[value]++;
}
}
});
consumer_thread.join();
for (auto &p : producers) {
p.join();
}
for (size_t i = 0; i < TEST_SIZE; i++) {
if (results[i] != NUMBER_OF_PRODUCERS) {
passed = false;
std::cout << "Failed: test value " << i << " count " << results[i] << "\n";
break;
}
}
if (passed) {
std::cout << "Success\n";
}
return passed;
}
int main() {
auto start_time = std::chrono::steady_clock::now();
auto end_time = start_time + std::chrono::minutes(10);
int test_count = 0;
while (std::chrono::steady_clock::now() < end_time) {
if (!test_mpsc_queue()) {
std::cout << "Test " << test_count << " failed\n";
return -1;
}
test_count++;
}
std::cout << "Completed " << test_count << " tests in 10 minutes." << std::endl;
return 0;
}
But I get the following error
Success
Success
Success
Success
Success
Success
Success
Success
Consumer wait exceeded when reading index 180 ret=-1
Consumer wait exceeded when reading index 181 ret=-1
Consumer wait exceeded when reading index 182 ret=-1
Consumer wait exceeded when reading index 183 ret=-1
Consumer wait exceeded when reading index 184 ret=-1
Consumer wait exceeded when reading index 185 ret=-1
Consumer wait exceeded when reading index 186 ret=-1
Consumer wait exceeded when reading index 187 ret=-1
Consumer wait exceeded when reading index 188 ret=-1
Consumer wait exceeded when reading index 189 ret=-1
Consumer wait exceeded when reading index 190 ret=-1
Consumer wait exceeded when reading index 191 ret=-1
Consumer wait exceeded when reading index 192 ret=-1
Consumer wait exceeded when reading index 193 ret=-1
Consumer wait exceeded when reading index 194 ret=-1
Consumer wait exceeded when reading index 195 ret=-1
Consumer wait exceeded when reading index 196 ret=-1
Consumer wait exceeded when reading index 197 ret=-1
Consumer wait exceeded when reading index 198 ret=-1
Consumer wait exceeded when reading index 199 ret=-1
Failed: test value 2 count 3
Test 152 failed
I was expecting the test to keep on printing success for 10 minutes since I am generating data continuously and checking.
%to just a bitwise AND. Usingnew/deleteand pointers is probably less efficient than using an array of structs that includes an atomic sequence counter or flag (with the rest of the struct being non-atomic, like your pointed-toint). The MPMC queue discussed in Lock-free Progress Guarantees in a circular buffer queue uses sequence counters in each bucket, but is otherwise I think similar to how yours should work.int* stored_value = data[current_head].exchange(nullptr)is doing which a pureload(acquire)and a purestore(nullptr, relaxed)wouldn't do. There are no other readers to race with, and the head/tail counters mean you're spin-waiting to see a write, but the writer isn't waiting for anything the reader does except theheadupdate. (Which you do with separate load and store instead of RMW.)Consumer wait exceeded when reading index 180 ret=-1mean? I haven't read through your test harness, so I'm not sure exactly what kind of error / bug I'm looking for.