Search code examples
pythonpython-3.xpython-multiprocessingta-lib

Unable to return a specific output from multiprocessing pool


I am trying to use the python multiprocessing library to call a function (calc_indicator) that takes an array of string names of technical indicators that are in the ta-lib and then call another function(technical_indicators) to calculate the values with the string names list passed to the first function (cal_indicator). This is what I would like the output to be like:

Required Output

When I run the below code:

import multiprocessing as mp
import pandas as pd
import numpy as np
from talib import abstract

dataset = pd.read_csv('Data/Currencies/COST.csv')
working_frame = dataset.drop(['Date', 'Adj Close'],axis=1)

def technical_indicators(currency_dataframe, indicator):
    nothing_found = 'Indicator Not Found'

    inputs = {
            'open':currency_dataframe['Open'],
            'high':currency_dataframe['High'],
            'low':currency_dataframe['Low'],
            'close':currency_dataframe['Close'],
            'volume':currency_dataframe['Volume']
    }

    DEMA = abstract.DEMA(inputs, timeperiod=20)
    EMA = abstract.EMA(inputs, timeperiod=20)
    KAMA = abstract.KAMA(inputs, timeperiod=20)
    MA = abstract.MA(inputs, timeperiod=20, matype=0)

    ATR = abstract.ATR(inputs, timeperiod=20)
    NATR = abstract.NATR(inputs, timeperiod=20)
    TRANGE = abstract.TRANGE(inputs)

    if(indicator == 'DEMA'):
       return DEMA
    elif(indicator == 'EMA'):
        return EMA
    elif(indicator == 'KAMA'):
        return KAMA
    elif(indicator == 'MA'):
        return MA
    elif(indicator == 'ATR'):
        return ATR
    elif(indicator == 'NATR'):
        return NATR
    elif(indicator == 'TRANGE'):
        return TRANGE
    else:
        return nothing_found

list0 = ['DEMA', 'EMA', 'KAMA', 'MA']
list1 = ['ATR', 'NATR', 'TRANGE']

calc_frame = pd.DataFrame()

def calc_indicator(data_list):
    for i in range(len(data_list)):
        tindicator = technical_indicators(working_frame, data_list[i])
        calc_frame[data_list[i]] = tindicator

    return calc_frame

cal_ = calc_indicator(list0)

pool = mp.Pool(mp.cpu_count())
res0 = pool.map(calc_indicator, list0)
res1 = pool.map(calc_indicator, list1)

I get this output:

D
E
K
M
M
A
E
A
M
A
M
A
A
A
T
N
T
R
A
A
N
R
G
T
E
R

Link for the data I am using: daily prices


Solution

  • The first problem is that your calc_indicator function expects a list of strings. But the pool.map() api consumes the list, and calc_indicator() is called with the individual strings ( e.g. calc_indicator('DEMA')), so calc_indicator is indexing into the characters of the string, instead of indexing into the list.

    The second problem, is that you are trying to update a single object, calc_frame, from multiple subprocesses. But each subprocesses has its own memory space, so the calc_frame in the main process will not be affected by the subprocesses.

    Instead, make the subprocesses return the technical_indicator() result via pool.map(), and iterate over pool.map() to update calc_frame with each result in turn:

    def one_calc_indicator(indicator):
        return indicator, technical_indicators(working_frame, indicator)
    
    pool = mp.Pool(mp.cpu_count())
    for indicator, result in pool.map(one_calc_indicator, list0):
        calc_frame[indicator] = result