I'm a spark newbie
I'm trying to read kafka topic using Spark Stream.
The 'value' field of the data streamed from Kafka is a json string. and I want to convert this 'value' field to a dataframe and change it to a parquet file.
I want to get schema information from the string value contained in the value field. cause, JSON data fields continue to be added
for example the kafka data like this.
key | value | ... |
---|---|---|
0 | "{a:1, b:2, c:3}.." | ... |
1 | "{a:1, b:2, c:3, d:4}.." | ... |
i'm trying this code
source_df = streaming_data.selectExpr("CAST(value AS STRING)").alias("value") \
.select(from_json("value", schema_of_json(streaming_data.select('value').first().getString(0)))).alias("data") \
.select("data.*")
i got error pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
please help
Option 1: Hard code the schema and use it in F.from_json()
.
my_schema = T.StructType([
T.StructField('a', T.IntegerType()),
T.StructField('b', T.IntegerType()),
T.StructField('c', T.IntegerType()),
T.StructField('d', T.IntegerType()),
])
value = F.col('value').cast(T.StringType())
data = F.from_json(value, my_schema).alias('data')
source_df = streaming_data.select(data).select('data.*')
Option 2: If you want to dynamically infer the schema, you can use foreachbatch
. But note that this is risky and breaking schema changes will fail the streaming query. Also it's not guaranteed that the schema will be inferred correctly.
def parse_and_process(df: DataFrame, epoch_id: int) -> None:
# cache the current micro batch, it will be scanned more than once
df.persist()
# infer the schema of the current batch
spark = SparkSession.getActiveSession()
value = F.col('value').cast(T.StringType())
inferred_df = spark.read.json(
df.select(value).rdd.map(lambda x: x[0]),
dropFieldIfAllNull=True
)
inferred_schema = inferred_df.schema
# parse the json with the schema
res_df = df.withColumn('data', F.from_json(value, inferred_schema))
# process the DataFramee, it's not a streaming DataFrame anymore.
res_df.write....
df.unpersist()
streaming_data.writeStream.foreachBatch(parse_and_process).start()