Search code examples
scalaapache-sparkapache-kafkaspark-structured-streaming

How to add Kafka headers for a Kafka output in Spark Structured Streaming, making them from Dataframe columns?


As Kafka headers for both input and output with Spark 3.0 is just a column of array of tuples, the question here is how to move the column value to a tuple made of column name and the current row value of the column and then to add it to the existing array.

For example,

I have a dataset, once obtained from Kafka and intended to be sent to kafka after some transformations:

Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+
|key|value|topic|partition|offset|timestamp|timestampType|headers|headersMap|timestamp2|acceptTimestamp|delayWindowEnd|timestamp1|
+---+-----+-----+---------+------+---------+-------------+-------+----------+----------+---------------+--------------+----------+

In headers we have:

[[Access-Control-Allow-Headers, DNT,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Range], [Access-Control-Allow-Methods, GET, POST, OPTIONS], [Access-Control-Allow-Origin, *], [Access-Control-Expose-Headers, Content-Length,Content-Range], [breadcrumbId, ID-ILYA-1644900650793-0-1], [Cache-Control, no-cache], [Connection, keep-alive], [Content-Length, 36362], [Content-Type, application/json], [count,   h], [Date, Tue, 15 Feb 2022 04:51:01 GMT], [kafka.KEY, 60890], [messageId, 60890-1644897084], [Server, adf_http_server/4.3.0205], [Set-Cookie, sessions=21293ca7a63f591ea65771ed2e7fbb5b; path=/;], [time_from,     b#$], [time_to,     b14], [timestamp, b#<], [unit_id,   ��]]

i.e and array of tuples

and in column timestamp1 we have

2022-02-20 02:07:32.54

so I'd like to add

[timestamp1, 2022-02-20 02:07:32.54]

to the array from headers column.

How I can turn a pair of column name and value to a tuple?


Solution

  • I had to create a tuple column using struct() and then array_union() if with preexisting array of headers tuples.

    .withColumn(kafkaTimestampColumnName, col("timestamp"))
    .withColumn("tupletime", struct(lit(kafkaTimestampColumnName) as "key", (unix_timestamp(col(kafkaTimestampColumnName), "yyyy-MM-dd hh:mm:ss").cast("string")).cast("binary") as "value"))
    .withColumn("headers", array_union(col("headers"), array(col("tupletime"))))
    

    please note, that in the tuple the key should be string while the value should be binary.