I'm implementing a pipeline of a several worker functions connected by channels. All of them get (in, out chan interface{})
as a input (each function receives out
of previous one as in
)
I don't have any guarantee that out
will be closed at the end of each function, so I'm wondering how should I check if a previous function has done its work. I've started with something like this:
func ExecutePipeline(jobs ...job) {
out := make(chan interface{}, 10)
for _, val := range jobs {
in := out
out := make(chan interface{})
go val(in, out)
}
}
I'm thinking about using WaitGroup
somehow to use the end of function's goroutine as an indicator that it's done its work and close its out channel.
How can I do it?
If your intent is to propagate a signal along the pipeline to communicate when previous pipeline stages have completed and will produce no further values, you can do this synchronously by closing the channel after each pipeline stage returns. The following code does so by wrapping the invocation of each pipeline worker:
func startWork(val job, in, out chan interface{}) {
val(in, out)
// out will be closed after val returns
close(out)
}
// Later, in ExecutePipeline, start your worker by calling startWork
func ExecutePipeline(jobs ...job) {
// ...
for _, val := range jobs {
// ...
go startWork(val, in, out)
}
}
I don't have any guarantee that
out
will be closed at the end of each function
Conversely, if any worker can close a channel, this is problematic; the subsequent call in startWork
to close the channel will panic if you attempt to close an already-closed channel.
In this simple implementation, workers must delegating channel closure to the code which supervises the pipeline to avoid causing your program to panic.
As the signalling is passed in-band (in the same channel as the data), care may be required in the implementation of your pipeline workers to ensure they differentiate between
range
ing over a channel in a for
loop will automatically break the loop when the channel is closed. If you implement your own logic to read from the channel, you will need to ascertain when the read trivially succeeds with a zero value because the channel is closed. This can be achieved using the multi-valued assignment form of the receive operator, which will return a boolean when a read from the channel was of a zero value because the channel was closed and empty.
func someWorker(in, out chan interface{}) {
for {
val, open := <-in
if !open {
// Read of val was the zero value of "in", indicating the channel
// is closed.
break // or, if appropriate, return
}
}
}