I want to move the BQ data to Cloud SQL using Apache Airflow DAGs. I have implemented one DAG and it's working fine. I'm using BigQueryOperator, BigQueryToCloudStorageOperator, and CloudSqlInstanceImportOperator for this.
We have to similar activity for multiple tables, just the table name will change, rest process will be same. I have couple of questions on how should I implement this?
Custom Operator - I tried this, but looks like the operators I mentioned above cannot be clubbed inside a custom operator. Can I do that somehow? I can pass the table name as parameter to custom operator and custom operator should have all above operators to move data.
Runtime variables or environment - I am not able to find anything related to this, can I use this way?
Please suggest if any other way should be used.
Thank you in advance.
You can use a shared method that returns a TaskGroup
.
You have 2 possibilities :
Composer bucket
: gs://composer_bucket/dags/my_folder/my_shared_file.py
. To be able to share Python
files between DAGs, you need to have a setup.py
at the root of the dags
folder in the Composer
bucket : gs://composer_bucket/dags/setup.py
Artifact registry
containing the shared file and logic, then use it in your DAGs as a Python
package.For simplicity, I show an example with the method and logic directly on the DAG file :
import airflow
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.task_group import TaskGroup
from jinja2 import Template
def get_jinja_template(file_path: str) -> Template:
with open(file_path) as fp:
return Template(fp.read())
def execute_query_for_table(table: str) -> TaskGroup:
with TaskGroup(group_id=f'my_group_{table}') as group:
query_path = f'{dag_folder}/your_dag_folder/{table}/query.sql'
query = get_jinja_template(query_path).render(
project_id='project',
param2='param2'
)
execute_query = BigQueryInsertJobOperator(
task_id=f'Execute_query_{table}',
configuration={
"query": {
"query": query,
"useLegacySql": False,
}
},
location='EU'
)
next_task = DummyOperator(task_id=f'Next_{table}')
execute_query >> next_task
return group
with airflow.DAG(
"your_dag",
default_args=your_args,
schedule_interval=None) as dag:
start = DummyOperator(task_id="Start")
start >> execute_query_for_table('table1')
TaskGroup
and contains the shared logic and the set of operatorsTaskGroup
as an usual operator.