Search code examples
pythongoogle-bigquerygoogle-cloud-storageairflowdirected-acyclic-graphs

How to load a BigQuery table from a file in GCS Bucket using Airflow?


I am new to Airflow, and I am wondering, how do I load a file from a GCS Bucket to BigQuery?

So far, I have managed to do BigQuery to GCS Bucket:

bq_recent_questions_query = bigquery_operator.BigQueryOperator(
    task_id='bq_recent_questions_query',
    sql="""
    SELECT owner_display_name, title, view_count
    FROM `bigquery-public-data.stackoverflow.posts_questions`
    WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
        AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
    ORDER BY view_count DESC
    LIMIT 100
    """.format(max_date=max_query_date, min_date=min_query_date),
    use_legacy_sql=False,
    destination_dataset_table=bq_recent_questions_table_id)

# Export query result to Cloud Storage.
export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
    task_id='export_recent_questions_to_gcs',
    source_project_dataset_table=bq_recent_questions_table_id,
    destination_cloud_storage_uris=[output_file],
    export_format='CSV')

Can someone help me to modify my current code, so I can load a file from a GCS Bucket and load it to BigQuery?


Solution

  • For your requirement, you can use GCSToBigQueryOperator which is an operator in airflow to transfer files from Cloud Storage to BigQuery.For more information, check this link. You can try the below code.

    gcs_to_bq_operator.py

    import os
    from airflow import DAG
    from airflow import models
    from airflow.providers.google.cloud.operators.bigquery import (
        BigQueryCreateEmptyDatasetOperator,
        BigQueryDeleteDatasetOperator,
    )
    from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
    from airflow.utils.dates import days_ago
    
    DATASET_NAME = os.environ.get("GCP_DATASET_NAME", 'new-dataset')
    TABLE_NAME = os.environ.get("GCP_TABLE_NAME", 'Country')
    
    dag = models.DAG(
        dag_id='gcs_to_bq_operator',
        start_date=days_ago(2),
        schedule_interval='@once',
        tags=['example'],
    )
    
    create_test_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME, dag=dag
    )
    
    # [START howto_operator_gcs_to_bigquery]
    load_csv = GCSToBigQueryOperator(
        task_id='gcs_to_bigquery_example',
        bucket='sample-bucket',
        source_objects=['cloud storage URI'],
        destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
        schema_fields=[
            {'name': 'Year', 'type': 'INTEGER', 'mode': 'NULLABLE'},
            {'name': 'Country', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'number', 'type': 'INTEGER', 'mode': 'NULLABLE'},
            {'name': 'result', 'type': 'INTEGER', 'mode': 'NULLABLE'}
        ],
        write_disposition='WRITE_TRUNCATE',
        dag=dag,
    )