Search code examples
apache-sparkpysparkspark-streamingspark-streaming-kafka

Splitting Kafka Message Line by line in Spark Structured Streaming


I want to read a message from Kafka topic in my Spark Structured Streaming job into a data frame. but I am getting entire message in one offset so in data frame only this message is coming into one row instead of multiple rows. (in my case it is 3 rows)

When I print this message I am getting below output:

enter image description here

The message "Text1", "Text2" and "Text3" I want in 3 rows in data frame so that I can process further.

Please help me.


Solution

  • you can use a user defined function (UDF) to convert the message string into a sequence of strings, and then apply the explode function on that column, to create a new row for each element in the sequence:

    As illustrated below (in scala, same principle applies to pyspark):

    case class KafkaMessage(offset: Long, message: String)
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.explode
    
    val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()
    
    val splitString = udf { s: String => s.split('\n') }
    
    df.withColumn("splitMsg", explode(splitString($"message")))
      .select("offset", "splitMsg")
      .show()
    

    this will yield the following output:

    +------+--------+
    |offset|splitMsg|
    +------+--------+
    |  1000|   Text1|
    |  1000|   Text2|
    |  1000|   Text3|
    +------+--------+