I'm trying to implement a simple parallelization example in golang using channels. The code tries to implement a parallelized map function. Made channels buffered too for easing constraints on blocking nature of channel. but code still leads to a deadlock.
func pmmap(inp []int, f func(int) int, p int) {
var wg sync.WaitGroup
var output []chan int
tot := len(inp)
out := make(chan int, tot)
slice := tot / p
for i := 0; i < p; i++ {
temp := make(chan int, slice)
output = append(output, temp)
start_ind := slice * i
end_ind := start_ind + slice
fmt.Println(start_ind, end_ind)
wg.Add(1)
go func(si, ei int, out chan int, wg *sync.WaitGroup) {
fmt.Println("goroutine started with ", si, ei)
for ind := si; ind < ei; ind++ {
out <- f(inp[ind])
}
wg.Done()
}(start_ind, end_ind, output[i], &wg)
}
wg.Add(1)
go func(wg *sync.WaitGroup) {
for i := 0; i < p; i++ {
for val := range output[i] {
out <- val
}
close(output[i])
}
wg.Done()
}(&wg)
wg.Add(1)
go func(wg *sync.WaitGroup) {
for i := range out {
fmt.Println(i)
}
wg.Done()
}(&wg)
time.Sleep(time.Second * 6)
wg.Wait()
close(out)
}
func add4(i int) int {
return i + 4
}
func main() {
temp := []int{}
for i := 1; i <= 20; i++ {
temp = append(temp, i)
}
pmmap(temp, add4, 2)
}
from the output of above code, I get the deadlock is because channel output[1] is never read. but I'm not sure why
0 10
10 20
goroutine started with 0 10
goroutine started with 10 20
5
6
7
8
9
10
11
12
13
14
fatal error: all goroutines are asleep - deadlock!
The problem is that range
over a channel keeps trying to receive from the channel until the channel is closed, but you didn't close
channels after finished sending.
Add close(out)
before wg.Done
in your code will fix it.
Playground: https://play.golang.org/p/NbKTx6Lke7X
Edit: fixed a bug of closing closed channel.