Search code examples
gogoroutine

Implementing a pipeline of worker functions


I'm implementing a pipeline of a several worker functions connected by channels. All of them get (in, out chan interface{}) as a input (each function receives out of previous one as in)

I don't have any guarantee that out will be closed at the end of each function, so I'm wondering how should I check if a previous function has done its work. I've started with something like this:

func ExecutePipeline(jobs ...job) {
    out := make(chan interface{}, 10)
    for _, val := range jobs {
        in := out
        out := make(chan interface{})
        go val(in, out)
    }
}

I'm thinking about using WaitGroup somehow to use the end of function's goroutine as an indicator that it's done its work and close its out channel.
How can I do it?


Solution

  • If your intent is to propagate a signal along the pipeline to communicate when previous pipeline stages have completed and will produce no further values, you can do this synchronously by closing the channel after each pipeline stage returns. The following code does so by wrapping the invocation of each pipeline worker:

    func startWork(val job, in, out chan interface{}) {
        val(in, out)
        // out will be closed after val returns
        close(out)
    }
    
    // Later, in ExecutePipeline, start your worker by calling startWork
    func ExecutePipeline(jobs ...job) {
        // ...
        for _, val := range jobs {
            // ...
            go startWork(val, in, out)
        }
    }
    

    Avoiding multiple channel closure

    I don't have any guarantee that out will be closed at the end of each function

    Conversely, if any worker can close a channel, this is problematic; the subsequent call in startWork to close the channel will panic if you attempt to close an already-closed channel.

    In this simple implementation, workers must delegating channel closure to the code which supervises the pipeline to avoid causing your program to panic.


    Handling in-band signalling

    As the signalling is passed in-band (in the same channel as the data), care may be required in the implementation of your pipeline workers to ensure they differentiate between

    • reads of a value from an open channel, and
    • reads of a zero value from a closed channel

    rangeing over a channel in a for loop will automatically break the loop when the channel is closed. If you implement your own logic to read from the channel, you will need to ascertain when the read trivially succeeds with a zero value because the channel is closed. This can be achieved using the multi-valued assignment form of the receive operator, which will return a boolean when a read from the channel was of a zero value because the channel was closed and empty.

    func someWorker(in, out chan interface{}) {
        for {
            val, open := <-in
            if !open {
                // Read of val was the zero value of "in", indicating the channel
                // is closed.
                break // or, if appropriate, return
            }
        }
    }