Search code examples
pandasapache-sparkfacebook-prophet

Forecasting with facebook prophet using Pandas udf in spark


I am trying to use Facebook prophet in spark in an Zeppelin environment and I have tried to follow the exact steps from https://github.com/facebook/prophet/issues/517, However, i get errors like below. I am simply not sure what am I to correct here or how to debug this.

My data contains a datetime features called ds, volume that I want to predict y and the segment and I am trying to build a model for each segment.

File"/hadoop14/yarn/nm/usercache/khasbab/appcache/application_1588090646020_2412/container_e168_1588090646020_2412_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o3737.showString.

%livycd.pyspark

from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
from fbprophet import Prophet
from datetime import datetime
import pandas as pd


result_schema = StructType([

    StructField('segment', StringType(), True),
    StructField('ds', TimestampType(), True),
    StructField('trend', ArrayType(DoubleType()), True),
    StructField('trend_upper', ArrayType(DoubleType()), True),
    StructField('trend_lower', ArrayType(DoubleType()), True),
    StructField('yearly', ArrayType(DoubleType()), True),
    StructField('yearly_upper', ArrayType(DoubleType()), True),
    StructField('yearly_lower', ArrayType(DoubleType()), True),
    StructField('yhat', ArrayType(DoubleType()), True),
    StructField('yhat_upper', ArrayType(DoubleType()), True),
    StructField('yhat_lower', ArrayType(DoubleType()), True),
    StructField('multiplicative_terms', ArrayType(DoubleType()), True),
    StructField('multiplicative_terms_upper', ArrayType(DoubleType()), True),
    StructField('multiplicative_terms_lower', ArrayType(DoubleType()), True),
    StructField('additive_terms', ArrayType(DoubleType()), True),
    StructField('additive_terms_upper', ArrayType(DoubleType()), True),
    StructField('additive_terms_lower', ArrayType(DoubleType()), True),

    ])

@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_loans(history_pd):

    # instantiate the model, configure the parameters
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=False,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    #history_pd['ds'] = pd.to_datetime(history_pd['ds'], errors = 'coerce', format = '%Y-%m-%d')
    #.apply(lambda x: datetime.strptime(x,'%Y-%m-%d')) 

    # fit the model
    model.fit(history_pd.loc[:,['ds','y']])

    # configure predictions
    future_pd = model.make_future_dataframe(
        periods=20,
        freq='W')

    # make predictions
    results_pd = model.predict(future_pd)

    # return predictions
    return pd.DataFrame({

        'segment':history_pd['segment'].values[0],
        'ds': [results_pd.loc[:,'ds'].values.tolist()],
        'trend': [results_pd.loc[:,'ds'].values.tolist()],
        'trend_upper': [results_pd.loc[:,'trend_upper'].values.tolist()],
        'trend_lower': [results_pd.loc[:,'trend_lower'].values.tolist()],
        'yearly': [results_pd.loc[:,'yearly'].values.tolist()],
        'yearly_upper': [results_pd.loc[:,'yearly_upper'].values.tolist()],
        'yearly_lower': [results_pd.loc[:,'yearly_lower'].values.tolist()],
        'yhat': [results_pd.loc[:,'yhat'].values.tolist()],
        'yhat_upper': [results_pd.loc[:,'yhat_upper'].values.tolist()],
        'yhat_lower': [results_pd.loc[:,'yhat_lower'].values.tolist()],
        'multiplicative_terms': [results_pd.loc[:,'multiplicative_terms'].values.tolist()],
        'multiplicative_terms_upper': [results_pd.loc[:,'multiplicative_terms_upper'].values.tolist()],
        'multiplicative_terms_lower': [results_pd.loc[:,'multiplicative_terms_lower'].values.tolist()],
        'additive_terms': [results_pd.loc[:,'additive_terms'].values.tolist()],
        'additive_terms_upper': [results_pd.loc[:,'additive_terms_upper'].values.tolist()],
        'additive_terms_lower': [results_pd.loc[:,'additive_terms_lower'].values.tolist()]

    })
    #return pd.concat([pd.DataFrame(results_pd),pd.DataFrame(history_pd[['segment']].values[0])], axis = 1)




results =df3.groupBy('segment').apply(forecast_loans)


results.show()

Solution

  • I have tweaked my code to the following and downgraded to pyarrow 0.14 as suggested here Pandas scalar UDF failing, IllegalArgumentException and it all worked! I believe downgrading pyarrow to 0.14 was the key for spark 2.x versions as commented on stackoverflow.

    The comment says the following "The issue is not with pyarrow's new release, it is spark which has to upgrade and become compatible with pyarrow.(i am afraid we have to wait for spark 3.0 to use the latest pyarrow)"

    %livycd.pyspark
    
    from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
    from pyspark.sql.functions import current_date
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from fbprophet import Prophet
    from datetime import datetime
    import pandas as pd
    
    
    result_schema = StructType([
    
        StructField('segment', StringType(), True),
        StructField('ds', TimestampType(), True),
        StructField('trend', DoubleType(), True),
        StructField('trend_upper', DoubleType(), True),
        StructField('trend_lower', DoubleType(), True),
        StructField('yearly', DoubleType(), True),
        StructField('yearly_upper', DoubleType(), True),
        StructField('yearly_lower', DoubleType(), True),
        StructField('yhat', DoubleType(), True),
        StructField('yhat_upper', DoubleType(), True),
        StructField('yhat_lower', DoubleType(), True),
        StructField('multiplicative_terms', DoubleType(), True),
        StructField('multiplicative_terms_upper', DoubleType(), True),
        StructField('multiplicative_terms_lower', DoubleType(), True),
        StructField('additive_terms', DoubleType(), True),
        StructField('additive_terms_upper', DoubleType(), True),
        StructField('additive_terms_lower', DoubleType(), True),
    
        ])
    
    
    @pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
    def forecast_loans(df):
    
        def prophet_model(df,test_start_date):
    
            df['ds'] = pd.to_datetime(df['ds'])
    
            # train
            ts_train = (df
                        .query('ds < @test_start_date')
                        .sort_values('ds')
                        )
            # test
            ts_test = (df
                       .query('ds >= @test_start_date')
                       .sort_values('ds')
                       .drop('y', axis=1)
                       )
    
            print(ts_test.columns)
    
            # instantiate the model, configure the parameters
            model = Prophet(
                interval_width=0.95,
                growth='linear',
                daily_seasonality=False,
                weekly_seasonality=False,
                yearly_seasonality=True,
                seasonality_mode='multiplicative'
            )
    
            # fit the model
    
            model.fit(ts_train.loc[:,['ds','y']])
    
            # configure predictions
            future_pd = model.make_future_dataframe(
                periods=len(ts_test),
                freq='W')
    
            # make predictions
            results_pd = model.predict(future_pd)
            results_pd = pd.concat([results_pd,df['segment']],axis = 1)
    
            return pd.DataFrame(results_pd, columns = result_schema.fieldNames())
    
        # return predictions
        return prophet_model(df, test_start_date= '2019-03-31')
    
    
    
    
    results =df3.groupBy('segment').apply(forecast_loans)