I want to read a message from Kafka topic in my Spark Structured Streaming job into a data frame. but I am getting entire message in one offset so in data frame only this message is coming into one row instead of multiple rows. (in my case it is 3 rows)
When I print this message I am getting below output:
The message "Text1", "Text2" and "Text3" I want in 3 rows in data frame so that I can process further.
Please help me.
you can use a user defined function (UDF) to convert the message string into a sequence of strings, and then apply the explode function on that column, to create a new row for each element in the sequence:
As illustrated below (in scala, same principle applies to pyspark):
case class KafkaMessage(offset: Long, message: String)
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode
val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()
val splitString = udf { s: String => s.split('\n') }
df.withColumn("splitMsg", explode(splitString($"message")))
.select("offset", "splitMsg")
.show()
this will yield the following output:
+------+--------+
|offset|splitMsg|
+------+--------+
| 1000| Text1|
| 1000| Text2|
| 1000| Text3|
+------+--------+