Search code examples
apache-kafkaapache-spark-sqlspark-structured-streaming

How to ingest data from two producers in kafka and join using spark structured streaming?


I am trying to read data from two kafka topics, but I am unable to join and find teh final dataframe. My kafka topics are CSVStreamRetail and OrderItems.

val spark = SparkSession
      .builder
      .appName("Spark-Stream-Example")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp")
      .getOrCreate()

    val ordersSchema = new StructType()
      .add("order_id", IntegerType)
      .add("order_date", StringType)
      .add("order_customer_id", IntegerType)
      .add("order_status", StringType)

    val orderItemsSchema = new StructType()
      .add("order_item_id",IntegerType)
      .add("order_item_order_id",IntegerType)
      .add("order_item_product_id",IntegerType)
      .add("order_item_quantity",IntegerType)
      .add("order_item_subtotal",DoubleType)
      .add("order_item_product_price", DoubleType)

    import spark.implicits._

    val df1 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "CSVStreamRetail")
      .load()

    val df2 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "OrderItems")
      .load()

    val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
      .select("orders_data.*","timestamp")

    val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
      .select("order_items_data.*","timestamp")

    val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))

    finalDF
      .writeStream
      .format("console")
      .option("truncate", "false")
      .start()
      .awaitTermination()

The output I am receiving is an empty dataframe.


Solution

  • First of all please check whether you are receiving data in your kafka topics. You should always provide watermarking at least in one stream in case of a stream-stream join. I see you want to perform an inner join. So I have added 200 seconds watermarking and now it is showing data in the output dataframe.

    val spark = SparkSession
      .builder
      .appName("Spark-Stream-Example")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp")
      .getOrCreate()
    
    val ordersSchema = new StructType()
      .add("order_id", IntegerType)
      .add("order_date", StringType)
      .add("order_customer_id", IntegerType)
      .add("order_status", StringType)
    
    val orderItemsSchema = new StructType()
      .add("order_item_id",IntegerType)
      .add("order_item_order_id",IntegerType)
      .add("order_item_product_id",IntegerType)
      .add("order_item_quantity",IntegerType)
      .add("order_item_subtotal",DoubleType)
      .add("order_item_product_price", DoubleType)
    
    import spark.implicits._
    
    val df1 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "CSVStreamRetail")
      .load()
    
    val df2 = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "OrderItems")
      .load()
    
    val ordersDF = df1.selectExpr("CAST(value AS STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value", ordersSchema).as("orders_data"),$"timestamp")
      .select("orders_data.*","timestamp")
      .withWatermark("timestamp","200 seconds")
    
    val orderItemsDF = df2.selectExpr("CAST(value as STRING)", "CAST(timestamp as TIMESTAMP)").as[(String,Timestamp)]
      .select(from_json($"value",orderItemsSchema).as("order_items_data"),$"timestamp")
      .select("order_items_data.*","timestamp")
      .withWatermark("timestamp","200 seconds")
    
    val finalDF = orderItemsDF.join(ordersDF, orderItemsDF("order_item_order_id")===ordersDF("order_id"))
    
    finalDF
      .writeStream
      .format("console")
      .option("truncate", "false")
      .start()
      .awaitTermination()
    

    Use the eventTimestamp for joining. Let me know if this helps.