I am following the https://blog.golang.org/pipelines article to implement a few stages.
I need one of the stages to introduce a delay of a few seconds before the events are passed on the next stage in the pipeline.
My concern with the code below is that it will yield an unbounded number of go routines that time.Sleep() before passing the events along. Are there better ways to do this?
Thanks!
func fooStage(inChan <- chan *Bar) (<- chan *Bar) {
out := make(chan *Bar, 10000)
go func() {
defer close(out)
wg := sync.WaitGroup{}
for {
select {
case event, ok := <-inChan:
if !ok {
// inChan closed
break
}
wg.Add(1)
go func() {
time.Sleep(5 * time.Second)
out <- event
wg.Done()
}()
}
}
wg.Wait()
}()
return out
}
You could use another channel to limit the number of active goroutines your loop is able to create.
const numRoutines = 10
func fooStage(inChan <-chan *Bar) <-chan *Bar {
out := make(chan *Bar, 10000)
routines := make(chan struct{}, numRoutines)
go func() {
defer close(out)
wg := sync.WaitGroup{}
for {
select {
case event, ok := <-inChan:
if !ok {
// inChan closed
break
}
wg.Add(1)
routines <- struct{}{}
go func() {
time.Sleep(5 * time.Second)
out <- event
wg.Done()
<-routines
}()
}
}
wg.Wait()
}()
return out
}