Search code examples
goscheduled-taskskillpreemption

How to force an interrupt on function execution that is taking too long within a scheduled tick


I have written this scheduler, but I am not able to make it "kill" that input function f when f takes more than the input recurring time interval.

If that f was a process instead of a thread, then this thing I am looking for could be some sort of defined hard preemption.

That f definition is something I do not have control on. It represents an ETL job involving crunching data from multiple databases during a batch execution. That f it's been written in go and works fine, but I need somehow to have some sort of control on it taking too long to execute.

I know f is atomic, so it either changes the database at the end of its execution or not. So it can be considered safe to "kill" it when it takes too long.

func schedule(f func(), recurring time.Duration) chan struct{} {
    ticker := time.NewTicker(recurring)
    quit := make(chan struct{})
    go func(inFunc func()) {
        for {
            select {
            case <-ticker.C:
                fmt.Println("Ticked")
                // when "go" is removed, then if "f()" takes
                // more than "recurring", then it postpones
                // the following executions of "f()"
                //
                // instead somehow it should be "killed"
                // 
                // check the timestamps in the execution of the test
                go inFunc()
            case <-quit:
                fmt.Println("Stopping the scheduler")
                ticker.Stop()
                return
            }
        }
    }(f)

    return quit
}

To see what's going on I've written this test:

func TestSlowExecutions(t *testing.T) {
    // log some information using a human readable timestamp
    dummyLog := func(format string, a ...interface{}) (n int, err error) {
        prefix := fmt.Sprintf("[%v] ", time.Now())
        message := fmt.Sprintf(format, a...)
        return fmt.Printf("%s%s\n", prefix, message)
    }

    // UUID to be able to uniquely identify "fooFunc"
    newUuid := func() string {
        // sudo apt-get install uuid-runtime
        uuid, _ := exec.Command("uuidgen").Output()

        re := regexp.MustCompile(`\r?\n`)
        uuidStr := re.ReplaceAllString(string(uuid), "")
        return uuidStr
    }

    // simulate some sort of very slow execution
    fooFunc := func() {
        uuid := newUuid()
        dummyLog("Ticked")
        dummyLog("Starting task %s", uuid)
        time.Sleep(2 * time.Second)
        dummyLog("Finished task %s", uuid)
    }

    // test the very slow execution of "fooFunc"
    quitChan := schedule(fooFunc, 1*time.Second)

    time.Sleep(4 * time.Second)
    close(quitChan)
    // wait more to see the "closing" message
    time.Sleep(4 * time.Second)
}

Solution

  • I negotiated the usage of a context with timeout (https://golang.org/pkg/context/#WithTimeout) with the author of f().

    See below for a working example, paying attention to the timestamps of the dummyLog so it should be clear what's happening on all the go routines involved in this process.

    The code:

    // dummyLog could be used to log some information using a human readable timestamp and the benefits of `fmt.Sprintf`
    func dummyLog(format string, a ...interface{}) (n int, err error) {
        prefix := fmt.Sprintf("[%v] ", time.Now())
        message := fmt.Sprintf(format, a...)
        return fmt.Printf("%s%s\n", prefix, message)
    }
    
    // newContext is providing a brand new context with a upper bound timeout
    func newContext(timeoutUpperBound time.Duration) (context.Context, context.CancelFunc) {
        ctx, cancel := context.WithTimeout(context.Background(), timeoutUpperBound)
        deadline, ok := ctx.Deadline()
        dummyLog("The context deadline is set to %s is it still valid? %v", deadline, ok)
        return ctx, cancel
    }
    
    // schedule could be used to schedule arbitrary functions with a recurring interval
    func schedule(f func(ctx context.Context), recurring time.Duration) chan struct{} {
        ticker := time.NewTicker(recurring)
        quit := make(chan struct{})
        go func(inFunc func(ctx context.Context)) {
            for {
                select {
                case <-ticker.C:
                    dummyLog("Ticked in the scheduler")
                    // simulate the "killing" of "inFunc" when it takes too long
                    go func(recurring time.Duration) {
                        inCtx, cancel := newContext(recurring)
                        defer cancel()
                        inFunc(inCtx)
                    }(recurring)
                case <-quit:
                    dummyLog("Stopping the scheduler")
                    ticker.Stop()
                    return
                }
            }
        }(f)
    
        return quit
    }
    

    The execution of the code in a testing environment (although not assertions have been performed):

    func TestSomething(t *testing.T) {
    
        // newUuid could be used to generate a UUID to be able to uniquely identify "fooFunc"
        newUuid := func() string {
            // sudo apt-get install uuid-runtime
            uuid, _ := exec.Command("uuidgen").Output()
    
            re := regexp.MustCompile(`\r?\n`)
            uuidStr := re.ReplaceAllString(string(uuid), "")
            return uuidStr
        }
    
        // randBetween is a dummy random int generator using "math/rand"
        randBetween := func(min int, max int) int {
            return min + rand.Intn(max-min)
        }
    
        // fooFunc simulates some sort of very slow execution
        // like database queries or network I/O
        fooFunc := func(ctx context.Context) {
            uuid := newUuid()
            randWait := time.Duration(randBetween(0, 4000)) * time.Millisecond
            dummyLog("Starting task %s taking %s random time", uuid, randWait)
            select {
            case <-time.After(randWait):
                dummyLog("Finished task %s", uuid)
            case <-ctx.Done():
                dummyLog("Killed task %s, reason: '%s'", uuid, ctx.Err())
            }
        }
    
        // test the very slow execution of "fooFunc"
        timeoutUpperBound := 2 * time.Second
        quitChan := schedule(fooFunc, timeoutUpperBound)
    
        time.Sleep(6 * timeoutUpperBound)
        close(quitChan)
        // wait more to see the "closing" message
        time.Sleep(4 * time.Second)
    }