Search code examples
google-cloud-platformgoogle-bigqueryairflowdirected-acyclic-graphsgoogle-cloud-composer

Apache Airflow - Custom Operator or Runtime parameters


I want to move the BQ data to Cloud SQL using Apache Airflow DAGs. I have implemented one DAG and it's working fine. I'm using BigQueryOperator, BigQueryToCloudStorageOperator, and CloudSqlInstanceImportOperator for this.

We have to similar activity for multiple tables, just the table name will change, rest process will be same. I have couple of questions on how should I implement this?

  1. Custom Operator - I tried this, but looks like the operators I mentioned above cannot be clubbed inside a custom operator. Can I do that somehow? I can pass the table name as parameter to custom operator and custom operator should have all above operators to move data.

  2. Runtime variables or environment - I am not able to find anything related to this, can I use this way?

Please suggest if any other way should be used.

Thank you in advance.


Solution

  • You can use a shared method that returns a TaskGroup.

    You have 2 possibilities :

    • This method can be put as an usual file in a shared folder in the Composer bucket : gs://composer_bucket/dags/my_folder/my_shared_file.py. To be able to share Python files between DAGs, you need to have a setup.py at the root of the dags folder in the Composer bucket : gs://composer_bucket/dags/setup.py
    • You can also create a shared library in Artifact registry containing the shared file and logic, then use it in your DAGs as a Python package.

    For simplicity, I show an example with the method and logic directly on the DAG file :

    import airflow
    from airflow.operators.dummy import DummyOperator
    from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
    from airflow.utils.task_group import TaskGroup
    from jinja2 import Template
    
    
    def get_jinja_template(file_path: str) -> Template:
        with open(file_path) as fp:
            return Template(fp.read())
    
    
    def execute_query_for_table(table: str) -> TaskGroup:
        with TaskGroup(group_id=f'my_group_{table}') as group:
            query_path = f'{dag_folder}/your_dag_folder/{table}/query.sql'
    
            query = get_jinja_template(query_path).render(
                project_id='project',
                param2='param2'
            )
    
            execute_query = BigQueryInsertJobOperator(
                task_id=f'Execute_query_{table}',
                configuration={
                    "query": {
                        "query": query,
                        "useLegacySql": False,
                    }
                },
                location='EU'
            )
    
            next_task = DummyOperator(task_id=f'Next_{table}')
    
            execute_query >> next_task
    
        return group
    
    
    with airflow.DAG(
            "your_dag",
            default_args=your_args,
            schedule_interval=None) as dag:
        start = DummyOperator(task_id="Start")
    
        start >> execute_query_for_table('table1')
    
    • I have a method that returns a TaskGroup and contains the shared logic and the set of operators
    • In the DAG, I can call the method returning the TaskGroup as an usual operator.