Looking at spawning multiple goroutines to read values from the same channel. The two workers spawn as expected but only read one item each from the channel and stop reading. I was expecting the goroutines to keep reading from the channel until it is closed from the goroutine that is sending values into the channel. The goroutine producing the items is not closed though something is blocking the sender from sending. Why does each worker only read one value and stop?
The output shows two values sent and each worker goroutine reading one value each. The 3rd value is sent but not read from either of the workers.
new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0
package main
import (
"fmt"
"sync"
)
func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
out := make(chan int)
var wg sync.WaitGroup
for i := 0; i < numberOfWorkers; i++ {
fmt.Println("new worker")
wg.Add(1)
// fan out worker goroutines reading from in channel and
// send output into out channel
go func() {
defer wg.Done()
for {
select {
case <-done:
fmt.Println("recieved done signal")
return
case data, ok := <-in:
if !ok {
fmt.Println("no more items")
return
}
// fan-in job execution multiplexing results into the results channel
fmt.Println("running func", data)
value := fn(data)
fmt.Println("sending value out", value)
out <- value
}
}
}()
}
fmt.Println("waiting")
wg.Wait()
fmt.Println("done waiting")
close(out)
return out
}
func main() {
done := make(chan bool)
defer close(done)
in := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println("sending", i)
in <- i
}
close(in)
}()
out := workerPool(done, in, 2, func(i int) int {
return i
})
for {
select {
case o, ok := <-out:
if !ok {
continue
}
fmt.Println("output", o)
case <-done:
return
default:
}
}
}
Previous comments about channels being unbuffered are right, but there are other synchronization issues going on as well.
Unbuffered channels essentially mean that the moment a value is written, it must be received before any other writes can happen.
workerPool
creates an unbuffered channel out
to store results, but only returns after all the results are written to out. But since reading from the out channel occurs after workerPool
returns and out
is unbuffered, workerPool
gets blocked by trying to write, resulting in a deadlock. This is why it seems like each worker is only sending one value; in actuality, after the first one was sent all the workers get blocked since nothing is there to receive the value (and you can see this by moving the print statement after writing to out
)Options to fix include making out
have a buffer of size n = number of results
(ie out := make(chan int, n)
) or leaving out
unbuffered and have reading from out
occur while writing occurs.
done
channel isn't being used properly either. Both main
and workerPool
rely on it to stop execution, but nothing ever gets written to it! It also is unbuffered, so it will also run into the deadlock problem described above.To fix, you first can remove the case <-done:
from workerPool
and simply range through in
since it is closed in main
. Then you can make done
a buffered channel to resolve the deadlock.
Combining these fixes give:
package main
import (
"fmt"
"sync"
)
func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
out := make(chan int, 100)
var wg sync.WaitGroup
for i := 0; i < numberOfWorkers; i++ {
fmt.Println("new worker")
wg.Add(1)
// fan out worker goroutines reading from in channel and
// send output into out channel
go func() {
defer wg.Done()
for data := range in {
// fan-in job execution multiplexing results into the results channel
fmt.Println("running func", data)
value := fn(data)
fmt.Println("sending value out", value)
out <- value
}
fmt.Println("no more items")
return
}()
}
fmt.Println("waiting")
wg.Wait()
fmt.Println("done waiting")
close(out)
done <- true
close(done)
return out
}
func main() {
done := make(chan bool, 1)
in := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println("sending", i)
in <- i
}
close(in)
}()
out := workerPool(done, in, 2, func(i int) int {
return i
})
for {
select {
case o, ok := <-out:
if !ok {
continue
}
fmt.Println("output", o)
case <-done:
return
}
}
}
which fixes your issues, but it's not the best way to be using channels! The structure itself can be changed to be much simpler and not have to rely on buffering the channels.