One big different between Spark Structure Streaming (SSS) and Spark Streaming (SS) is SSS can leverage statestore. It can store aggregation result of previous batches and apply current result with previous result. So it can get the real aggregation result from the very beginning of the input stream.
But for one case, we don't want to get the final result that merged with previous value of statestore. We just want to get (ouput) the aggregation result of current batch. And duo to the platform&framework thing, we can't rollback to SS.
So my question is, is it still doable in SSS to get the aggretation result of current batch, like SS?
Taking word count application for example which is given in the spark structure streaming guide: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
When there is a "cat cat
" comes in one batch, my expected output is cat|2
.
And when "cat
" comes in next batch, my expected output is cat|1
is it still doable in SSS to get the aggretation result of current batch, like SS?
One way to achieve what you want is to control the state store yourself using mapGroupsWithState
, and use it as a kind of degenerate store that actually doesn't do anything. For example:
val spark =
SparkSession.builder().appName("bla").master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.load()
socketDF
.as[String]
.map { str =>
val Array(key, value) = str.split(';')
(key, value)
}
.groupByKey { case (key, _) => key }
.mapGroupsWithState((str: String,
tuples: Iterator[(String, String)],
value: GroupState[Int]) => {
(str, tuples.size)
})
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
.awaitTermination()
Assuming I have a stream of values in the format of key;value
, this will just use mapGroupsWithState
as a pass through store and not actually accumulate any results. That way, for each batch, you get a clean state with no previously aggregated data.