I am using the pipes and filters pattern as described in this blog post.
I am wondering how to test this effectively. My idea was to just test each filter independently. For example I have a filter that looks like this
func watchTemperature(ctx context.Context, inStream <-chan int) {
maxTemp = 90
go func() {
for {
select {
case <-ctx.Done():
return
case temp := <-inStream:
if temp > maxTemp{
log.Print("Temperature too high!")
}
}
}
}()
}
In my test for now I only want to see if the log has been printed. My test looks as follows.
func TestWatchTemperature(t *testing.T) {
maxTemp = 90
ctx := context.Background()
inStream := make(chan int)
defer close(inStream)
watchTemperature(ctx, inStream)
var buf bytes.Buffer
log.SetOutput(&buf)
inStream<-maxTemp+1
logMsg := buf.String()
assert.True(t, strings.Contains(logMsg, "Temperature too high!"),
"Expected log message not found")
}
As this filter is the end of my pipeline, I do not have an out channel I can read from to determine if this goroutine/filter has already done something.
The only thing I found online so far was, to just wait for a few seconds after writing to the inStream in my test and then check the log. However this seems like a really poor choice, as it simple introduces a race condition and slows down the test.
What is the best way to test something like this or is there simply no good way to test it with this design of my filter and I always need an outStream?
Not always a worker goroutine has a result to deliver. But, if you want to know exactly when it is done, you need to synchronize it with your main goroutine using one of the concurrency primitives. It could be a signaling channel, or a wait group.
Here's an example:
package main
import (
"bytes"
"context"
"fmt"
"log"
"strings"
)
const (
maxTemp = 90
errMsg = "Temperature too high!"
)
func watchTemperature(ctx context.Context, inStream <-chan int, finished chan<- bool) {
go func() {
defer func() {
finished <- true
}()
for {
select {
case <-ctx.Done():
return
case temp := <-inStream:
if temp > maxTemp {
log.Print(errMsg)
}
}
}
}()
}
func main() {
// quit signals to stop the work
ctx, quit := context.WithCancel(context.Background())
var buf bytes.Buffer
// Make sure, this is called before launching the goroutine!
log.SetOutput(&buf)
inStream := make(chan int)
finished := make(chan bool)
// pass the callback channel to the goroutine
watchTemperature(ctx, inStream, finished)
// asynchronously to prevent a deadlock
go func() {
inStream <- maxTemp + 1
quit()
}()
// Block until the goroutine returns.
<-finished
if !strings.Contains(buf.String(), errMsg) {
panic("Expected log message not found")
}
fmt.Println("Pass!")
}