Search code examples
airflowgoogle-cloud-pubsubgoogle-cloud-composer

Airflow PubSubPullSensor/Operator: How to can we acknowledge a gcp PubSub Message by downstream tasks when "ack_messages" flag is set to False


I am trying to implement PubSubPullSensor and PubSubPullOperator to process message from a GCP PubSub Topic and need to acknowledge the message only if downstream task is able to successfully handle that message . However i am unable to find any good examples to acknowledge each message separately from downstream tasks, are there any Operators Built-in that i can leverage or i should implement using PubSub APIs using a PythonOperator task?.

The documentation says

If ack_messages is set to True, messages will be immediately acknowledged before being returned, otherwise, downstream tasks will be responsible for acknowledging them.


Solution

  • After a little bit of research this is how i did it. Manual acknowledgement can be achieved by providing a callback method to PullSensor or PullOperator and handle that acknowledge logic inside the callback method by leveraging PubSubHook().acknowledge method. A sample example using PullOperator is given below. While working on this solution i have faced some issues with the arguments passed to PubSubHook().acknowledge method, please see this link for more details.

    from __future__ import annotations
    import os
    from datetime import datetime
    import base64
    import airflow
    from airflow import DAG
    import json
    from airflow.operators.bash import BashOperator
    from airflow.providers.google.cloud.operators.pubsub import (
        PubSubCreateSubscriptionOperator,
        PubSubPullOperator,
    )
    from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
    from  airflow.providers.google.cloud.hooks.pubsub import PubSubHook,Retry
    
    
    ENV_ID = "Dev" #os.environ.get("SYSTEM_TESTS_ENV_ID")
    PROJECT_ID = "abcdef" #os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "your-project-id")
    DAG_ID = "DataPullDag_1"
    TOPIC_ID = "alert_topic_jp" #f"topic-{DAG_ID}-{ENV_ID}"
    SNOW_SUBSCRIPTION="alert_subscription_jp"
    
    def print_ack_messages(pulled_messages, context):
        retryObj = Retry(initial=10, maximum=10, multiplier=1.0, deadline=600)
        for idx,m in enumerate(pulled_messages):
          data = m.message.data.decode('utf-8')
          print(f'################{data}')     
          data_json_dict = json.loads(data)     
          print(f"AckID: { m.ack_id }, incident_id: { data_json_dict['incident']['incident_id'] }"
           f"scoping_project_id: { data_json_dict['incident']['scoping_project_id'] } "
           f"resource_name: { data_json_dict['incident']['resource_name'] } "
           f"summary: { data_json_dict['incident']['summary'] } ")
           
          ack_id_list = [m.ack_id]
          print(type(ack_id_list))
          #remove this if condition for acknowledging all the messages
          if idx == 0:
            PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, ack_ids=ack_id_list, retry=retryObj, timeout=10)
            print(f"Successfully acknowldeged incident_id: { data_json_dict['incident']['incident_id'] }")
    
    
    with DAG(
        DAG_ID,
        schedule_interval='@once',  # Override to match your needs
        start_date=airflow.utils.dates.days_ago(0),
        catchup=False,
    ) as dag:
    # [START howto_operator_gcp_pubsub_create_subscription]
      subscribe_task = PubSubCreateSubscriptionOperator(
      task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID,subscription=SNOW_SUBSCRIPTION
      )
      
      subscription = subscribe_task.output
    
      pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        ack_messages=False,
        project_id=PROJECT_ID,       
        messages_callback=print_ack_messages,
        subscription=subscription,
        max_messages=50,
      )
      (
        subscribe_task         
        >> pull_messages_operator    
        
      )