Search code examples
scalaapache-sparkapache-spark-sqlspark-structured-streaming

Outer join two Datasets (not DataFrames) in Spark Structured Streaming


I have some code that joins two streaming DataFrames and outputs to console.

val dataFrame1 =
  df1Input.withWatermark("timestamp", "40 seconds").as("A")

val dataFrame2 =
  df2Input.withWatermark("timestamp", "40 seconds").as("B")

val finalDF: DataFrame = dataFrame1.join(dataFrame2,
      expr(
        "A.id = B.id" +
          " AND " +
          "B.timestamp >= A.timestamp " +
          " AND " +
          "B.timestamp <= A.timestamp + interval 1 hour")
      , joinType = "leftOuter")
finalDF.writeStream.format("console").start().awaitTermination()

What I now want is to refactor this part to use Datasets, so I can have some compile-time checking.

So what I tried was pretty straightforward:

val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
      expr(
        "A.id = B.id" +
          " AND " +
          "B.timestamp >= A.timestamp " +
          " AND " +
          "B.timestamp <= A.timestamp + interval 1 hour")
      , joinType = "leftOuter")
finalDS.writeStream.format("console").start().awaitTermination()

However, this gives the following error:

org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;;

As you can see, the join code hasn't changed, so there is a watermark on both sides and a range condition. The only change was to use the Dataset API instead of DataFrame.

Also, it is fine when I use inner join:

val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
          expr(
            "A.id = B.id" +
              " AND " +
              "B.timestamp >= A.timestamp " +
              " AND " +
              "B.timestamp <= A.timestamp + interval 1 hour")
          )
    finalDS.writeStream.format("console").start().awaitTermination()

Does anyone know how can this happen?


Solution

  • Well, when you using joinWith method instead of join you rely on different implementation and it seems like this implementation not support leftOuter join for streaming Datasets.

    You can check outer joins with watermarking section of the official documentation. Method join not joinWith used. Note that result type will be DataFrame. That means that you most likely will have to map field manually

    val finalDS = dataFrame1.as[A].join(dataFrame2.as[B],
        expr(
          "A.key = B.key" +
            " AND " +
            "B.timestamp >= A.timestamp " +
            " AND " +
            "B.timestamp <= A.timestamp + interval 1 hour"),
        joinType = "leftOuter").select(/* useful fields */).as[C]