Search code examples
pythonamazon-web-servicesairflowaws-batch

How to update an AwsBatchOperator task code in Airflow?


I have an airflow-scheduler and airflow-webserver deployed on an EC2 machine on AWS. I use this airflow-scheduler to execute a DAG with an AwsBatchOperator task. This task executes a python script present on EC2 machine. Here is the code of the DAG:

from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.operators.batch import AwsBatchOperator

default_args = {
    'owner': 'admin',
    'concurrency': 3,
    'depends_on_past': True,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': None,
    'end_date': None,
    'schedule_interval': None,
}

dag = DAG(
    dag_id='my-dag',
    default_args=default_args,
    description='My DAG',
    schedule_interval='00 03 * * *',
    start_date=days_ago(1),
    tags=['dev'],
)

task = AwsBatchOperator(
  dag=dag,
  job_name= 'my-job-name',
  job_definition= 'arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name',
  job_queue= 'arn:aws:batch:eu-central-1:XXXX:job-queue/my-job-name',
  region_name= 'eu-central-1',
  task_id= 'my-task-id',
  overrides={
    'command': ['python3', './my_python_script.py']
  },
  parameters= {}
)

The python script my_python_script.py is on the EC2 machine where airflow is deployed, in directory /home/ubuntu.

I had a typo raising an error in this python script. I corrected it and push the corrected script on EC2 machine. However when I execute the DAG, I still get the error caused by the typo I corrected. So here is my question:

How can I refresh my DAG to ensure that it uses the version of my script that is present on my EC2 machine?

What I've tried

  • Refresh DAG by clicking on "refresh" button on Airflow web interface
  • Wait for automatic DAG refresh by airflow-scheduler
  • Delete DAG and wait for refresh
  • Recompile python script on EC2 machine with command python -m compileall

Solution

  • To update code of an AwsBatchOperator task, you need to update docker image used by your AWS batch job with your new code and not the code present on EC2 machine where airflow is deployed

    AwsBatchOperator can execute code present in AWS batch job definition docker image but cannot execute code present in EC2 machine where airflow is deployed.

    When you use AwsBatchOperator, you set which job definition you want to use. For my case, it is arn:aws:batch:eu-central-1:XXXX:job-definition/my-job-name. This job definition contains the docker image where your command will be executed. See https://docs.aws.amazon.com/en_en/batch/latest/userguide/Batch_GetStarted.html

    I was confused as it was the same codebase that was present on EC2 machine and docker image. So instead of updating code on EC2 machine where airflow is deployed, I just have to create a new docker image with the new version of my code and make my AWS batch job uses this new image.