Search code examples
pythongoogle-cloud-platformgoogle-cloud-storageapache-beamdataflow

Apache Beam Python gscio upload method has @retry.no_retries implemented causes data loss?


I have a Python Apache Beam streaming pipeline running in Dataflow. It's reading from PubSub and writing to GCS. Sometimes I get errors like "Error in _start_upload while inserting file ...", which comes from:

File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/<bucke-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id>>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id>', 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> [while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631

The underlying issue seems to be that there is no retry logic applied in method _start_upload here, and when there is an HttpError (503 in this case), it's not handled:

# TODO(silviuc): Refactor so that retry logic can be applied.
 # There is retry logic in the underlying transfer library but we should make
 # it more explicit so we can control the retry parameters.
 @retry.no_retries # Using no_retries marks this as an integration point.
 def _start_upload(self):

From my point of view, whenever these errors are thrown then there was a failed upload due to a server issue which wasn't even retried, and therefore that data is lost? Am I missing something or this is what's happening? If that's the case what is odd to my is that no-one else found this before.

The piece of code that fails is using WriteToFiles from io.fileio. It looks like this (destination_partitioning_naming is a custom method):

from apache_beam.io.fileio import WriteToFiles

   ...
                | "Write to GCS" >> WriteToFiles(
                    path=output_path,
                    shards=1,
                    max_writers_per_bundle=0,
                    destination=lambda record: record['topic_kafka'],
                    sink=JsonSink(),
                    file_naming=destination_partitioning_naming(extension="json", topics=topics)
                )
            )

I have raised this issue in JIRA as well

Here is the full stacktrace:

  2021-06-10 16:58:55.104 CEST
Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker.invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in process writer.close() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 220, in close self._uploader.finish() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert return self._RunMethod( File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self.ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/<bucket-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<uploadid>>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id>, 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 999, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/init.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in process writer.close() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 220, in close self._uploader.finish() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert return self._RunMethod( File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self._ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/<bucke-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id>>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id>', 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> [while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631

Solution

  • In a streaming pipeline, Dataflow retries work items running into errors indefinitely.

    The code itself does not need to have retry logic.