I am currently facing an issue where I have EventHub file landing in the landing zone in avro format. After that autloader picks the files up and puts them in the table as is which means that Body is in bytes. There is another column added with the stream which is decoding that Body to string. The column in the table now contains the rows with strings which need to be parsed and converted to json all the rows. The strings look like this
@300://mm/cm#//c.Process/p.TriggerAck/v,@2222,@300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v,@cc300://imm/cm#//c.AirValve/p.sv_bActivatedInSequence/v
444444,4.73,0
Expected output
{
"@300://mm/cm#//c.Process/p.Trigger/v,@2222,@300://mm/cm#//c.AcousticAlarm1/p.sv_Alarm/v": 444444,
"@2222": 4.73,
"@300://mm/cm#//c.AirValve/p.sv_ActivatedInSequence/v": 0
}
Each of these column names belong to a value respectively. I want to convert all of these strings to json and have them stored in the rows in the table.
I am fairly new to pyspark and Databricks so I was wondering if I can get any kind of help or nudge in the right direction as to how convert all of these to json and to have it optimal as well since this is a transformation of the data.
Any help is greatly appreciated!
Code that I currently have is not of much help
import pandas as pd
from io import StringIO
data = spark.sql("SELECT decoded FROM catalog.schema.table")
first_row = data.head()
decoded_body = first_row[0]
df = pd.read_csv(StringIO(decoded_body))
This only tackles the first row even if I do not choose only one row. My idea was to convert it to proper table and then to json but I am sure there are better solutions.
You can use user-defined functions in Databricks to achieve your goal. There is a function (from_csv function - Azure Databricks - Databricks SQL | Microsoft Learn) to read data from a CSV string. Since you have a dynamic schema, you can use the code below.
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import *
from io import StringIO
import pandas as pd
def get_json(row):
import json
df = pd.read_csv(StringIO(row), dtype=float)
res = {'value': json.loads(df.to_json(orient='records'))[0]}
return res
json_udf = udf(get_json, StructType([
StructField('value', MapType(StringType(), FloatType()))
]))
And
df = spark.sql("SELECT decoded FROM table")
df.withColumn("json", json_udf(F.col('decoded'))).select("json.value").display()
Output:
value |
---|
{"@cc300://imm/cm#//c.AirValve1/p.sv_bActivatedInSequence/v":null,"@cc300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v":0,"@2222":4.73,"@cc300://imm/cm#//c.Process/p.TriggerAck/v":444444} |
Here, for each record, it reads the data as CSV in pandas and then converts it into JSON. You can adjust the data type according to your data.