Search code examples
pysparkdatabricksspark-structured-streamingazure-eventhubdatabricks-autoloader

Is Schema Evolution when ingesting avro files, written by Event Hub Capture?


I have a double hop ingestion pipeline as follows:

Data Producer -events-> Azure Event Hub -event hub capture-> ADLS in Avro format -Autoloader-> Databricks UC Managed Delta Table

When using the Event Hub Capture functionality, captured the incoming events to Avro in ADLS, I notice it stores the events in a specific schema as follows:

Message Body Offset Sequence Number Partition Id Enqueued Time Content Type

When reading into a PySpark dataframe, I notice Message Body is in binary format. Converting this to string gives me my record in JSON, I can then unpack these using a StructType schema.

My question is, given I use spark.readStream and autoloader to read in the data and it comes in using the event hub schema, will this mean I cannot use schema evolution features as it will capture this schema in the metadata?

My actual schema is stored in the Message Body column, so an additional schema wouldn't be picked up I'm guessing? Any way around this?

Many thanks


Solution

  • Since the schema is constant in Event Hub, schema evolution is not needed. Even if you change the data, it comes in the body column which is of binary type and is converted to string upon casting; it will still have the same schema.

    So, if you have a standard constant schema that is not changing, you can load your data using the from_json function.

    schema = StructType([
        StructField("key1", StringType(), True),
        StructField("key2", StringType(), True),
        StructField("key3", StringType(), True),
        StructField("nestedKey", StructType([
            StructField("nestedKey1", StringType(), True)
        ]), True),
        StructField("arrayKey", ArrayType(StringType(), True), True)
    ])
    
    df.select(F.from_json(F.col("body").cast("string"), schema=schema).alias("body")).display()
    

    Output:

    enter image description here

    You also mentioned that you have the actual schema in the message body itself, but it will be of string type after casting and not be able to use for schema.

    You can also use a schema registry in Event Hub for making a common schema between consumer and producer. Refer to the documents below for more information.

    Create an Azure Event Hubs schema registry - Azure Event Hubs | Microsoft Learn

    Validate events from Apache Kafka apps using Avro (Java) - Azure Event Hubs | Microsoft Learn

    Read and write streaming Avro data | Databricks on AWS