Search code examples
interoptwistedpython-asynciopython-3.6

How can I run asyncio library code on top of Twisted's asyncioreactor?


I've managed to import/install Twisted's asyncioreactor and execute a trivial asynchronous function:

from twisted.internet import asyncioreactor
asyncioreactor.install()
from twisted.internet import task
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import ensureDeferred

async def sleepy(reactor):
    print("SLEEPING")
    await task.deferLater(reactor, 3.0, lambda: None)
    print("done sleep")
    return 42

@task.react
def main(reactor):
    d = ensureDeferred(sleepy(reactor))
    d.addCallback(print)
    return d

I'd like to intermix an asyncio library in said code, for instance asyncio.sleep. I've tried the following:

from twisted.internet import asyncioreactor
asyncioreactor.install()
from twisted.internet import task
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import ensureDeferred

import asyncio

async def sleepy(reactor):
    print("SLEEPING")
    await asyncio.sleep(3)
    print("done sleep")
    return 42

@task.react
def main(reactor):
    d = ensureDeferred(sleepy(reactor))
    d.addCallback(print)
    return d

which produces the following error:

 $ python test.py
SLEEPING
main function encountered error
Traceback (most recent call last):
  File "test.py", line 16, in <module>
    @task.react
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/task.py", line 908, in react
    finished = main(_reactor, *argv)
  File "test.py", line 18, in main
    d = ensureDeferred(sleepy(reactor))
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/defer.py", line 823, in ensureDeferred
    return _inlineCallbacks(None, coro, Deferred())
--- <exception caught here> ---
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/defer.py", line 1301, in _inlineCallbacks
    result = g.send(result)
  File "test.py", line 11, in sleepy
    await asyncio.sleep(3)
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/asyncio/tasks.py", line 476, in sleep
    return (yield from future)
builtins.AssertionError: yield from wasn't used with future

Fair enough, thought I, so I tried swapping await asyncio.sleep(3) with await ensureDeferred(asyncio.sleep(3)) and await asyncio.ensure_future(asyncio.sleep(3)), but I get exactly the same error.

How can I schedule an aio coroutine (and/or Future) to run on the same event loop as is used by asyncioreactor?


Solution

  • so I tried swapping await asyncio.sleep(3) with await ensureDeferred(asyncio.sleep(3)) and await asyncio.ensure_future(asyncio.sleep(3))

    You were almost there, you should combine the two, and use Deferred.fromFuture instead of ensureDeferred:

    await Deferred.fromFuture(asyncio.ensure_future(asyncio.sleep(3)))
    

    The rule is, async def functions running in Twisted context (with ensureDeferred) can await only on Deferred, and async def functions running in asyncio context (with ensure_future) only on asyncio Future (one can always await on other coroutine objects (results from async def function calls), but the chain will ultimately lead to Deferred/Future). To convert from asyncio Future to Deferred use Deferred.fromFuture and to convert to asyncio Future use Deferred.asFuture.

    One can switch contexts from one to another and back. In this (contrived) example we start with sleepy_twisted run in Twisted context, which does Twisted sleep, then it switches to asyncio context to run sleepy_asyncio which does asyncio sleep, but then switches again to Twisted context for Twisted sleep:

    from twisted.internet import asyncioreactor, task
    from twisted.internet.defer import inlineCallbacks, ensureDeferred, Deferred
    import asyncio
    asyncioreactor.install()
    
    async def sleepy_asyncio(reactor):
       print("Sleep 2")
       await asyncio.sleep(1)
       print("Sleep 3")
       await Deferred.asFuture(task.deferLater(reactor, 1, lambda: None), loop=asyncio.get_running_loop())
       
    async def sleepy_twisted(reactor):
       print("Sleep 1")
       await task.deferLater(reactor, 1, lambda: None)
       await Deferred.fromFuture(asyncio.ensure_future(sleepy_asyncio(reactor)))
       print("done")
    
    @task.react
    def main(reactor):
        return ensureDeferred(sleepy_twisted(reactor))