Search code examples
goconcurrencychannelgoroutine

go routine not collecting all objects from channel


I have one go-routine to add the objects into channel and then I have 4 go-routines to process objects of channel. The processing is nothing but adding objects to an array. But at few times, the objects are missing from final array. So I am assuming that at some point channel stops to collect objects. I have following code:

package main

import (
    "log"
    "sync"
)

func main() {
    j := 0
    for {
        if j == 10 {
            break
        }
        wg := sync.WaitGroup{}
        months := []string{"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul"}
        hits := make(chan string)
        i := 0
        wg.Add(1)
        go func() {
            defer close(hits)
            for {
                if i == 25 {
                    wg.Done()
                    return
                }
                for _, month := range months {
                    hits <- month
                }
                i++
            }
        }()

        temp := []string{}
        for updateWorker := 1; updateWorker <= 4; updateWorker++ {
            wg.Add(1)
            go func() {
                for hit := range hits {
                    temp = append(temp, hit)
                }
                wg.Done()
                return
            }()
        }

        wg.Wait()

        log.Printf("length of temp %+v\n", len(temp))
        j++
    }
}

I am using sync library to synchronise routines. I am looping same process 10 times to test if output is consistent. I am expecting output like this:

length of temp 175

It is 175 because I am sending 7 month strings 25 times. But sometimes the output is less than 175, I don't know why. I am bit beginner about go routines. So could anybody please help me here finding the reason? Thanks.


Solution

  • The problem is that the updateWorker goroutines all collect results from the hits channel (so far so good), and they all store the result into the temp local variable unsynchronized. This is not OK.

    Access to all variables from multiple goroutines (where at least one of them is a write) must be synchronized.

    If you run it with the race detector enabled, it screams about data races (go run -race app.go).

    It immediately yields valid results if you decrease the number of updateWorker goroutines to 1, because then we eliminate the single data race source of your app:

    for updateWorker := 1; updateWorker <= 1; updateWorker++ {
        // ...
    }
    

    If you want to keep multiple updateWorker goroutines, their access to the shared temp variable must be synchronized.

    With a sync.Mutex:

    var (
        mu   sync.Mutex
        temp []string
    )
    for updateWorker := 1; updateWorker <= 4; updateWorker++ {
        wg.Add(1)
        go func() {
            for hit := range hits {
                mu.Lock()
                temp = append(temp, hit)
                mu.Unlock()
            }
            wg.Done()
            return
        }()
    }
    

    Also note that in this simple example you gain nothing by using multiple updateWorker goroutines, adding the above synchronization (locking) even makes it less performant compared to having only one of them.

    For properly distributing work and collecting results, check out this answer: Is this an idiomatic worker thread pool in Go?