Search code examples
pythonforecastingrpy2dask-distributeddask-delayed

How to use Dask Delayed with rpy2?


I'm attempting to use Dask, specifically dask delayed to generate time series forecast in parallel using rpy2 and the forecast package in R. My process works when only using 1 core but I get a

NotImplementedError: Conversion 'py2ri' not defined for objects of type '<class 'pandas.core.series.Series'>'

when using dask delayed with more than 1 core. The code used to reproduce this issue is shown below:

from rpy2.robjects.packages import importr
from rpy2.robjects import pandas2ri
import rpy2.robjects as robjects
#get ts object as python object
ts=robjects.r('ts')
pandas2ri.activate()

import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
import dask

#start cluster:

cluster = LocalCluster()
client = Client(cluster)

#define R function to generate time series in R from python series
def r_vecs(time_series):

    rdata=ts(time_series,frequency=12)

    return rdata

#Generate DataFrame of time series
rows = 24
ncolumns = 5
column_names = ['ts1','ts2','ts3','ts4','ts5']
df = pd.DataFrame(np.random.randint(0,10000,size=(rows, ncolumns)), columns=column_names)
df_date_index = pd.date_range(end='2018-04-01', periods=rows, freq='MS')
df.index = df_date_index 

Use dask delayed to loop through each time series in the dataframe and turn into a time series

Works:

output_fc_R = []
for i in df:
    forecasted_series = r_vecs(df[i])
    output_fc_R.append(forecasted_series)

output_fc_R

Doesn't work:

#Try to forecast in parallel with Dask
output_fc_R = []
for i in df:
    forecasted_series = dask.delayed(r_vecs)(df[i])
    output_fc_R.append(forecasted_series)

total = dask.delayed(output_fc_R).compute()

Solution

  • I'm still not sure what exactly causes the issue, but when I first explicitly convert the time series to an R Intvector object, things seem to work correctly.

    def r_vecs(time_series):
    
        time_series = robjects.IntVector(time_series)
        rdata=ts(time_series,frequency=12)
    
        return rdata
    

    In my original post, there was also different issues related to fitting an R model in the forecast package by evaluating a python string. If you want to follow the full thread see: https://github.com/dask/distributed/issues/1939