Search code examples
pythonmultithreadingmultiprocessingpool

Am I using python pooling properly?


I have a very simple python script that reads in a ticker symbol from a list (6K+ long) and gets some data to flag unusual volume during the trading day.

If I just run a loop through each of the lines in the ticker file it takes hours to run.

Based on some googling I found a crude example of this multiprocessing and decided to try and implement it.

When I run the script it runs WAY faster but also has caused some really bizarre issues I can't see to figure out. Sometimes I'll get a redis circuitbreaker error or sometimes it'll just stop and hang near the end of the ticker file.

Any thoughts?

import yfinance as yf
import multiprocessing
import time
import logging

file = open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w')


def main():
    read_ticker_file()


def read_ticker_file():
    file1 = open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r')
    lines = file1.readlines()

    count = 0

    ticker_arr = []

    for line in lines:
        count += 1
        line = line.strip('\n')
        line = line.strip()
        ticker_arr.append(line)

    return ticker_arr


def get_historical_data(symbol):
    yahoo_ticker = yf.Ticker(symbol)
    historical = yf.download(symbol, period="max", interval="1d")
    average_volume_arr = historical['Volume']
    try:
        current_volume = yahoo_ticker.info['volume']
        sum_volume = 0
        for volume in average_volume_arr:
            sum_volume += volume
        average_volume = sum_volume / len(average_volume_arr)
        if current_volume > average_volume:
            volume_over_average = (current_volume - average_volume) / average_volume
            volume_over_average = "{:.2%}".format(volume_over_average)
            unusual_volume = (symbol + " - " + str(volume_over_average))
            print(unusual_volume)
            write_to_file(unusual_volume)
    except Exception as e:
        print(e)


def write_to_file(data):
    file.writelines(data + "\n")


if __name__ == '__main__':
    # start = time.time()
    inputs = read_ticker_file()

    pool = multiprocessing.Pool(processes=20)
    pool.map(get_historical_data, inputs)
    pool.close()
    pool.join()
    # end = time.time()
    # print(start - end)

Solution

  • As I mentioned in my comment above, I do not believe you are handling your output to unusual.txt correctly. The following at least should correct that issue by having your worker function just return the record or None back to the main process for writing. I am using method imap instead of map so that I can lazily process the return values as they are returned. They will also now be in the order of the symbols as they appeared in the input file. In case the input file has a large number of symbols, we should not use the default chunksize argument, so I have provided a function to calculate a suitable value.

    import yfinance as yf
    import multiprocessing
    import time
    
    def read_ticker_file():
        with open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r') as f:
            return [line.strip() for line in f]
    
    def get_historical_data(symbol):
        yahoo_ticker = yf.Ticker(symbol)
        historical = yf.download(symbol, period="max", interval="1d")
        average_volume_arr = historical['Volume']
        try:
            current_volume = yahoo_ticker.info['volume']
            sum_volume = 0
            for volume in average_volume_arr:
                sum_volume += volume
            average_volume = sum_volume / len(average_volume_arr)
            if current_volume > average_volume:
                volume_over_average = (current_volume - average_volume) / average_volume
                volume_over_average = "{:.2%}".format(volume_over_average)
                unusual_volume = (symbol + " - " + str(volume_over_average))
                print(unusual_volume)
                return unusual_volume
            else:
                return None
        except Exception as e:
            print(e)
            return None
    
    def compute_chunksize(iterable_size, pool_size):
        chunksize, remainder = divmod(iterable_size, 4 * pool_size)
        if remainder:
            chunksize += 1
        return chunksize
    
    if __name__ == '__main__':
        # start = time.time()
        inputs = read_ticker_file()
        pool = multiprocessing.Pool(processes=20)
        chunksize = compute_chunksize(len(inputs), 20)
        results = pool.imap(get_historical_data, inputs, chunskize=chunksize)
        with open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') as f:
            for result in results:
                if result:
                    print(result, file=f)
        # end = time.time()
        # print(start - end)
    

    An Alternate Approach

    Again, this is not necessarily going to solve your problem other than the one you have writing to unusual.txt, which the above code should also handle. But it is the way that I would code the solution and work from there:

    I am going "out on a limb" here not knowing how large a file tickers.txt is or that much about the yfinance package. But it seems fairly obvious that the call to yf.download and the file writing to unusual.txt, which I have already indicated in my comment above I don't believe is being handled correctly, are I/O bound "processes" that couldn't just as well be handled by a multithreading pool. It's not clear that what remains, i.e. the calculations and comparison of current_volume with average_volume are CPU-intensive enough to justify the use of the overhead of using multiprocessing for performing these calculations.

    The following splits up what was a single function, get_historical_data, which did all the downloading and computations, into two functions, load_historical_data_and_process and process_data. Both a large multithreading pool and multiprocessing pool are created. Worker function load_historical_data_and_process is called for each symbol in tickers.txt using the mutithreading pool with function imap, which is a "lazier" version of map. That is, in case the file is large, it is not necessary to read into memory all the symbols and first build a list required by map; a generator function can be used. Even if the file is small, there is no real disadvantage to using imap. load_historical_data_and_process will do all the downloading necessary. For doing calculations it will used the multithreading pool that was passed to it with blocking method apply to invoke worker function process_data. It would be interesting to also get an alternate timing achieved by directly calling function process_data instead of using the multiprocessing pool. There will, of course, be very little concurrency achieved across the threads in the execution of process_data in this case because of contention for the Global Interpreter Lock. But depending on how much actual CPU is involved in the execution of process_data (I have no way of knowing), the CPU you will have saved by not having to pass arguments and results across process boundaries may be offsetting.

    import yfinance as yf
    from multiprocessing.pool import ThreadPool, Pool
    from functools import partial
    import time
    
    def get_symbols():
        with open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r') as file1:
            for line in file1:
                yield line.strip()
    
    def load_historical_data_and_process(multiprocessing_pool, symbol):
        """ What I believe is I/O-intensive and so this runs in a multithreading pool: """
        try:
            historical = yf.download(symbol, period="max", interval="1d")
            yahoo_ticker = yf.Ticker(symbol)
            current_volume = yahoo_ticker.info['volume']
            # To call directly:
            #return process_data(symbol, historical, current_volume)
            return multiprocessing_pool.apply(process_data, args=(symbol, historical, current_volume))
        except Exception as e:
            print(e)
            return None
    
    
    def process_data(symbol, historical, current_volume):
        """ What I believe may warrant running in a multiprocessing pool: """
        average_volume_arr = historical['Volume']
        sum_volume = 0
        for volume in average_volume_arr:
            sum_volume += volume
        average_volume = sum_volume / len(average_volume_arr)
        if current_volume > average_volume:
            volume_over_average = (current_volume - average_volume) / average_volume
            volume_over_average = "{:.2%}".format(volume_over_average)
            unusual_volume_record = (symbol + " - " + str(volume_over_average))
            print(unusual_volume_record, flush=True)
            return unusual_volume_record
        else:
            return None
    
    if __name__ == '__main__':
        # start = time.time()
        # or some suitable thread pool size:
        with Pool(processes=20) as multiprocessing_pool, ThreadPool(processes=100) as thread_pool:
            # pass multiprocessing pool to thread pool worker get_historical_data for CPU-intensive processing
            worker = partial(load_historical_data_and_process, multiprocessing_pool)
            results = thread_pool.imap(worker, get_symbols())
            with open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') as f:
                for result in results:
                    if result:
                        print(result, file=f)
        # end = time.time()
        # print(start - end)