Search code examples
jsonapache-sparkpysparkapache-kafkaspark-structured-streaming

Problem with expanding json data in a column into multiple columns in a dataframe PySpark from Apache Kafka streaming data


Dears,

I practise streaming data. I decided to create Apache Kafka. I would like to read data from KafkaProducer, then I created a Kafka Source for Streaming Queries:

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", topic_name) \
  .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(truncate=False)
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").printSchema()

It works properly:

enter image description here

I would like to expand this json data in a value column into multpie columns in order to process further this data. I used this code:

json_schema = StructType(
[
    StructField('A', StringType(), True),
    StructField('B', StringType(), True)
]
) 
 json_df = df.selectExpr("cast(value as string) as value")
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], 
json_schema)).select("value.*") 

json_expanded_df.show()

It returns a dataframe, but it has a null values:

enter image description here

I browsed internet, and I have no idea why it returns NULL values in the dataframe. Even if I used IntegerType() in a json_schema, the result is the same.

I would appreciate if it someone could advice me how to deal with it.


Solution

  • It works properly:

    Look at the data. Where did A and B come from for the schema you tried to use? If the schema is wrong, you'll get null rows, not an error.

    This is the schema you should be using

    json_schema = StructType([
        StructField('number', IntegerType(), False),
    ])
    

    Then cast the value column as value, not override it with the key casted result