Search code examples
apache-flinkflink-statefun

Flink stateful functions : compensating callback on a timeout


I am implementing a use case in Flink stateful functions. My specification highlights that starting from a stateful function f a business workflow (in other words a group of stateful functions f1, f2, … fn are called either sequentially or in parallel or both ). Stateful function f waits for a result to be returned to update a local state, it as well starts a timeout callback i.e. a message to itself. At timeout, f checks if the local state is updated (it has received a result), if this is the case life is good.

However, if at timeout f discovers that it has not received a result yet, it has to launch a compensating workflow to undo any changes that stateful functions f1, f2, … fn might have received.

Does Flink stateful functions framework support such as a design pattern/use case, or it should be implemented at the application level? What is the simplest design to achieve such a solution? For instance, how to know what functions of the workflow stateful functions f1, f2, … fn were affected by the timedout invocation (where the control flow has been timed out)? How does Flink sateful functions and the concept of integrated messaging and state facilitate such a pattern?

Thank you.


Solution

  • I posted the question on Apache Flink mailing list and got the following response by Igal Shilman, Thanks to Igal.

    The first thing that I would like to mention is that, if your original motivation for that scenario is a concern of a transient failures such as:

    • did function Y ever received a message sent by function X ?
    • did sending a message failed?
    • did the target function is there to accept a message sent to it?
    • did the order of message got mixed up?
    • etc'

    Then, StateFun eliminates all of these problems and a whole class of transient errors that otherwise you would have to deal with by yourself in your business logic (like retries, backoffs, service discovery etc').

    Now if your motivating scenario is not about transient errors but more about transactional workflows, then as Dawid mentioned you would have to implement this in your application logic. I think that the way you have described the flow should map directly to a coordinating function (per flow instance) that keeps track of results/timeouts in its internal state.

    Here is a sketch:

    1. A Flow Coordinator Function - it would be invoked with the input necessary to kick off a flow. It would start invoking the relevant functions (as defined by the flow's DAG) and would keep an internal state indicating what functions (addresses) were invoked and their completion statues. When the flow completes successfully the coordinator can safely discard its state. In any case that the coordinator decides to abort the flow (an internal timeout / an external message / etc') it would have to check its internal state and kick off a compensating workflow (sending a special message to the already succeed/in progress functions)

    2. Each function in the flow has to accept a message from the coordinator, in turn, and reply with either a success or a failure.