Search code examples
scalaapache-sparkleft-joinspark-structured-streaming

Structured spark streaming leftOuter joins behaves like inner join


I'm trying structured spark streaming stream-stream join, and my left outer joins behaves exactly same as inner join.

Using spark version 2.4.2 and Scala version 2.12.8, Eclipse OpenJ9 VM, 1.8.0_252

Here is what I'm trying to do,

  1. Create rate stream which generates 1 row per second.
  2. Create Employee and Dept stream out of it.
  3. Employee stream deptId field multiplies rate value by 2 and Dept stream id field by 3
  4. Purpose of doing this is to have two stream which have few common and not common id field.
  5. Do leftOuter stream-stream join with time constraint of 30 sec and dept stream on left side of join.

Expectation: After 30 seconds of time constraint, for unmatched rows, I should be see null on right side of join.

Whats happening

  • I only see rows where there was match between ids and not unmatched rows.

Code - trying on spark-shell

import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

case class RateData(timestamp: Timestamp, value: Long)

// create rate source with 1 row per second.
val rateSource = spark.readStream.format("rate").option("rowsPerSecond", 1).option("numPartitions", 1).option("rampUpTime", 1).load()

import spark.implicits._
val rateSourceData = rateSource.as[RateData]

// employee stream departid increments by 2
val employeeStreamDS = rateSourceData.withColumn("firstName",  concat(lit("firstName"),rateSourceData.col("value")*2)).withColumn("departmentId", lit(floor(rateSourceData.col("value")*2))).withColumnRenamed("timestamp", "empTimestamp").withWatermark("empTimestamp", "10 seconds")

// dept stream id increments by 3
val departmentStreamDS = rateSourceData.withColumn("name", concat(lit("name"),floor(rateSourceData.col("value")*3))).withColumn("Id", lit(floor(rateSourceData.col("value")*3))).drop("value").withColumnRenamed("timestamp", "depTimestamp")

// watermark - 10s and time constraint is 30 secs on employee stream.
val joinedDS  =  departmentStreamDS.join(employeeStreamDS, expr(""" id = departmentId AND empTimestamp >= depTimestamp AND empTimestamp <= depTimestamp + interval 30 seconds """), "leftOuter")

val q = joinedDS.writeStream.format("parquet").trigger(Trigger.ProcessingTime("60 seconds")).option("checkpointLocation", "checkpoint").option("path", "rate-output").start

I queried the output of the table after 10 mins and I only found 31 matching rows. which is same as inner join output.

val df = spark.read.parquet("rate-output")
 df.count
res0: Long = 31
df.agg(min("departmentId"), max("departmentId")).show
+-----------------+-----------------+
|min(departmentId)|max(departmentId)|
+-----------------+-----------------+
|                0|              180|
+-----------------+-----------------+

Explanation of output. employeeStreamDS stream, departmentId field value is 2 times rate value so it is multiples of two.

departmentStreamDS stream, Id field is 3 times rate stream value so multiple of 3.

so there would match of departmentId = Id for every 6 because, LCM(2,3) = 6. that would happen until there is difference of 30 sec between those streams(join time constraint).

I would expect after 30 seconds, I would null values for dept stream values(3,9,15 .. ) and so on.

I hope I'm explaining it well enough.

So the result question about left-outer join behavior for spark streaming.


Solution

  • From my understanding and indeed according to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins, you need to apply watermarks on event-time columns of both streams, like e.g.:

    val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    ...
    impressionsWithWatermark.join(
      clicksWithWatermark,
      expr("""
        clickAdId = impressionAdId AND
        clickTime >= impressionTime AND
        clickTime <= impressionTime + interval 1 hour
        """),
      joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
     )
    

    You have only one watermark defined.