Search code examples
apache-sparkhadoop-yarnexecutorsspark-structured-streaming

Spark Structured Stream Executors weird behavior


Using Spark Structured Stream, with Cloudera solution I'm using 3 executors but when I launch the application the executor that is used it's only one. How can I use multiple executors?

Let me give you more infos. This is my parameters:

Command Launch:

spark2-submit --master yarn \
--deploy-mode cluster \
--conf spark.ui.port=4042 \
--conf spark.eventLog.enabled=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.kafka.consumer.poll.ms=512 \
--num-executors 3 \
--executor-cores 3 \
--executor-memory 2g \
--jars /data/test/spark-avro_2.11-3.2.0.jar,/data/test/spark-streaming-kafka-0-10_2.11-2.1.0.cloudera1.jar,/data/test/spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar \
--class com.test.Hello /data/test/Hello.jar

The Code:

val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", <topic_list:9092>)
      .option("subscribe", <topic_name>)
      .option("group.id", <consumer_group_id>)
      .load()
      .select($"value".as[Array[Byte]], $"timestamp")
      .map((c) => { .... })

val query = lines
      .writeStream
      .format("csv")
      .option("path", <outputPath>)
      .option("checkpointLocation", <checkpointLocationPath>)
      .start()
query.awaitTermination()

Result in SparkUI: SparkUI Image

What i expected that all executors were working.

Any suggestions?

Thank you Paolo


Solution

  • Looks like there is nothing wrong in your configuration, it's just the partitions that you are using might be just one. You need to increase the partitions in your kafka producer. Usually, the partitions are around 3-4 times the number of executors.

    If you don't want to touch the producer code, you can come around this by doing repartition(3) before you apply the map method, so every executor works on it's own logical partition.

    If you still want you explicitly mention the work each executor gets, you could do mapPerPartion method.