Search code examples
pythonpython-3.xdaskdirected-acyclic-graphsdask-distributed

Dask and persistence of data on the cluster


I am working on a project that uses historical data and also incoming data for analysis. I would like to learn how to manage updating incoming data on dask while not having to dispatch all the historical data every time.

I gather data for time series for analysis, but the time series grow with incoming data, and the incoming data for each stream needs to be sent to the appropriate worker for things like ARMA analysis. If I do ARMA analysis for weather, I would want to keep barometric pressure separate from temperature and perform analysis by comparing pressure to pressure and temperature to temperature. I don't want to append the new temperature data to the prior temperature data and the dispatch the now larger series to a new worker. I would like to send only the new temperature data to the dask worker that already had all the prior temperature data, and so on. How can I ensure the prior temperature data persists on the worker, and how do I dispatch (only) the new temperature data to the worker that has the prior data.

I have done some basic things with dask, but all the basic lessons don't address persistence of the history and methods on the worker-only persistence of the results.

In addition this data is not based in Dask series or dataframes, but in classes that hold different data and methods related to the analysis method. So I cannot effectively use a dask series or dataframe.

Any help would be appreciated


Solution

  • This might not be the right solution, but one possibility is to designate specific workers to perform specific computations. For example, let's separate the workers into two groups:

    # instantiate workers
    from distributed import Client
    c = Client(n_workers=5)
    
    # here the separation is done based on order
    # but custom logic can be implemented instead
    workers_pressure = list(c.scheduler_info()['workers'])[3:]
    workers_temperature = list(c.scheduler_info()['workers'])[:3]
    

    Now, for tasks that are related to pressure, we can designate workers that are related to pressure:

    data_pressure = [4,5,6]
    data_temperature = [1,2,3]
    
    # scatter data to pressure/temperature workers
    d_p = client.scatter(data_pressure, workers= workers_pressure)
    d_t = client.scatter(data_temperature, workers=workers_temperature)
    
    # submit computations to specific workers
    function_pressure = lambda x: x**2
    function_temperature = lambda x: x**2
    
    f_p = client.map(function_pressure, d_p, workers=workers_pressure)
    f_t = client.map(function_temperature, d_t, workers= workers_temperature)
    

    In the above snippet, workers designated to handle pressure data will be used to run the pressure computations.

    This won't scale well if you have a very heterogeneous set of tasks. If that is your situation, I'd first construct the task graph (DAG) and then let dask handle the most efficient assignment of workers to tasks.