Search code examples
airflowuuidhttp-getairflow-2.x

Using Airflow's HttpSensor with a unique X-Request-ID header on each request


I want to monitor an endpoint via a HTTP GET Request until I see a "SUCCESS".

The endpoint requires a unique X-Request-ID header on each request. If I do not include this field, or if I send the same UUID twice, I get a 400 Bad Request back. I tried:

monitor_job = HttpSensor(
    task_id = 'monitor_job',
    http_conn_id='',
    endpoint='http://some_endpoint',
    request_params={},
    response_check=lambda response: 'SUCCESS' in response.text,
    poke_interval=5,
    dag=dag,
    headers={
        'X-Request-ID': str(uuid.uuid4()) #returns a random uuid
    }
)

I am seeing that the first GET request works fine, it waits 5 seconds, but the next GET request fails, as it tries to send a request with the same GUID. I'd need it to send a new value in the X-Request-ID header on each request.

Is this possible with HttpSensor or otherwise?


The best alternative approach I can think of would be to move the GET request into a loop in python code (probably using the requests library), and use a PythonSensor. This is more code to write and it feels like a workaround.

I am not currently using http_conn_id just to match style with related code in the codebase. I can use it if it would help.

I'm running on Airflow v2.2.2


Solution

  • If you run the sensor in mode poke, the task will be created once, and put to sleep between pokes. In this case, uuid.uuid4() will be called once and you will have the same uuid for all the queries:

    • you can change the mode to reschedule:
    monitor_job = HttpSensor(
        task_id = 'monitor_job',
        http_conn_id='',
        endpoint='http://some_endpoint',
        request_params={},
        response_check=lambda response: 'SUCCESS' in response.text,
        poke_interval=5,
        dag=dag,
        headers={
            'X-Request-ID': str(uuid.uuid4()) #returns a random uuid
        },
       mode="reschedule",
    )
    
    • you can override the the sensor code to change the headers value:
    class MyHttpSensor(HttpSensor):
        def poke(self, context: Context) -> bool:
            self.headers = {
                'X-Request-ID': str(uuid.uuid4())
            }
            return super().poke(context)
    monitor_job = MyHttpSensor(
        task_id = 'monitor_job',
        http_conn_id='',
        endpoint='http://some_endpoint',
        request_params={},
        response_check=lambda response: 'SUCCESS' in response.text,
        poke_interval=5,
        dag=dag,
    )
    
    • you can also override the sensor and call the method render_template_fields on each poke, then provide the X-Request-ID as a jinja template:
    class MyHttpSensor(HttpSensor):
        def poke(self, context: Context) -> bool:
            self.render_template_fields(context)
            return super().poke(context)
    monitor_job = MyHttpSensor(
        task_id = 'monitor_job',
        http_conn_id='',
        endpoint='http://some_endpoint',
        request_params={},
        response_check=lambda response: 'SUCCESS' in response.text,
        poke_interval=5,
        dag=dag,
        headers={
            'X-Request-ID': "{{ uuid.uuid4() }}"
        },
    )
    

    I recommend the second option, but if your API takes much time to be "SUCCESS", then the first one is the best with poke_interval >= 60, in order to release the worker slot and let the worker runs other tasks.