Search code examples
pythonmultithreadingnumpyconcurrencymultiprocessing

Make a function containing a for loop execute concurrently


I want to incorporate either process-based concurrency or thread-based concurrency in the snippet of code below (contains a file-read, calculate, and file-write sections) - that would be critical when (i) operating on a larger number of trading instruments, and/or (ii) when the need to include additional time-frame based signal lines to be written into the .npy files arises. If possible. incorporating multithreading within multiprocessing would be a great bonus, but I'll be happy to take either on its own.

import numpy as np
import pandas as pd 
import MetaTrader5 as mt5

marketFeedArrayLength = 364320
timeFrame = mt5.TIMEFRAME_M1

symbols_MultiTFMACD = ['AUDCAD', 'AUDJPY', 'AUDNZD', 'AUDUSD', 'CADJPY', 'EURAUD', 'EURCAD']

m1_macdLine_fastEMA_period_1 = 1
m1_macdLine_slowEMA_period_480 = 480
m1_macdLine_slowEMA_period_840 = 840

def multiTF_MACD():
 
    # Using for loop to iterate over each selected currency symbol.
    for i in symbols_MultiTFMACD:
        
        "Assign the relevant data column ('Close' price i.e. index 4) to a variable."
        closeDataForEMA = np.load("%s.npy" % (i))[:,4]
        "The tailored MACD data:"
        m1_closeDataForEMA = np.flip(closeDataForEMA[-1::-timeFrame])
        
        "Calculate the MACDs." 
        weightsFast_1 = np.exp(np.linspace(-1., 0., m1_macdLine_fastEMA_period_1))
        weightsSlow_480 = np.exp(np.linspace(-1., 0., m1_macdLine_slowEMA_period_480))
        weightsSlow_840 = np.exp(np.linspace(-1., 0., m1_macdLine_slowEMA_period_840))
            # Normalize the weights.
        weightsFast_1 /= weightsFast_1.sum()
        weightsSlow_480 /= weightsSlow_480.sum()
        weightsSlow_840 /= weightsSlow_840.sum()
            # Third - Calculate the EMAs.
        m1_fastEMA_1 = np.pad(np.convolve(weightsFast_1, m1_closeDataForEMA, mode='valid'), (m1_macdLine_fastEMA_period_1 - 1, 0))
        m1_slowEMA_480 = np.pad(np.convolve(weightsSlow_480, m1_closeDataForEMA, mode='valid'), (m1_macdLine_slowEMA_period_480 - 1, 0))
        m1_slowEMA_840 = np.pad(np.convolve(weightsSlow_840, m1_closeDataForEMA, mode='valid'), (m1_macdLine_slowEMA_period_840 - 1, 0))
            # Get the MACD lines: 
        m1_macdLine_1_480 = m1_fastEMA_1 - m1_slowEMA_480
        m1_macdLine_1_840 = m1_fastEMA_1 - m1_slowEMA_840      
         
        "Append the the MACD lines to columns in the .npy rate files"
            # Create a file path capturing all the names.
        file_path = "%s.npy" % (i)
            # (i) Use numpy.load() to call the .npy rate files into Pandas.DataFrame().            
        npyToPandas = pd.DataFrame(np.load(file_path)) 
            # (ii) Add the MACD lines as columns into their respective currency data frames.
        npyToPandas['8'] = m1_macdLine_1_480
        npyToPandas['9'] = m1_macdLine_1_840
            # (iii) Covert the DF to an array and save as .npy format.  
        stackedFile_MultiTFMACD = np.save(i, 
                                          (pd.DataFrame(npyToPandas).to_numpy()), 
                                          allow_pickle=True, fix_imports=False)       
                
multiTF_MACD()


Solution

  • Numpy and pandas will already do a fair bit of threading under the hood and you're unlikely to get much benefit from trying to do more on top of that. You might be able to get more out of running each symbol in a different process but it's quite possible that's not your bottleneck.

    If you take everything from your for i in symbols_MultiTFMACD: loop and convert it to a function like def process_symbol(symbol):, you could pretty easily farm it out to separate processes with concurrent.futures

    import concurrent.futures
    
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for symbol in symbols_MultiTFMACD:
            executor.submit(process_symbol, symbol)
    

    If that doesn't work, I would suggest you profile the code to find your real bottleneck or look into using a distributed processing framework like Spark.