Search code examples
goconcurrencygoroutinechannel

Do goroutines with receiving channel as parameter stop, when the channel is closed?


I have been reading "Building microservices with go" and the book introduces apache/go-resiliency/deadline package for handling timeouts.

deadline.go

// Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go.
package deadline

import (
    "errors"
    "time"
)

// ErrTimedOut is the error returned from Run when the deadline expires.
var ErrTimedOut = errors.New("timed out waiting for function to finish")

// Deadline implements the deadline/timeout resiliency pattern.
type Deadline struct {
    timeout time.Duration
}

// New constructs a new Deadline with the given timeout.
func New(timeout time.Duration) *Deadline {
    return &Deadline{
        timeout: timeout,
    }
}

// Run runs the given function, passing it a stopper channel. If the deadline passes before
// the function finishes executing, Run returns ErrTimeOut to the caller and closes the stopper
// channel so that the work function can attempt to exit gracefully. It does not (and cannot)
// simply kill the running function, so if it doesn't respect the stopper channel then it may
// keep running after the deadline passes. If the function finishes before the deadline, then
// the return value of the function is returned from Run.
func (d *Deadline) Run(work func(<-chan struct{}) error) error {
    result := make(chan error)
    stopper := make(chan struct{})

    go func() {
        result <- work(stopper)
    }()

    select {
    case ret := <-result:
        return ret
    case <-time.After(d.timeout):
        close(stopper)
        return ErrTimedOut
    }
}

deadline_test.go

package deadline

import (
    "errors"
    "testing"
    "time"
)

func takesFiveMillis(stopper <-chan struct{}) error {
    time.Sleep(5 * time.Millisecond)
    return nil
}

func takesTwentyMillis(stopper <-chan struct{}) error {
    time.Sleep(20 * time.Millisecond)
    return nil
}

func returnsError(stopper <-chan struct{}) error {
    return errors.New("foo")
}

func TestDeadline(t *testing.T) {
    dl := New(10 * time.Millisecond)

    if err := dl.Run(takesFiveMillis); err != nil {
        t.Error(err)
    }

    if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
        t.Error(err)
    }

    if err := dl.Run(returnsError); err.Error() != "foo" {
        t.Error(err)
    }

    done := make(chan struct{})
    err := dl.Run(func(stopper <-chan struct{}) error {
        <-stopper
        close(done)
        return nil
    })
    if err != ErrTimedOut {
        t.Error(err)
    }
    <-done
}

func ExampleDeadline() {
    dl := New(1 * time.Second)

    err := dl.Run(func(stopper <-chan struct{}) error {
        // do something possibly slow
        // check stopper function and give up if timed out
        return nil
    })

    switch err {
    case ErrTimedOut:
        // execution took too long, oops
    default:
        // some other error
    }
}

1st question

// in deadline_test.go
if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
    t.Error(err)
}  

I have problem understanding the execution flow of above code. As far as I understand, because the takesTwentyMillis function sleeps longer than the set timeout duration of 10 milliseconds,

// in deadline.go
case <-time.After(d.timeout):
    close(stopper)
    return ErrTimedOut

time.After emits current time, and this case is selected. Then the stopper channel is closed and ErrTimeout is returned.

What I do not understand is, what closing the stopper channel does to the anonymous goroutine that might still be running I think, when the stopper channel is closed, the below goroutine might still be running.

go func() {
        result <- work(stopper)
    }()

(Please correct me if I'm wrong here) I think after close(stopper), this goroutine will call takesTwentyMillis(=work function) with stopper channel as its parameter. And the function will proceed and sleep for 20 milliseconds and return nil to pass to result channel. And the main() ends here, right?

I do not see what is the point of closing the stopper channel here. The takesTwentyMillis function does not seem to use the channel within the function body anyway :(.

2nd question

// in deadline_test.go within TestDeadline()
done := make(chan struct{})
err := dl.Run(func(stopper <-chan struct{}) error {
    <-stopper
    close(done)
    return nil
})
if err != ErrTimedOut {
    t.Error(err)
}
<-done

This is the part I do not understand completely. I think when dl.Run is run, stopper channel is initialized. But because there is no value in the stopper channel, the function call will be blocked at <-stopper...but because I do not understand this code, I do not see why this code exists in the first place (i.e. what this code is trying to test, and how it is executed, etc).


3rd(additional) question regarding the 2nd question

So I understand that when Run function in the 2nd question triggers the stopper channel to close, the worker function gets the signal. And the worker closes the done channel and returns nil. I used delve(=go debugger) to see this, and the gdb takes me to the goroutine in deadline.go after the line return nil.

   err := dl.Run(func(stopper <-chan struct{}) error {
        <-stopper
        close(done)
-->     return nil   
    })

After typing n for stepping over to the next line, delve takes me here

    go func() {
-->             result <- work(stopper)
            }()

And the process kind of finishes here because when I type n again the command line prompts PASS and process exits. Why does the process finishes here? The work(stopper) seems to return nil, which should then be passed to result channel right? But this line does not seem to execute for some reason.

I know the main goroutine, which is the Run function, has already returned ErrTimedOut. So I guess it has something to do with this?


Solution

  • 1st question

    The use of the stopper channel is to signal the function e.g. takesTwentyMillis that it's deadline is reached and the caller no longer cares about its result. Usually this means that the worker function like takesTwentyMillis should check if the stopper channel is already closed so that it may cancel it's work. Still, checking for the stopper channel is the worker function's choice. It may or may not check the channel.

    func takesTwentyMillis(stopper <-chan struct{}) error {
        for i := 0; i < 20; i++ {
            select {
            case <-stopper:
                // caller doesn't care anymore might as well stop working
                return nil
            case <-time.After(time.Second): // simulating work
            }
        }
        // work is done
        return nil
    }
    

    2nd question

    This part of Deadline.Run() will close the stopper channel.

    case <-time.After(d.timeout):
        close(stopper)
    

    Reading on a closed channel (<-stopper) will return a zero value for that channel immediately. I think it's just testing for a worker function that ultimately times-out.