Search code examples
azurepysparkazure-databricksazure-eventhub

Reading avro data with Databricks from Azure Data Lake Gen1 generated by Azure EventHubs Capture fails


I am trying to read avro data from Azure Data Lake Gen1, generated from Azure EventHubs with Azure Event Hubs Capture enabled in Azure Databricks with pyspark:

inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)

The following statement fails

rawData.count()

with

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file

Is EventHub-Capture writing non-Avro data? Are there any best practices for reading EventHub captured data with Spark ?


Solution

  • One pattern implementing a cold ingestion path is using Event Hubs Capture. EventHubs capturing writes one file per partition as defined with the windowing parameters. The data is written in avro format and can be analyzed with Apache Spark.

    So what are best practices using this functionality?

    1. Do not over-partition

    Often I have seen people using the default configuration which finally often results in many small files. If you want to consume the data ingested via EventHubs Capture with Spark, keep in mind the best practices for file sizes in Azure Data Lake Store and partitions with Spark. File sizes should be ~256 MB and partitions between 10 and 50 GB. So finally the confguration depends on the number and sizes of the messages you are consuming. In most cases you are doing fine with just partitioning your data per ingest-date.

    2. Check "Do not emit empty files option"

    You should check "Do not emit empty files option". If you want to consume the data with Spark that saves unnecessary file operations.

    3. Use the data origin in your file pathes

    With a streaming architecture your EventHub is what a Landing Zone would be in a batch oriented architecture approach. So you will ingest the data in a raw-data-layer. Good practice is to use data sources instead of the name of the EventHub in the directory path. So for example if you are ingesting telemetry data from robots in your factory this could be the directory path /raw/robots/

    The storage naming requires all attributes like {Namesapce}, {PartitionId} to be used. So finally the a good capture file format definition with an explicitly defined path, a daily partition and use of the remaining attributes for the filename in an Azure Data Lake Gen 2 could look like this:

     /raw/robots/ingest_date={Year}-{Month}-{Day}/{Hour}{Minute}{Second}-{Namespace}-{EventHub}-{PartitionId}
    

    enter image description here

    4. Think of a compaction job

    Captured data is not compressed and also might end up in to small files in your use case (as minimum write frequency is 15 minutes). So if necessary write a compaction job running once a day. Something like

    df.repartition(5).write.format("avro").save(targetpath)
    

    will do this job.

    So what are now the best practices for reading the captured data?

    5. Ignore non avro-files reading the data

    Azure EventHubs Capture writes temporary data to Azure Data Lake Gen1. Best practice is only to read data with avro-extension. You can easily achive this via a spark configuration:

    spark.conf.set("avro.mapred.ignore.inputs.without.extension", "true")
    

    6. Read only relevant partitions

    Consider reading only relevant partitions, e. g. filter the current ingestion day.

    7. Use shared metadata

    Reading the captured data works similar than reading the data directly from Azure EventHubs. So you have to have a schema. Assuming that you also have jobs reading the data directly with Spark Structured Streaming a good pattern is to store the metadata and share it. You could just store this metadata in a Data Lake Store json file:

    [{"MeasurementTS":"timestamp","Location":"string", "Temperature":"double"}]
    

    and read it with this simple parsing function:

    # parse the metadata to get the schema
    from collections import OrderedDict 
    from pyspark.sql.types import *
    import json
    
    ds = dbutils.fs.head (metadata)                                                 # read metadata file
    
    items = (json
      .JSONDecoder(object_pairs_hook=OrderedDict)
      .decode(ds)[0].items())
    
    #Schema mapping 
    mapping = {"string": StringType, "integer": IntegerType, "double" : DoubleType, "timestamp" : TimestampType, "boolean" : BooleanType}
    
    schema = StructType([
        StructField(k, mapping.get(v.lower())(), True) for (k, v) in items])
    

    So you could just reuse your schema:

    from pyspark.sql.functions import *
    
    parsedData = spark.read.format("avro").load(rawpath). \
      selectExpr("EnqueuedTimeUtc", "cast(Body as string) as json") \
     .select("EnqueuedTimeUtc", from_json("json", schema=Schema).alias("data")) \
     .select("EnqueuedTimeUtc", "data.*")