Search code examples
gogoroutine

Golang : Collect Responses from spawned go routines


I need to run parallel tasks based on an input array and wait for all of them to finish and process their response.

My code waits for all the go routines to finish using wait group and then read the channel for all the responses. But when I am reading the responses, I am only getting half of the responses from the channel.

Is this the right way to get responses from go routines? If so what am I missing here? If not what is the right way to achieve this?

A simplified version of what I am doing:

package main

import (
    "fmt"
    "sync"
    "time"
)

func odd(i int) (int, error) {
    time.Sleep(1 * time.Second)
    if i%2 == 0 {
        return i, fmt.Errorf("even number")
    } else {
        return i, nil
    }
}

func main() {
    type R struct {
        val int
        err error
    }
    wg := sync.WaitGroup{}
    respChan := make(chan R, 10)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            val, err := odd(i)
            r := R{val: val, err: err}
            respChan <- r
            fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
        }(i)
    }
    wg.Wait()
    fmt.Println("Done Waiting")
    fmt.Println("Response Channel Length: ", len(respChan))
    for i := 0; i < len(respChan); i++ {
        r := <-respChan
        if r.err != nil {
            fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
        } else {
            fmt.Printf("[%d] : %d\n", i, r.val)
        }
    }
    fmt.Println("Finished")
}

Output :

Queued Response: 5 , size: 1
Queued Response: 0 , size: 2 
Queued Response: 2 , size: 3 
Queued Response: 9 , size: 7 
Queued Response: 3 , size: 5 
Queued Response: 4 , size: 6 
Queued Response: 1 , size: 4 
Queued Response: 7 , size: 8 
Queued Response: 6 , size: 9 
Queued Response: 8 , size: 10 
Done Waiting
Response Channel Length:  10
[0] : 5
[1] : 0 , even number
[2] : 2 , even number
[3] : 1
[4] : 3
Finished

Playground link : https://go.dev/play/p/x9a3zL3MspR


Solution

  • Option 1: Collect the results in a slice. Eliminate the channel. The approach ensures that the results are processed in order.

    type R struct {
        val int
        err error
    }
    wg := sync.WaitGroup{}
    resp := make([]R, 10)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            val, err := odd(i)
            resp[i] = R{val: val, err: err}
        }(i)
    }
    wg.Wait()
    fmt.Println("Done Waiting")
    for i, r := range resp {
        if r.err != nil {
            fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
        } else {
            fmt.Printf("[%d] : %d\n", i, r.val)
        }
    }
    fmt.Println("Finished")
    

    https://go.dev/play/p/o_mHuU0myyS

    Option 2: Drop the wait group. Receive N results from the channel where N is the number of goroutines.

    type R struct {
        val int
        err error
    }
    const N = 10
    respChan := make(chan R)
    for i := 0; i < N; i++ {
        go func(i int) {
            val, err := odd(i)
            r := R{val: val, err: err}
            respChan <- r
            fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
        }(i)
    }
    for i := 0; i < N; i++ {
        r := <-respChan
        if r.err != nil {
            fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
        } else {
            fmt.Printf("[%d] : %d\n", i, r.val)
        }
    }
    fmt.Println("Finished")
    

    https://go.dev/play/p/HCkecc3_AVG