-1

I need to run parallel tasks based on an input array and wait for all of them to finish and process their response.

My code waits for all the go routines to finish using wait group and then read the channel for all the responses. But when I am reading the responses, I am only getting half of the responses from the channel.

Is this the right way to get responses from go routines? If so what am I missing here? If not what is the right way to achieve this?

A simplified version of what I am doing:

package main

import (
    "fmt"
    "sync"
    "time"
)

func odd(i int) (int, error) {
    time.Sleep(1 * time.Second)
    if i%2 == 0 {
        return i, fmt.Errorf("even number")
    } else {
        return i, nil
    }
}

func main() {
    type R struct {
        val int
        err error
    }
    wg := sync.WaitGroup{}
    respChan := make(chan R, 10)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            val, err := odd(i)
            r := R{val: val, err: err}
            respChan <- r
            fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
        }(i)
    }
    wg.Wait()
    fmt.Println("Done Waiting")
    fmt.Println("Response Channel Length: ", len(respChan))
    for i := 0; i < len(respChan); i++ {
        r := <-respChan
        if r.err != nil {
            fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
        } else {
            fmt.Printf("[%d] : %d\n", i, r.val)
        }
    }
    fmt.Println("Finished")
}

Output :

Queued Response: 5 , size: 1
Queued Response: 0 , size: 2 
Queued Response: 2 , size: 3 
Queued Response: 9 , size: 7 
Queued Response: 3 , size: 5 
Queued Response: 4 , size: 6 
Queued Response: 1 , size: 4 
Queued Response: 7 , size: 8 
Queued Response: 6 , size: 9 
Queued Response: 8 , size: 10 
Done Waiting
Response Channel Length:  10
[0] : 5
[1] : 0 , even number
[2] : 2 , even number
[3] : 1
[4] : 3
Finished

Playground link : https://go.dev/play/p/x9a3zL3MspR

0

3 Answers 3

2

Option 1: Collect the results in a slice. Eliminate the channel. The approach ensures that the results are processed in order.

type R struct {
    val int
    err error
}
wg := sync.WaitGroup{}
resp := make([]R, 10)
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(i int) {
        defer wg.Done()
        val, err := odd(i)
        resp[i] = R{val: val, err: err}
    }(i)
}
wg.Wait()
fmt.Println("Done Waiting")
for i, r := range resp {
    if r.err != nil {
        fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
    } else {
        fmt.Printf("[%d] : %d\n", i, r.val)
    }
}
fmt.Println("Finished")

https://go.dev/play/p/o_mHuU0myyS

Option 2: Drop the wait group. Receive N results from the channel where N is the number of goroutines.

type R struct {
    val int
    err error
}
const N = 10
respChan := make(chan R)
for i := 0; i < N; i++ {
    go func(i int) {
        val, err := odd(i)
        r := R{val: val, err: err}
        respChan <- r
        fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
    }(i)
}
for i := 0; i < N; i++ {
    r := <-respChan
    if r.err != nil {
        fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
    } else {
        fmt.Printf("[%d] : %d\n", i, r.val)
    }
}
fmt.Println("Finished")

https://go.dev/play/p/HCkecc3_AVG

Sign up to request clarification or add additional context in comments.

Comments

2

If you want to check the length of the channel and read it, which is not really how you should do, you should realize that as you read the length changes, so:

for len(respChan)>0 {
  r<-respChan
   ...
}

The correct way to use channel is to write to it from the goroutines, close when all goroutines are done, and then read from the channel in a range loop. That is:

go func() {
  wg.Wait()
  close(respChan)
}()
for {...} // Create goroutines
for r:=range respChan { // Read from the channel until it closes
  ...
}

Comments

1

The issue is i < len(respChan). len(respChan) tells you how many unread elements are in the channel (Number of elements in a channel), but you're also incrementing i. This means that even if there are unread elements, after i gets large enough you'll still exit the loop.

The way I'd solve this is to close(respChan) after all responses have been written. You can do this after the first for that spawns all the goroutines, and makes your code more resilient if in the future you're writing more responses than the channel has capacity. Untested code, but should be pretty close:

package main

import (
    "fmt"
    "sync"
    "time"
)

func odd(i int) (int, error) {
    time.Sleep(1 * time.Second)
    if i%2 == 0 {
        return i, fmt.Errorf("even number")
    } else {
        return i, nil
    }
}

func main() {
    type R struct {
        val int
        err error
    }
    wg := sync.WaitGroup{}
    respChan := make(chan R, 10)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            val, err := odd(i)
            r := R{val: val, err: err}
            respChan <- r
            fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
        }(i)
    }
    wg.Wait()
    // wait for all the providers to finish, then close the channel
    close(respChan)
    fmt.Println("Done Waiting")
    fmt.Println("Response Channel Length: ", len(respChan))
    // loop will break when respChan is closed and drained
    for r := range respChan {
        if r.err != nil {
            fmt.Printf(" %d , %s\n", r.val, r.err.Error())
        } else {
            fmt.Printf(" %d\n", r.val)
        }
    }
    fmt.Println("Finished")
}

1 Comment

@Viral: I find it incredibly strange that you take a non-compiling answer, completely rework and accept it, instead of honoring the better answers that are available in this thread, i.e. by @jatia. The original had for r, ok <- respChan; ok { ... } which does not even compile.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.