Search code examples
google-cloud-dataflowapache-beamdataflow

Apache Beam stateful ParDo Work token invalid


I have a stateful DoFn that basically batches the elements that are coming and when the buffer reaches a certain size, the buffer is cleared and the elements are inserted into BigQuery. What I've noticed is that from time to time, the pipeline is raising an exception, the exception is not stopping the job to run. Below is the stacktrace:


Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 140, in process
    self._flush_buffer(buffer_state, count_state, buffer_size_state)
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 162, in _flush_buffer
    rows = self._extract_rows(buffer_state)
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 197, in _extract_rows
    for row in buffer.read():
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 510, in __iter__
    for elem in self.first:
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1039, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 846, in get_raw
    continuation_token=continuation_token)))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 886, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: INTERNAL: Work token invalid

This is raised when the process method is called and it tries to extract the elements from the buffer, see rows = self._extract_rows(buffer_state)

The DoFn is implemented exactly like in the example https://beam.apache.org/blog/timely-processing/#example-batched-rpc


Solution

  • I've confirmed this error is expected during work reassignments, e.g. when autoscaling. The work item will be retried on the new machine and the pipeline will continue processing correctly. (I agree the error message could be improved.)