I have a google cloud run service which is triggered via pubsub and grabs data from an azure api and places it in a gcp bucket using smart_open.
with smart_open.open(destination, "wb") as fout:
schema = pyarrow_adapters.json_schema_to_pyarrow_schema(self.desired_schema)
data_frame.to_parquet(
path=fout,
engine="pyarrow",
compression="gzip",
schema=schema,
use_compliant_nested_type=False,
)
Edit: This is the error I get in google-resumable-media-python
This saves 100s of thousads of files in a few hours time, each day there's a few failures with this error which I cannot figure out how to fix.
Retrieving Azure Metrics for Resource ID: xxx Configuration is: xxx Error retrieving metrics: Connection broken: IncompleteRead(15773 bytes read, 125469 more expected)', IncompleteRead(15773 bytes read, 125469 more expected))
And also sometimes a very similar error like this
Retrieving Azure Metrics for Resource ID: xxx Configuration is: xxx Error retrieving metrics: Bytes stream is in unexpected state. The local stream has had 8887 bytes read from it while 0 bytes have already been updated (they should match).
The metrics for cloudrun look good, and I have no idea how to debug this further.
Steps to replicate.
Setup a cloudrun service that runs a flask app with a single endpoint that picks up events from an eventarc trigger.
@app.route("/", methods=["POST"])
def index():
data = request.get_json()
if not data:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(data, dict) or "message" not in data:
msg = "invalid Pub/Sub message formaty"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = data["message"]
Use gunicorn to run this, Dockerfile
...
CMD ./azu_metrics_extractor_pex.pex \
azu_metrics_extractor.main:app \
-b :${PORT} \
-w 3 \
--threads=8 \
--timeout=0
The service should use info in the pubsub message to grab resource data from azure, convert to parquet, and write it to a gcp bucket using smart_open
with smart_open.open(destination, "wb") as fout:
schema = pyarrow_adapters.json_schema_to_pyarrow_schema(self.desired_schema)
data_frame.to_parquet(
path=fout,
engine="pyarrow",
compression="gzip",
schema=schema,
use_compliant_nested_type=False,
)
Trigger the pubsub topic with ~10k messages in one hour
Notice that roughly 2 or 3 of those parquet files that were to be saved to gcp failed to save with the above errors.
Potentially related to this open issue for smart_open https://github.com/piskvorky/smart_open/issues/784
We had this "connection broken" error in GCP Workflows calling cloud function. As you describe, 2 - 3 errors in 10k. We checked this with GCP support and their solution was to build a retry function. They admitted that it was network related (and maybe still is).
So we came up with a custom retry in GCP workflows and never saw this error again.
retry:
predicate: $${custom_retry}
max_retries: 5
backoff:
initial_delay: 5
max_delay: 60
multiplier: 2
....
custom_retry:
params: [e]
steps:
- retry_always:
return: true
retry always sounds stupid but works in our case.
So in your cloud run code, you would need to catch the error and do a retry. Do you need help with that?