I am trying to do a stream-stream self join using Spark Structured streaming using a left outer join so that I can split the joined and unjoined rows after. My setup is as follows:
df_source = app.spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "XXXXXXX") \
.option("subscribe", "XXXXXXX") \
.option("startingOffsets", "earliest") \
.option("kafka.group.id", "924ee006-c268-11ed-afa1-0242ac120002") \
.load() \
df_source.createOrReplaceTempView("df_source")
I then incrementally get the json string from the kafka message and parse the json into my required schema:
df0 = app.spark.sql("""\
----------------------
SELECT
CAST(value AS STRING) AS json
FROM df_source
----------------------
""")
df0.createOrReplaceTempView("df0")
df1 = app.spark.sql("""\
----------------------
SELECT
from_json(json, 'struct< `metadata` : struct< `namespace` : string
, `name` : string
, `name0` : string
, `size0` : int
, `message0` : struct< `id` : string
, `type` : string
, `timestamp` : string
, `date` : string
, `time` : string
, `process_name` : string
, `loglevel` : string
, `process_id` : string
>
>
, `spec` : struct< `fix` : string
, `source_process_name` : string
, `sink_process_name` : string
, `source_CLORDID` : string
, `sink_CLORDID` : string
, `action` : string
>
, `@timestamp` : timestamp
>
') AS dict
FROM df0
----------------------
""")
df1.createOrReplaceTempView("df1")
I am trying to perform a self join on df1 where dict.spec.sink_CLORDID == dict.spec.source_CLORDID. I have followed the spark structured streaming docs
I am defining a watermark based on my event-timestamp:
df1_new = df1.withWatermark("`dict.@timestamp`", "2 minutes")
Then I am trying to do the self-join:
df2 = df1.alias("orig").join(df1_new.alias("new"),
expr("""
orig.dict.spec.sink_CLORDID = new.dict.spec.source_CLORDID AND
\"new.dict.@timestamp\" >= \"orig.dict.@timestamp\" AND
\"new.dict.@timestamp\" <= \"orig.dict.@timestamp\" + interval 1 minute
"""),
"leftOuter"
)
But I am receiving the following error message:
Stream-stream LeftOuter join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition
I have a few questions:
Will leave the question up for anyone in future that has the same / a similar issue, my issue was caused by the escape characters in the @timestamp column
\"new.dict.@timestamp\"
etc. these were being treated as strings not columns