Search code examples
pythonpostgresqlapache-beamgoogle-cloud-sql

Issues with connecting google cloud sql postgres instance from beam pipeline


I've been having some issues with a connection to a Postgresql instance on Google Cloud SQL, and wanted to ask for help. I'm not sure if a solution would be to initiate a connection engine or something of the sort, but here is my issue. My code is as follows

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

source_config = relational_db.SourceConfiguration(
    drivername='postgresql+pg8000',
    host='localhost',
    port=5432,
    username= USERNAME,
    password= PASSWORD,
    database= DB-NAME,
    create_if_missing=True,
)

table_config = relational_db.TableConfiguration(
            name=TABLE-NAME,
            create_if_missing=False,
            primary_key_columns=["key"],
            create_insert_f=FUNCTION,
        )

with beam.Pipeline(options= pipeline_options) as pipeline:    
update_pipe= (
    pipeline
    | 'QueryTable' >> beam.io.ReadFromBigQuery(
        table= TABLE)
    | 'UPDATE DB' >> relational_db.Write(source_config=source_config, table_config=table_config)
)

Running the code like this has resulted in the following error:

RuntimeError: sqlalchemy.exc.InterfaceError: (pg8000.exceptions.InterfaceError) Can't create a connection to host localhost and port 5432 (timeout is None and source_address is None).

I've read the documentation and some related questions on stackoverflow, where I've seen some suggestions like Private IP connectivity, or trying to authenticate with the Gcloud CLI , and other such things. But my confusion relates to the following. If I use sqlalchemy without trying to implement it in an apache beam pipeline, I don't get the same connection refusal. And one of the arguments explicitly defines the IP types to look for as Public

import sys  
import sqlalchemy
from google.cloud.sql.connector import Connector, IPTypes


# initialize Python Connector object
connector = Connector()

# Python Connector database connection function
def getconn():
    conn = connector.connect(
        CLOUD-SQL-CONNECTION-NAME, # Cloud SQL Instance Connection Name
        "pg8000",
        user=USERNAME,
        password=PASSWORD,
        db=DB-NAME,
        ip_type= IPTypes.PUBLIC  # IPTypes.PRIVATE for private IP
    )
    return conn

# create connection pool with 'creator' argument to our connection object function
pool = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
)

# interact with Cloud SQL database using connection pool
with pool.connect() as db_conn:
   
    result = db_conn.execute(sqlalchemy.text("SELECT * from users_copy LIMIT 10")).fetchall()

So my question is, is there a way to include the connection/engine in the Beam pipeline to avoid the error? Or do I need to change the source config arguments to include the Cloud SQL Instance connection name?

Thanks for the help and for reading my issue.


Solution

  • Are you locked into using the beam_nuggets.io library?

    If yes, then you can unfortunately not use the Cloud SQL Python Connector.

    Taking a look at the beam-nuggets code it uses SQLAlchemy under the hood which can be used by the Python Connector, however it configures the database engine purely using a database URL/URI. The Cloud SQL Python Connector requires leveraging SQLAlchemy's create_engine method with the creator argument. Beam nuggets would have to add support for the creator argument in order to support the Cloud SQL Python Connector.

    An alternative to the Cloud SQL Python Connector would be to try and deploy the Cloud SQL Proxy alongside your Beam application so that your initial code snippet would work connecting to localhost.

    Update: I put up a PR to try and add support for the creator argument to beam-nuggets, doesn't look like the library is actively maintained though.