I am looking for the solution for adding timestamp value of kafka to my Spark structured streaming schema. I have extracted the value field from kafka and making dataframe. My issue is, I need to get the timestamp field (from kafka) also along with the other columns.
Here is my current code:
val kafkaDatademostr = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
.option("subscribe","csvstream")
.load
val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
.select("csv.*")
val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
"split(value,',')[1] as DFW",
"split(value,',')[2] as DTG",
"split(value,',')[3] as CDF",
"split(value,',')[4] as DFO",
"split(value,',')[5] as SAD",
"split(value,',')[6] as DER",
"split(value,',')[7] as time_for",
"split(value,',')[8] as fort")
How can I get the timestamp from kafka and add as columns along with other columns?
Timestamp is included in the source schema. Just add a "select timestamp" to get the timestamp like the below.
val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")