In Apache Beam Python SDK running on GCP Dataflow, I have a DoFn.process
that takes a long time. My DoFn takes a long time for reasons that are not that important - I have to accept them due to requirements out of my control. But if you must know, it is making network calls to external services that take quite long (several seconds) and it is processing multiple elements from a previous GroupByKey
- leading to DoFn.process
calls that takes minutes.
Anyways, my question is: Is there a time limit for the runtime length of a DoFn.process
call? I'm asking because I'm seeing logs that look like:
WARNING 2023-01-03T13:12:12.679957Z ReportProgress() took long: 1m49.15726646s
WARNING 2023-01-03T13:12:14.474585Z ReportProgress() took long: 1m7.166061638s
WARNING 2023-01-03T13:12:14.864634Z ReportProgress() took long: 1m58.479671042s
WARNING 2023-01-03T13:12:16.967743Z ReportProgress() took long: 1m40.379289919s
2023-01-03 08:16:47.888 EST Error message from worker: SDK harness sdk-0-6 disconnected.
2023-01-03 08:21:25.826 EST Error message from worker: SDK harness sdk-0-2 disconnected.
2023-01-03 08:21:36.011 EST Error message from worker: SDK harness sdk-0-4 disconnected.
It seems to me like the Apache Beam Fn API Progress Reporting machinery thinks that my DoFn.process
function is stuck not making any progress and eventually terminates the "unresponsive" SDK Harness. Is this happening because my DoFn.process
is taking too low to process a single element? If so how do I report progress to the Dataflow Worker Engine to let it know that my DoFn.process
is still alive?
There is no time limit for the runtime length of a DoFn.process call. However there is a related thing. Pipelines running on Dataflow regularly reports progress if this progress reporting fails for a long time, Dataflow service will assume the related workers failed and cancels work assigned to those workers.
In Python, this might happen if for some reason progress reporting thread is not scheduled (e.g. holding the GIL for a long time, or something consuming all CPU.). See https://cloud.google.com/dataflow/docs/guides/common-errors for more information on this (paragraph starting with "In Python pipelines, this error might occur when workers use too much CPU.").