Search code examples
asynchronousgoconcurrencychannelgoroutine

In golang, how to write a pipeline stage that introduces a delay for the next stage?


I am following the https://blog.golang.org/pipelines article to implement a few stages.

I need one of the stages to introduce a delay of a few seconds before the events are passed on the next stage in the pipeline.

My concern with the code below is that it will yield an unbounded number of go routines that time.Sleep() before passing the events along. Are there better ways to do this?

Thanks!

func fooStage(inChan <- chan *Bar) (<- chan *Bar) {
    out := make(chan *Bar, 10000)
    go func() {
        defer close(out)
        wg := sync.WaitGroup{}
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    // inChan closed
                    break
                }
                wg.Add(1)
                go func() {
                    time.Sleep(5 * time.Second)
                    out <- event
                    wg.Done()
                }()
            }
        }
        wg.Wait()
    }()
    return out
}

Solution

  • You could use another channel to limit the number of active goroutines your loop is able to create.

    const numRoutines = 10
    
    func fooStage(inChan <-chan *Bar) <-chan *Bar {
        out := make(chan *Bar, 10000)
        routines := make(chan struct{}, numRoutines)
        go func() {
            defer close(out)
            wg := sync.WaitGroup{}
            for {
                select {
                case event, ok := <-inChan:
                    if !ok {
                        // inChan closed
                        break
                    }
                    wg.Add(1)
                    routines <- struct{}{}
                    go func() {
                        time.Sleep(5 * time.Second)
                        out <- event
                        wg.Done()
                        <-routines
                    }()
                }
            }
            wg.Wait()
        }()
        return out
    }