I'm having a problem when I switch from the DirectRunner to Dataflowrunner: Pardo apparently does not work. When I set runner to Dataflowrunner, def process(self, query) never runs. I can see the job working on GCP, however, my method InsertPostgresql doesn't work using Dataflowrunner.
According to the log error, apparently ParDo is not recognizing the 'psycopg2'.
process NameError: name 'psycopg2' is not defined During handling of the above exception
I wonder why ?
def run_pipeline():
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'C:\Users\julianocm\Desktop\POC\<mycredentialfile>.json'
optionsGCP = {
'streaming': True,
'project':"<myproject>",
'region':"us-central1",
'temp_location':"gs://poc360-bucket/temp",
'staging_location':"gs://poc360-bucket/staging",
'drivername':"postgresql",
'save_main_session': True,
'setup_file': r'C:\Users\julianocm\Desktop\POC\setup.py'
}
paramsDB = {
'database': '<mydatabase>',
'user': '<myuser>',
'password': '<mypassword>',
'host': 'localhost',
'port': '5000'
}
class InsertPostgresql(beam.DoFn):
def __init__(self, **server_config):
self.config = server_config
def process(self, query):
con = psycopg2.connect(**self.config)
cur = con.cursor()
cur.execute(query)
con.commit()
resultado=cur.fetchall()
cur.close()
con.close()
yield resultado
runner='Dataflowrunner'
options = PipelineOptions(**optionsGCP)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
sql="select public.Insert_tbCadastro('01','010',431,'A',501741,000000,'2020-10-26','A')"
p = beam.Pipeline(runner=runner,options=options)
data = (p
| beam.Create([sql])
| beam.ParDo(InsertPostgresql(**paramsDB))
)
data | 'teste' >> beam.Map(print)
print("Lines: ", data)
result = p.run()
result.wait_until_finish()
if __name__=='__main__':
run_pipeline()
Leaving the job executing, I got, after some time, this:
2020-11-12T11:54:12.718405344ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
Besides everything above, the job accuses :
After aplying a setup.py file, i got:
setup.py:
import setuptools
setuptools.setup(
name='psycopg2',
version='2.8.6',
install_requires=[],
packages=setuptools.find_packages(),
)
Dataflow Log error:
2020-11-12T21:11:50.946327861ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
received from SDK harness for instruction -1396: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in
apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in
apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 48, in process
NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback
(most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256,
in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313,
in <lambda> lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return
getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518,
in process_bundle bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py",
line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279,
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569,
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py",
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py",
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279,
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569,
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371,
in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py",
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py",
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py",
line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py",
line 1215,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py",
line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py",
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py",
line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-1388']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.RuntimeException: Error received from SDK harness for instruction -1396:
Traceback (most recent call last): File "apache_beam/runners/common.py",
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
Using requirements.txt:
I'm still waiting for the job log... until now, my PostgreSQL table still empty.
After a few minutes I got the log, I'm still unable to define psycopg2:
line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-208'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
My requirements.txt:
After the troubleshooting guide regarding to NameErros
Apparently, We fixed the NameError. Now I'm trying to figure out why I don't have my SQL executed. My job still working, but my PostgreSQL does't have any record. So, something still wrong.
When I debug, the lines in "process" are ignored. Are the arguments being used correctly?
And after all:
Thank you, Juliano
The issue might be related with global imports. As can be seen in this troubleshooting guide it is a common issue when switching from direct runner to dataflow runner which matches your actual case.
Given that you already tried the solution of adding the flag --save_main_session=True
, I would try importing the module in the function itself, meaning instead of:
import psycopg2
(...)
def process(self, query):
con = psycopg2.connect(**self.config)
try:
def process(self, query):
import psycopg2
con = psycopg2.connect(**self.config)
--- Edit
Given that we bypassed that error and it works properly in DirectRunner I doubt the issue is found in the code itself. I would look at the firewall rules to ensure Dataflow workers can connect to the PostgreSQL db and I would check that the controller service account has the necessary permissions.
I see that you specify some credentials within your code, take into consideration that the Dataflow workers use the default controller service account <project-number>-compute@developer.gserviceaccount.com
or a custom one and they need to have the necessary permissions to connect to the database.
Both things (Firewall rules/Service account used by the workers) are potential differences between direct runner and dataflow runner and thus could be the cause of the issue, review them.