Search code examples
gogoroutine

Multiple goroutines reading from the same channel


Looking at spawning multiple goroutines to read values from the same channel. The two workers spawn as expected but only read one item each from the channel and stop reading. I was expecting the goroutines to keep reading from the channel until it is closed from the goroutine that is sending values into the channel. The goroutine producing the items is not closed though something is blocking the sender from sending. Why does each worker only read one value and stop?

The output shows two values sent and each worker goroutine reading one value each. The 3rd value is sent but not read from either of the workers.

new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0

Go Playground

package main

import (
    "fmt"
    "sync"
)

func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for {
                select {
                case <-done:
                    fmt.Println("recieved done signal")
                    return
                case data, ok := <-in:
                    if !ok {
                        fmt.Println("no more items")
                        return
                    }
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
                }
            }
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    return out
}

func main() {
    done := make(chan bool)
    defer close(done)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        default:
        }
    }

}


Solution

  • Previous comments about channels being unbuffered are right, but there are other synchronization issues going on as well.

    Unbuffered channels essentially mean that the moment a value is written, it must be received before any other writes can happen.

    1. workerPool creates an unbuffered channel out to store results, but only returns after all the results are written to out. But since reading from the out channel occurs after workerPool returns and out is unbuffered, workerPool gets blocked by trying to write, resulting in a deadlock. This is why it seems like each worker is only sending one value; in actuality, after the first one was sent all the workers get blocked since nothing is there to receive the value (and you can see this by moving the print statement after writing to out)

    Options to fix include making out have a buffer of size n = number of results (ie out := make(chan int, n)) or leaving out unbuffered and have reading from out occur while writing occurs.

    1. The done channel isn't being used properly either. Both main and workerPool rely on it to stop execution, but nothing ever gets written to it! It also is unbuffered, so it will also run into the deadlock problem described above.

    To fix, you first can remove the case <-done: from workerPool and simply range through in since it is closed in main. Then you can make done a buffered channel to resolve the deadlock.

    Combining these fixes give:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
        out := make(chan int, 100)
        var wg sync.WaitGroup
    
        for i := 0; i < numberOfWorkers; i++ {
            fmt.Println("new worker")
            wg.Add(1)
            // fan out worker goroutines reading from in channel and
            // send output into out channel
            go func() {
                defer wg.Done()
                for data := range in {
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
    
                }
                fmt.Println("no more items")
                return
            }()
        }
    
        fmt.Println("waiting")
        wg.Wait()
        fmt.Println("done waiting")
        close(out)
        done <- true
        close(done)
        return out
    }
    
    func main() {
        done := make(chan bool, 1)
    
        in := make(chan int)
    
        go func() {
            for i := 0; i < 10; i++ {
                fmt.Println("sending", i)
                in <- i
            }
            close(in)
        }()
    
        out := workerPool(done, in, 2, func(i int) int {
            return i
        })
    
        for {
            select {
            case o, ok := <-out:
                if !ok {
                    continue
                }
    
                fmt.Println("output", o)
            case <-done:
                return
            }
        }
    
    }
    

    which fixes your issues, but it's not the best way to be using channels! The structure itself can be changed to be much simpler and not have to rely on buffering the channels.