Search code examples
scrapypython-asynciotwisted

How can I access the return value from twisted deferred callbacks using AsyncioSelectorReactor?


Using Python 3.7.7, Twisted 20.3.0 (and Scrapy 2.1.0), when I try...

doc_link = await self.upload_reseller_document(doc_request, self.create_id(contract))

I get a deferred instead of a string. Also my callbacks are not awaited.

Expected: https://s3.amazonaws.com/some-bucket/some_file.csv or None

Received: <Deferred at 0x11ae61dd0 current result: None>

    async def conditional_upload(request):
        docs_bucket = 'some-bucket'
        key = f'some-prefix/some_file.csv'
        url = f'https://s3.amazonaws.com/{docs_bucket}/{key}'
        async def cb(obj):
            print('found key, returning url')
            return defer.success(url)

        async def upload_doc():
            print('called upload_doc')
            response = await self.crawler.engine.download(request, self)
            if response.status != 200:
                # Error happened, return item.
                print('could not download reseller csv')
                return defer.error(None)
            print('uploading to', docs_bucket, key)
            return threads.deferToThread(
                self.s3client.put_object,
                Bucket=docs_bucket,
                Key=key,
                Body=response.body)

        async def eb(failure):
            print('did not find key')
            if failure.type != ClientError:
                raise failure.value
            return upload_doc()

        return ensureDeferred(threads.deferToThread(
                self.s3client.head_object,
                Bucket=docs_bucket,
                Key=key).addCallbacks(cb, eb))

Solution

  • Internally Twisted deals only with Deferreds and functions that returns it, you can't pass async functions as callbacks to Deferreds (when called, async functions returns a coroutine object), if you do, the callback will have no effect and at the reactor stop you will get a warning "coroutine x was never awaited".
    When using async functions, you should just await the Deferreds finish and handle their result instead of appending callbacks and returning them. The goal of async functions is to avoid the callback hell.

    defer.ensureDeferred is used to wrap coroutines in a deferred and allow Twisted to schedule them to be ran, you use it when you need to call async functions inside functions that are not async.

    Use try/catch to handle the exceptions (it's equivalent to errback, but the exception is not wrapped in twisted's Failure):

    async def conditional_upload(request):
        docs_bucket = 'some-bucket'
        key = f'some-prefix/some_file.csv'
        url = f'https://s3.amazonaws.com/{docs_bucket}/{key}'
    
        async def upload_doc():
            print('called upload_doc')
            response = await self.crawler.engine.download(request, self)
            if response.status != 200:
                # Error happened, return item.
                print('could not download reseller csv')
                raise Exception('could not download reseller csv')
            print('uploading to', docs_bucket, key)
            return await threads.deferToThread(
                self.s3client.put_object, Bucket=docs_bucket, Key=key, Body=body
            )
    
        # propably here you want to check if something already exists
        try:
            await threads.deferToThread(self.s3client.head_object, Bucket=docs_bucket, Key=key)
            print('found key, returning url')
            return url
        except ClientError:
            print('did not find key, going to upload_doc ...')
    
        # if does not exists, then create it
        retry_attempts = 10 # avoid infinite loop
        for _ in range(retry_attempts):
            try:
                await upload_doc()
                print('Uploaded the key, returning url')
                return url
            except ClientError:
                print('Failed to upload the key, retrying...')
    
        print('Failed to upload the key, max attemps tried.')