Search code examples
pysparkspark-streaming

How can I get Dataframe from Kafka using Spark Stream without json Schema?


I'm a spark newbie

I'm trying to read kafka topic using Spark Stream.

The 'value' field of the data streamed from Kafka is a json string. and I want to convert this 'value' field to a dataframe and change it to a parquet file.

I want to get schema information from the string value contained in the value field. cause, JSON data fields continue to be added

for example the kafka data like this.

key value ...
0 "{a:1, b:2, c:3}.." ...
1 "{a:1, b:2, c:3, d:4}.." ...

i'm trying this code

   source_df = streaming_data.selectExpr("CAST(value AS STRING)").alias("value") \
         .select(from_json("value", schema_of_json(streaming_data.select('value').first().getString(0)))).alias("data") \
         .select("data.*")

i got error pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

please help


Solution

  • Option 1: Hard code the schema and use it in F.from_json().

    my_schema = T.StructType([
        T.StructField('a', T.IntegerType()),
        T.StructField('b', T.IntegerType()),
        T.StructField('c', T.IntegerType()),
        T.StructField('d', T.IntegerType()),
    ])
    value = F.col('value').cast(T.StringType())
    data = F.from_json(value, my_schema).alias('data')
    source_df = streaming_data.select(data).select('data.*')
    

    Option 2: If you want to dynamically infer the schema, you can use foreachbatch. But note that this is risky and breaking schema changes will fail the streaming query. Also it's not guaranteed that the schema will be inferred correctly.

    def parse_and_process(df: DataFrame, epoch_id: int) -> None:
        # cache the current micro batch, it will be scanned more than once
        df.persist()
    
        # infer the schema of the current batch
        spark = SparkSession.getActiveSession()
    
        value = F.col('value').cast(T.StringType())
        inferred_df = spark.read.json(
            df.select(value).rdd.map(lambda x: x[0]),
            dropFieldIfAllNull=True
        )
        inferred_schema = inferred_df.schema
    
        # parse the json with the schema
        res_df = df.withColumn('data', F.from_json(value, inferred_schema))
    
        # process the DataFramee, it's not a streaming DataFrame anymore.
        res_df.write....
    
        df.unpersist()
    
    
    streaming_data.writeStream.foreachBatch(parse_and_process).start()