Search code examples
apache-sparkspark-structured-streaming

spark streaming understanding timeout setup in mapGroupsWithState


I am trying very hard to understand the timeout setup when using the mapGroupsWithState for spark structured streaming.

below link has very detailed specification, but I am not sure i understood it properly, especially the GroupState.setTimeoutTimeStamp() option. Meaning when setting up the state expiry to be sort of related to the event time. https://spark.apache.org/docs/3.0.0-preview/api/scala/org/apache/spark/sql/streaming/GroupState.html

I copied them out here:

With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark(). 

With this setting, data that is older than the watermark are filtered out. 
The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp. 

You can control the timeout delay by two parameters - watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering). 

Guarantees provided by this timeout are as follows:
Timeout will never be occur before watermark has exceeded the set timeout.
Similar to processing time timeouts, there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced.

question 1: What is this timestamp in this sentence and the timeout would occur when the watermark advances beyond the set timestamp? is it an absolute time or is it a relative time duration to the current event time in the state? I know I could expire it by removing the state by ```

e.g. say I have some data state like below, when will it exprire by setting up what value in what settings?

+-------+-----------+-------------------+
|expired|something  |          timestamp|
+-------+-----------+-------------------+
|  false|   someKey |2020-08-02 22:02:00|
+-------+-----------+-------------------+

question 2: Reading the sentence Data that is older than the watermark are filtered out, I understand the late arrival data is ignored after it is read from kafka, is this correct?

question reason Without understanding these, i can not really apply them to use cases. Meaning when to use GroupState.setTimeoutDuration(), when to use GroupState.setTimeoutTimestamp()

Thanks a lot.

ps. I also tried to read below

-  https://www.waitingforcode.com/apache-spark-structured-streaming/stateful-transformations-mapgroupswithstate/read
(confused me, did not understand)
- https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
(did not say a lot of it for my interest)

Solution

  • What is this timestamp in the sentence and the timeout would occur when the watermark advances beyond the set timestamp?

    This is the timestamp you set by GroupState.setTimeoutTimestamp().

    is it an absolute time or is it a relative time duration to the current event time in the state?

    This is a relative time (not duration) based on the current batch window.

    say I have some data state (column timestamp=2020-08-02 22:02:00), when will it expire by setting up what value in what settings?

    Let's assume your sink query has a defined processing trigger (set by trigger()) of 5 minutes. Also, let us assume that you have used a watermark before applying the groupByKey and the mapGroupsWithState. I understand you want to use timeouts based on event times (as opposed to processing times, so your query will be like:

    ds.withWatermark("timestamp", "10 minutes")
      .groupByKey(...) // declare your key
      .mapGroupsWithState(
        GroupStateTimeout.EventTimeTimeout)(
        ...) // your custom update logic
    

    Now, it depends on how you set the TimeoutTimestamp withing your "custom update logic". Somewhere in your custom update logic you will need to call

    state.setTimeoutTimestamp()
    

    This method has four different signatures and it is worth scanning through their documentation. As we have set a watermark in (withWatermark) we can actually make use of that time. As a general rule: It is important to set the timeout timestamp (set by state.setTimeoutTimestamp()) to a value larger then the current watermark. To continue with our example we add one hour as shown below:

    state.setTimeoutTimestamp(state.getCurrentWatermarkMs, "1 hour")
    

    To conclude, your message can arrive into your stream between 22:00:00 and 22:15:00 and if that message was the last for the key it will timeout by 23:15:00 in your GroupState.

    question 2: Reading the sentence Data that is older than the watermark are filtered out, I understand the late arrival data is ignored after it is read from kafka, this is correct?

    Yes, this is correct. For the batch interval 22:00:00 - 22:05:00 all messages that have an event time (defined by column timestamp) arrive later then the declared watermark of 10 minutes (meaning later then 22:15:00) will be ignored anyway in your query and are not going to be processed within your "custom update logic".