Search code examples
python-asynciopy-shiny

Shiny for Python: Implementing an asynchronous iterator (almost there)


The endgame is making an app reactive to a non-blocking stream of information (in my particular case a MongoDB ChangeSteam; it could also be a Kafka consumer).

For the sake of reproducibility, in the example below I implement a generic asynchronous iterator AsyncIteratorDummy that mimics the behaviour of a data stream:


import asyncio
from shiny import reactive, ui, Inputs, Outputs, Session, App, render

class AsyncIteratorDummy:
''' Iterate over an asynchronous source n Iterations.'''

    def __init__(self, n):
        self.current = 0
        self.n = n
    def __aiter__(self):
        return self
    async def __anext__(self):
        await asyncio.sleep(1)
        print(f"get next element {self.current}")
        self.current += 1
        if self.current > self.n:
            raise StopAsyncIteration
        return self.current - 1

async def watch_changes(rval: reactive.Value):
    async for i in AsyncIteratorDummy(5):
        print(f"next element {i}")
        rval.set(i)
           

app_ui = ui.page_fluid(
    "This should update automatically",
    ui.output_text_verbatim("async_text"),
)

def server(input: Inputs, output: Outputs, session: Session):

    triggered_val = reactive.Value(-1)
    
    asyncio.create_task(watch_changes(triggered_val))
    
    @output(id="async_text")
    @render.text()
    async def _():
        return triggered_val.get()
    
    # un/commenting this makes makes the invalidation
    # of `triggered_val` effective or not:
    @reactive.Effect
    def _():
        reactive.invalidate_later(0.1)

app = App(app_ui, server)

The app works because of the presence of


    @reactive.Effect
    def _():
        reactive.invalidate_later(0.1)

Else, async_text greys out (indicating it has been invalidated) but does not update.

Is it possible to implement the asynchronous iteration without the "hack" of the reactive.Effect invalidating on loop?

My supposition is that I have to "flush" or "execute" invalidated variables in the context of watch_changes() (after rval.set(i)), using a low-level py-shiny function that I cannot figure out.


Solution

  • I think you are looking for reactive.flush().

    async def watch_changes(rval: reactive.Value):
        async for i in AsyncIteratorDummy(5):
            print(f"next element {i}")
            rval.set(i)
            reactive.flush()