I want to write the content of a variable that is created dynamically in the program to a ADLS file. This is how I am getting the data -
@dataclass
class pipeline_run:
id:str
group_id:str
run_start:str
run_end:str
pipeline_name:str
pipeline_status:str
parameters:str
message:str
addl_properties:str
runs = adf_client.pipeline_runs.query_by_factory(rg_name, df_name,
filter_parameters={'lastUpdatedBefore': 'date'})
#Array to hold the pipeline run returns
list_of_json_data = list()
for i in range(0, len(runs.value)):
this_run = runs.value[i]
#Gathering each runs information and storing into dataclass
data = pipeline_run(this_run.run_id, this_run.run_group_id, this_run.run_start, this_run.run_end, this_run.pipeline_name,
this_run.status, this_run.parameters, this_run.message, this_run.additional_properties)
#Converting dataclass to dict and storing in the created list
list_of_json_data.append(asdict(data))
Now, I want list_of_json_date to write in an ADLs File (.json). Any help would be appreciated. Thanks!
As you already have list of dictionaries as variable, follow the below approach to achieve your requirement. First Create an ADLS Gen2 Linked Service in Synapse.
Then Mount your Target container using the Linked service.
mssparkutils.fs.mount(
"abfss://<container_name>@<Storage_account_name>.dfs.core.windows.net",
"/<Mountpoint_name>",
{"linkedService":"<Linked_service_name>"}
)
After mounting, you can do it either by using Pyspark dataframe and pandas or with open()
and json
in python. Use the mount point in building the file path.
Using Pyspark dataframe and pandas:
Here I have used a sample list of dictionaries as the variable list_of_json_data
.
Code:
list_of_json_data=[{"id": "24", "group_id": "1224", "run_id": "990b0720-4747-4992-b87f-a74e1078a5f1"},
{"id": "16", "group_id": "1216", "run_id": "990b0720-4747-4992-b87f-a74e1078a5f1"},
{"id": "20", "group_id": "2408", "run_id": "990b0720-4747-4992-b87f-a74e1078a5f1"}]
#Create spark dataframe with variable
df = spark.createDataFrame(list_of_json_data)
display(df)
#Get the spark Job Id to build the path
jobid=mssparkutils.env.getJobId()
#Building the path
LogFilepath='/synfs/'+jobid+'/sourcedata/Sample2.json'
print(LogFilepath)
#Write to JSON path
df.toPandas().to_json(LogFilepath,orient='records')
Result JSON file:
Using with open()
and json
:
Use the below code:
import json
#get the spark Job Id to build File path
jobid=mssparkutils.env.getJobId()
Filepath='/synfs/'+jobid+'/sourcedata/PipelinesJSON.json'
#Write to JSON Path
with open(Filepath, 'w') as f:
json.dump(list_of_json_data , f)
Result JSON file: