The endgame is making an app reactive to a non-blocking stream of information (in my particular case a MongoDB ChangeSteam; it could also be a Kafka consumer).
For the sake of reproducibility, in the example below I implement a generic asynchronous iterator AsyncIteratorDummy
that mimics the behaviour of a data stream:
import asyncio
from shiny import reactive, ui, Inputs, Outputs, Session, App, render
class AsyncIteratorDummy:
''' Iterate over an asynchronous source n Iterations.'''
def __init__(self, n):
self.current = 0
self.n = n
def __aiter__(self):
return self
async def __anext__(self):
await asyncio.sleep(1)
print(f"get next element {self.current}")
self.current += 1
if self.current > self.n:
raise StopAsyncIteration
return self.current - 1
async def watch_changes(rval: reactive.Value):
async for i in AsyncIteratorDummy(5):
print(f"next element {i}")
rval.set(i)
app_ui = ui.page_fluid(
"This should update automatically",
ui.output_text_verbatim("async_text"),
)
def server(input: Inputs, output: Outputs, session: Session):
triggered_val = reactive.Value(-1)
asyncio.create_task(watch_changes(triggered_val))
@output(id="async_text")
@render.text()
async def _():
return triggered_val.get()
# un/commenting this makes makes the invalidation
# of `triggered_val` effective or not:
@reactive.Effect
def _():
reactive.invalidate_later(0.1)
app = App(app_ui, server)
The app works because of the presence of
@reactive.Effect
def _():
reactive.invalidate_later(0.1)
Else, async_text
greys out (indicating it has been invalidated) but does not update.
Is it possible to implement the asynchronous iteration without the "hack" of the reactive.Effect
invalidating on loop?
My supposition is that I have to "flush" or "execute" invalidated variables in the context of watch_changes()
(after rval.set(i)
), using a low-level py-shiny
function that I cannot figure out.
I think you are looking for reactive.flush()
.
async def watch_changes(rval: reactive.Value):
async for i in AsyncIteratorDummy(5):
print(f"next element {i}")
rval.set(i)
reactive.flush()