I am trying to write an Airflow DAG, which will load an .XML
file to BigQuery using a Python method, but I believe that it needs to be converted into .json
for it to work.
I have written the following code to convert an .XML
file to .json
, but I am unsure if this is correct, and how I can write a Python method to achieve this:
import json, xmltodict
def load_xml_to_bq():
with open("xml_file.xml") as xml_file:
data_dict = xmltodict.parse(xml_file.read())
xml_file.close()
json_data = json.dumps(data_dict)
with open("data.json", "w") as json_file:
json_file.write(json_data)
json_file.close()
Additionally, does the .XML
file need to be loaded from a GCS Bucket for it to be inserted into BigQuery?
Furthermore, would the following code need to be added to my function for the .XML
to be inserted into BigQuery?
client = bigquery.Client()
client.insert_rows_json(f'{dataset}.{table}', dec_sale_list)
Thanks - any help on how to accomplish this would be helpful; I feel that I have some of the correct concepts, but I am not sure what I need to add/remove to do this.
You can also use the 2 following solutions.
Solution 1 :
PythonOperator
to transform your xml file to json with pure Python
code and a lib called xmltodict
GCSToBigqueryOperator
from the Json
file to Bigquery
dags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'
def transform_xml_file_to_json():
import json, xmltodict
from google.cloud import storage
with open(xml_file_path_gcs) as xml_file:
data_dict = xmltodict.parse(xml_file.read())
xml_file.close()
json_data = json.dumps(data_dict)
client = storage.Client()
bucket = client.get_bucket("your-bucket")
blob = bucket.blob('your_file.json')
blob.upload_from_string(data=json.dumps(json_data), content_type='application/json')
with airflow.DAG(
"dag",
default_args=default_dag_args,
schedule_interval=None) as dag:
xml_to_json = PythonOperator(
task_id="transform_xml_to_json",
python_callable=transform_xml_file_to_json
)
gcs_to_bq = GCSToBigQueryOperator(
task_id="gcs_to_bq",
bucket="your-bucket",
source_objects=['*.json'],
destination_project_dataset_table="your_table",
schema_object=f"dags/schema/creditsafe/{data_type}.json",
source_format="NEWLINE_DELIMITED_JSON",
....,
)
xml_to_json >> gcs_to_bq
GCS
with Python clientBigQuery
Solution 2 :
All the work in the PythonOperator
:
Dicts
Python
client and insert_rows
methoddags_folder = os.getenv("DAGS_FOLDER")
xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'
def load_xml_file_to_bq():
import json, xmltodict
from google.cloud import bigquery
with open(xml_file_path_gcs) as xml_file:
data_dicts = xmltodict.parse(xml_file.read())
xml_file.close()
# Check if you have to transform data_dicts to the expected list of Dict. A Dict need to match exactly with the schema of the BQ table
client = bigquery.Client()
client.insert_rows_json('dataset.table', data_dicts)
with airflow.DAG(
"dag",
default_args=default_dag_args,
schedule_interval=None) as dag:
load_xml_file_to_bq = PythonOperator(
task_id="load_xml_file_to_bq",
python_callable=load_xml_file_to_bq
)
load_xml_file_to_bq
Check if you have to transform data_dicts
to the expected list of Dict
. A Dict
in the list needs to match exactly with the schema of the BigQuery
table.
You have to be carreful, if your xml file is too big, it's not recommended to load heavy elements in a node.