Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streaming

How can I read every 5 seconds in pyspark with kafka readStream?


I want to read a topic every 5 seconds; with older versions of pyspark I can use kafka-utils and window method, but currently, I cannot use that.

Now I am loading data from kafka with spark with the following code

spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", 'data') \
    .load()

But whit this, I am reading all data.

So I want to know how I can read the data with batches size of 1 second every 5 seconds if possible.

Thanks


Solution

  • Assuming you want to aggregate and group by something every 5 second interval, refer documentation on windowing

    This should define a tumbling window

    kafka_df \
        .withWatermark("timestamp", "5 seconds") \
        .groupBy(
            window(kafka_df.timestamp, "5 seconds", "1 second"),
            kafka_df.value)