Search code examples
gochannelgoroutine

A goroutine with WaitGroup executes same code twice


I'm relatively new to go and am having some trouble with some concurrent code:

For some reason the else block executes twice and the program panics with the message: panic: close of closed channel because the extractedCh has already been closed the first time.

func (ex Extractor) Extract(pageCh chan *types.ScrapedData, extractedCh chan *types.ScrapedData) {
    log.Info().Msg("Extracting data...")

    var wg sync.WaitGroup

    for {
        page, more := <-pageCh
        if more {
            wg.Add(1)

            go func() {
                defer wg.Done()
                worker, err := ex.getWorker(page)
                if err != nil {
                    log.Error().Err(err).Msg("Error creating worker")
                } else {
                    worker.Extract(extractedCh)
                }
            }()

        } else {
            log.Info().Msg("WebPage channel closed: waiting for waitgroup")
            wg.Wait()

            log.Info().Msg("WaitGroup finished closed -- closing extractedCh")
            close(extractedCh)
        }

    }

}

The Extract function only runs in one goroutine - so I'm not sure why the else block would execute twice and try to close the extractedCh when it is already closed.

What's the fix : do I restructure my code, or simply check to see if the channel is closed before trying to close it?


Solution

  • The else block executes a second time because the else block does not break out of the loop when pageCh is closed. Fix by adding a return statement the end of the else block. Better yet, rewrite the code like this:

    func (ex Extractor) Extract(pageCh chan *types.ScrapedData, extractedCh chan *types.ScrapedData) {
        log.Info().Msg("Extracting data...")
        var wg sync.WaitGroup
        for page := range pageCh {
            wg.Add(1)
            page := page // delete this line if compiling for Go 1.22 or later
            go func() {
                defer wg.Done()
                worker, err := ex.getWorker(page)
                if err != nil {
                    log.Error().Err(err).Msg("Error creating worker")
                } else {
                    worker.Extract(extractedCh)
                }
            }()
        }
        log.Info().Msg("WebPage channel closed: waiting for waitgroup")
        wg.Wait()
        log.Info().Msg("WaitGroup finished closed -- closing extractedCh")
        close(extractedCh)
    }
    

    The for loop exits when pageCh is closed.

    One thing that might have caused confusion is this line:

        page, more := <-pageCh
    

    The second return value from channel receive operator means "a value was received", not "more values expected". If the second return value is false, meaning "no value received", then receives on the channel will forever return false in the second return value.