I am trying to understand the golang channels and synchronization. When I run my program with race detector, it results in race detection.
func main() {
ch := make(chan int)
done := make(chan struct{})
wg := sync.WaitGroup{}
go func() {
defer close(ch)
defer close(done)
wg.Wait()
done <- struct{}{}
}()
for i := 0; i < 5; i++ {
x := i
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Value: ", x)
ch <- x
}()
}
loop:
for {
select {
case i := <-ch:
fmt.Println("Value: ", i)
case <- done:
break loop
}
}
}
==================
WARNING: DATA RACE
Write at 0x00c000020148 by goroutine 7:
internal/race.Write()
/home/linuxbrew/.linuxbrew/Cellar/go/1.16.5/libexec/src/internal/race/race.go:41 +0x125
sync.(*WaitGroup).Wait()
/home/linuxbrew/.linuxbrew/Cellar/go/1.16.5/libexec/src/sync/waitgroup.go:128 +0x126
main.main.func1()
/home/reddy/code/github.com/awesomeProject/prod.go:106 +0xc4
Previous read at 0x00c000020148 by main goroutine:
internal/race.Read()
/home/linuxbrew/.linuxbrew/Cellar/go/1.16.5/libexec/src/internal/race/race.go:37 +0x206
sync.(*WaitGroup).Add()
/home/linuxbrew/.linuxbrew/Cellar/go/1.16.5/libexec/src/sync/waitgroup.go:71 +0x219
main.main()
/home/reddy/code/github.com/awesomeProject/prod.go:112 +0x124
Goroutine 7 (running) created at:
main.main()
/home/reddy/code/github.com/awesomeProject/prod.go:103 +0x104
==================
I am not able to figure out what's going wrong here.
My analysis:
wg.Add(1)
is incrementing the counterwg.Done()
is called at the end of goroutine which decrements the counterch <- x
this should be a blocking call as it's non buffered channelwaitgroup
counter goes to zero, i.e. all the 5 goroutines published the messagewg
goroutine will resume and done is called and once the message is consumed in the main loop, it breaks the loop and should gracefully exit.The program has a race between the calls to wg.Add
and the call to wg.Wait
. These calls can happen in any order. The call to wg.Wait
does not wait for any of the goroutines when wg.Wait
is called before the calls to wg.Add
.
Fix by moving the calls to wg.Add
before starting the goroutine that calls wg.Wait
. This change ensures that the calls to wg.Add
happen before the call to wg.Wait
.
for i := 0; i < 5; i++ {
x := i
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Value: ", x)
ch <- x
}()
}
go func() {
defer close(ch)
defer close(done)
wg.Wait()
done <- struct{}{}
}()
The WaitGroup
type has code to check for this error when running under the race detector (modeled read, modeled write).
Simplify the code by breaking out of the loop in the main goroutine when ch
is closed. The done
channel is not needed.
ch := make(chan int)
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
x := i
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Value: ", x)
ch <- x
}()
}
go func() {
wg.Wait()
close(ch)
}()
for i := range ch {
fmt.Println("Value: ", i)
}