Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streamingspark-kafka-integration

PySpark Structured Streaming with Kafka - Scaling Consumers for multiple topics with different loads


We subscribed to 7 topics with spark.readStream in 1 single running spark app. After transforming the event payloads, we save them with spark.writeStream to our database.

For one of the topics, the data is inserted only batch-wise (once a day) with a very high load. This delays our reading from all other topics, too. For example (grafana), the delay between a produced and consumed record over all topics stays below 1m the whole day. When the bulk-topic receives its events, our delay increases up to 2 hours on all (!) topics.

  1. How can we solve this? we already tried 2 successive readStreams (the bulk-topic separately), but it didn't help.
    Further info: We use 6 executors, 2 executor-cores. The topics have a different number of partitions (3 to 30). Structured Streaming Kafka Integration v0.10.0.

  2. General question: How can we scale the consumers in spark structured streaming? Is 1 readStream equal to 1 consumer? or 1 executor? or what else?


Solution

  • We found a solution for our problem:
    Our grafana after the change shows, that the batch-data topic still peaks but without blocking the consumption on other topics.

    What we did:
    We still have 1 spark app. We used 2 successive spark.readStreams but also added a sink for each.

    In code:

    priority_topic_stream = spark.readStream.format('kafka')
         .options(..).option('subscribe', ','.join([T1, T2, T3])).load()
    bulk_topic_stream = spark.readStream.format('kafka')
         .options(..).option('subscribe', BULK_TOPIC).load()
    
    priority_topic_stream.writeStream.foreachBatch(..).trigger(..).start()
    bulk_topic_stream.writeStream.foreachBatch(..).trigger(..).start()
    spark.streams.awaitAnyTermination()
    

    To minimize the peak on the bulk-stream we will try out increasing its partitions like adviced from @partlov. But that would have only speeded up the consumption on the bulk-stream but not resolved the issue from blocking our reads from the priority-topics.