Search code examples
apache-sparkspark-streamingspark-structured-streaming

dead executors in spark structured streaming app


I have a simple streaming job which pulls data from kafka topic and push it to S3.

df2 = parsed_df \
        .coalesce(1)\
        .writeStream.format("parquet")\
        .option("checkpointLocation", "<s3location>")\
        .option("path","s3location")\
        .partitionBy("dt")\
        .outputMode("Append")\
        .trigger(processingTime='150 seconds')\
        .start()

Triggering time is 150 seconds. My spark config is below for this job.

        "driverMemory": "6G",
        "driverCores": 1,
        "executorCores": 1,
        "executorMemory": "3G",
                {
                "spark.dynamicAllocation.initialExecutors": "3",
                "spark.dynamicAllocation.maxExecutors": "12",
                "spark.driver.maxResultSize": "4g",
                "spark.sql.session.timeZone":"UTC",
                "spark.executor.memoryOverhead": "1g",
                "spark.driver.memoryOverhead": "2g",
                "spark.dynamicAllocation.enabled": "true",
                "spark.rpc.message.maxSize": "1024",
                "spark.streaming.receiver.maxRate": "4000",
                "spark.port.maxRetries" : "100",
                "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:2.4.4"
                }

Job is running fine. But when I checked my spark UI, I see many dead executors.spark UI

These dead executors keep on increasing. For every batch of 150 seconds, I am processing 3-5k events. My questions are :-

  1. Is this a valid scenario?
  2. If this is not a valid scenario, then what may be the reason? Is it because the dynamic allocation property is set to true?

Solution

  • Yes, It is a valid scenario when Dynamic Allocation is enabled.

    In structured streaming , data is processed in microbatches. If the executor idle timeout is less than the microbatch duration, executors are constantly added and removed. However, if the executor idle timeout is greater than the batch duration, executors are never removed. The property that controls this behaviour is "spark.dynamicAllocation.executorIdleTimeout", the default value for which is 60 seconds.

    So, for 60 seconds if there is no activity, the executors will be removed. In your case, since Trigger interval is 150 seconds, spark processes the microbatch of 3-5k events fairly quickly and there is chance that executors sit idle for a duration of >60 seconds and hence are removed.

    To change this behavior, add a new config "spark.dynamicAllocation.executorIdleTimeout" and set it to a higher value(say 300 seconds).