Search code examples
gochannelproducer-consumer

How to handle channel closing synchronisation in case of slow producer, fast consumer in go?


I am new to go, and couldn't find an answer to this problem. What I am doing, is to read a CSV file in producer, doing some stuff which may take time, and then sending the output to a consumer via a channel. There's a chain of producer-consumers, and any producer may end up being slower than it's consumer.

producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)

There can be up to 15 consumers here.

Now the problem that I face is how to decide on the consumer side if the producer is done, and we can stop processing.

What I need to achieve is:

  1. once producer is done, all consumers should eventually do some cleanup and exit after finishing the remaining
  2. If a consumer doesn't get any data for a specific timeout period, it can exit(with a signal, preferably) without blocking any further.
  3. It happens for all the producer-consumer pair across the sequence.

I have used the following approach.

  1. To keep a signal channel along with each data channel, and to publish a "done", for each goroutine of its next consumer.
  2. After reading it, each consumer should just read the remaining buffered data in the channel and then put, say 5 "done" on next signal channel. Ensuring that it's only 5, and not 5 for each goroutine (using https://golang.org/pkg/sync/#Once.Do).
  3. Below is what I could think of till here.

    processRemaining = false
    for processRemaining == false{
            select {
            case stuff, ok := <-input_messages:
                    do_stuff(stuff)
                    if ok == false { // if channel has been closed
                        processRemaining = true
                    }
                    if result != nil {
                            //send to channel output_messages
                    }
            case sig := <-input_signals: // if signaled to stopped.
                    fmt.Println("received signal", sig)
                    processRemaining = true
            default:
                    fmt.Println("no activity")
            }
    }
    if processRemaining {
            for stuff := range input_messages {
                    do_stuff(stuff)
                    if result != nil {
                            //send to channel output_messages
                    }
            }
            // send "output_routine" number of "done" to a channel "output_signals".
    }
    

But even in this approach, I am unable to think of any way to behave the same way as closed "input_messages" channel, if nothing is available for, say 10 seconds.

Are there any problems I am ignoring with this approach. What are the possible way (or concurrency patterns) to approach this problem? ensuring:

  1. All the subsequent channels are closed, once first "chan0" is closed.
  2. All the producers are updated before closing their output channel, and the channel is closed only once they all have finished their writes.
  3. If a consumer gets no data from a channel for a specified timeout, it should treat it as closed, and unblocks itself.

Solution

  • Use a sync.WaitGroup to keep track of the number of running goroutines. Each goroutine exits after it no longer gets data from the channel. Once the WaitGroup is done, the cleanup can be done.

    Something like this:

    import (
            "sync"
            "time"
    )
    
    type Data interface{} // just an example
    
    type Consumer interface {
            Consume(Data) Data
            CleanUp()
            Count() int
            Timeout() time.Duration
    }
    
    func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
            wg := sync.WaitGroup{}
            for i := 0; i < consumer.Count(); i++ {
                    wg.Add(1)
                    go func() {
                    consumeLoop:
                            for {
                                    select {
                                    case v, ok := <-inCh: // 'ok' says if the channel is still open
                                            if !ok {
                                                    break consumeLoop
                                            }
                                            outCh <- consumer.Consume(v)
                                    case <-time.After(consumer.Timeout()):
                                            break consumeLoop
                                    }
                            }
    
                            wg.Done()
                    }()
            }
            wg.Wait()
    
            consumer.CleanUp()
            close(outCh)
    }
    

    At each stage of the pipeline, you can use a similar process as the above to start the consumers.