Looking at this implementation of multiple-producer single-consumer, which was the implementation in Rust's standard library; however, its memory order model is derived from C++. So, it should be based on the C++ standard to do the formal reasoning. I have partially translated them to C++. The snippet codes of recv and send are
template<class T>
struct Packet{
Queue<T> queue;
atomic<long int> cnt;
void send(t: T) {
if self.cnt.load(memory_order::seq_cst) < DISCONNECTED + FUDGE {
throw "Err"
}
this->queue.push(t);
if(this->cnt.fetch_add(1, memory_order:seq_cst) == -1) {
this->take_to_wake().signal();
}
}
T recv(Option<Instant> deadline) {
try{
auto t = this->try_recv();
return t;
}catch(e){
}
auto [wait_token, signal_token] = blocking::tokens();
if this->decrement(signal_token) == Installed {
wait_token.wait();
}
try{
auto t = this->try_recv();
this->steals.get() -= 1;
return t;
}catch(...){
}
}
fn decrement(SignalToken token) -> StartResult {
unsafe {
assert_eq(this->to_wake.load(memory_order:seq_cst), 0);
auto ptr = token.cast_to_usize();
this->to_wake.store(ptr, memory_order:seq_cst);
auto steals = ptr::replace(self.steals.get(), 0);
auto r = self.cnt.fetch_sub(1 + steals, memory_order:seq_cst);
if(r==DISCONNECTED ) {
this->cnt.store(DISCONNECTED, memory_order:seq_cst);
}else{
assert(n >= 0);
if n - steals <= 0 {
return Installed;
}
}
this->to_wake.store(0, memory_order:seq_cst);
drop(SignalToken::cast_from_usize(ptr));
return Abort;
}
}
T try_recv() {
while(true){
try{
auto t = this->queue.pop();
return t;
}catch(e){
if(e=="Empty"){
throw e;
}
if(e=="Inconsistent"){
thread::yield_now();
}
}
}
}
};
template<class T>
struct Node{
std::atomic<Node*> next;
T value;
Node(T v):value(v),next(){}
};
template<class T>
struct Queue {
std::atomic<Node<T>*> head;
Node<T>* tail;
Queue(){
auto h = new Node<T>{T{}};
head.store(h);
tail = h;
}
void push(T t){
auto node = new Node<T>{t};
auto pre = this->head.exchange(node,std::memory_order::acq_rel);
pre->next.store(node,std::memory_order::release);
}
T pop(){
auto tail = this->tail;
auto next = tail->next.load(std::memory_order::acquire);
if(next){
this->tail = next;
auto ret = next->value;
delete tail;
return ret;
}
if (this->head.load(std::memory_order::acquire) == tail){ // #1
throw "empty";
}else{
throw "Inconsistent";
}
}
};
The complete original codes are here, and the implementation of queue is here
Assuming three threads send the data. According to [intro.races] p14
If a side effect X on an atomic object M happens before a value computation B of M, then the evaluation B takes its value from X or from a side effect Y that follows X in the modification order of M.
If there are no other extra synchronizations, the pop thread is not guaranteed to see the subsequent nodes set in the push threads because no happen-before forces the visibility; that is, to see these modifications after the initial side effect in the mod order. However, the initial value head and its field next to be null is guaranteed to be visible.
As per [atomic.order] p10
Atomic read-modify-write operations shall always read the last value (in the modification order) written before the write associated with the read-modify-write operation.
If one RMW thereof reads the initial value 0, other RMW operations must read the later modification in the modification order, and no two RMW operations can read the same modification. All operations on cnt are RMW operations; this guarantees that the load part of the RMW operation can always make forward progress and can't step on another(i.e., all operations are serialized and cannot overlap). With the synchronization established by the serialized RMW operations(i.e., cnt.fetch_xxx), the loads to next and head are guaranteed to see the later values in their modification orders, because there are happen-before relationships between side effects to next or head and the loads.
Each swap in push can establish synchronization with another. However, since fetch_add(1,...) is sequenced-after the call of push, there is no happen-before established by synchronization in push for any two fetch_adds, hence, no constraint on these fetch_add(1,...)s for their order in the modification order; there can be a case where the fetch_add corresponding to the first node other than head can be ordered to later in its modification order. Specifically, I tried to illustrate this case in the following image:
The first pop starts from that head and head->next is null, the cnt.fetch_sub(1) synchronizes-with M1, this can only guarantee that node2->next = node3 happens before try_recv() that is sequenced after the cnt.fetch_sub(1). If I didn't reason wrong in my picture, head->next = node1 is still unordered to head->next.load(...) in try_recv(i.e., in queue.pop()), however, self.head.load is guaranteed to see node3 because of the synchronization established by RMW operations on cnt. Therefore, the try_recv should step into the Inconsistent branch to spin a loop and wait for the update, even though this still cannot guarantee seeing the new value, as per [atomics].order] p12
The implementation should make atomic stores visible to atomic loads, and atomic loads should observe atomic stores, within a reasonable amount of time.
That is, the loop can technically be an infinite loop(even though it's not possible in a real implementation).
Not only in this case, if we swap the order of M1 and cnt.fetch_sub(1) in the picture, in this case, we can still step into an inconsistent status? Is my reasoning right?
