Search code examples
scalaapache-sparkapache-kafkaspark-structured-streaming

How to get Kafka header's value to Spark Dataset as a single column?


We have a Kafka stream that has headers enabled

  .option("includeHeaders", true)

thus making them stored as a high level dataset's colum, bearing the inside array of structs with key and value:

root
 |-- topic: string (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)

I can access the needed header with it's order in the array like that:

val controlDataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaLocation)
      .option("includeHeaders", true)
      .option("failOnDataLoss", value = false)
      .option("subscribe", "mytopic")
      .load()
      .withColumn("acceptTimestamp", element_at(col("headers"),1))
      .withColumn("acceptTimestamp2", col("acceptTimestamp.value").cast("STRING"))
 

but this solution looks fragile, as the order of headers produced on other side can always be changed with updates, while only thhe key name looks stabile there. How can I lookup through the struct keys and extract the needed struct rather than poin the array index?

UPD.

Thanks to davice of Alex Ott, I have fount the solution to get what I want into the following columns:

.withColumn("headers1", map_from_entries(col("headers")))
.withColumn("acceptTimestamp2", col("headers1.acceptTimestamp").cast("STRING"))

Solution

  • You can use map_from_entries function to convert an array of structs into a map where you can access entries by name.

    import org.apache.spark.sql.functions.map_from_entries
    
    ....
    select(map_from_entries("headers").alias("headers"), ...)
    

    but as I remember, header names may not be unique, that's a primary reason why they are sent as an array of key/value pairs.

    Another approach would be to use filter function to find headers by name - this will allow to handle non-unique headers.

    P.S. I have used Python docs because I can link individual functions - it's not easy to do in the Scala docs.