Search code examples
apache-sparkpysparkapache-kafkaapache-spark-sqlspark-structured-streaming

Structured streaming schema from Kafka JSON - query error


I'm using Spark 3.2 for getting JSONs streaming from Kafka 2.12-3.0.0. I'm receiving error in query after parsing JSONs.

Kafka topic streaming JSONs:

b'{"pmu_id": 2, "time": 1642771653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 49.99, "rocof": 1}'
b'{"pmu_id": 2, "time": 1642734653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 50.00, "rocof": -1}'

DataFrame schema:

stream01Schema= StructType()\
    .add("pmu_id", ByteType())\
    .add("time", TimestampType()).add("stream_id", ByteType())
    .add("analog", StringType()).add("digital", ByteType()).add("frequency", FloatType()).add("rocof", ByteType())

Constructing a streaming DataFrame that reads from topic:

stream01DF = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()
        .select(col("key").cast("string") from_json(col("value").cast("string").alias("pmudata"), stream01Schema))

Printing resulting schema:

root
 |-- key: string (nullable = true)
 |-- from_json(CAST(value AS STRING) AS pmudata): struct (nullable = true)    
 |    |-- pmu_id: byte (nullable = true)
 |    |-- time: timestamp (nullable = true)
 |    |-- stream_id: byte (nullable = true)
 |    |-- analog: string (nullable = true)
 |    |-- digital: byte (nullable = true)
 |    |-- frequency: float (nullable = true)
 |    |-- rocof: byte (nullable = true)

Testing query:

testQuery = stream01DF.groupBy("pmudata.rocof").count()    
testQuery.writeStream \
  .outputMode("complete") \
  .format("console") \
  .option("truncate", False) \
  .start() \
  .awaitTermination()

Error received:

pyspark.sql.utils.AnalysisException: cannot resolve 'pmudata.rocof' given input columns: [from_json(CAST(value AS STRING) AS pmudata), key];

Solution

  • Seems you're looking for this since you are trying to alias the from_json() column (check your parentheses) to a name, which you can later select/group by.

    from_json(col("value").cast("string"), stream01Schema).alias("pmudata")

    A complete usage is in the end-to-end example in this Databricks post