Search code examples
pythongoogle-cloud-platformgoogle-bigquerygoogle-cloud-functionsairflow

Airflow: How to Load an XML File to BigQuery?


I am trying to write an Airflow DAG, which will load an .XML file to BigQuery using a Python method, but I believe that it needs to be converted into .json for it to work.

I have written the following code to convert an .XML file to .json, but I am unsure if this is correct, and how I can write a Python method to achieve this:

import json, xmltodict

def load_xml_to_bq():
    
with open("xml_file.xml") as xml_file:
    data_dict = xmltodict.parse(xml_file.read())
    xml_file.close()
json_data = json.dumps(data_dict)
with open("data.json", "w") as json_file:
        json_file.write(json_data)
        json_file.close()

Additionally, does the .XML file need to be loaded from a GCS Bucket for it to be inserted into BigQuery?

Furthermore, would the following code need to be added to my function for the .XML to be inserted into BigQuery?

    client = bigquery.Client()
    client.insert_rows_json(f'{dataset}.{table}', dec_sale_list)

Thanks - any help on how to accomplish this would be helpful; I feel that I have some of the correct concepts, but I am not sure what I need to add/remove to do this.


Solution

  • You can also use the 2 following solutions.

    Solution 1 :

    • PythonOperator to transform your xml file to json with pure Python code and a lib called xmltodict
    • Then use GCSToBigqueryOperator from the Json file to Bigquery
    dags_folder = os.getenv("DAGS_FOLDER")
    xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'
    
    def transform_xml_file_to_json():
        import json, xmltodict
        from google.cloud import storage
    
        with open(xml_file_path_gcs) as xml_file:
            data_dict = xmltodict.parse(xml_file.read())
            xml_file.close()
    
        json_data = json.dumps(data_dict)
    
        client = storage.Client()
        bucket = client.get_bucket("your-bucket")
        blob = bucket.blob('your_file.json')
        blob.upload_from_string(data=json.dumps(json_data), content_type='application/json')
    
    with airflow.DAG(
            "dag",
            default_args=default_dag_args,
            schedule_interval=None) as dag:
    
        xml_to_json = PythonOperator(
            task_id="transform_xml_to_json",
            python_callable=transform_xml_file_to_json
        )
    
        gcs_to_bq = GCSToBigQueryOperator(
           task_id="gcs_to_bq",
           bucket="your-bucket",
           source_objects=['*.json'],
           destination_project_dataset_table="your_table",
           schema_object=f"dags/schema/creditsafe/{data_type}.json",
           source_format="NEWLINE_DELIMITED_JSON",
        ....,
       )
    
       xml_to_json >> gcs_to_bq
    
    • The first operator transforms the xml to json file with the xmltodict lib and upload the json file to GCS with Python client
    • The second operator loads the json file to BigQuery

    Solution 2 :

    All the work in the PythonOperator :

    • Load the XML file
    • Transform it to list of Dicts
    • Use Python client and insert_rows method
    dags_folder = os.getenv("DAGS_FOLDER")
    xml_file_path_gcs = f'{dags_folder}/your_xml_file.xml'
    
    def load_xml_file_to_bq():
        import json, xmltodict
        from google.cloud import bigquery
    
        with open(xml_file_path_gcs) as xml_file:
            data_dicts = xmltodict.parse(xml_file.read())
            xml_file.close()
        
        # Check if you have to transform data_dicts to the expected list of Dict. A Dict need to match exactly with the schema of the BQ table
        client = bigquery.Client()
        client.insert_rows_json('dataset.table', data_dicts)
    
    
    with airflow.DAG(
            "dag",
            default_args=default_dag_args,
            schedule_interval=None) as dag:
    
        load_xml_file_to_bq = PythonOperator(
            task_id="load_xml_file_to_bq",
            python_callable=load_xml_file_to_bq
        )
    
        load_xml_file_to_bq
    

    Check if you have to transform data_dicts to the expected list of Dict. A Dict in the list needs to match exactly with the schema of the BigQuery table.

    You have to be carreful, if your xml file is too big, it's not recommended to load heavy elements in a node.