Search code examples
airflowgoogle-cloud-sqlgoogle-cloud-composer

CloudSqlExecuteQueryOperator does not return results to xcom


I have an airflow pipeline that does an UPSERT to a postgresql database on GCP cloud sql. Now, before I was using the PostgresOperator on a local database and everything was working fine, but needed to move my DAG execution to Cloud Composer with cloud sql. I followed a rather complicated set of steps to allow connectivity to my database through a pod in the cloud composer's kubernetes cluster. I can confirm that is working! I have some operators that successfully create tables in the database. although, My issue arises when I try to get values back from the CloudSqlExecuteQueryOperator.

I have the following templated SQL command...

WITH input_rows(plate_name, plate_type, number_wells) AS (
  VALUES
    {% for row in ti.xcom_pull(task_ids='get_message_rows', key='return_value') %}
    ('{{ row['plate'] }}', 'culture', 384) {{ ",\n" if not loop.last else "" }}
    {% endfor %}
    )
, ins AS (
  INSERT INTO plate (plate_name, plate_type, number_wells)
  SELECT * FROM input_rows
  ON CONFLICT (plate_name) DO NOTHING
  RETURNING plate_id, plate_name, plate_type, number_wells
  )
SELECT 'i' AS source
    , plate_id, plate_name, plate_type, number_wells
FROM ins
UNION ALL
SELECT 's' AS source
    , c.plate_id, input_rows.plate_name, input_rows.plate_type, input_rows.number_wells
FROM input_rows
JOIN plate c USING (plate_name);

the operator in question then is executed like this...

write_plates = CloudSQLExecuteQueryOperator(
        task_id="write_plates",
        sql=sql_plate_template,
        gcp_cloudsql_conn_id=connection_name, # environment variable AIRFLOW_CONN_<conn_id>
    )

# this operator fails because it is getting a NoneType
formatted_results = some_python_operator_with_decoration(write_plates.output)

write_plates >> formatted_results

I see the inserted rows in my table, and when I manually run this query I get return values how I expected. Does the CloudSqlExecuteQueryOperator simply not have the ability to return to XCOM? Should I set autocommit to true? is there a work around I have to do? I was hoping to get a return similar to the PostgresOperator where you can iterate through the rows and columns like so...

@task
def some_python_operator_with_decoration(postgres_results)
    results = []
    for row in postgres_results:
        results.append({'source': row[0], "id": row[1], "name": row[2]})
    return results

Solution

  • The CloudSqlExecuteQueryOperator does not return any queries, according to the documentation. you can work around this by deploying a pod by following the steps google has on connecting to cloud sql from kubernetes. This medium article does a decent job walking you through it as well as this github gist. Once you have the service running in your composer environment, you can treat it as a regular Postgres connection in airflow. Although I will add one thing. when you deploy the container running cloud-sql-proxy, make sure to explicitly tell cloud-sql-proxy to host it on "0.0.0.0". I had everything working but could not figure out why nothing could connect. It defaults to hosting it on "127.0.0.1". For some reason, my other services like airflow could connect only when I made this change.