I am trying to read Azure EventHub GZIP compressed messages using Azure DataBricks and python (PySpark), but using UDF isn't working with BinaryType data.
Well, here is the part where I check what is in the body
df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)
And that displays a well compressed data, just like the following: H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...
However when I try to send the data to my UDF, It doesn't behave like expected. The function literally don't do anything, but the output looks like it has been transformed:
import zlib
from pyspark.sql.types import StringType
def streamDecompress(val: BinaryType()):
#return zlib.decompress(val)
return val
func_udf = udf(lambda x: streamDecompress(x), StringType())
df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)
Here is the output:
[B@49d3f786
So, like expected, it fails when I try to decompress using zlib.
Anyone know how I do it?
Well, that was way simpler than I thought. I was basically, trying to display a byte-like data haha.
The code below solved the problem:
import zlib
def streamDecompress(val):
return str(zlib.decompress(val, 15+32))
func_udf = udf(lambda x: streamDecompress(x))
df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')
display(df, truncate=False)