Search code examples
google-cloud-platformgoogle-cloud-runsmart-open

Connection broken: IncompleteRead in Google Cloud Run


I have a google cloud run service which is triggered via pubsub and grabs data from an azure api and places it in a gcp bucket using smart_open.

          with smart_open.open(destination, "wb") as fout:
            schema = pyarrow_adapters.json_schema_to_pyarrow_schema(self.desired_schema)
            data_frame.to_parquet(
                path=fout,
                engine="pyarrow",
                compression="gzip",
                schema=schema,
                use_compliant_nested_type=False,
            )

Edit: This is the error I get in google-resumable-media-python

https://github.com/googleapis/google-resumable-media-python/blob/main/google/resumable_media/_upload.py#L50-L53

This saves 100s of thousads of files in a few hours time, each day there's a few failures with this error which I cannot figure out how to fix.

Retrieving Azure Metrics for Resource ID: xxx Configuration is: xxx Error retrieving metrics: Connection broken: IncompleteRead(15773 bytes read, 125469 more expected)', IncompleteRead(15773 bytes read, 125469 more expected))

And also sometimes a very similar error like this

Retrieving Azure Metrics for Resource ID: xxx Configuration is: xxx Error retrieving metrics: Bytes stream is in unexpected state. The local stream has had 8887 bytes read from it while 0 bytes have already been updated (they should match).

The metrics for cloudrun look good, and I have no idea how to debug this further.

Steps to replicate.

  1. Setup a cloudrun service that runs a flask app with a single endpoint that picks up events from an eventarc trigger.

    @app.route("/", methods=["POST"])
    def index():
      data = request.get_json()
      if not data:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400
    
      if not isinstance(data, dict) or "message" not in data:
        msg = "invalid Pub/Sub message formaty"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400
    
      pubsub_message = data["message"]
    
  2. Use gunicorn to run this, Dockerfile

    ...
    CMD ./azu_metrics_extractor_pex.pex \
        azu_metrics_extractor.main:app \
        -b :${PORT} \
        -w 3 \
        --threads=8 \
        --timeout=0
    
  3. The service should use info in the pubsub message to grab resource data from azure, convert to parquet, and write it to a gcp bucket using smart_open

          with smart_open.open(destination, "wb") as fout:
            schema = pyarrow_adapters.json_schema_to_pyarrow_schema(self.desired_schema)
            data_frame.to_parquet(
                path=fout,
                engine="pyarrow",
                compression="gzip",
                schema=schema,
                use_compliant_nested_type=False,
            )
    
  4. Trigger the pubsub topic with ~10k messages in one hour

  5. Notice that roughly 2 or 3 of those parquet files that were to be saved to gcp failed to save with the above errors.

Potentially related to this open issue for smart_open https://github.com/piskvorky/smart_open/issues/784


Solution

  • We had this "connection broken" error in GCP Workflows calling cloud function. As you describe, 2 - 3 errors in 10k. We checked this with GCP support and their solution was to build a retry function. They admitted that it was network related (and maybe still is).

    So we came up with a custom retry in GCP workflows and never saw this error again.

        retry:
          predicate: $${custom_retry}
          max_retries: 5
          backoff:
            initial_delay: 5
            max_delay: 60
            multiplier: 2
    

    ....

    custom_retry:
      params: [e]
      steps:
        - retry_always:
            return: true
    

    retry always sounds stupid but works in our case.

    So in your cloud run code, you would need to catch the error and do a retry. Do you need help with that?