Search code examples
pythonamazon-web-servicesairflowboto3throttling

Airflow - Set python operator op_kwargs variable with return value of python function


I have several DAG's, each DAG contains several EMR steps. I have created below function that is being used in all DAG's. The purpose of this function is to decide which EMR cluster has the least amount of active steps in RUNNING or PENDING state. The ID of this cluster is returned to a variable in the op_kwargs in the operator.

Below Python operator has some inputparameters for the Python function 'emr_step_adder', among other things the cluster ID which is returned by the function get_cluster_id(). Each operator must call the get_cluster_id() to decide what is the most suitable cluster on that moment. This is being done by the boto3 EMR function emr_client.list_steps().

The issue here is that my DAG's are creating a huge amount (about 5 per second) of API calls (I can see this in cloudtrail). This is caused by the emr_client.list_steps() function (I can see this in cloudtrail). It seems that the get_cluster_id() function is being executed while no DAG is in running state. It also throws a ThrottlingException which I can see in the airflow UI and logs.

I know this probably has to do with the DAG parsing. What do I need to change so that the get_cluster_id() function in the operator is only called when I run a DAG?

def get_cluster_id(**kwargs):
        emr_client = boto3.client("emr", 'us-east-1')
        cluster_details = emr_client.list_clusters(ClusterStates=['STARTING','BOOTSTRAPPING','RUNNING','WAITING'])['Clusters']
        cluster_list = []
        for i in range(len(cluster_details)):
            if cluster_details[i]['Name'] in ['cluster1', 'cluster2']: 
                cluster_id = cluster_details[i]['Id']
                nr_of_active_steps = len(emr_client.list_steps(ClusterId=cluster_id, StepStates=['PENDING','RUNNING'])['Steps'])
                cluster_list.append({'cluster_id':cluster_id, 'nr_of_active_steps':nr_of_active_steps})
        return min(cluster_list, key=lambda x:x['nr_of_active_steps'])['cluster_id']   

# Example operator
task1 = PythonOperator(
    task_id='task1',
    provide_context=True,
    python_callable=emr_step_adder,
    retries = 1,
    retry_delay=timedelta(minutes=2),
    op_kwargs={'task_name': 'task1'
        ,'cluster_id': get_cluster_id()
        ,'arg_list':['spark-submit', 's3://test/test.py']
        ,'timeout_in_seconds': 1800
    },
    dag=dag
)

Solution

  • Just move get_cluster_id inside emr_step_adder instead of passing it as argument.

    def emr_step_adder(task_name, arg_list, timeout_in_seconds, **context):
        cluster_id = get_cluster_id()
        [...]
    
    task1 = PythonOperator(
        task_id='task1',
        op_kwargs={'task_name': 'task1',
                   'arg_list': ['spark-submit', 's3://test/test.py'],
                   'timeout_in_seconds': 1800},
        [...]
    )
    

    This way it will only be executed when DAG is triggered and not on every parse of DAG file.