Search code examples
pythonapache-sparkpysparkazure-databricksazure-eventhub

EventHub Parsing Decoded Body


I am currently facing an issue where I have EventHub file landing in the landing zone in avro format. After that autloader picks the files up and puts them in the table as is which means that Body is in bytes. There is another column added with the stream which is decoding that Body to string. The column in the table now contains the rows with strings which need to be parsed and converted to json all the rows. The strings look like this

@300://mm/cm#//c.Process/p.TriggerAck/v,@2222,@300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v,@cc300://imm/cm#//c.AirValve/p.sv_bActivatedInSequence/v
444444,4.73,0

Expected output

{
"@300://mm/cm#//c.Process/p.Trigger/v,@2222,@300://mm/cm#//c.AcousticAlarm1/p.sv_Alarm/v": 444444,
"@2222": 4.73,
"@300://mm/cm#//c.AirValve/p.sv_ActivatedInSequence/v": 0
}

Each of these column names belong to a value respectively. I want to convert all of these strings to json and have them stored in the rows in the table.

I am fairly new to pyspark and Databricks so I was wondering if I can get any kind of help or nudge in the right direction as to how convert all of these to json and to have it optimal as well since this is a transformation of the data.

Any help is greatly appreciated!

Code that I currently have is not of much help

import pandas as pd
from io import StringIO

data = spark.sql("SELECT decoded FROM catalog.schema.table")

first_row = data.head()

decoded_body = first_row[0]

df = pd.read_csv(StringIO(decoded_body))

This only tackles the first row even if I do not choose only one row. My idea was to convert it to proper table and then to json but I am sure there are better solutions.


Solution

  • You can use user-defined functions in Databricks to achieve your goal. There is a function (from_csv function - Azure Databricks - Databricks SQL | Microsoft Learn) to read data from a CSV string. Since you have a dynamic schema, you can use the code below.

    from pyspark.sql.functions import udf
    import pyspark.sql.functions as F
    from pyspark.sql.types import *
    from io import StringIO
    import pandas as pd
    
    def get_json(row):
        import json
        df = pd.read_csv(StringIO(row), dtype=float)
        res = {'value': json.loads(df.to_json(orient='records'))[0]}
        return res
    
    json_udf = udf(get_json, StructType([
        StructField('value', MapType(StringType(), FloatType()))
    ]))
    

    And

    df = spark.sql("SELECT decoded FROM table")
    df.withColumn("json", json_udf(F.col('decoded'))).select("json.value").display()
    

    Output:

    value
    {"@cc300://imm/cm#//c.AirValve1/p.sv_bActivatedInSequence/v":null,"@cc300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v":0,"@2222":4.73,"@cc300://imm/cm#//c.Process/p.TriggerAck/v":444444}

    Here, for each record, it reads the data as CSV in pandas and then converts it into JSON. You can adjust the data type according to your data.