Search code examples
pythonscopeairflowairflow-webserver

How can I access a variable defined in a function[airflow Pythonoperator called function] and use it outside the airflow-template scope?


## Section 1 | Import Modules
## Section 2 | DAG Default Arguments
## Section 3 | Instantiate the DAG
## Section 4 | defining Utils
## Section 5 | Task defining
## Section 6 | Defining dependecies
 
 
## Section 1 | Import Modules
from airflow import DAG
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
 
## Section 2 | DAG Default Arguments
default_args = {
    'owner': 'Sourav',
    'depends_on_past': False,
    'start_date': datetime(2021, 6, 11),
    'retries': 0,
}
 
## Section 3 | Instantiate the DAG
dag = DAG('basic_skeleton',
        description='basic skeleton of a DAG',
        default_args=default_args,
        schedule_interval=None,
        catchup=False,
        tags=['skeleton'],
        )
 
x = 0
## Section 4 | defining Utils
def print_context(**kwargs):
    print("hello world")
    return "hello world!!!"
 
def sum(**kwargs):
    c = 1+2
    return c
 
def diff(**kwargs):
    global c
    c = 2-1
    return c
 
## Doubts
x = c
y = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf
 
## Section 5 | Task defining
with dag:
    t_printHello_prejob = PythonOperator(
        task_id='t_printHello_prejob',
        provide_context=True,
        python_callable=print_context,
        dag=dag,
    )
 
    t_sum_job = PythonOperator(
        task_id='t_sum_job',
        python_callable=sum,
        provide_context=True,
        dag=dag
    )
 
    ## Section 6 | Defining dependecies
    t_printHello_prejob>>t_sum_job

Now, I need to know 2 things:

  1. x = c, I am trying to use this variable x to define a for-loop for the number of times the next task needs to shoot. Somehow, the Airflow UI is rendered from a basic compiled .py file and x is loaded with a value of 0 instead of 1, even if I do global c in the function. Sometimes, by chance, airflow UI shows the value of 1. I want to know the logic behind it. How can I get control over the global variable?

  2. for each dagrun, I want to get the conf out of the airflow-template scope and use it in the global python region[non-airflow template]. I understand, I can use jinja macros in the airflow templates. But, I need to access the conf outside the scope of airflow. y = dag.get_dagrun(execution_date=dag.get_latest_execution_date()).conf This statement gives me the latest dag_run conf. But, for me, I have multiple DAG_runs running at the same time, so can I get the current dag_run conf in this variable for that dagrun?


Solution

  • Sourav, tell me if this helps:

    In an Airflow DAG we generally don't share data between tasks, even though it's technically possible. We're encouraged to keep every task idempotent, not unlike a "pure function" in functional programming. This means that given an input x, a given task will always create the same result.

    The DAG you're defining here is basically a blueprint for a data pipeline. When the DAG and tasks are evaluated by the Airflow scheduler, the functions which will be called by the tasks are... well, not yet called. Intuitively, therefore I would expect x to always equal zero, and while it's an interesting mystery to unravel why it isn't always, mutating global variables during a DAG run isn't what Airflow is set up to do.

    That said, one simple way to reliably mutate x or c and use it across tasks is to store it in an Airflow variable:

    from airflow.models.variable import Variable
    ...
    
    Variable.set('x', 0)
    ...
    
    def sum(**kwargs):
        c = 1+2
        return c
     
    def diff(**kwargs):
        c = 2-1
        Variable.set('c', c)
        return c
    
    def a_func_that_uses_c(**kwargs):
        """make sure this function is called in a task _after_ the task calling `diff`"""
        c = Variable.get('c')
        ...
    

    One gotcha is that Airflow variables are strings, so if you're storing an integer, as here, you'll need to eval(c) or int(c) to get it as such.