Problem with expanding json data in a column into multiple columns in a dataframe PySpark from Apache Kafka streaming data


I practise streaming data. I decided to create Apache Kafka. I would like to read data from KafkaProducer, then I created a Kafka Source for Streaming Queries:

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", topic_name) \

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(truncate=False)
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").printSchema()

It works properly:

I would like to expand this json data in a value column into multpie columns in order to process further this data. I used this code:

json_schema = StructType(
    StructField('A', StringType(), True),
    StructField('B', StringType(), True)
 json_df = df.selectExpr("cast(value as string) as value")
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], 

It returns a dataframe, but it has a null values:

I browsed internet, and I have no idea why it returns NULL values in the dataframe. Even if I used IntegerType() in a json_schema, the result is the same.

    Look at the data. Where did A and B come from for the schema you tried to use? If the schema is wrong, you'll get null rows, not an error.

    This is the schema you should be using

    json_schema = StructType([
        StructField('number', IntegerType(), False),

    Then cast the value column as value, not override it with the key casted result