Search code examples
pythonpython-3.xautobahnpython-asyncio

Calling a remote procedure from a subscriber and resolving the asyncio promise


I'm having problems getting asyncio based autobahn RPCs to work in an event handler:

from autobahn.asyncio import wamp
from autobahn.wamp import register, subscribe

class Foo(wamp.ApplicationSession):
    @subscribe('wamp.metaevent.session.on_join')
    def bar(self):
        baz = yield from self.call('baz')

    @register('baz')
    def baz(self):
        return 'baz'

Reading the documentation, I'm under the impression that this should work. However, if I'm using yield inside Foo.bar none of its code is being executed at all. I've tried decorating with asyncio.coroutine in various patterns, but can't get it to run at all.

The only way I've found to make it work is by resolving the returned future "manually":

@subscribe('wamp.metaevent.session.on_join')
def bar(self):
    def do_something(f):
        print(f.result())

    f = self.call('baz')
    f.add_done_callback(do_something)

I'm sure I'm just not understanding asyncio programming correctly yet, so what do I have to do to be able to write baz = self.call('baz') and get the result immediately (meaning without additional explicitly stated callbacks)?


Solution

  • The presence of the yield keyword in the body of a def statement makes the defined function into a generator function.

    When you call a generator function (as autobahn would), the body of the function is not executed. Instead, a generator object is created. When you call next on the generator, control advances to the next yield statement. Generators have been discussed extensively in other Stack Overflow posts, as well as in the docs and elsewhere on the web.

    The API of asyncio makes extensive use of coroutines and yield from. (For most purposes, you can consider 'coroutine' and 'generator' to be synonymous.) The event loop tracks a collection of generators and calls next, send and throw on them as appropriate.*

    It looks like autobahn.subscribe expects a regular callback function, not a coroutine, and that's why your code is not getting executed. One way to work around this is to write a callback function that schedules your coroutine, using asyncio.async.

    class Foo(wamp.ApplicationSession):
        @subscribe('wamp.metaevent.session.on_join')
        def schedule_bar(self):
            coro = self.bar()  # create the coroutine object
    
            # wrap the coroutine in a Task and register it with the event loop.
            # the loop argument is optional and defaults to asyncio.get_event_loop()
            asyncio.async(coro, loop=my_event_loop)  
    
        @coroutine
        def bar(self):
            baz = yield from self.call('baz')
    
        @register('baz')
        def baz(self):
            return 'baz'
    

    In the absence of a function in autobahn to do this, you can write your own reusable decorator to subscribe coroutines to WAMP topics.

    from functools import wraps
    
    def subscribe_coro(uri, loop=None):
        def decorator(f):
            @subscribe(uri)
            @wraps(f)
            def wrapper(*args, **kwargs):
                coro = f(*args, **kwargs)
                asyncio.async(coro, loop=loop)
            return wrapper
        return decorator
    

    Now your class will look like this:

    class Foo(wamp.ApplicationSession):
        @subscribe_coro('wamp.metaevent.session.on_join')
        @coroutine
        def bar(self):
            baz = yield from self.call('baz')
    
        @register('baz')
        def baz(self):
            return 'baz'
    

    * This is a simplification. The event loop in fact tracks Futures, not coroutines. The algorithm to call the appropriate methods on a generator is implemented by Task, which wraps a coroutine up into a Future.