I have a very simple python script that reads in a ticker symbol from a list (6K+ long) and gets some data to flag unusual volume during the trading day.
If I just run a loop through each of the lines in the ticker file it takes hours to run.
Based on some googling I found a crude example of this multiprocessing and decided to try and implement it.
When I run the script it runs WAY faster but also has caused some really bizarre issues I can't see to figure out. Sometimes I'll get a redis circuitbreaker error or sometimes it'll just stop and hang near the end of the ticker file.
Any thoughts?
import yfinance as yf
import multiprocessing
import time
import logging
file = open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w')
def main():
read_ticker_file()
def read_ticker_file():
file1 = open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r')
lines = file1.readlines()
count = 0
ticker_arr = []
for line in lines:
count += 1
line = line.strip('\n')
line = line.strip()
ticker_arr.append(line)
return ticker_arr
def get_historical_data(symbol):
yahoo_ticker = yf.Ticker(symbol)
historical = yf.download(symbol, period="max", interval="1d")
average_volume_arr = historical['Volume']
try:
current_volume = yahoo_ticker.info['volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume = (symbol + " - " + str(volume_over_average))
print(unusual_volume)
write_to_file(unusual_volume)
except Exception as e:
print(e)
def write_to_file(data):
file.writelines(data + "\n")
if __name__ == '__main__':
# start = time.time()
inputs = read_ticker_file()
pool = multiprocessing.Pool(processes=20)
pool.map(get_historical_data, inputs)
pool.close()
pool.join()
# end = time.time()
# print(start - end)
As I mentioned in my comment above, I do not believe you are handling your output to unusual.txt
correctly. The following at least should correct that issue by having your worker function just return the record or None
back to the main process for writing. I am using method imap
instead of map
so that I can lazily process the return values as they are returned. They will also now be in the order of the symbols as they appeared in the input file. In case the input file has a large number of symbols, we should not use the default chunksize argument, so I have provided a function to calculate a suitable value.
import yfinance as yf
import multiprocessing
import time
def read_ticker_file():
with open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r') as f:
return [line.strip() for line in f]
def get_historical_data(symbol):
yahoo_ticker = yf.Ticker(symbol)
historical = yf.download(symbol, period="max", interval="1d")
average_volume_arr = historical['Volume']
try:
current_volume = yahoo_ticker.info['volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume = (symbol + " - " + str(volume_over_average))
print(unusual_volume)
return unusual_volume
else:
return None
except Exception as e:
print(e)
return None
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
if __name__ == '__main__':
# start = time.time()
inputs = read_ticker_file()
pool = multiprocessing.Pool(processes=20)
chunksize = compute_chunksize(len(inputs), 20)
results = pool.imap(get_historical_data, inputs, chunskize=chunksize)
with open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') as f:
for result in results:
if result:
print(result, file=f)
# end = time.time()
# print(start - end)
An Alternate Approach
Again, this is not necessarily going to solve your problem other than the one you have writing to unusual.txt, which the above code should also handle. But it is the way that I would code the solution and work from there:
I am going "out on a limb" here not knowing how large a file tickers.txt is or that much about the yfinance
package. But it seems fairly obvious that the call to yf.download
and the file writing to unusual.txt, which I have already indicated in my comment above I don't believe is being handled correctly, are I/O bound "processes" that couldn't just as well be handled by a multithreading pool. It's not clear that what remains, i.e. the calculations and comparison of current_volume
with average_volume
are CPU-intensive enough to justify the use of the overhead of using multiprocessing for performing these calculations.
The following splits up what was a single function, get_historical_data
, which did all the downloading and computations, into two functions, load_historical_data_and_process
and process_data
. Both a large multithreading pool and multiprocessing pool are created. Worker function load_historical_data_and_process
is called for each symbol in tickers.txt using the mutithreading pool with function imap
, which is a "lazier" version of map
. That is, in case the file is large, it is not necessary to read into memory all the symbols and first build a list required by map
; a generator function can be used. Even if the file is small, there is no real disadvantage to using imap
. load_historical_data_and_process
will do all the downloading necessary. For doing calculations it will used the multithreading pool that was passed to it with blocking method apply
to invoke worker function process_data
. It would be interesting to also get an alternate timing achieved by directly calling function process_data
instead of using the multiprocessing pool. There will, of course, be very little concurrency achieved across the threads in the execution of process_data
in this case because of contention for the Global Interpreter Lock. But depending on how much actual CPU is involved in the execution of process_data
(I have no way of knowing), the CPU you will have saved by not having to pass arguments and results across process boundaries may be offsetting.
import yfinance as yf
from multiprocessing.pool import ThreadPool, Pool
from functools import partial
import time
def get_symbols():
with open("C:\\Users\\miner\\Desktop\\tickers.txt", 'r') as file1:
for line in file1:
yield line.strip()
def load_historical_data_and_process(multiprocessing_pool, symbol):
""" What I believe is I/O-intensive and so this runs in a multithreading pool: """
try:
historical = yf.download(symbol, period="max", interval="1d")
yahoo_ticker = yf.Ticker(symbol)
current_volume = yahoo_ticker.info['volume']
# To call directly:
#return process_data(symbol, historical, current_volume)
return multiprocessing_pool.apply(process_data, args=(symbol, historical, current_volume))
except Exception as e:
print(e)
return None
def process_data(symbol, historical, current_volume):
""" What I believe may warrant running in a multiprocessing pool: """
average_volume_arr = historical['Volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume_record = (symbol + " - " + str(volume_over_average))
print(unusual_volume_record, flush=True)
return unusual_volume_record
else:
return None
if __name__ == '__main__':
# start = time.time()
# or some suitable thread pool size:
with Pool(processes=20) as multiprocessing_pool, ThreadPool(processes=100) as thread_pool:
# pass multiprocessing pool to thread pool worker get_historical_data for CPU-intensive processing
worker = partial(load_historical_data_and_process, multiprocessing_pool)
results = thread_pool.imap(worker, get_symbols())
with open("C:\\Users\\miner\\Desktop\\unusual.txt", 'w') as f:
for result in results:
if result:
print(result, file=f)
# end = time.time()
# print(start - end)