Search code examples
gogoroutine

Golang : Collect Responses from spawned go routines


I need to run parallel tasks based on an input array and wait for all of them to finish and process their response.

My code waits for all the go routines to finish using wait group and then read the channel for all the responses. But when I am reading the responses, I am only getting half of the responses from the channel.

Is this the right way to get responses from go routines? If so what am I missing here? If not what is the right way to achieve this?

A simplified version of what I am doing:

package main

import (
    "fmt"
    "sync"
    "time"
)

func odd(i int) (int, error) {
    time.Sleep(1 * time.Second)
    if i%2 == 0 {
        return i, fmt.Errorf("even number")
    } else {
        return i, nil
    }
}

func main() {
    type R struct {
        val int
        err error
    }
    wg := sync.WaitGroup{}
    respChan := make(chan R, 10)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            val, err := odd(i)
            r := R{val: val, err: err}
            respChan <- r
            fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
        }(i)
    }
    wg.Wait()
    fmt.Println("Done Waiting")
    fmt.Println("Response Channel Length: ", len(respChan))
    for i := 0; i < len(respChan); i++ {
        r := <-respChan
        if r.err != nil {
            fmt.Printf("[%d] : %d , %s\n", i, r.val, r.err.Error())
        } else {
            fmt.Printf("[%d] : %d\n", i, r.val)
        }
    }
    fmt.Println("Finished")
}

Output :

Queued Response: 5 , size: 1
Queued Response: 0 , size: 2 
Queued Response: 2 , size: 3 
Queued Response: 9 , size: 7 
Queued Response: 3 , size: 5 
Queued Response: 4 , size: 6 
Queued Response: 1 , size: 4 
Queued Response: 7 , size: 8 
Queued Response: 6 , size: 9 
Queued Response: 8 , size: 10 
Done Waiting
Response Channel Length:  10
[0] : 5
[1] : 0 , even number
[2] : 2 , even number
[3] : 1
[4] : 3
Finished

Playground link : https://go.dev/play/p/x9a3zL3MspR


Solution

  • The issue is i < len(respChan). len(respChan) tells you how many unread elements are in the channel (Number of elements in a channel), but you're also incrementing i. This means that even if there are unread elements, after i gets large enough you'll still exit the loop.

    The way I'd solve this is to close(respChan) after all responses have been written. You can do this after the first for that spawns all the goroutines, and makes your code more resilient if in the future you're writing more responses than the channel has capacity. Untested code, but should be pretty close:

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func odd(i int) (int, error) {
        time.Sleep(1 * time.Second)
        if i%2 == 0 {
            return i, fmt.Errorf("even number")
        } else {
            return i, nil
        }
    }
    
    func main() {
        type R struct {
            val int
            err error
        }
        wg := sync.WaitGroup{}
        respChan := make(chan R, 10)
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(i int) {
                defer wg.Done()
                val, err := odd(i)
                r := R{val: val, err: err}
                respChan <- r
                fmt.Printf("Queued Response: %d , size: %d \n", r.val, len(respChan))
            }(i)
        }
        wg.Wait()
        // wait for all the providers to finish, then close the channel
        close(respChan)
        fmt.Println("Done Waiting")
        fmt.Println("Response Channel Length: ", len(respChan))
        // loop will break when respChan is closed and drained
        for r := range respChan {
            if r.err != nil {
                fmt.Printf(" %d , %s\n", r.val, r.err.Error())
            } else {
                fmt.Printf(" %d\n", r.val)
            }
        }
        fmt.Println("Finished")
    }