Search code examples
pythonpandasdaskfacebook-prophet

Dask and fbprophet


I'm trying to use dask and the fbprophet library together and I'm either doing something wrong or having unexpected performance problems.

import dask.dataframe as dd
import datetime as dt
import multiprocessing as mp 
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None
from fbprophet import Prophet
import time
ncpu = mp.cpu_count()

def parallel_pd(fun, vec, pool = ncpu-1):
    with mp.Pool(pool) as p:
        res = p.map(fun,vec)
    return(res)

def forecast1dd(ts):
    time.sleep(0.1)
    return ts["y"].max()

def forecast1mp(key):
    ts = df[df["key"]==key]
    time.sleep(0.1)
    return ts["y"].max()

def forecast2dd(ts):
    future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
                                                  periods=7, freq="D")})
    key = ts.name
    model = Prophet(yearly_seasonality=True)
    model.fit(ts)
    forecast = model.predict(future)
    future["yhat"] = forecast["yhat"]
    future["key"] =  key
    return future.as_matrix()

def forecast2mp(key):
    ts = df[df["key"]==key]
    future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1),
                                                  periods=7, freq="D")})
    model = Prophet(yearly_seasonality=True)
    model.fit(ts)
    forecast = model.predict(future)
    future["yhat"] = forecast["yhat"]
    future["key"] =  key
    return future.as_matrix()

In one side I have a custom function which runs in about 0.1 s so forecast1dd and forecast1mp are simulating my function and for the following dataframe

N = 2*365
key_n = 5000
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
                   "y":np.random.normal(100,20,N),
                  "key":np.repeat(str(k),N)}) for k in range(key_n)])
keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)

I obtain (respectively)

%%time
grp = ddf.groupby("key").apply(forecast1dd, meta=pd.Series(name="s"))
df1dd = grp.to_frame().compute()
CPU times: user 7.7 s, sys: 400 ms, total: 8.1 s
Wall time: 1min 8s

%%time
res = parallel_pd(forecast1mp,keys)
CPU times: user 820 ms, sys: 360 ms, total: 1.18 s
Wall time: 10min 36s

In the first case the cores are not used at 100% but the performances are in line with my real situation. It is easy to check, using a line profiler, that the culprit for slow performance in second case is ts = df[df["key"]==key] and things get worst if we have more keys.

So till now I'm happy with dask. But whenever I try to use fbprophet things change. Here I use fewer keys but unlikely the previous case dask performance are always worst than multiprocessing.

N = 2*365
key_n = 200
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"),
                   "y":np.random.normal(100,20,N),
                  "key":np.repeat(str(k),N)}) for k in range(key_n)])

keys = df.key.unique()
df = df.sample(frac=1).reset_index(drop=True)
ddf = dd.from_pandas(df, npartitions=ncpu*2)

%%time
grp = ddf.groupby("key").apply(forecast2dd, 
meta=pd.Series(name="s")).to_frame().compute()
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])
CPU times: user 3min 42s, sys: 15 s, total: 3min 57s
Wall time: 3min 30s

%%time
res = parallel_pd(forecast2mp,keys)
df2mp = pd.concat([pd.DataFrame(a) for a in res])
CPU times: user 76 ms, sys: 160 ms, total: 236 ms
Wall time: 39.4 s

Now my questions are:

  • How can I improve the performance of prophet with dask?
  • What should I do to have dask using cores at 100%?

Solution

  • I suspect that Prophet is holding the GIL, so when computing ddf.groupby("key").apply(forecast2dd, meta=pd.Series(name="s"), only one thread can run Python code at once. Using multiprocessing can sidestep this, at the cost of having to copy your data ncpu times. This should have similar runtime to your parallel_pd function.

    %%time
    with dask.set_options(get=dask.multiprocessing.get):
        grp = ddf.groupby("key").apply(forecast2dd, 
            meta=pd.Series(name="s")).to_frame().compute()
    
    df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values])
    
    CPU times: user 2.47 s, sys: 251 ms, total: 2.72 s
    Wall time: 1min 27s
    

    You could try asking the Prophet developers if they need to hold the GIL. I suspect that the issue is in PyStan, and that they probably don't need the GIL when the actual Stan solvers are running. There's a Github issue here


    Side note: since your sample forecast1dd is a an aggregation, it can be run much more quickly using a dd.Aggregation:

    %%time
    
    def forcast1dd_chunk(ts):
        time.sleep(0.1)
        return ts.max()
    
    def forecast1dd_agg(ts):
        return ts.max()
    
    f1dd = dd.Aggregation("forecast1dd", forcast1dd_chunk, forecast1dd_agg)
    
    grp = ddf.groupby("key")[['y']].agg(f1dd)
    x = grp.compute()
    
    CPU times: user 59.5 ms, sys: 5.13 ms, total: 64.7 ms
    Wall time: 355 ms
    

    Though this doesn't fit your actual problem, which isn't an aggregation.