Search code examples
pythonplotreal-timedaskplotly-dash

Dask load JSON (for realtime plots)


I am trying to load a JSON from a http address using dask and then put it into a dataframe in order to plot some experiment data with dash. The goal is to fetch the data in realtime and show realtime plots of the machines (example data can be found here: http://aav.rz-berlin.mpg.de:17668/retrieval/data/getData.json?pv=FHIMP%3AHeDrop%3AForepressure_Droplet_Src)

This is what I tried:

import json
import dask.bag as db
mybag = db.read_text("http://aav.rz-berlin.mpg.de:17668/retrieval/data/getData.json?pv=FHIMP%3AHeDrop%3AForepressure_Droplet_Src").map(json.loads)
mybag.to_dataframe()

but mybag.to_dataframe() freezes my code.

I also tried:

import dask.dataframe as dd
dd.read_json('url')

which returned "ValueError: Expected object or value". So according to the error message, there's no JSON at all. Does the problem derive from the JSON consisting of a meta and a data field?

Sidequestion: Does my system even make sense like this if I want to provide a Webapp for monitoring? It's my first time working with Dash and Dask. Dask basically does the work of a backend here if I understood it right and there's no need to have it standing on it's own if I have an API that's sending me JSON data.


Solution

  • Dask is not, generally, a realtime/streaming analysis engine. Mostly, things are expected to be functional, that running the same task with the same arguments is guaranteed to produce the same output - clearly not the case here.

    Realtime analysis can be produced by the client.submit API, which creates arbitrary tasks at the time of invocation. However, it still requires that the task be finite in order for other tasks to then take the result and operate on it. Reading the from given URL never ends.

    If you want to use dask in conjunction with streaming data, or generally want to work on streaming data in python, you might want to try streamz. The sources listed are mostly polling (repeat some action on a timer to check for new events) or driven by inbound events (like a server waiting for connections). You could easily make a source for the HTTP endpoint, though:

    from streamz import Source, Stream
    import aiohttp
    
    @Stream.register_api(staticmethod)
    class from_http(Source):
        def __init__(self, url, chunk_size=1024, **kwargs):
            self.url = url
            self.chunk_size = chunk_size
            super().__init__(**kwargs)
    
        async def run(self):
            async with aiohttp.ClientSession() as session:
                async with session.get(self.url) as resp:
                    async for chunk in resp.content.iter_chunked(self.chunk_size):
                        await self.emit(chunk, asynchronous=True)
    

    The output of this streaming node is chunks of binary data - it would be up to you to write downstream nodes which can parse this into JSON (since the chunk boundaries won't respect the JSON record terminators).