1

I'm the creator of https://github.com/nevalang/neva

It's a dataflow programming where you have nodes that do message passing through ports. I use go channels to implement that. However, I faced an issue - turns out my current implementation is incorrect and everywhere I have fan-in (1 receiver, >=2 senders) our-of-order delivery might happen

First approach:

For each fan-out connection (1 sender, N receivers) I have a separate goroutine. Each sender might have N receivers and each receiver might have N senders. It's many-to-many relation.

Simplified version:

for _, conn := range conns {
    go func() {
        for msg := range conn.sender {
            for _, r := range conn.receivers {
                r <- msg
            }
        }
    }()
}

Concurrent goroutines are source of out-of-order delivery. Let's say I have s1,s2 -> r1 connection. Even though s1 might send first and s2 - second, it's possible that go scheduler will activate corresponding "transmission" goroutines in a different order.

I suggest to omit discussion about - why order matters and how can we be sure that someone send before or after somebody else. Let's just assume that it does matter and somehow we sure.

Second approach:

My second attempt to fix it was to create a single queue (go channel) so all senders could share it and write to it. That would preserve order isn't it? Yes, but it also lead to deadlock. I will use letter n to mean "node" (nodes have inports and outports, they are senders and receivers).

  • n1 sends to n2
  • n2 receives and starts work
  • n1 sends next message to n2
  • n2 is busy and cannot receive
  • n2 finishes the job and tries to send its output data (to the queue)
  • deadlock - queue is busy trying to deliver message from n1 to n2, but n2 cannot receive (it's busy trying to write to queue)

Third approach:

Obviously we have deadlock because n1 and n2 share the same queue. Queue was solution for out-of-order issue which happens only in fan-in patterns and we don't have fan-in here. Can we avoid using same queue for n1 and n2?

Yes, we can send from n1 to n2 directly (by using dedicated queue/channel) and only create shared queues for senders that are involved in fan-in (share same receiver). Because each sender might have N receivers it's possible that one sender sends to N queues but that just a side-note.

However, turns out it's possible to deadlock even in this situation! Let's imagine we have a loop in our topology - N1 sends to N2 and N2 sends back to itself.

  • N1 sends to N2
  • N2 starts to do job
  • N1 sends next message to N2
  • N2 is busy and cannot receive new message- N2 finishes the job and tries to write its output data
  • deadlock

N2 blocks while sending because nobody can receive. N2 is the one who should receive but it's busy trying to send

(If this example doesn't make practical sense - I suggest not to think about that. It's a programming language where both programs with loops and programs where order matters are possible).

4th Approach:

Go back to original design where each inport and each outport is a go channel but avoid having separate goroutine for each fan-in patter and avoid out-of-delivery because of concurrent goroutines

Instead use fan-in view and for each fan-in pattern (1 receiver, N senders) spawn a goroutine with for and select. Inside that select wait for message from one of the senders and send to receiver

for {
    select {
    case msg := sender_1:
        receiver <- msg
    case msg := sender_2:
        receiver <- msg
    ...
    }
}

If there're >1 senders ready then select choses randomly and sometimes that's ok and sometimes it's not. I don't care if both senders send at the same time. I do care about situations where it's clear that one sender sent first and another second. I.e. when there's a order - it must be preserved.

Problem (out of order delivery) happens when receiver is slower than senders.

Example:

  • sender_1 sent, we write to receiver and block
  • sender_2 sent, we do not receive from sender_2 because we wait for receiver
  • sender_1 sent its second message (third overall), we still wait
  • receiver is ready, we send first message and go to next iteration
  • select "should" (I want it to) chose sender_2 because that message was first, but it chooses randomly
  • let's say it choses sender_1 - out of order happened

I was thinking about moving away from channels to mutexes and slices but failed to see how it could help. I also know there's a sync.Cond that might help but I have zero experience working with it. I do struggle with this for almost month and seek help from the community. This is one of the most important tasks in my life these days. I have a small community and people are waiting for the solution from me just so we could get back to shipping fun stuff like stdlib.

Thanks in advance!

0

1 Answer 1

0

You can implement an unordered fan-in, and then use another goroutine to order the results. You need to know the number of messages in flight because you have to buffer a certain number of elements to output the next item in correct sequence.

Something like this:

Unordered fan-in:

func fanIn(inputs []<-chan OutputPayload) <-chan OutputPayload {

    result := make(chan OutputPayload)

    // Listen to input channels in separate goroutines
    inputWg := sync.WaitGroup{}
    for inputIndex := range inputs {
        inputWg.Add(1)
        go func(index int) {
            defer inputWg.Done()
            for data := range inputs[index] {
                // Send the data to the output
                result <- data
            }
        }(inputIndex)
    }

    // When all input channels are closed, close the fan in ch
    go func() {
        inputWg.Wait()
        close(result)
    }()

    return result
}

Ordering function keeps a heap-based sorting queue that holds bufsize messages in memory. bufsize must be at least the max number of messages in flight. Something like this:

// Read inputs from input channel, output them to the output channel
// bufsize is the max number of messages in flight.
func order(input <-chan OutputPayload, bufsize int) <-chan OutputPayload {
    result := make(chan OutputPayload)
    queue := SortQueue{} // implementation below
    go func() { 
        defer close(result)
        for data := range input {
            // Add new item to the queue
            heap.Push(&queue, data)
            // If the queue grew enough, pop
            for len(queue) >= bufsize {
                result <- heap.Pop(&queue).(OutputPayload)
            }
        }
        for len(queue) > 0 {
            result <- heap.Pop(&queue).(OutputPayload)
        }
    }()

    return result
}

This will buffer elements until the SortQueue is full, and then start outputting them in order.

You hook these up by:

outputCh:=order(fanIn(channels,bufsize))

where channels are your input channels, and outputCh is the output.

The SortQueue is a heap based queue. This assumes your messages have a Sequence that you can order on. You probably need to have a different ordering key.

type SortQueue []OutputPayload

func (q SortQueue) Len() int { return len(q) }
func (q SortQueue) Less(i, j int) bool {
    return q[i].Sequence < q[j].Sequence
}

func (q SortQueue) Swap(i, j int) {
    q[i], q[j] = q[j], q[i]
}

func (q *SortQueue) Push(x any) {
    *q = append(*q, x.(OutputPayload))
}

func (q *SortQueue) Pop() any {
    old := *q
    n := len(old)
    item := old[n-1]
    *q = old[0 : n-1]
    return item
}

Again: this scheme assumes you are processing some sort of input data, with at most bufSize messages in flight, and these messages enter the processing pipeline in order, but they may be out of order when they arrive at the fan-in stage. The ordering function buffers bufSize elements, and feeds them in correct order.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.