I'm trying structured spark streaming stream-stream join, and my left outer joins behaves exactly same as inner join.
Using spark version 2.4.2 and Scala version 2.12.8, Eclipse OpenJ9 VM, 1.8.0_252
Here is what I'm trying to do,
Expectation: After 30 seconds of time constraint, for unmatched rows, I should be see null on right side of join.
Whats happening
Code - trying on spark-shell
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
case class RateData(timestamp: Timestamp, value: Long)
// create rate source with 1 row per second.
val rateSource = spark.readStream.format("rate").option("rowsPerSecond", 1).option("numPartitions", 1).option("rampUpTime", 1).load()
import spark.implicits._
val rateSourceData = rateSource.as[RateData]
// employee stream departid increments by 2
val employeeStreamDS = rateSourceData.withColumn("firstName", concat(lit("firstName"),rateSourceData.col("value")*2)).withColumn("departmentId", lit(floor(rateSourceData.col("value")*2))).withColumnRenamed("timestamp", "empTimestamp").withWatermark("empTimestamp", "10 seconds")
// dept stream id increments by 3
val departmentStreamDS = rateSourceData.withColumn("name", concat(lit("name"),floor(rateSourceData.col("value")*3))).withColumn("Id", lit(floor(rateSourceData.col("value")*3))).drop("value").withColumnRenamed("timestamp", "depTimestamp")
// watermark - 10s and time constraint is 30 secs on employee stream.
val joinedDS = departmentStreamDS.join(employeeStreamDS, expr(""" id = departmentId AND empTimestamp >= depTimestamp AND empTimestamp <= depTimestamp + interval 30 seconds """), "leftOuter")
val q = joinedDS.writeStream.format("parquet").trigger(Trigger.ProcessingTime("60 seconds")).option("checkpointLocation", "checkpoint").option("path", "rate-output").start
I queried the output of the table after 10 mins and I only found 31 matching rows. which is same as inner join output.
val df = spark.read.parquet("rate-output")
df.count
res0: Long = 31
df.agg(min("departmentId"), max("departmentId")).show
+-----------------+-----------------+
|min(departmentId)|max(departmentId)|
+-----------------+-----------------+
| 0| 180|
+-----------------+-----------------+
Explanation of output. employeeStreamDS stream, departmentId field value is 2 times rate value so it is multiples of two.
departmentStreamDS stream, Id field is 3 times rate stream value so multiple of 3.
so there would match of departmentId = Id for every 6 because, LCM(2,3) = 6. that would happen until there is difference of 30 sec between those streams(join time constraint).
I would expect after 30 seconds, I would null values for dept stream values(3,9,15 .. ) and so on.
I hope I'm explaining it well enough.
So the result question about left-outer join behavior for spark streaming.
From my understanding and indeed according to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins, you need to apply watermarks on event-time columns of both streams, like e.g.:
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
...
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter"
)
You have only one watermark
defined.