Search code examples

How to ingest data from two producers in kafka and join using spark structured streaming?

I am trying to read data from two kafka topics, but I am unable to join and find teh final dataframe. My kafka topics are CSVStreamRetail and OrderItems.

val spark = SparkSession
      .config("spark.sql.warehouse.dir", "file:///C:/temp")

    val ordersSchema = new StructType()
      .add("order_id", IntegerType)
      .add("order_date", StringType)
      .add("order_customer_id", IntegerType)
      .add("order_status", StringType)

    val orderItemsSchema = new StructType()
      .add("order_item_product_price", DoubleType)

    import spark.implicits._

    val df1 = spark
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "CSVStreamRetail")

    val df2 = spark
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "OrderItems")

    val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")

    val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]

    val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))

      .option("truncate", "false")

The output I am receiving is an empty dataframe.


  • First of all please check whether you are receiving data in your kafka topics. You should always provide watermarking at least in one stream in case of a stream-stream join. I see you want to perform an inner join. So I have added 200 seconds watermarking and now it is showing data in the output dataframe.

    val spark = SparkSession
      .config("spark.sql.warehouse.dir", "file:///C:/temp")
    val ordersSchema = new StructType()
      .add("order_id", IntegerType)
      .add("order_date", StringType)
      .add("order_customer_id", IntegerType)
      .add("order_status", StringType)
    val orderItemsSchema = new StructType()
      .add("order_item_product_price", DoubleType)
    import spark.implicits._
    val df1 = spark
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "CSVStreamRetail")
    val df2 = spark
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "OrderItems")
    val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
      .withWatermark("timestamp","200 seconds")
    val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .withWatermark("timestamp","200 seconds")
    val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))
      .option("truncate", "false")

    Use the eventTimestamp for joining. Let me know if this helps.