Search code examples
apache-sparkpysparkspark-streaming

Spark Streaming Job is running very slow


enter image description hereenter image description hereI am running a spark streaming job in my local and it is taking approximately 4 to 5 min for one batch. Can someone suggest what could be the issue with the bellow code?

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import avg, window, from_json, from_unixtime, unix_timestamp
import uuid

schema = StructType([
    StructField("source", StringType(), True),
    StructField("temperature", FloatType(), True),
    StructField("time", StringType(), True)
])

spark = SparkSession \
    .builder.master("local[8]") \
    .appName("poc-app") \
    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 5)    

df1 = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "poc") \
    .load() \
    .selectExpr("CAST(value AS STRING)")

df2 = df1.select(from_json("value", schema).alias(
    "sensors")).select("sensors.*")

df3=df2.select(df2.source,df2.temperature,from_unixtime(unix_timestamp(df2.time, 'yyyy-MM-dd HH:mm:ss')).alias('time'))
df4 = df3.groupBy(window(df3.time, "2 minutes","1 minutes"), df3.source).count()

query1 = df4.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/tmp/temporary-" + str(uuid.uuid4())) \
.start() 

query1.awaitTermination()

Solution

  • with mini-batch streaming you usually want to reduce the # of output partitions ... since you are doing some aggregation (wide transformation) every time you persist it will default to 200 partitions to disk because of

    spark.conf.get("spark.sql.shuffle.partitions")
    

    try lowering this config to a smaller output partition and place it at the beginning of your code so when the aggregation is performed it outputs 5 partitions to disk

    spark.conf.set("spark.sql.shuffle.partitions", 5)
    

    you can also get a feel by looking at the # of files in the output write stream directory as well as identifying the # of partitions in your aggregated df

    df3.rdd.getNumPartitions()
    

    btw since you are using a local mode for testing try setting to local[8] instead of local[4] so it increases the parallelism on your cpu cores (i assume you have 4)