Search code examples
apache-kafkaspark-structured-streaming

Writing streaming dataframe to kafka


I am reading log lines from kafka topic through spark structured streaming,separating the fields of loglines, performing some manipulations on fields and storing storing it in dataframe with separate columns for every fields. I want to write this dataframe to kafka

Below is my sample dataframe and writestream for writing it to kafka

 val dfStructuredWrite = dfProcessedLogs.select(
    dfProcessedLogs("result").getItem("_1").as("col1"),
    dfProcessedLogs("result").getItem("_2").as("col2"),
    dfProcessedLogs("result").getItem("_17").as("col3"))

dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()

Above code gives me below error

Required attribute 'value' not found

I believe this is because I don't have my dataframe in key/value format.How can I write my existing dataframe to kafka in most efficient way?


Solution

  • The Dataframe being written to Kafka should have the following columns in schema:

    • key (optional) (type: string or binary)
    • value (required) (type: string or binary)
    • topic (optional) (type: string)

    In your case there is no value column and the exception is thrown.

    You have to modify it to add at least value column, ex:

    import org.apache.spark.sql.functions.{concat, lit}
    
    dfStructuredWrite.select(concat($"col1", lit(" "), $"col2", lit(" "), $"col3").alias("value"))
    

    For more details you can check: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka