Search code examples
apache-sparkspark-structured-streamingspark-java

Grouping and sorting on spark structured streaming


I have a usecase where I have streaming dataset like mobile number ,starttime and call duration. I need to do group by on mobile number and sort the group based on starttime and filter out the calls where the sum(starttime + duration) greater than the next sum ( starttime+duration)

I tried Window.partitionby("mobilenumber").orderby("starttime") But later figured it out that it will not work for streaming datasets


Solution

  • This is not possible standardly, unless complete mode is used. More specifically:

    Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

    The basic tenet of Spark Structured Streaming is that a query should return the same answer in streaming or batch mode. We support sorting in complete mode because we have all the data and can sort it correctly and return the full answer. In update or append mode, sorting would only return a correct answer if we could promise that records that sort lower are going to arrive later (and we can't). Therefore, it is disallowed.

    Write to a table or HDFS and then to a DB that has materialized views that can assist in incremental build aspects. This in preference to UDAF which works - until you change some of the stateful operations in a way that you lose all that stateful data. With former approach you still have that data - persisted. See the docs. That database could be Delta table these days.