Search code examples
google-cloud-platformairflowgoogle-cloud-composer

Create Dynamic task mapping in Airflow from BigQueryGetDataOperator


Disclaimer: i am a fairly new noob to Airflow - and would love each advise ^^

soo i need to read a data from BigQuery (in this case: a list of project_ids)

and from that information create a dynamically mapped task that will run operations on each project in that list: (in my example just printing the Project id in a new task)

this is my DAG config:

`

import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryGetDataOperator
from airflow import XComArg

def printer_task(data_list):
    list = []
    for value in data_list:
        project_id = value.get('project')
        list.append(project_id)
        print(f"project_ID: {list}")

def fetch_data_from_bq():
    return BigQueryGetDataOperator(
        task_id="fetch_data_from_bq",
        project_id="Project-X",
        dataset_id="Dataset-X",
        table_id="Table-X",
        gcp_conn_id="bigquery_default",
        max_results=10,
        selected_fields='project',
        #as_dict=True
    )

with models.DAG(
        dag_id="dummy-task",
        default_args=default_args,
        schedule_interval='@daily',
) as dag:

    get_data = fetch_data_from_bq()
    start_task = DummyOperator(task_id="start_task", dag=dag)
    end_task = DummyOperator(task_id="end_task", dag=dag)

    create_mapped_tasks = PythonOperator.partial(
        task_id="create_mapped_tasks",
        python_callable=printer_task
    ).expand(op_args=XComArg(get_data))

    start_task >> get_data >> create_mapped_tasks >> end_task

in the picture you can see 10 mapped tasks being created but they all fail

can someone please explain to me why? ^^ and maybe help me with a solution?

cheers and than for your help

already tried different approaches but i always get this error message in the mapped tasks [2024-02-13, 00:02:50 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 27244 for task create_mapped_tasks ('str' object has no attribute 'get'; 1423691)

enter image description here


Solution

  • When you expand an Airflow operator with a list of values (the list of fetched project_ids), each task receives and processes an element (not a list of a single element).

    To fix your issue, you can try:

    def printer_task(value):
        print(f"project_ID: {value}")