I have some code that joins two streaming DataFrames
and outputs to console.
val dataFrame1 =
df1Input.withWatermark("timestamp", "40 seconds").as("A")
val dataFrame2 =
df2Input.withWatermark("timestamp", "40 seconds").as("B")
val finalDF: DataFrame = dataFrame1.join(dataFrame2,
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDF.writeStream.format("console").start().awaitTermination()
What I now want is to refactor this part to use Datasets
, so I can have some compile-time
checking.
So what I tried was pretty straightforward:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDS.writeStream.format("console").start().awaitTermination()
However, this gives the following error:
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;;
As you can see, the join
code hasn't changed, so there is a watermark on both sides and a range condition. The only change was to use the Dataset
API instead of DataFrame
.
Also, it is fine when I use inner join
:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
)
finalDS.writeStream.format("console").start().awaitTermination()
Does anyone know how can this happen?
Well, when you using joinWith
method instead of join
you rely on different implementation and it seems like this implementation not support leftOuter join for streaming Datasets.
You can check outer joins with watermarking section of the official documentation. Method join
not joinWith
used. Note that result type will be DataFrame
. That means that you most likely will have to map field manually
val finalDS = dataFrame1.as[A].join(dataFrame2.as[B],
expr(
"A.key = B.key" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour"),
joinType = "leftOuter").select(/* useful fields */).as[C]