Search code examples
javaapache-sparkspark-streamingspark-structured-streaming

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark


I have a kafka stream that I am loading to Spark. Messages from Kafka topic has following attributes: bl_iban, blacklisted,timestamp. So there are IBANS, flag about whether or not is that IBAN blacklisted (Y/N) and also there is timestamp of that record. The thing is that there can be multiple records for one IBAN, because overtime IBAN can get blacklisted or "removed". And the thing that I am trying to achieve is that I want to know the current status for each of IBANS. However I have started with even simpler goal and that is to list for each IBAN latest timestamp (and after that I would like to add blacklisted status as well) So I have produced the following code (where blacklist represents Dataset that I have loaded from Kafka):

blackList = blackList.groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));

And after that I have tried to print that to console using following code:

StreamingQuery query = blackList.writeStream()
    .format("console")
    .outputMode(OutputMode.Append())
    .start();

I have run my code and I get following error: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

So I put watermark to my Dataset like so:

blackList = blackList.withWatermark("timestamp", "2 seconds")
                .groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));

And got same error after that. Any ideas how can I approach this problem?


Update: With help of mike I have managed to get rid of that error. But the problem is that I still cannot get my blacklist working. I can see how data is loaded from Kafka but after that from my group operation I get two empty batches and that is it. Printed data from Kafka:

+-----------------------+-----------+-----------------------+
|bl_iban                |blacklisted|timestamp              |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N          |2020-04-10 17:26:58.208|
|SK341492788657560898224|N          |2020-04-10 17:26:58.214|
|SK118866580129485701645|N          |2020-04-10 17:26:58.215|
+-----------------------+-----------+-----------------------+

This is how I got that blacklist that is outputted:

blackList = blackList.selectExpr("split(cast(value as string),',') as value", "cast(timestamp as timestamp) timestamp")
                .selectExpr("value[0] as bl_iban", "value[1] as blacklisted", "timestamp");

And this is my group operation:

Dataset<Row> blackListCurrent = blackList.withWatermark("timestamp", "20 minutes")
                .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
                .agg(col("bl_iban"), max("timestamp"));

Link to source file: Spark Blacklist


Solution

  • When you use watermarking in Spark you need to ensure that your aggregation knows about the window. The Spark documentation provides some more background.

    In your case the code should look something like this

    blackList = blackList.withWatermark("timestamp", "2 seconds")
      .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
      .agg(col("bl_iban"), max("timestamp"));
    

    It is important, that the attribute timestamp has the data type timestamp!