Search code examples
pythonparallel-processingtime-seriesforecastingdask

How to efficiently parallelize time series forecasting using dask?


I'm trying to parallelize time series forecasting in python using dask. The format of the data is that each time series is a column and they have a common index of monthly dates. I have a custom forecasting function that returns a time series object with the fitted and forecasted values. I want to apply this function across all columns of a dataframe (all time series) and return a new dataframe with all these series to be uploaded to a DB. I've gotten the code to work by running:

data = pandas_df.copy()
ddata = dd.from_pandas(data, npartitions=1)
res = ddata.map_partitions(lambda df: df.apply(forecast_func, 
    axis=0)).compute(get=dask.multiprocessing.get)

My question is, is there a way in Dask to partition by column instead of row, since in this use case I need to keep the ordered time index as is for the forecasting function to work correctly.

If not, how would I re-format the data to allow efficient large-scale forecasting to be possible, and still return the data in the format I need to then push to a DB?

example of data format


Solution

  • I've used the dask.delayed solution and it's working really well, it takes about 1/3 of the time just using a local cluster.

    For anyone interested the solution I've implemented:

    from dask.distributed import Client, LocalCluster
    import pandas as pd
    import dask
    
    cluster = LocalCluster(n_workers=3,ncores=3)
    client = Client(cluster)
    
    #get list of time series back
    output = []
    for i in small_df:
        forecasted_series = dask.delayed(custom_forecast_func)(small_df[i])
        output.append(forecasted_series)
        
    total = dask.delayed(output).compute()
    
    #combine list of series into 1 dataframe
    full_df = pd.concat(total,ignore_index=False,keys=small_df.columns,names=['time_series_names','Date'])
    final_df = full_df.to_frame().reset_index()
    final_df.columns = ['time_series_names','Date','value_variable']
    final_df.head()
    

    This gives you the melted dataframe structure so if you want the series to be the columns you can transform it with

    pivoted_df = final_df.pivot(index='Date', columns='time_series_names', values='value_variable')
    

    small_df is in this format in pandas dataframe with Date being the index