I am having problems with code in the following format and assume that the error is to do with how I am trying to access the elements in each tuple.
from numberer import numberify
from sys import argv
infile=argv[1]
from multiprocessing import Pool
pool=Pool(15)
import os
def chunker(fob):
chunkbegin=0
filesize=os.stat(fob.name).st_size
while chunkbegin < filesize:
chunkend=chunkbegin+100000
fob.seek(chunkend)
fob.readline()
chunkend=fob.tell()
yield (chunkbegin,chunkend)
chunkbegin=chunkend
def run(tup, fob):
fob.seek(tup[0])
length=int(tup[1])-int(tup[0])
lines=fob.readlines(length)
for line in lines:
print(line)
fob=open(infile)
chunks=[x for x in chunker(fob)]
pool.map(run, (chunks, fob))
The exact error is:
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'run' on <module '__main__' from 'pretonumber.py'>
1) So when map function maps the tuples to function; I assume that these elements should be called as if they are ordinary tuples? IE with one index?
2) The element chunks that I am passing to the function run: is a list of tuples: chunks=[(0,100000),(100000,200000)....] as created by the generator chunker.
Thank you.
The map
method takes an iterable of argument. Each element of the iterable is passed to one instance of run
. Since your iterable is the tuple (chunks, fob)
, this is going to run two tasks, calling run(chunks)
in one task, and run(fob)
in another.
What I think you want to do is to run one task for each chunk
in chunks
, calling run(chunk, fob)
.
So, first, you need an iterable that yields (chunk, fob)
once per chunk, e.g., ((chunk, fob) for chunk in chunks)
.
But this still isn't going to work, because it's going to call run
with a single argument, the 2-tuple (chunk, fob)
, not with two arguments. You can fix this by rewriting or wrapping run
to take a single 2-tuple instead of two separate arguments, or you can just use starmap
instead of map
, which does that wrapping for you.
But this still isn't going to work. You're trying to pass an open file object between processes, and multiprocessing
can't do that.
Since you're using the fork
method, you can sometimes get away with inheriting the file object from the parent rather than passing it, but the details are complicated, and you really need to read the Programming guidelines for multiprocessing
and understand how file descriptor inheritance works on Unix.
Since you want each child to have its own independent copy of the file object so they can all seek
around in it, the easiest solution is to just pass the filename and have them open
it themselves:
def run(tup, path):
with open(path) as fob:
fob.seek(tup[0])
length=int(tup[1])-int(tup[0])
lines=fob.readlines(length)
for line in lines:
print(line)
fob = open(infile)
chunks = [x for x in chunker(fob)]
args = ((chunk, infile) for chunk in chunks)
pool.starmap(run, args)
Meanwhile, now that we're sure we're not relying on fork
behavior, it's probably a good idea to write the code to work with any start method. This means putting the top-level code into a __main__
block. And, while we're at it, let's make sure we close the file once we're done with it:
# imports
# function definitions
if __name__ == '__main__':
infile = argv[1]
pool = Pool(15)
with open(infile) as fob:
chunks = [x for x in chunker(fob)]
args = ((chunk, infile) for chunk in chunks)
pool.starmap(run, args)
You may still have other errors in your code, but I think this exhausts the multiprocessing
ones.