Search code examples
concurrencycronairflow

Airflow: Dag scheduled twice a few seconds apart


I am trying to run a DAG only once a day at 00:15:00 (midnight 15 minutes), yet, it's being scheduled twice, a few seconds apart.

dag = DAG(
    'my_dag',
    default_args=default_args,
    start_date=airflow.utils.dates.days_ago(1) - timedelta(minutes=10),
    schedule_interval='15 0 * * * *',
    concurrency=1,
    max_active_runs=1,
    retries=3,
    catchup=False,
)

The main goal of that Dag is check for new emails then check for new files in a SFTP directory and then run a "merger" task to add those new files to a database.

All the jobs are Kubernetes pods:

email_check = KubernetesPodOperator(
    namespace='default',
    image="g.io/email-check:0d334adb",
    name="email-check",
    task_id="email-check",
    get_logs=True,
    dag=dag,
)
sftp_check = KubernetesPodOperator(
    namespace='default',
    image="g.io/sftp-check:0d334adb",
    name="sftp-check",
    task_id="sftp-check",
    get_logs=True,
    dag=dag,
)
my_runner = KubernetesPodOperator(
    namespace='default',
    image="g.io/my-runner:0d334adb",
    name="my-runner",
    task_id="my-runner",
    get_logs=True,
    dag=dag,
)
my_runner.set_upstream([sftp_check, email_check])

So, the issue is that there seems to be two runs of the DAG scheduled a few seconds apart. They do not run concurrently, but as soon as the first one is done, the second one kicks off.

The problem here is that the my_runner job is intended to only run once a day: it tries to create a file with the date as a suffix, and if the file already exists, it throws an exception, so that second run always throws an exception (because the file for the day has already been properly created by the first run)

Since an image (or two) are worth a thousand words, here it goes:

You'll see that there's a first run that is scheduled "22 seconds after 00:15" (that's fine... sometimes it varies a couple of seconds here and there) and then there's a second one that always seems to be scheduled "58 seconds after 00:15 UTC" (at least according to the name they get). So the first one runs fine, nothing else seems to be running... And as soon as it finishes the run, a second run (the one scheduled at 00:15:58) starts (and fails).

A "good" one:

enter image description here

A "bad" one:

enter image description here


Solution

  • It looks like setting the start_date to 2 days ago instead of 1 did the trick

    dag = DAG(
        'my_dag',
        ...
        start_date=airflow.utils.dates.days_ago(2),
        ...
    )
    

    I don't know why.

    I just have a theory. Maaaaaaybe (big maybe) the issue was that because.days_ago(...) sets a UTC datetime with hour/minute/second set to 0 and then subtracts whichever number of days indicated in the argument, just saying "one day ago" or even "one day and 10 minutes ago" didn't put the start_date over the next period (00:15) and that was somehow confusing Airflow?

    Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

    https://airflow.readthedocs.io/en/stable/scheduler.html#scheduling-triggers

    So, the end of the period would be 00:15... If my theory was correct, doing it airflow.utils.dates.days_ago(1) - timedelta(minutes=16) would probably also work.

    This doesn't explain why if I set a date very far in the past, it just doesn't run, though. ¯\_(ツ)_/¯