Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streaming

Left Outer Stream-Stream SELF join using Spark Structured Streaming - Kafka


I am trying to do a stream-stream self join using Spark Structured streaming using a left outer join so that I can split the joined and unjoined rows after. My setup is as follows:

df_source = app.spark.readStream.format("kafka") \
                 .option("kafka.bootstrap.servers", "XXXXXXX") \
                 .option("subscribe", "XXXXXXX") \
                 .option("startingOffsets", "earliest") \
                 .option("kafka.group.id", "924ee006-c268-11ed-afa1-0242ac120002") \
                 .load() \

df_source.createOrReplaceTempView("df_source")

I then incrementally get the json string from the kafka message and parse the json into my required schema:

df0 = app.spark.sql("""\
----------------------
SELECT
    CAST(value AS STRING) AS json
FROM df_source
----------------------
""")

df0.createOrReplaceTempView("df0")
df1 = app.spark.sql("""\
----------------------
SELECT
    from_json(json,        'struct< `metadata`   : struct< `namespace` : string 
                                                         , `name`      : string
                                                         , `name0`     : string 
                                                         , `size0`     : int
                                                         , `message0`  : struct< `id`           : string
                                                                               , `type`         : string
                                                                               , `timestamp`    : string
                                                                               , `date`         : string
                                                                               , `time`         : string
                                                                               , `process_name` : string
                                                                               , `loglevel`     : string
                                                                               , `process_id`   : string
                                                                               >
                                                         >
                                  , `spec`       : struct< `fix`                  : string
                                                         , `source_process_name`  : string
                                                         , `sink_process_name`    : string
                                                         , `source_CLORDID`       : string
                                                         , `sink_CLORDID`         : string
                                                         , `action`               : string
                                  >
                                  , `@timestamp` : timestamp
                                  >
                           ')               AS dict
FROM df0
----------------------
""")

df1.createOrReplaceTempView("df1")

I am trying to perform a self join on df1 where dict.spec.sink_CLORDID == dict.spec.source_CLORDID. I have followed the spark structured streaming docs

I am defining a watermark based on my event-timestamp:

df1_new = df1.withWatermark("`dict.@timestamp`", "2 minutes")

Then I am trying to do the self-join:

df2 = df1.alias("orig").join(df1_new.alias("new"), 
                     expr("""
                         orig.dict.spec.sink_CLORDID = new.dict.spec.source_CLORDID AND
                         \"new.dict.@timestamp\" >= \"orig.dict.@timestamp\" AND
                         \"new.dict.@timestamp\" <= \"orig.dict.@timestamp\" + interval 1 minute
                    """),
                     "leftOuter"
                )

But I am receiving the following error message:

Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition

I have a few questions:

  1. I have followed the docs example in regards to the join exactly, why am i still getting a watermark error?
  2. As I am incrementally defining dataframes from stream -> dataset I want to join do I need to define a watermark for all of them so that the state can be cleared?

Solution

  • Will leave the question up for anyone in future that has the same / a similar issue, my issue was caused by the escape characters in the @timestamp column

    \"new.dict.@timestamp\" 
    

    etc. these were being treated as strings not columns