I'm attempting to use Dask, specifically dask delayed to generate time series forecast in parallel using rpy2 and the forecast package in R. My process works when only using 1 core but I get a
NotImplementedError: Conversion 'py2ri' not defined for objects of type '<class 'pandas.core.series.Series'>'
when using dask delayed with more than 1 core. The code used to reproduce this issue is shown below:
from rpy2.robjects.packages import importr
from rpy2.robjects import pandas2ri
import rpy2.robjects as robjects
#get ts object as python object
ts=robjects.r('ts')
pandas2ri.activate()
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
import dask
#start cluster:
cluster = LocalCluster()
client = Client(cluster)
#define R function to generate time series in R from python series
def r_vecs(time_series):
rdata=ts(time_series,frequency=12)
return rdata
#Generate DataFrame of time series
rows = 24
ncolumns = 5
column_names = ['ts1','ts2','ts3','ts4','ts5']
df = pd.DataFrame(np.random.randint(0,10000,size=(rows, ncolumns)), columns=column_names)
df_date_index = pd.date_range(end='2018-04-01', periods=rows, freq='MS')
df.index = df_date_index
Use dask delayed to loop through each time series in the dataframe and turn into a time series
Works:
output_fc_R = []
for i in df:
forecasted_series = r_vecs(df[i])
output_fc_R.append(forecasted_series)
output_fc_R
Doesn't work:
#Try to forecast in parallel with Dask
output_fc_R = []
for i in df:
forecasted_series = dask.delayed(r_vecs)(df[i])
output_fc_R.append(forecasted_series)
total = dask.delayed(output_fc_R).compute()
I'm still not sure what exactly causes the issue, but when I first explicitly convert the time series to an R Intvector object, things seem to work correctly.
def r_vecs(time_series):
time_series = robjects.IntVector(time_series)
rdata=ts(time_series,frequency=12)
return rdata
In my original post, there was also different issues related to fitting an R model in the forecast package by evaluating a python string. If you want to follow the full thread see: https://github.com/dask/distributed/issues/1939