Search code examples
google-app-enginegoogle-cloud-pythongoogle-cloud-tasks

Google Cloud Tasks 'Create Task' request is throwing ServiceUnavailable: 503


I'm converting my Tasks from AppEngine TaskQueues to Google Cloud Tasks.

The one having problems is an hourly cron job that checks a S3 Bucket for new files. The cron job launches a new task per file found. Those tasks then download their respective files and launch a new task per record in their file.

It it during this fan-out that some of the calls to create_task() seem to fail with ServiceUnavailable: 503 (https://googleapis.dev/python/cloudtasks/latest/gapic/v2/api.html#google.cloud.tasks_v2.CloudTasksClient.create_task)

Heres one

Traceback (most recent call last):
  ...
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/src/utils/gc_tasks.py", line 72, in _gc_create_task
    _ = _tasks_client.create_task(parent=_queue_path(DEFAULT_QUEUE), task=task)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/cloud/tasks_v2/gapic/cloud_tasks_client.py", line 1512, in create_task
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 273, in retry_wrapped_func
    on_error=on_error,
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 182, in retry_target
    return target()
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "/base/alloc/tmpfs/dynamic_runtimes/python27g/ebb3af67a06047b6/python27/python27_lib/versions/third_party/six-1.12.0/six/__init__.py", line 737, in raise_from
    raise value
ServiceUnavailable: 503 {
    "created":"@1583436423.131570193",
    "description":"Delayed close due to in-progress write",
    "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/ext/transport/chttp2/transport/chttp2_transport.c",
    "file_line":412,
    "grpc_status":14,
    "referenced_errors":[{
        "created":"@1583436423.131561040",
        "description":"OS Error",
        "errno":32,
        "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/iomgr/tcp_posix.c",
        "file_line":393,
        "os_error":"Broken pipe",
        "syscall":"sendmsg"}
    ]}

Here's another

Traceback (most recent call last):
  ...
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/src/utils/pt_gc_tasks.py", line 72, in _gc_create_task
    _ = _tasks_client.create_task(parent=_queue_path(DEFAULT_QUEUE), task=task)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/cloud/tasks_v2/gapic/cloud_tasks_client.py", line 1512, in create_task
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 273, in retry_wrapped_func
    on_error=on_error,
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/retry.py", line 182, in retry_target
    return target()
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/base/data/home/apps/s~my_project/dev.XXXXXXXXXXXXXXXXXXX/lib/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "/base/alloc/tmpfs/dynamic_runtimes/python27g/ebb3af67a06047b6/python27/python27_lib/versions/third_party/six-1.12.0/six/__init__.py", line 737, in raise_from
    raise value
ServiceUnavailable: 503 {
    "created":"@1583407622.505288938",
    "description":"Endpoint read failed",
    "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/ext/transport/chttp2/transport/chttp2_transport.c",
    "file_line":1807,
    "grpc_status":14,
    "occurred_during_write":0,
    "referenced_errors":[{
        "created":"@1583407622.505108366",
        "description":"Secure read failed",
        "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/security/transport/secure_endpoint.c",
        "file_line":158,
        "referenced_errors":[{
            "created":"@1583407622.505106550",
            "description":"Socket closed",
            "file":"third_party/apphosting/python/grpcio/v1_0_0/src/core/lib/iomgr/tcp_posix.c",
            "file_line":259}
        ]}
    ]}

Am I enqueuing too many tasks at the same time? What can I do to deal with this?


Solution

  • After quite a bit of digging, it seems that "503 Service Unavailable" is a pretty common error across the google-cloud SDKs for all of GCPs services.

    The solution is to enable retry logic. google-cloud-core (which google-cloud-tasks depends on) has an existing retry mechanism, but it wasn't configured for task creation.

    retry_codes_name was set to non_idempotent instead of idempotent

                "CreateTask": {
                    "timeout_millis": 10000,
                    "retry_codes_name": "non_idempotent",
                    "retry_params_name": "default",
                },
    

    https://github.com/googleapis/python-tasks/blob/100e9c709383848498e1e6a747fb819520a7d8c1/google/cloud/tasks_v2/gapic/cloud_tasks_client_config.py#L87

    My guess is that this could cause duplicate tasks to get enqueued. But if you specify a task name, google-cloud-tasks should prevent those duplicates from getting enqueued.

    So I passed a Retry object to .create_task() without providing an arg for predicate, which causes it to default to if_transient_error() which will retry for the following errors: exceptions.InternalServerError, exceptions.TooManyRequests, exceptions.ServiceUnavailable

    Below is a snippet of my code to create tasks

    from google.api_core import retry
    from google.api_core.exceptions import AlreadyExists
    from google.cloud import tasks
    
    _tasks_client = tasks.CloudTasksClient()
    
    
    def my_create_task_function(my_queue_path, task_object):
        try:
            _tasks_client.create_task(
                parent=my_queue_path, 
                task=task_object, 
                retry=retry.Retry(  # Copies the default retry config from retry_params in google.cloud.tasks_v2.gapic.cloud_tasks_client_config
                    initial=.1,
                    maximum=60,
                    multiplier=1.3,
                    deadline=600))
        except AlreadyExists:
            logging.warn("found existing task")
    

    There is also a logger available that you can adjust the level of so that you can see log statements for when it actually does a retry.

    If you do the following:

    logging.getLogger('google.api_core.retry').setLevel(logging.DEBUG)
    

    You should then see messages like this in your logs when it kicks in:

    enter image description here