I'm trying to subscribe to a GCP PubSub to get the messages from GCP Composer but got the following error:
airflow.exceptions.AirflowException: The conn_id `google_cloud_default` isn't defined
My DAG code is the following:
subscribe_pubsub_task = PubSubPullSensor(
task_id="subscribe_pubsub",
project_id=GCP_REPORTING_ID,
subscription=PUBSUB_SUBSCRIPTION,
ack_messages=True,
deferrable=True,
)
I've reviewed the Airflow docs: https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/pubsub/index.html
How to get connection id using GCP Service Account to authenticate?
Solved with custom Airflow connection for GCP.
The connection implementation looks like the following code:
PUBSUB_CONNECTION = 'gcp_pubsub_reporting'
PUBSUB_CLIENT = Connection(
conn_id=PUBSUB_CONNECTION,
conn_type='google_cloud_platform'
)
scopes = [
"https://www.googleapis.com/auth/pubsub",
"https://www.googleapis.com/auth/cloud-platform",
]
conn_extra = {
"extra__google_cloud_platform__scope": ",".join(scopes),
"extra__google_cloud_platform__project": GCP_REPORTING_ID,
"extra__google_cloud_platform__keyfile_dict": json.loads(GCP_PUBSUB_CREDENTIALS),
}
conn_extra_json = json.dumps(conn_extra)
PUBSUB_CLIENT.set_extra(conn_extra_json)
if not (session.query(Connection).filter(Connection.conn_id == PUBSUB_CLIENT.conn_id).first()):
session.add(PUBSUB_CLIENT)
session.commit()
else:
msg = f'Airflow connection with conn_id: {PUBSUB_CLIENT.conn_id} already exists'
print(msg)
The task implementation looks like the following code:
subscribe_pubsub_task = PubSubPullSensor(
task_id="subscribe_pubsub",
project_id=GCP_REPORTING_ID,
subscription=PUBSUB_SUBSCRIPTION,
gcp_conn_id=PUBSUB_CONNECTION,
ack_messages=True,
deferrable=False,
max_messages=1,
mode='poke',
poke_interval=20
)