Search code examples
apache-sparkapache-spark-sqlspark-structured-streaming

spark structured streaming batch data refresh issue (partition by clause)


I came across a problem while joining spark structured streaming data frame with batch data frame , my scenario I have a S3 stream which needs to do left anti join with history data which returns record not present in history (figures out new records) and I write these records to history as a new append (partition by columns disk data partition not memory).

when I refresh my history data frame which is partitioned my history data frame doesn't get updated.

Below are the code two code snippets one which work's the other which doesn't work.

Only difference between working code and non working code is partition_by clause.

Working Code:- (history gets refreshed)

import spark.implicits._

    val inputSchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val historySchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val source = spark.readStream
      .schema(inputSchema)
      .option("header", "false")
      .csv("src/main/resources/Input/")

    val history = spark.read
      .schema(inputSchema)
      .option("header", "true")
      .csv("src/main/resources/history/")
      .withColumnRenamed("spark_id", "spark_id_2")
      .withColumnRenamed("account_id", "account_id_2")
      .withColumnRenamed("run_dt", "run_dt_2")
      .withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
      .withColumnRenamed("trxn_dt", "trxn_dt_2")
      .withColumnRenamed("trxn_amt", "trxn_amt_2")

    val readFilePersisted = history.persist()
    readFilePersisted.createOrReplaceTempView("hist")

    val recordsNotPresentInHist = source
      .join(
        history,
        source.col("account_id") === history.col("account_id_2") &&
          source.col("run_dt") === history.col("run_dt_2") &&
          source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
          source.col("trxn_dt") === history.col("trxn_dt_2") &&
          source.col("trxn_amt") === history.col("trxn_amt_2"),
        "leftanti"
      )

    recordsNotPresentInHist.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .mode(SaveMode.Append)
          //.partitionBy("spark_id", "account_id", "run_dt")
          .csv("src/main/resources/history/")

        val lkpChacheFileDf1 = spark.read
          .schema(inputSchema)
          .parquet("src/main/resources/history")

        val lkpChacheFileDf = lkpChacheFileDf1
        lkpChacheFileDf.unpersist(true)
        val histLkpPersist = lkpChacheFileDf.persist()
        histLkpPersist.createOrReplaceTempView("hist")

      }
      .start()

    println("This is the kafka dataset:")
    source
      .withColumn("Input", lit("Input-source"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    recordsNotPresentInHist
      .withColumn("reject", lit("recordsNotPresentInHist"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    spark.streams.awaitAnyTermination()

Doesn't Work:- (history is not getting refreshed)

import spark.implicits._

    val inputSchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val historySchema = StructType(
      Array(
        StructField("spark_id", StringType),
        StructField("account_id", StringType),
        StructField("run_dt", StringType),
        StructField("trxn_ref_id", StringType),
        StructField("trxn_dt", StringType),
        StructField("trxn_amt", StringType)
      )
    )
    val source = spark.readStream
      .schema(inputSchema)
      .option("header", "false")
      .csv("src/main/resources/Input/")

    val history = spark.read
      .schema(inputSchema)
      .option("header", "true")
      .csv("src/main/resources/history/")
      .withColumnRenamed("spark_id", "spark_id_2")
      .withColumnRenamed("account_id", "account_id_2")
      .withColumnRenamed("run_dt", "run_dt_2")
      .withColumnRenamed("trxn_ref_id", "trxn_ref_id_2")
      .withColumnRenamed("trxn_dt", "trxn_dt_2")
      .withColumnRenamed("trxn_amt", "trxn_amt_2")

    val readFilePersisted = history.persist()
    readFilePersisted.createOrReplaceTempView("hist")

    val recordsNotPresentInHist = source
      .join(
        history,
        source.col("account_id") === history.col("account_id_2") &&
          source.col("run_dt") === history.col("run_dt_2") &&
          source.col("trxn_ref_id") === history.col("trxn_ref_id_2") &&
          source.col("trxn_dt") === history.col("trxn_dt_2") &&
          source.col("trxn_amt") === history.col("trxn_amt_2"),
        "leftanti"
      )

    recordsNotPresentInHist.writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write
          .mode(SaveMode.Append)
          .partitionBy("spark_id", "account_id","run_dt")
          .csv("src/main/resources/history/")

        val lkpChacheFileDf1 = spark.read
          .schema(inputSchema)
          .parquet("src/main/resources/history")

        val lkpChacheFileDf = lkpChacheFileDf1
        lkpChacheFileDf.unpersist(true)
        val histLkpPersist = lkpChacheFileDf.persist()
        histLkpPersist.createOrReplaceTempView("hist")

      }
      .start()

    println("This is the kafka dataset:")
    source
      .withColumn("Input", lit("Input-source"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    recordsNotPresentInHist
      .withColumn("reject", lit("recordsNotPresentInHist"))
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

    spark.streams.awaitAnyTermination()

Thanks Sri


Solution

  • I resolved this problem by using union by name function instead of reading refreshed data from disk.

    Step 1:- Read history S3

    Step 2:- Read Kafka and look up history

    Step 3:- Write to processed data to Disk and append to data frame created in step 1 using union by name spark function.

    Step 1 Code (Reading History Data Frame):-

    val acctHistDF = sparkSession.read
    .schema(schema)
    .parquet(S3path)
    val acctHistDFPersisted = acctHistDF.persist()
    acctHistDFPersisted.createOrReplaceTempView("acctHist")
    
    

    Step 2 (Refreshing History Data Frame with stream data):-

    val history = sparkSession.table("acctHist")
    history.unionByName(stream)
    history.createOrReplaceTempView("acctHist")
    

    Thanks Sri