Search code examples
pythonjinja2airflow

Airflow: Implementing wait (sleep) task - efficiently


I need to implement the waiting task in Airflow. Waiting time is to be around a couple of hours.

First, TimeDeltaSensor is just not working.

SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
            task_id="sleep_for_11_min",
            delta=timedelta(minutes=SLEEP_MINUTES_1ST),                    
    )

For daily schedule like:

schedule_interval='30 06 * * *'

Just waits until next schedule:

[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come

That is painfully obvious in code: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43

(not to mention known bug when using schedule: None or @once)

Next try was with TimeSensor like this:

 SLEEP_MINUTES_1ST = 11
 sleep_task_1 = TimeSensor(
           task_id="sleep_for_11_min",
           provide_context=True,
           target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
           trigger_rule=TriggerRule.NONE_FAILED    
        )

And this actually worked well, but in poke mode it takes one worker for the whole time of the wait time. I received a suggestion to use reschedule mode but by just adding:

mode='reschedule',

generates new schedule on every reschedule check and never finishes like this:

[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 15:38:51,306] {time_sensor.py:39} INFO - Checking if the time (14:49:51.079783) has come
[2020-01-15 15:38:51,331] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
...
[2020-01-15 15:41:00,587] {time_sensor.py:39} INFO - Checking if the time (14:52:00.202168) has come
[2020-01-15 15:41:00,614] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.....

(note that Airflow is mixing UTC and my timezone UTC+1 in the log here)

The next try is to generate target_time for TimeSensor relative to the execution_date of the DAG. But several tries isn't successful like:

task_target_time = '{{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}' 
sleep_task_1 = TimeSensor(
          task_id=task_id="sleep_for_11_min",
          provide_context=True,
#         target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
#         target_time = task_target_time,
#         target_time=datetime.strptime('{{ execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST) }}','%Y-%m-%dT%H:%M:%S'),                        
#         target_time='{{ execution_date }}'+ timedelta(minutes=SLEEP_MINUTES_1ST),
          target_time = ('{{ task_instance.execution_date }}' + timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
              poke_interval=120,
              mode='reschedule',
              timeout=10*60*60,
              trigger_rule=TriggerRule.NONE_FAILED    
        )

In commented lines (target_time.... ) you can see just some of the combinations I have tried. Several failed immediately on DAG creation and some generate the error like this during the run:

[2020-01-15 17:56:39,388] {time_sensor.py:39} INFO - Checking if the time ({{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}) has come
[2020-01-15 17:56:39,389] {taskinstance.py:1058} ERROR - '>' not supported between instances of 'datetime.time' and 'str'
Traceback (most recent call last):
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
    while not self.poke(context):
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/time_sensor.py", line 40, in poke
    return timezone.utcnow().time() > self.target_time
TypeError: '>' not supported between instances of 'datetime.time' and 'str'
[2020-01-15 17:56:39,390] {taskinstance.py:1089} INFO - Marking task as FAILED.

I think I understand the whole theory - the task context including execution_date isn't available on the operator creation, only during runtime. The Jinja returns Pendulum object that should be converted to time, but Jinja is a String and I don't get Pendulum methods at creation time.

But WHY is it so hard to create simple:

sleep 1000

in Airflow.

(Airflow: v1.10.6, python 3.6.8)


Solution

  • TimeSensor goes into a reschedule loop because target_time is recomputed during each check of the constraint to a different value. This leads to the constraint never being fulfilled.

        target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
    

    In using TimeSensor this way, you must set target_time to a time value that is the latest time that you expect for a condition to have been satisfied.

    I suggest using TimeDeltaSensor in reschedule mode. It is okay to wait for the task to be scheduled, then reschedule it if it fulfils a constraint check or otherwise execute it.

    SLEEP_MINUTES_1ST = 11
    sleep_task_1 = TimeDeltaSensor(
        task_id="sleep_for_11_min",
        delta=timedelta(minutes=SLEEP_MINUTES_1ST),
        mode='reschedule'               
    )
    

    You could also subclass BaseSensorOperator similar to TimeSensor that does a liveliness check to see if the task has been released from sleep. For example,

    from airflow.sensors.base_sensor_operator import BaseSensorOperator
    from airflow.utils.decorators import apply_defaults
    from airflow.models.taskreschedule import TaskReschedule
    from airflow.utils.session import provide_session
    
    
    XCOM_KEY='start_date'
    
    class ReleaseProbe(BaseSensorOperator):
        """
        Waits until the time of job is released from sleep.
        :param sleep_duration: sleep duration of job before it runs 
        :type delta: datetime.timedelta
        """
    
        @apply_defaults
        def __init__(self, sleep_duration, *args, **kwargs):
            super(ReleaseProbe, self).__init__(*args, **kwargs)
            self.sleep_duration = sleep_duration
    
        def poke(self, context):
            self.log.info('Checking if task is released after (%s) sleep, execution date is:  %s', self.sleep_duration)
    
            ti = context['ti']
    
            start_date = ti.xcom_pull(key=XCOM_KEY, task_id=ti.task_id)
            if not start_date:
                ti.xcom_push(key=XCOM_KEY, value=timezone.now())
                return False
    
            return timezone.utcnow() - start_date > self.sleep_duration