Search code examples
gopipelinegoroutine

How to effectively test the pipes and filters pattern


I am using the pipes and filters pattern as described in this blog post.

I am wondering how to test this effectively. My idea was to just test each filter independently. For example I have a filter that looks like this

func watchTemperature(ctx context.Context, inStream <-chan int) {
    maxTemp = 90

    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            case temp := <-inStream:
                if temp > maxTemp{
                    log.Print("Temperature too high!")
                }
            }
        }
    }()
}

In my test for now I only want to see if the log has been printed. My test looks as follows.

func TestWatchTemperature(t *testing.T) {
    maxTemp = 90

    ctx := context.Background()
    inStream := make(chan int)
    defer close(inStream)
    watchTemperature(ctx, inStream)

    var buf bytes.Buffer
    log.SetOutput(&buf)

    inStream<-maxTemp+1

    logMsg := buf.String()
    assert.True(t,  strings.Contains(logMsg, "Temperature too high!"), 
        "Expected log message not found")
}

As this filter is the end of my pipeline, I do not have an out channel I can read from to determine if this goroutine/filter has already done something.

The only thing I found online so far was, to just wait for a few seconds after writing to the inStream in my test and then check the log. However this seems like a really poor choice, as it simple introduces a race condition and slows down the test.

What is the best way to test something like this or is there simply no good way to test it with this design of my filter and I always need an outStream?


Solution

  • Not always a worker goroutine has a result to deliver. But, if you want to know exactly when it is done, you need to synchronize it with your main goroutine using one of the concurrency primitives. It could be a signaling channel, or a wait group.

    Here's an example:

    package main
    
    import (
        "bytes"
        "context"
        "fmt"
        "log"
        "strings"
    )
    
    const (
        maxTemp = 90
        errMsg  = "Temperature too high!"
    )
    
    func watchTemperature(ctx context.Context, inStream <-chan int, finished chan<- bool) {
        go func() {
            defer func() {
                finished <- true
            }()
            for {
                select {
                case <-ctx.Done():
                    return
                case temp := <-inStream:
                    if temp > maxTemp {
                        log.Print(errMsg)
                    }
                }
            }
        }()
    }
    
    func main() {
        // quit signals to stop the work
        ctx, quit := context.WithCancel(context.Background())
        var buf bytes.Buffer
        // Make sure, this is called before launching the goroutine!
        log.SetOutput(&buf)
        inStream := make(chan int)
        finished := make(chan bool)
        // pass the callback channel to the goroutine
        watchTemperature(ctx, inStream, finished)
    
        // asynchronously to prevent a deadlock
        go func() {
            inStream <- maxTemp + 1
            quit()
        }()
        // Block until the goroutine returns.
        <-finished
    
        if !strings.Contains(buf.String(), errMsg) {
            panic("Expected log message not found")
        }
    
        fmt.Println("Pass!")
    }