Search code examples
javaapache-sparkspark-structured-streamingspark-kafka-integration

Spark structured stream with tumbling window delayed and duplicate data


I am attempting to read from a kafka topic, aggregate some data over a tumbling window and write that to a sink (I've been trying both with Kafka and console).

The problems I'm seeing are

  • a long delay between sending data and receiving aggregate records for a window on the sink (minutes after the expected triggers should fire)
  • duplicate records from previous window aggregations appearing in subsequent windows

Why is the delay so long, and what can I do to reduce it?

Why are duplicates records from previous windows showing up and how can I remove them?

The delays seem to be especially bad as the window gets shorter - it was 3+ minutes when I had the window duration set to 10 seconds, around 2 minutes when the window duration was set to 60 seconds.

With the shortest window times I'm also seeing the records getting "bunched up" so that when records are received by the sink I receive those for several windows at a time.

On the duplicate aggregate records I do have the output mode set to complete but my understanding is records should only be repeated withing the current window assuming the trigger fires multiple times within it, which mine shouldn't be.

I have a processing trigger set up matching the window time and a watermark threshold of 10% (1 or 6 seconds) and I know the stream itself works fine if I remove the tumbling window.

I get why spark might not be able to hit a certain frequency of triggers but I'd think 10 and certainly 60 seconds would be more than enough time to process the very limited amount of data I am testing with.

An example of sending data with a 60 second tumbling window and processing time trigger

  • send 6 payloads
  • wait a minute
  • send 1 payload
  • wait a while
  • send 3 payloads

(CreateTime is coming from kafka-console-consumer with --property print.timestamp=true). These arrive a couple of minutes after I would expect the trigger to fire based on the CreateTime timestamp and window.

// First record fine
CreateTime:1644329432464        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}

// Duplicate of first record with second sent by spark even though the window is over
CreateTime:1644329505265        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644329523901        {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}

// Duplicate of first 2 records with third
CreateTime:1644330082974        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644330105990        {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
CreateTime:1644330125375        {"window":{"start":"2022-02-08T14:20:00.000Z","end":"2022-02-08T14:21:00.000Z"},"account_id":"acc0","totalAmount":333}

I do sometimes see messages like below but there's no other WARN or ERROR level messages indicating problems:

2022-02-08 14:24:45 WARN  ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 99770 milliseconds

Application data/code

Sample data looks like below and is generated by a python script with ts set to the current time:

{"account_id": "acc0", "txn_id": "1234500001", "amount": 111, "ts": 1644258052800}

Application code (running embedded with spark.master=local[*])

    public void execute() throws Exception {
        final SparkSession spark = SparkSession.builder().appName("test").getOrCreate();

        final Trigger trigger = Trigger.ProcessingTime(60000);
        final OutputMode outputMode = OutputMode.Complete();
        final String windowDuration = "60 seconds";
        final String watermarkThreshold = "6 seconds";

        final String kafkaHost = "localhost:9092";
        final String kafkaInTopic = "topic-in";
        final String kafkaOutTopic = "topic-out";

        final StructType schema = new StructType(
            new StructField[] {
                new StructField("account_id", DataTypes.StringType, false, null),
                new StructField("txn_id", DataTypes.StringType, false, null),
                new StructField("amount", DataTypes.LongType, false, null),
                new StructField("ts", DataTypes.LongType, false, null)
            }
        );

        final Dataset<Row> in = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaHost)
            .option("subscribe", kafkaInTopic)
            .option("startingOffsets", "latest")
            .option("includeHeaders", "true")
            .load();

        in.printSchema();

        final Dataset<Row> process = in
            // convert kafka value payload to structured data
            .withColumn("value", functions.from_json(functions.column("value").cast(DataTypes.StringType), schema))

            .withColumn("account_id", functions.column("value.account_id"))
            .withColumn("txn_id", functions.column("value.txn_id"))
            .withColumn("amount", functions.column("value.amount"))
            .withColumn("ts", functions.column("value.ts"))

            // conversion to timestamp is by second, not ms
            .withColumn("datetime", functions.col("ts").divide(1000).cast(DataTypes.TimestampType))
            .withWatermark("datetime", watermarkThreshold)

            .groupBy(
                functions.window(functions.col("datetime"), windowDuration),
                functions.col("account_id")
            )
            .agg(functions.sum("amount").as("totalAmount"));


        process.printSchema();

        final DataStreamWriter<Row> out = process
            // required for kafka output
            .select(functions.to_json(functions.struct("*")).as("value"))

            .writeStream()
            .outputMode(outputMode)
            .trigger(trigger)
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaHost)
            .option("topic", kafkaOutTopic)
            .option("checkpointLocation", "/tmp/spark-kafka");

        LOGGER.info("STARTING STREAM");
        out.start().awaitTermination();
    }


Solution

  • For long delays, it's likely caused by not enough resources to process messages according to the warning message. You could check spark UI to understand why. It could be data skew between partitions or more memory or cores needed.

    For duplicate records, you may want to try update or append mode. Complete mode means the whole Result Table will be outputted to the sink after every trigger. That's the reason why you have deplicates. You can refer https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes