I am new to go, and couldn't find an answer to this problem. What I am doing, is to read a CSV file in producer, doing some stuff which may take time, and then sending the output to a consumer via a channel. There's a chain of producer-consumers, and any producer may end up being slower than it's consumer.
producer(1 goroutine) -> chan0 -> consumer-producer-1(>1 goroutines) -> chan1 -> consumer-producer-2(>1 goroutines) -> chan2 -> consumer(>1 goroutines)
There can be up to 15 consumers here.
Now the problem that I face is how to decide on the consumer side if the producer is done, and we can stop processing.
What I need to achieve is:
I have used the following approach.
Below is what I could think of till here.
processRemaining = false
for processRemaining == false{
select {
case stuff, ok := <-input_messages:
do_stuff(stuff)
if ok == false { // if channel has been closed
processRemaining = true
}
if result != nil {
//send to channel output_messages
}
case sig := <-input_signals: // if signaled to stopped.
fmt.Println("received signal", sig)
processRemaining = true
default:
fmt.Println("no activity")
}
}
if processRemaining {
for stuff := range input_messages {
do_stuff(stuff)
if result != nil {
//send to channel output_messages
}
}
// send "output_routine" number of "done" to a channel "output_signals".
}
But even in this approach, I am unable to think of any way to behave the same way as closed "input_messages" channel, if nothing is available for, say 10 seconds.
Are there any problems I am ignoring with this approach. What are the possible way (or concurrency patterns) to approach this problem? ensuring:
Use a sync.WaitGroup
to keep track of the number of running goroutines. Each goroutine exits after it no longer gets data from the channel. Once the WaitGroup
is done, the cleanup can be done.
Something like this:
import (
"sync"
"time"
)
type Data interface{} // just an example
type Consumer interface {
Consume(Data) Data
CleanUp()
Count() int
Timeout() time.Duration
}
func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
wg := sync.WaitGroup{}
for i := 0; i < consumer.Count(); i++ {
wg.Add(1)
go func() {
consumeLoop:
for {
select {
case v, ok := <-inCh: // 'ok' says if the channel is still open
if !ok {
break consumeLoop
}
outCh <- consumer.Consume(v)
case <-time.After(consumer.Timeout()):
break consumeLoop
}
}
wg.Done()
}()
}
wg.Wait()
consumer.CleanUp()
close(outCh)
}
At each stage of the pipeline, you can use a similar process as the above to start the consumers.