I have been using the mapWithState
API in Spark Streaming, but 2 things are not clear about the StateSpec.function
:
Let's say my function is:
def trackStateForKey(batchTime: Time,
key: Long,
newValue: Option[JobData],
currentState: State[JobData]): Option[(Long, JobData)]
Why is the new value an Option[T]
type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.
What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove()
and state.update()
, why would I have to do the same with return values?
In my current implementation I return None
if I remove the key, and Some(newState)
if I update it, but I'm not sure if that is correct.
Why is the new value an
Option[T]
type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.
It is an Option[T]
for the reason that if you set a timeout using StateSpec.timeout
, e.g:
StateSpec.function(spec _).timeout(Milliseconds(5000))
then the value passed in once the function times out will be None
and the isTimingOut
method on State[T]
will yield true. This makes sense, because a timeout of the state doesn't mean that a new value has arrived for the specified key, and generally safer to use than passing null
for T
(which wouldn't work for primitives anyway) as you expect the user to safely operate on an Option[T]
.
You can see that in the Sparks implementation:
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
mappedData ++= returned
newStateMap.remove(key)
}
}
What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove() and state.update(), why would I have to do the same with return values?
The return value is a way to pass intermediate state along the spark graph. For example, assume that I want to update my state but also perform some operation in my pipeline with the intermediate data, e.g:
dStream
.mapWithState(stateSpec)
.map(optionIntermediateResult.map(_ * 2))
.foreachRDD( /* other stuff */)
That return value is exactly what allows me to continue operating on said data. If you don't care for the intermediate result and only want the complete state, then outputting None
is perfectly fine.
I've written a blog post (following this question) which attempts to give an in-depth explanation to the API.