Search code examples
pythongoogle-cloud-platformairflowgoogle-cloud-pubsub

Connection error from GCP Composer (Airflow) to GCP PubSub


I'm trying to subscribe to a GCP PubSub to get the messages from GCP Composer but got the following error:

airflow.exceptions.AirflowException: The conn_id `google_cloud_default` isn't defined

My DAG code is the following:

subscribe_pubsub_task = PubSubPullSensor(
  task_id="subscribe_pubsub",
  project_id=GCP_REPORTING_ID,
  subscription=PUBSUB_SUBSCRIPTION,
  ack_messages=True,
  deferrable=True,
)

I've reviewed the Airflow docs: https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/pubsub/index.html

How to get connection id using GCP Service Account to authenticate?


Solution

  • Solved with custom Airflow connection for GCP.

    The connection implementation looks like the following code:

    PUBSUB_CONNECTION = 'gcp_pubsub_reporting'
    PUBSUB_CLIENT = Connection(
      conn_id=PUBSUB_CONNECTION,
      conn_type='google_cloud_platform'
    )
    scopes = [
      "https://www.googleapis.com/auth/pubsub",
      "https://www.googleapis.com/auth/cloud-platform",
    ]
    conn_extra = {
      "extra__google_cloud_platform__scope": ",".join(scopes),
      "extra__google_cloud_platform__project": GCP_REPORTING_ID,
      "extra__google_cloud_platform__keyfile_dict": json.loads(GCP_PUBSUB_CREDENTIALS),
    }
    conn_extra_json = json.dumps(conn_extra)
    PUBSUB_CLIENT.set_extra(conn_extra_json)
    
    if not (session.query(Connection).filter(Connection.conn_id == PUBSUB_CLIENT.conn_id).first()):
      session.add(PUBSUB_CLIENT)
      session.commit()
    else:
      msg = f'Airflow connection with conn_id: {PUBSUB_CLIENT.conn_id} already exists'
      print(msg)
    

    The task implementation looks like the following code:

    subscribe_pubsub_task = PubSubPullSensor(
        task_id="subscribe_pubsub",
        project_id=GCP_REPORTING_ID,
        subscription=PUBSUB_SUBSCRIPTION,
        gcp_conn_id=PUBSUB_CONNECTION,
        ack_messages=True,
        deferrable=False,
        max_messages=1,
        mode='poke',
        poke_interval=20
    )