Search code examples
pythonpython-asyncioreactivexrx-py

How to call an async coroutine periodically using an RxPY interval observable?


I need to create an Observable stream which emits the result of a async coroutine at regular intervals.

intervalRead is a function which returns an Observable, and takes as parameters the interval rate and an async coroutine function fun, which needs to be called at the defined interval.

My first aproach was to create an observable with the interval factory method, and then use map to call the coroutine, using from_future to wrap it in a Observable, and then get the value returned by the coroutine.

async def foo():
    await asyncio.sleep(1)
    return 42

def intervalRead(rate, fun) -> Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        map(lambda i: rx.from_future(loop.create_task(fun()))),
    )

async def main():
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next= lambda item: print(item)
    )

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

Yet the output I get is not the result of the coroutine, but the Observable returned by from_future, emited at the specified interval

output: <rx.core.observable.observable.Observable object at 0x033B5650>

How could I could get the actual value returned by that Observable? I would expect 42

My second aproach was to create a custom observable:


def intervalRead(rate, fun) -> rx.Observable:
    interval = rx.interval(rate)
    def subs(observer: Observer, scheduler = None):
        loop = asyncio.get_event_loop()
        def on_timer(i):
            task = loop.create_task(fun())
            from_future(task).subscribe(
                on_next= lambda i: observer.on_next(i),
                on_error= lambda e: observer.on_error(e),
                on_completed= lambda: print('coro completed')
            )
        interval.subscribe(on_next= on_timer, on_error= lambda e: print(e))        
    return rx.create(subs)

However, on subscription from_future(task) never emits a value, why does this happen?

Yet if i write intervalRead like this:

def intervalRead(rate, fun):
    loop = asyncio.get_event_loop()
    task = loop.create_task(fun())
    return from_future(task)

I get the expected result: 42. Obviously this doesn´t solve my issue, but it confuses me why it doesn´t work in my second approach?

Finally, I experimented with a third approch using the rx.concurrency CurrentThreadScheduler and schedule an action perdiocally with the schedule_periodic method. Yet i'm facing the same issue I get with the second approach.

def funWithScheduler(rate, fun):
    loop = asyncio.get_event_loop()
    scheduler = CurrentThreadScheduler()
    subject = rx.subjects.Subject()
    def action(param):
        obs = rx.from_future(loop.create_task(fun())).subscribe(
            on_next= lambda item: subject.on_next(item),
            on_error= lambda e: print(f'error in action {e}'),
            on_completed= lambda: print('action completed')
        )     
        obs.dispose()   
    scheduler.schedule_periodic(rate,action)
    return subject

Would appreciate any insight into what am I missing or any other suggestions to accomplish what I need. This is my first project with asyncio and RxPY, I have only use RxJS in the context of an angular project so any help is welcome.


Solution

  • Your first example almost works. There are only two changes needed to get it working:

    First the result of from_future is an observable that emits a single item (the value of the future when it completes). So the output of map is a higher order observable (an observable that emits observables). These children observables can be flattened by using the merge_all operator after map, or by using flat_map instead of map.

    Then the interval operator must schedule its timer on the AsyncIO loop, which is not the case by default: The default scheduler is the TimeoutScheduler, and it spawns a new thread. So in the original code, the task cannot be scheduled on the AsyncIO event loop because create_task is called from another thread. Using the scheduler parameter on the call to subscribe declares the default scheduler to use for the whole operator chain.

    The following code works (42 is printed every 5 seconds):

    import asyncio
    import rx
    import rx.operators as ops
    from rx.scheduler.eventloop import AsyncIOScheduler
    
    
    async def foo():
        await asyncio.sleep(1)
        return 42
    
    
    def intervalRead(rate, fun) -> rx.Observable:
        loop = asyncio.get_event_loop()
        return rx.interval(rate).pipe(
            ops.map(lambda i: rx.from_future(loop.create_task(fun()))),
            ops.merge_all()
        )
    
    
    async def main(loop):
        obs = intervalRead(5, foo)
        obs.subscribe(
            on_next=lambda item: print(item),
            scheduler=AsyncIOScheduler(loop)
        )
    
    loop = asyncio.get_event_loop()
    loop.create_task(main(loop))
    loop.run_forever()