Search code examples
jsonapache-sparkpysparkspark-structured-streamingspark-kafka-integration

How to convert kafka message value to a particular schema?


I am trying to read data from Kafka topics using Pyspark. I want to transform that data into a particular schema. But unable to do so.

Here is what I have tried:

>> df = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test1").load()
    
>> userSchema = StructType().add("Name", StringType(), True).add("Age", IntegerType(), True)

>> df1 = df.selectExpr("CAST(value AS STRING)")

>> df2 = df1.select(from_json(col("value"), userSchema))

>> df2.printSchema()
root
 |-- jsontostructs(value): struct (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Age: integer (nullable = true)

What I want is:

>> df2.printSchema()
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)

Is there any way to get the desired schema?


Solution

  • For anyone facing the same issue, here is the how I achieved this:

     df2 = df1.select(from_json(col("value"),userSchema)).select("jsontostructs(value).*")