Search code examples
scalaapache-sparkapache-kafkaspark-structured-streamingspark-streaming-kafka

How to include kafka timestamp value as columns in spark structured streaming?


I am looking for the solution for adding timestamp value of kafka to my Spark structured streaming schema. I have extracted the value field from kafka and making dataframe. My issue is, I need to get the timestamp field (from kafka) also along with the other columns.

Here is my current code:

val kafkaDatademostr = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
  .option("subscribe","csvstream")
  .load

val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
  .select("csv.*")

val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
    "split(value,',')[1] as DFW",
    "split(value,',')[2] as DTG",
    "split(value,',')[3] as CDF",
    "split(value,',')[4] as DFO",
    "split(value,',')[5] as SAD",
    "split(value,',')[6] as DER",
    "split(value,',')[7] as time_for",
    "split(value,',')[8] as fort")

How can I get the timestamp from kafka and add as columns along with other columns?


Solution

  • Timestamp is included in the source schema. Just add a "select timestamp" to get the timestamp like the below.

    val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")