Search code examples
google-cloud-platformgoogle-cloud-dataflowpsycopg2apache-beamdataflow

Why is ParDo not working on DataflowRunner?


I'm having a problem when I switch from the DirectRunner to Dataflowrunner: Pardo apparently does not work. When I set runner to Dataflowrunner, def process(self, query) never runs. I can see the job working on GCP, however, my method InsertPostgresql doesn't work using Dataflowrunner.

According to the log error, apparently ParDo is not recognizing the 'psycopg2'.

process NameError: name 'psycopg2' is not defined During handling of the above exception

psycopg2' is not defined

I wonder why ?

def run_pipeline(): 
      
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'C:\Users\julianocm\Desktop\POC\<mycredentialfile>.json'
     
        optionsGCP = {
            'streaming': True,
            'project':"<myproject>",
            'region':"us-central1",
            'temp_location':"gs://poc360-bucket/temp",
            'staging_location':"gs://poc360-bucket/staging",
            'drivername':"postgresql",
            'save_main_session': True,
            'setup_file': r'C:\Users\julianocm\Desktop\POC\setup.py'     
        }
    
        paramsDB = {
            'database': '<mydatabase>',
            'user': '<myuser>',
            'password': '<mypassword>',
            'host': 'localhost',
            'port': '5000'
        }
    
        class InsertPostgresql(beam.DoFn):
    
            def __init__(self, **server_config):
                self.config = server_config
    
            def process(self, query):
                con = psycopg2.connect(**self.config)
    
                cur = con.cursor()
                cur.execute(query)
                con.commit()
                resultado=cur.fetchall() 
                cur.close()
                con.close()
                
                yield resultado
    
        runner='Dataflowrunner' 
        options = PipelineOptions(**optionsGCP)
        options.view_as(SetupOptions).save_main_session = True
        options.view_as(StandardOptions).streaming = True 
    
        sql="select public.Insert_tbCadastro('01','010',431,'A',501741,000000,'2020-10-26','A')"
     
            
        p = beam.Pipeline(runner=runner,options=options)
        data = (p
            | beam.Create([sql]) 
            | beam.ParDo(InsertPostgresql(**paramsDB)) 
        )
        
        data | 'teste' >> beam.Map(print)
        print("Lines: ", data)
    
        result = p.run()
        result.wait_until_finish()
    
    if __name__=='__main__':
        run_pipeline()

Leaving the job executing, I got, after some time, this:

2020-11-12T11:54:12.718405344ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

Besides everything above, the job accuses :

timeout

After aplying a setup.py file, i got:

warning

setup.py:

import setuptools 

setuptools.setup(
    name='psycopg2',
    version='2.8.6',
    install_requires=[],
    packages=setuptools.find_packages(),
)

Dataflow Log error:

2020-11-12T21:11:50.946327861ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
received from SDK harness for instruction -1396: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in
 apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in 
apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 48, in process
 NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback
 (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, 
in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, 
in <lambda> lambda: self.create_worker().do_instruction(request), request) 
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return 
getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, 
in process_bundle bundle_processor.process_bundle(instruction_id))
 File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle
 input_op_by_transform_id[element.transform_id].process_encoded( 
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670,
 in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, 
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", 
line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, 
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, 
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", 
line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", 
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", 
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, 
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, 
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, 
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, 
in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", 
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", 
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", 
line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", 
line 1215, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", 
line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", 
line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", 
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", 
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", 
line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", 
line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-1388'] 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) 
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.RuntimeException: Error received from SDK harness for instruction -1396: 
Traceback (most recent call last): File "apache_beam/runners/common.py", 
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", 
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", 

Using requirements.txt:

execution

I'm still waiting for the job log... until now, my PostgreSQL table still empty.

After a few minutes I got the log, I'm still unable to define psycopg2:

line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-208'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

My requirements.txt:

file

After the troubleshooting guide regarding to NameErros

Apparently, We fixed the NameError. Now I'm trying to figure out why I don't have my SQL executed. My job still working, but my PostgreSQL does't have any record. So, something still wrong.

Job

When I debug, the lines in "process" are ignored. Are the arguments being used correctly?

enter image description here

And after all:

timeout

Thank you, Juliano


Solution

  • The issue might be related with global imports. As can be seen in this troubleshooting guide it is a common issue when switching from direct runner to dataflow runner which matches your actual case.

    Given that you already tried the solution of adding the flag --save_main_session=True, I would try importing the module in the function itself, meaning instead of:

    import psycopg2
    (...)
    def process(self, query):
      con = psycopg2.connect(**self.config)
    

    try:

    def process(self, query):
      import psycopg2
      con = psycopg2.connect(**self.config)
    

    --- Edit

    Given that we bypassed that error and it works properly in DirectRunner I doubt the issue is found in the code itself. I would look at the firewall rules to ensure Dataflow workers can connect to the PostgreSQL db and I would check that the controller service account has the necessary permissions.

    I see that you specify some credentials within your code, take into consideration that the Dataflow workers use the default controller service account <project-number>-compute@developer.gserviceaccount.com or a custom one and they need to have the necessary permissions to connect to the database.

    Both things (Firewall rules/Service account used by the workers) are potential differences between direct runner and dataflow runner and thus could be the cause of the issue, review them.