Search code examples
sqlalchemypython-asynciogoogle-cloud-dataflowapache-beamasyncpg

Is it possible to run functions from asynchronous libraries like asyncpg in Beam ParDo?


We are using pg8000 as the driver for inserting records into our Cloud SQL Postgres instance. Our Beam code is in Python and uses the library sqlalchemy to interact with the database. We find pg8000 too slow and tested asyncpg instead locally (no Beam code) with promising results. We used this as a reference in using asyncpg:

https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/blob/main/README.md

Can we use asynchronous functions like asyncpg in establishing DB connection within a Beam ParDo? How do we structure the PTransform's functions like setup (establishing an async connection), start_bundle (start the transaction), process (perform SQL operation), finish_bundle (commit/rollback the transaction), tear_down (close the connection)?


Solution

  • Yes, it is possible to use asynchronous functions if you are careful. Here is what you have to be careful of:

    • Beam objects passed to your DoFn are generally not intended for concurrent use, so you should not pass Beam objects to the concurrent functions. Instead, you should just use the functions to do generic processing and keep all the Beam-specific logic on the main thread.
    • You need to wait for all the asynchronous tasks to finish in finishBundle, so that all processing is done before the bundle is committed as "done".