As Kafka headers for both input and output with Spark 3.0 is just a column of array of tuples, the question here is how to move the column value to a tuple made of column name and the current row value of the column and then to add it to the existing array.
For example,
I have a dataset, once obtained from Kafka and intended to be sent to kafka after some transformations:
Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+
|key|value|topic|partition|offset|timestamp|timestampType|headers|headersMap|timestamp2|acceptTimestamp|delayWindowEnd|timestamp1|
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+
In headers we have:
[[Access-Control-Allow-Headers, DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range], [Access-Control-Allow-Methods, GET, POST, OPTIONS], [Access-Control-Allow-Origin, *], [Access-Control-Expose-Headers, Content-Length,Content-Range], [breadcrumbId, ID-ILYA-1644900650793-0-1], [Cache-Control, no-cache], [Connection, keep-alive], [Content-Length, 36362], [Content-Type, application/json], [count, h], [Date, Tue, 15 Feb 2022 04:51:01 GMT], [kafka.KEY, 60890], [messageId, 60890-1644897084], [Server, adf_http_server/4.3.0205], [Set-Cookie, sessions=21293ca7a63f591ea65771ed2e7fbb5b; path=/;], [time_from, b#$], [time_to, b14], [timestamp, b#<], [unit_id, ��]]
i.e and array of tuples
and in column timestamp1
we have
2022-02-20 02:07:32.54
so I'd like to add
[timestamp1, 2022-02-20 02:07:32.54]
to the array from headers column.
How I can turn a pair of column name and value to a tuple?
I had to create a tuple column using struct()
and then array_union()
if with preexisting array of headers tuples.
.withColumn(kafkaTimestampColumnName, col("timestamp"))
.withColumn("tupletime", struct(lit(kafkaTimestampColumnName) as "key", (unix_timestamp(col(kafkaTimestampColumnName), "yyyy-MM-dd hh:mm:ss").cast("string")).cast("binary") as "value"))
.withColumn("headers", array_union(col("headers"), array(col("tupletime"))))
please note, that in the tuple the key should be string
while the value should be binary
.