Search code examples
goconcurrencydeadlockgoroutine

Go producer consumer avoiding deadlock


I have a code for consumer and producer in go. Although I have asked this question for code-review here and a good part of the idea was derived from this thread here here is the code in playground.

  • This code has multiple producers and consumers sharing the same channel.
  • This code has an error handling mechanism, if any of the workers (producer or consumer) errors out than all the workers should by halted.

I am concerned about deadlock scenario where all consumers are shut but producer is still adding data to shared channel. To "mitigate" this I have added a context check right before adding data into the data queue - specifically Line 85 in go playground.

However is a dead lock still possible if - the producer checks for context.Done() in Line 85, then the context is cancelled causing all consumers to shut down, and then the producer tries to insert data into the queue ?

If so how to mitigate.

Reposting the code:

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    a1 := []int{1, 2, 3, 4, 5}
    a2 := []int{5, 4, 3, 1, 1}
    a3 := []int{6, 7, 8, 9}
    a4 := []int{1, 2, 3, 4, 5}
    a5 := []int{5, 4, 3, 1, 1}
    a6 := []int{6, 7, 18, 9}
    arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

    ctx, cancel := context.WithCancel(context.Background())
    ch1 := read(ctx, arrayOfArray)

    messageCh := make(chan int)
    errCh := make(chan error)

    producerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        producerWg.Add(1)
        producer(ctx, producerWg, ch1, messageCh, errCh)
    }

    consumerWg := &sync.WaitGroup{}
    for i := 0; i < 3; i++ {
        consumerWg.Add(1)
        consumer(ctx, consumerWg, messageCh, errCh)
    }

    firstError := handleAllErrors(ctx, cancel, errCh)

    producerWg.Wait()
    close(messageCh)

    consumerWg.Wait()
    close(errCh)

    fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
    ch := make(chan []int)

    go func() {
        defer close(ch)

        for i := 0; i < len(arrayOfArray); i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- arrayOfArray[i]:
            }
        }
    }()

    return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
    go func() {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case arr, ok := <-in:
                if !ok {
                    return
                }

                for i := 0; i < len(arr); i++ {

                    // simulating an error.
                    //if arr[i] == 10 {
                    //  errCh <- fmt.Errorf("producer interrupted")
                    //}

                    select {
                    case <-ctx.Done():
                        return
                    case messageCh <- 2 * arr[i]:
                    }
                }
            }
        }
    }()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
    go func() {
        wg.Done()

        for {
            select {
            case <-ctx.Done():
                return
            case n, ok := <-messageCh:
                if !ok {
                    return
                }
                fmt.Println("consumed: ", n)

                // simulating erros
                //if n == 10 {
                //  errCh <- fmt.Errorf("output error during write")
                //}
            }
        }
    }()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
    firstErrCh := make(chan error, 1)
    isFirstError := true
    go func() {
        defer close(firstErrCh)
        for err := range errCh {
            select {
            case <-ctx.Done():
            default:
                cancel()
            }
            if isFirstError {
                firstErrCh <- err
                isFirstError = !isFirstError
            }
        }
    }()

    return firstErrCh
}

Solution

  • Nope, you're fine, this won't deadlock on a producer write because you're wrapping the channel writes in a select statement, so even if the channel write can't happen because the consumers have terminated, you'll still hit the context cancellation clause and terminate your producer.

    Just to demonstrate the concept, you can run this and see it doesn't deadlock despite the fact that it's attempting a channel write with no readers.

    package main
    
    import (
        "context"
        "fmt"
        "time"
    )
    
    func main() {
        ctx, cancel := context.WithCancel(context.Background())
        ch := make(chan struct{})
    
        go func() {
            time.Sleep(1 * time.Second)
            cancel()
        }()
    
        select {
        case ch <- struct{}{}:
        case <-ctx.Done():
            fmt.Println("context canceled")
        }
        fmt.Println("bye!")
    }
    

    Here's the playground link for it.

    Regarding some code simplification. If this is for any sort of real-life code, I'd probably just use a Group from golang.org/x/sync/errgroup. Either that or take a cue from them and leverage sync.Once and wrap all of your producers and consumers with a function that spawns the goroutines and can handle the errors without the more complex error channel draining code you have in your error handling function.