We have a Kafka stream that has headers enabled
.option("includeHeaders", true)
thus making them stored as a high level dataset's colum, bearing the inside array of structs with key and value:
root
|-- topic: string (nullable = true)
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
I can access the needed header with it's order in the array like that:
val controlDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaLocation)
.option("includeHeaders", true)
.option("failOnDataLoss", value = false)
.option("subscribe", "mytopic")
.load()
.withColumn("acceptTimestamp", element_at(col("headers"),1))
.withColumn("acceptTimestamp2", col("acceptTimestamp.value").cast("STRING"))
but this solution looks fragile, as the order of headers produced on other side can always be changed with updates, while only thhe key name looks stabile there. How can I lookup through the struct keys and extract the needed struct rather than poin the array index?
UPD.
Thanks to davice of Alex Ott, I have fount the solution to get what I want into the following columns:
.withColumn("headers1", map_from_entries(col("headers")))
.withColumn("acceptTimestamp2", col("headers1.acceptTimestamp").cast("STRING"))
You can use map_from_entries function to convert an array of structs into a map where you can access entries by name.
import org.apache.spark.sql.functions.map_from_entries
....
select(map_from_entries("headers").alias("headers"), ...)
but as I remember, header names may not be unique, that's a primary reason why they are sent as an array of key/value pairs.
Another approach would be to use filter function to find headers by name - this will allow to handle non-unique headers.
P.S. I have used Python docs because I can link individual functions - it's not easy to do in the Scala docs.