Search code examples
apache-sparkpysparkapache-kafkadatabricksvariant-format

How do I access the fields within a VARIANT column while reading from Kafka using Spark?


I get a nice structure if I do not try to get to nested fields. I am reading from Kafka and writing to a table. The issue happens on the readStream. I get [INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "data". Need a complex type [STRUCT, ARRAY, MAP] but got "VARIANT". SQLSTATE: 42000

Here is my readStream:

    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
        .option("subscribe", TOPIC) \
        .option("startingOffsets", "latest") \
        .... \
        .load() \
        .withColumn("data", parse_json(col("value").cast("string"))) \
        .select("data, data:unique_id")\
        .withColumn("timestamp", current_timestamp())
    
    display(df)

Solution

  • It turns out that using selectExpr is required:

        df = spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
            .option("subscribe", TOPIC) \
            .option("startingOffsets", "latest") \
            .... \
            .load() \
            .withColumn("data", parse_json(col("value").cast("string"))) \
            .selectExpr("data, data:unique_id")\
            .withColumn("timestamp", current_timestamp())
        
        display(df)