We are using pg8000
as the driver for inserting records into our Cloud SQL Postgres instance. Our Beam code is in Python and uses the library sqlalchemy
to interact with the database. We find pg8000
too slow and tested asyncpg
instead locally (no Beam code) with promising results. We used this as a reference in using asyncpg:
https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/blob/main/README.md
Can we use asynchronous functions like asyncpg
in establishing DB connection within a Beam ParDo? How do we structure the PTransform's functions like setup
(establishing an async connection), start_bundle
(start the transaction), process
(perform SQL operation), finish_bundle
(commit/rollback the transaction), tear_down
(close the connection)?
Yes, it is possible to use asynchronous functions if you are careful. Here is what you have to be careful of:
finishBundle
, so that all processing is done before the bundle is committed as "done".