Search code examples
amazon-s3celeryboto3eventlet

Celery with Eventlets fails when uploading to s3 (boto3)


I have been trying to use Celery with Eventlets to upload, with boto3, but it hangs during the upload. When using celery without eventlets, them it uploads fine. Patching or not doesn't seem to affect the end result

Heres the code:

def getS3():
    s3_access_key, s3_secret_key = os.getenv('S3_ACCESS_KEY'), os.getenv('S3_SECRET_KEY')

    if not all([s3_access_key, s3_secret_key]):
        raise ValueError('Keys')

    session = Session(aws_access_key_id=s3_access_key,
                      aws_secret_access_key=s3_secret_key,
                      region_name='us-east-1')
    return session.resource('s3')


def upload_to_awss3(zip_io, zip_name, bucket, endpoint='s3-sa-east-1.amazonaws.com'):
    s3 = getS3()

    try:
        s3_object = s3.Bucket(bucket).put_object(Key=zip_name, Body=zip_io, ACL='public-read')

        return 'http://{}.{}/{}'.format(bucket, endpoint, zip_name)
    except Exception as e:
        print('upload error: ', e)
        raise

Here's the result on celery:

[2016-05-24 11:43:06,878: DEBUG/MainProcess] Sending http request: <PreparedRequest [PUT]>
[2016-05-24 11:43:06,879: INFO/MainProcess] Resetting dropped connection: pontotel-docs.s3.amazonaws.com
[2016-05-24 11:43:07,069: DEBUG/MainProcess] Waiting for 100 Continue response.
[2016-05-24 11:43:47,176: DEBUG/MainProcess] 100 Continue response seen, now sending request body.
[2016-05-24 11:43:47,294: DEBUG/MainProcess] ConnectionError received when sending HTTP request.

Here's the error:

[2016-05-24 12:30:04,800: DEBUG/MainProcess] Response headers: {'x-amz-id-2': 'VwcX2j4FmBoE2oUyH+08V0bh+ZW74vGOF0IkSP2h5KUp07ANcw8qOwexZyv5yupmaXOxiyYbiCg=', 'x-amz-request-id': '2F527FED26157010', 'content-type': 'application/xml', 'date': 'Tue, 24 May 2016 15:29:45 GMT', 'transfer-encoding': 'chunked', 'server': 'AmazonS3', 'connection': 'close'}
[2016-05-24 12:30:04,801: DEBUG/MainProcess] Response body:
b'<?xml version="1.0" encoding="UTF-8"?>\n<Error><Code>RequestTimeout</Code><Message>Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed.</Message><RequestId>2F527FED26157010</RequestId><HostId>VwcX2j4FmBoE2oUyH+08V0bh+ZW74vGOF0IkSP2h5KUp07ANcw8qOwexZyv5yupmaXOxiyYbiCg=</HostId></Error>'
[2016-05-24 12:30:04,802: DEBUG/MainProcess] Event needs-retry.s3.PutObject: calling handler <botocore.retryhandler.RetryHandler object at 0x7fd7480694a8>
[2016-05-24 12:30:04,802: DEBUG/MainProcess] retry needed: matching HTTP status and error code seen: 400, RequestTimeout

Any ideas would br greatly appreciated! :-D


Solution

  • This is not related to Celery.

    Caused by known bug in Eventlet, triggered by PUT-Except-100 sequence in HTTPS connection: https://github.com/eventlet/eventlet/issues/315

    Sorry, proper fix is not ready yet.

    Temporary workaround (also posted in https://github.com/eventlet/eventlet/issues/313):

    import eventlet.green.ssl
    
    def _green_ssl_recv_into (self, buffer, nbytes=None, flags=0):
        if self._sslobj:
            if flags != 0:
                raise ValueError(
                    "non-zero flags not allowed in calls to recv_into() on %s" %
                    self.__class__)
            if nbytes is None:
                if buffer:
                    nbytes = len(buffer)
                else:
                    nbytes = 1024
            read = self.read(nbytes, buffer)
            return read
        else:
            while True:
                try:
                    return eventlet.green.ssl.socket.recv_into(self, buffer, nbytes, flags)
                except eventlet.green.ssl.orig_socket.error as e:
                    if self.act_non_blocking:
                        raise
                    erno = eventlet.green.ssl.get_errno(e)
                    if erno in eventlet.green.ssl.greenio.SOCKET_BLOCKING:
                        try:
                            eventlet.green.ssl.trampoline(
                                self, read=True,
                                timeout=self.gettimeout(), timeout_exc=eventlet.green.ssl.timeout_exc('timed out'))
                        except eventlet.green.ssl.IOClosed:
                            return b''
                    elif erno in eventlet.green.ssl.greenio.SOCKET_CLOSED:
                        return b''
                    raise
    
    eventlet.green.ssl.GreenSSLSocket.recv_into = _green_ssl_recv_into