Search code examples
pysparkazure-eventhubmicrosoft-fabric

How to Read Compressed Data from Azure Event Hub in Fabric using Py-spark


I am sending compressed data into Event Hub to overcome 1 MB hard limit in Azure Event Hub. I also have to read this in Py-spark and update delta table.

The compressed data send to Event hub is coming as null in Py-spark stream. How to read it?

This is how I am reading from Event Hub

 df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))

This is how I am sending data to Event Hub . event_data_batch = await producer.create_batch()

    # Add events to the batch.
    body = '{"id": "90", "firstName": "Sudarshan70","middleName": "Kumar2","lastName": "Thakur2"}'  
    
    # Compress the JSON string using the gzip algorithm.
    compressed_body = gzip.compress(body.encode('utf-8'))

    # Encode the compressed JSON string to base64.
    encoded_compressed_body = base64.b64encode(compressed_body)
    

    event_data_batch.add(EventData(encoded_compressed_body))

I tried to read with option as gzip but its giving me null .

df_stream  = spark.readStream.format("eventhubs")\
  .options(**ehConf)\
  .option("compression", "gzip") \
  .load() 

And this is how I read body column

def foreach_batch_function(df_stream, epoch_id):
              # temp_view_name = "stream_temp"
              df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))
              df_stream_body.createOrReplaceTempView("stream_temp")
              # df_stream_body.printSchema()

Solution

  • You need to do decompress on the column body, the option .option("compression", "gzip") is for compressed files not on compressed data on the column.

    So, need to create user defined function which decompress and decodes the data. Use below code.

    UDF

    import  pyspark.sql.functions as F
    from pyspark.sql.types import StringType
    import gzip,base64,json
    
    
    def unZip(binary_string):
        return gzip.decompress(base64.b64decode(binary_string)).decode('utf-8')
    
    unzip = F.udf(unZip, StringType())
    

    Next, create new column with decompressed data.

    df.withColumn("bd", unzip(F.col("body").cast('string'))).display()
    

    Output:

    enter image description here