Search code examples

Is there any way to store the return value of a task in Python variable and share it with downstream tasks (without using xcom or airflow variable)

I am writing an airflow dag which will read a bunch of configs from the database and will then execute a series of Python scripts using bash operator. The configs which were read previously will be passed as arguments.

The problem is I am not getting an efficient way to share the config with the other downstream operators. I designed the below dag. Below are my concerns.

  1. I am not sure how many DB calls will be made to fetch the values required inside the jinja templates (in the below example).

  2. Besides as the config is the same in every task, I am not sure if it's a good idea to fetch it every time from the database. That's why I don't want to use xcom also. I used the airflow variable because the JSON parsing can happen in a single line. But still, the database call issue is there I guess.

class ReturningMySqlOperator(MySqlOperator):
    def execute(self, context):
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
        s = hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
        s = s.set_index('laptopName', drop=False)
        s = s.to_json(orient='index')
        Variable.set('jobconfig', s)

t1 = ReturningMySqlOperator(
    sql='SELECT * FROM laptops',

t3 = BashOperator(
    bash_command='python3 path/ {{var.json.jobconfig.Legion.laptopName}} {{}}',

t4 = BashOperator(
    bash_command='python3 path/ {{var.json.jobconfig.Legion.laptopName}} {{}}',

t5 = BashOperator(
    bash_command='python3 path/ {{var.json.jobconfig.Legion.laptopName}} {{}}',

t6 = BashOperator(
    bash_command='python3 path/ {{var.json.jobconfig.Legion.laptopName}} {{}}',

t1 >> t3 
t3 >> [t4,t6]


  • First point:

    I am not sure how many DB calls will be made to fetch the values required inside the jinja templates (in the below example).

    In the example you provided, you are making two connections to the metadata DB in each sequence_x task, one per each {{var.json.jobconfig.xx}} call. The good news is that those are not being executed by the scheduler so are not being done every heartbeat interval. From Astronomer guide:

    Since all top-level code in DAG files is interpreted every scheduler "heartbeat," macros and templating allow run-time tasks to be offloaded to the executor instead of the scheduler.

    Second point:

    I think the key aspect here is that the value you want to pass downstream is always the same and won't change after you executed T1. There may be a few approaches here, but if you want to minimize the number of calls to the DB, and avoid XComs at all, you should use the TriggerDagRunOperator.

    To do so you have to split your DAG into two parts, having the controller DAG with the task where you fetch the data from MySQL, triggering a second DAG where you execute all of the BashOperator using the values you obtained from the controller DAG. You can pass in the data using conf parameter.

    Here is an example based on the official Airflow example DAGs:

    Controller DAG:

    from airflow import DAG
    from airflow.models import Variable
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils.dates import days_ago
    def _data_from_mysql():
        # fetch data from the DB or anywhere else
        # set a Variable
        data = {'legion': {'company': 'some_company', 'laptop': 'great_laptop'}}
        Variable.set('jobconfig', data, serialize_json=True)
    dag = DAG(
        default_args={"owner": "airflow"},
    get_data_from_MySql = PythonOperator(
    trigger = TriggerDagRunOperator(
        # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Company is {{}}"},
    get_data_from_MySql >> trigger

    When the trigger task gets executed will include the key message as part of the configuration for the DAG run of the second DAG.

    Target DAG:

    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    dag = DAG(
        default_args={"owner": "airflow"},
    def run_this_func(**context):
        Print the payload "message" passed to the DagRun conf attribute.
        :param context: The execution context
        :type context: dict
        print("Remotely received value of {} for key=message".format(
    run_this = PythonOperator(
        task_id="run_this", python_callable=run_this_func, dag=dag)
    bash_task_1 = BashOperator(
        bash_command='echo "Here is the message: $message"',
        env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},

    The logs of bash_task_1 in this example will include:

    [2021-05-05 15:40:35,410] {} INFO - Running command: echo "Here is the message: $message"
    [2021-05-05 15:40:35,418] {} INFO - Output:
    [2021-05-05 15:40:35,419] {} INFO - Here is the message: Company is some_company
    [2021-05-05 15:40:35,420] {} INFO - Command exited with return code 0


    • One task to fetch data from DB and set it as a Variable
    • Trigger a second DAG passing the data from the Variable in conf
    • In your target DAG consume data from dag_run.conf

    This way you are only reading from metadaba DB once, when the second DAG is triggered.

    Also, to avoid repeating too much code during the BashOperator tasks definition you could do something like this:

    templated_bash_cmd = """
    python3 {{params.path_to_script}} {{dag_run.conf["laptopName"]}} {{dag_run.conf["company"]}}
    bash_task_1 = BashOperator(
            'path_to_script': 'path/'

    Let me know if that worked for you!