Dears,
I practise streaming data. I decided to create Apache Kafka. I would like to read data from KafkaProducer, then I created a Kafka Source for Streaming Queries:
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topic_name) \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(truncate=False)
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").printSchema()
It works properly:
I would like to expand this json data in a value column into multpie columns in order to process further this data. I used this code:
json_schema = StructType(
[
StructField('A', StringType(), True),
StructField('B', StringType(), True)
]
)
json_df = df.selectExpr("cast(value as string) as value")
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"],
json_schema)).select("value.*")
json_expanded_df.show()
It returns a dataframe, but it has a null values:
I browsed internet, and I have no idea why it returns NULL values in the dataframe. Even if I used IntegerType() in a json_schema, the result is the same.
I would appreciate if it someone could advice me how to deal with it.
It works properly:
Look at the data. Where did A and B come from for the schema you tried to use? If the schema is wrong, you'll get null rows, not an error.
This is the schema you should be using
json_schema = StructType([
StructField('number', IntegerType(), False),
])
Then cast the value column as value
, not override it with the key casted result