Search code examples
time-seriesstl-decomposition

"How can I code for seasonal decomposing for many monthly time series in same time"


I want to decompose many monthly time series data into seasonal factor. After first trying the code below for 1 time series (that is bmix_e) the code is work.

decomposed = sm.tsa.seasonal_decompose(df.bmix_e.values, model='multiplicative', freq=12)

However after I added the second time series that is bmix_s the code did't work event I use the same code as above.

So,I would like to know

  1. How to code for seasonal decomposing for more than 2 time series?

  2. After decomposing the series how I can get the average of monthly seasonal factor of each time series in data frame form (because I get the result in array form according to the code as decomposed.seasonal).


Solution

  • When forecasting periodic data, it is useful to normalize the seasonality out of a dataset. It has become easier to do this with the development of Seasonal Autoregressive Integrated Moving Average, or SARIMA. With the adjustment of hyperparameters, an accurate model can be created.

    Code pasted below (source)

    # grid search sarima hyperparameters
    from math import sqrt
    from multiprocessing import cpu_count
    from joblib import Parallel
    from joblib import delayed
    from warnings import catch_warnings
    from warnings import filterwarnings
    from statsmodels.tsa.statespace.sarimax import SARIMAX
    from sklearn.metrics import mean_squared_error
    
    # one-step sarima forecast
    def sarima_forecast(history, config):
        order, sorder, trend = config
        # define model
        model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
        # fit model
        model_fit = model.fit(disp=False)
        # make one step forecast
        yhat = model_fit.predict(len(history), len(history))
        return yhat[0]
    
    # root mean squared error or rmse
    def measure_rmse(actual, predicted):
        return sqrt(mean_squared_error(actual, predicted))
    
    # split a univariate dataset into train/test sets
    def train_test_split(data, n_test):
        return data[:-n_test], data[-n_test:]
    
    # walk-forward validation for univariate data
    def walk_forward_validation(data, n_test, cfg):
        predictions = list()
        # split dataset
        train, test = train_test_split(data, n_test)
        # seed history with training dataset
        history = [x for x in train]
        # step over each time-step in the test set
        for i in range(len(test)):
            # fit model and make forecast for history
            yhat = sarima_forecast(history, cfg)
            # store forecast in list of predictions
            predictions.append(yhat)
            # add actual observation to history for the next loop
            history.append(test[i])
        # estimate prediction error
        error = measure_rmse(test, predictions)
        return error
    
    # score a model, return None on failure
    def score_model(data, n_test, cfg, debug=False):
        result = None
        # convert config to a key
        key = str(cfg)
        # show all warnings and fail on exception if debugging
        if debug:
            result = walk_forward_validation(data, n_test, cfg)
        else:
            # one failure during model validation suggests an unstable config
            try:
                # never show warnings when grid searching, too noisy
                with catch_warnings():
                    filterwarnings("ignore")
                    result = walk_forward_validation(data, n_test, cfg)
            except:
                error = None
        # check for an interesting result
        if result is not None:
            print(' > Model[%s] %.3f' % (key, result))
        return (key, result)
    
    # grid search configs
    def grid_search(data, cfg_list, n_test, parallel=True):
        scores = None
        if parallel:
            # execute configs in parallel
            executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
            tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
            scores = executor(tasks)
        else:
            scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
        # remove empty results
        scores = [r for r in scores if r[1] != None]
        # sort configs by error, asc
        scores.sort(key=lambda tup: tup[1])
        return scores
    
    # create a set of sarima configs to try
    def sarima_configs(seasonal=[0]):
        models = list()
        # define config lists
        p_params = [0, 1, 2]
        d_params = [0, 1]
        q_params = [0, 1, 2]
        t_params = ['n','c','t','ct']
        P_params = [0, 1, 2]
        D_params = [0, 1]
        Q_params = [0, 1, 2]
        m_params = seasonal
        # create config instances
        for p in p_params:
            for d in d_params:
                for q in q_params:
                    for t in t_params:
                        for P in P_params:
                            for D in D_params:
                                for Q in Q_params:
                                    for m in m_params:
                                        cfg = [(p,d,q), (P,D,Q,m), t]
                                        models.append(cfg)
        return models
    
    if __name__ == '__main__':
        # define dataset
        data = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
        print(data)
        # data split
        n_test = 4
        # model configs
        cfg_list = sarima_configs()
        # grid search
        scores = grid_search(data, cfg_list, n_test)
        print('done')
        # list top 3 configs
        for cfg, error in scores[:3]:
            print(cfg, error)