Search code examples
apache-sparkspark-streamingspark-structured-streamingspark-streaming-kafka

Spark Structured Streaming: Output result at the end of Tumbling Window and not the Batch


I want the output of Spark Stream to be sent to the Sink at the end of the Tumbling Window and not at the batch interval.

I am reading from a Kafka stream and outputting to another Kafka stream.

Code to query and write output is like this:

Dataset<Row> sqlResult = session.sql("select window, user, sum(amount) as amount from users where type = 'A' group by window(timestamp, '1 minute', '1 minute'), user");
sqlResult = sqlResult.select(to_json(struct("window", "user", "amount")).as("value"));

StreamingQuery query = sqlResult.writeStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "aggregated-topic")
    .option("checkpointLocation", "c:/tmp")
    .outputMode(OutputMode.Update())
    .start();

When I send multiple records for a particular user within a window of 1 minute, I want sum of amount of these events at the end of the 1 minute.

But I get multiple outputs on output Kafka stream, with intermittent aggregations written to it.

Eg.

I am sending the following 7 records within a minute window but at some intervals.


>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}

Output I am getting is this :

{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":10.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":20.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":40.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":60.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}

You can see, the output is within the same window, but there are multiple outputs.

What I want is single output at the end of the minute as

{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}

How can I achieve it?


Solution

  • You need to set the processing trigger while writing the stream to the sink.

    You use .trigger(Trigger.ProcessingTime) of DataStreamWriter with appropriate trigger value.

    
    StreamingQuery query = sqlResult.writeStream()
            .trigger(Trigger.ProcessingTime("1 minute")) //this