Search code examples
scalaapache-sparkapache-spark-sqlspark-structured-streaming

Losing entries when inner-joining data to a left-joined DataFrame in Spark Structured Streaming


I'm trying to join data with a DataFrame that in turn resulted from a left join. While in batch processing this works as expected, in stream processing some entries are lost...

Below I created a minimal example of "sessions", that have "start" and "end" events and optionally some "metadata".

The script generates two outputs: sessionStartsWithMetadata result from "start" events that are left-joined with the "metadata" events, based on sessionId. A "left join" is used, since we like to get an output event even when no corresponding metadata exists.

Additionally a DataFrame endedSessionsWithMetadata is created by joining "end" events to the previously created DataFrame. Here an "inner join" is used, since we only want some output when a session has ended for sure.

This code can be executed in spark-shell:

import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}

import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext

// Main data processing, regardless whether batch or stream processing
def process(
    sessionStartEvents: DataFrame,
    sessionOptionalMetadataEvents: DataFrame,
    sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
  val sessionStartsWithMetadata: DataFrame = sessionStartEvents
    .join(
      sessionOptionalMetadataEvents,
      sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") &&
        sessionStartEvents("sessionStartTimestamp").between(
          sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
          sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
        ),
      "left" // metadata is optional
    )
    .select(
      sessionStartEvents("sessionId"),
      sessionStartEvents("sessionStartTimestamp"),
      sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
    )

  val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
    sessionEndEvents,
    sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") &&
      sessionStartsWithMetadata("sessionStartTimestamp").between(
        sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
        sessionEndEvents("sessionEndTimestamp")
      )
  )

  (sessionStartsWithMetadata, endedSessionsWithMetadata)
}

def streamProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {

  val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionStartEventsStream.addData(sessionStartData)

  val sessionStartEvents: DataFrame = sessionStartEventsStream
    .toDS()
    .toDF("sessionStartTimestamp", "sessionId")
    .withWatermark("sessionStartTimestamp", "1 second")

  val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)

  val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
    .toDS()
    .toDF("sessionOptionalMetadataTimestamp", "sessionId")
    .withWatermark("sessionOptionalMetadataTimestamp", "1 second")

  val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionEndEventsStream.addData(sessionEndData)

  val sessionEndEvents: DataFrame = sessionEndEventsStream
    .toDS()
    .toDF("sessionEndTimestamp", "sessionId")
    .withWatermark("sessionEndTimestamp", "1 second")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
    .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
    .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}

def batchProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): Unit = {

  val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
  val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
  val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  println("sessionStartsWithMetadata")
  sessionStartsWithMetadata.show(100, truncate = false)

  println("endedSessionsWithMetadata")
  endedSessionsWithMetadata.show(100, truncate = false)
}


// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
  (new Timestamp(1), 0),
  (new Timestamp(2000), 1),
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionOptionalMetadata = Vector(
  (new Timestamp(1), 0),
  // session `1` has no metadata
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionEndData = Vector(
  (new Timestamp(10000), 0),
  (new Timestamp(11000), 1),
  (new Timestamp(12000), 2),
  (new Timestamp(30000), 10)
)

batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
  streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

In the example session with ID 1 has no metadata, so the respective metadata column is null.

The main functionality of joining the data is implemented in def process(…), which is called using both batch data and stream data.

In the batch version the output is as expected:

sessionStartsWithMetadata
+---------+-----------------------+--------------------------------+            
|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
+---------+-----------------------+--------------------------------+
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
|1        |1970-01-01 01:00:02    |null                            | ← has no metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
+---------+-----------------------+--------------------------------+

endedSessionsWithMetadata
+---------+-----------------------+--------------------------------+-------------------+---------+
|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+---------+-----------------------+--------------------------------+-------------------+---------+
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 01:00:10|0        |
|1        |1970-01-01 01:00:02    |null                            |1970-01-01 01:00:11|1        |  ← has no metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 01:00:12|2        |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 01:00:30|10       |
+---------+-----------------------+--------------------------------+-------------------+---------+

But when the same processing is run as stream processing the output of endedSessionsWithMetadata does not contain the entry of session 1 that has no metadata:

-------------------------------------------                                     
Batch: 0 ("start event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
+-------------------------+---------+-----------------------+--------------------------------+
|sessionStartsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
|sessionStartsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
|sessionStartsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
+-------------------------+---------+-----------------------+--------------------------------+

-------------------------------------------                                     
Batch: 0 ("end event")
-------------------------------------------
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 01:00:30|10       |
|endedSessionsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 01:00:12|2        |
|endedSessionsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 01:00:10|0        |
+-------------------------+---------+-----------------------+--------------------------------+-------------------+---------+

-------------------------------------------                                     
Batch: 1 ("start event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
+-------------------------+---------+---------------------+--------------------------------+
|sessionStartsWithMetadata|1        |1970-01-01 01:00:02  |null                            | ← has no metadata ✔
+-------------------------+---------+---------------------+--------------------------------+

-------------------------------------------                                     
Batch: 1 ("end event")
-------------------------------------------
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
+-------------------------+---------+---------------------+--------------------------------+-------------------+---------+
  ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘


Can anybody explain why in stream processing the "end" event with no "metadata" (sessionId=1) is not there? What do I need to do to make it appear in the output?

Thanks a lot!


Solution

  • After considerable testing, looking around and re-reading the manual: