Search code examples
pythonpysparkbinaryazure-eventhubazure-databricks

Failing to decompress streaming data by using UDF on Azure Databricks - Python


I am trying to read Azure EventHub GZIP compressed messages using Azure DataBricks and python (PySpark), but using UDF isn't working with BinaryType data.

Well, here is the part where I check what is in the body

df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)

And that displays a well compressed data, just like the following: H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...

However when I try to send the data to my UDF, It doesn't behave like expected. The function literally don't do anything, but the output looks like it has been transformed:

import zlib
from pyspark.sql.types import StringType

def streamDecompress(val: BinaryType()):
  #return zlib.decompress(val)
  return val

func_udf = udf(lambda x: streamDecompress(x), StringType())

df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)

Here is the output:

[B@49d3f786

So, like expected, it fails when I try to decompress using zlib.

Anyone know how I do it?


Solution

  • Well, that was way simpler than I thought. I was basically, trying to display a byte-like data haha.

    The code below solved the problem:

    import zlib
    
    def streamDecompress(val):   
      return str(zlib.decompress(val, 15+32))
    
    func_udf = udf(lambda x: streamDecompress(x))
    
    df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')
    
    display(df, truncate=False)