*Resolved by using pool.map() instead of map_async() with multiprocessing.
Python 2.7 - How do I get gevent or multiprocessing to process multiple text files at the same time with the following code?
I've pasted both gevent and multiprocessing pool versions
From the log output it shows the files are being processed synchronously and using 'lsof' on Linux confirms only one file is being read at once time.
The files are stored on a enterprise class disk shelf containing an array of ultra320 drives.
I can open 4 files at once with the following function that just sleeps, just not when I try the process the open file line by line. Is the 'for line in file' loop preventing the next file from being opened somehow?
from time import sleep
from multiprocessing import Pool
def hold_open(log):
with open(log) as fh:
sleep(60)
pool = Pool(processes=4)
pool.map(hold_open, ['file1', 'file2', 'file3', 'file4'])
pool.join()
What am I doing wrong and what do I change to fix it?
2014-10-07 13:51:51,088 - __main__ - INFO - Found 23 files, duration: 0:00:00.000839
2014-10-07 13:51:51,088 - __main__ - INFO - Now analysing using 8 threads.....
2014-10-07 13:51:51,089 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Analysing...
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Finished analysing 41943107 bytes duration: 0:00:00.381875
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Analysing...
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Finished analysing 4017126 bytes duration: 0:00:01.725641
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Analysing...
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Finished analysing 4970479 bytes duration: 0:00:01.753434
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.46.05.txt.gz - Analysing...
from gevent import monkey; monkey.patch_all()
import os
import re
import gzip
import gevent
import logging
from gevent import pool
from datetime import datetime
log_level = logging.INFO
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
ch = logging.StreamHandler()
ch.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def get_time_range(log):
if not os.path.isfile(log):
logging.error("\x1b[31m%s - Something went wrong analysing\x1b[0m" % log)
return
date_regex = re.compile('^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}:\d{3})')
def process(lh):
start, end = str(), str()
logger.info("\x1b[33m%s - Analysing...\x1b[0m" % os.path.basename(log))
for line in lh:
date = date_regex.match(line)
if date:
if not start:
start = date.group(1)
end = date.group(1)
return start, end
start_time = datetime.now()
size = os.path.getsize(log)
if os.path.splitext(log)[1] == '.txt':
with open(log, 'r') as lh:
start, end = process(lh)
elif os.path.splitext(log)[1] == '.gz':
with gzip.open(log, 'r') as lh:
start, end = process(lh)
else:
return
meta = (log, size, start, end)
duration = datetime.now() - start_time
logger.info("\x1b[32m%s - Finished analysing %s bytes duration: %s\x1b[0m" % (os.path.basename(log), size, duration))
def run(directory, pool_size=8, cur=None):
start = datetime.now()
worker_pool = gevent.pool.Pool(int(pool_size))
files = list()
while True:
for log in os.listdir(directory):
if 'XSLog' and 'txt' in log:
files.append(os.path.join(directory, log))
logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start))
logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size)
for log in files:
worker_pool.spawn(get_time_range, log)
worker_pool.join()
duration = datetime.now() - start
logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration)
if __name__ == '__main__':
run('/path/to/log/files')
With multiprocessing:
def run(directory, pool_size=8, cur=None):
start = datetime.now()
worker_pool = gevent.pool.Pool(int(pool_size))
files = list()
pool = Pool(processes=pool_size, maxtasksperchild=2)
while True:
for log in os.listdir(directory):
if 'XSLog' and 'txt' in log:
files.append(os.path.join(directory, log))
logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start))
logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size)
# pool.map_async(get_time_range, files)
pool.map(get_time_range, files) # This fixed it.
pool.join()
duration = datetime.now() - start
logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration)
The amount of benefit you get from parallelism is limited here, because a significant chunk of your time is spent reading from disk. Disk I/O is sequential; it doesn't matter how many processes/greenlets you have, only one of them is going to be able to read from disk at a time. Now, aside from the time spent reading from disk, the rest of the time is spent doing a regular expression match on the lines being read. gevent
will not help you at all for this. It's a CPU-bound operation, and gevent
can't be used to parallelize CPU-bound operations. gevent
is useful for making blocking I/O operations non-blocking, which enables parallel I/O, but there is no blocking I/O going on here.
multiprocessing
can make the regex operations run in parallel, so I would expect it to perform a little bit better than the gevent
version. But in either case, you're probably not going to be much (if any) faster than a sequential version, because so much of your time is spent reading the file from disk.