Search code examples
pythondictionarymultiprocessingtuplespool

mapping a list of tuples to the multiprocessing pool object in python


I am having problems with code in the following format and assume that the error is to do with how I am trying to access the elements in each tuple.

from numberer import numberify
from sys import argv
infile=argv[1]
from multiprocessing import Pool
pool=Pool(15)
import os

def chunker(fob):
    chunkbegin=0
    filesize=os.stat(fob.name).st_size
    while chunkbegin < filesize:
        chunkend=chunkbegin+100000
        fob.seek(chunkend)
        fob.readline()
        chunkend=fob.tell()
        yield (chunkbegin,chunkend)
        chunkbegin=chunkend

def run(tup, fob):
    fob.seek(tup[0])
    length=int(tup[1])-int(tup[0])
    lines=fob.readlines(length)
    for line in lines:
        print(line)

fob=open(infile)
chunks=[x for x in chunker(fob)]
pool.map(run, (chunks, fob))

The exact error is:

Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'run' on <module '__main__' from 'pretonumber.py'>

1) So when map function maps the tuples to function; I assume that these elements should be called as if they are ordinary tuples? IE with one index?

2) The element chunks that I am passing to the function run: is a list of tuples: chunks=[(0,100000),(100000,200000)....] as created by the generator chunker.

Thank you.


Solution

  • The map method takes an iterable of argument. Each element of the iterable is passed to one instance of run. Since your iterable is the tuple (chunks, fob), this is going to run two tasks, calling run(chunks) in one task, and run(fob) in another.


    What I think you want to do is to run one task for each chunk in chunks, calling run(chunk, fob).

    So, first, you need an iterable that yields (chunk, fob) once per chunk, e.g., ((chunk, fob) for chunk in chunks).


    But this still isn't going to work, because it's going to call run with a single argument, the 2-tuple (chunk, fob), not with two arguments. You can fix this by rewriting or wrapping run to take a single 2-tuple instead of two separate arguments, or you can just use starmap instead of map, which does that wrapping for you.


    But this still isn't going to work. You're trying to pass an open file object between processes, and multiprocessing can't do that.

    Since you're using the fork method, you can sometimes get away with inheriting the file object from the parent rather than passing it, but the details are complicated, and you really need to read the Programming guidelines for multiprocessing and understand how file descriptor inheritance works on Unix.

    Since you want each child to have its own independent copy of the file object so they can all seek around in it, the easiest solution is to just pass the filename and have them open it themselves:

    def run(tup, path):
        with open(path) as fob:
            fob.seek(tup[0])
            length=int(tup[1])-int(tup[0])
            lines=fob.readlines(length)
            for line in lines:
                print(line)
    
    fob = open(infile)
    chunks = [x for x in chunker(fob)]
    args = ((chunk, infile) for chunk in chunks)
    pool.starmap(run, args)
    

    Meanwhile, now that we're sure we're not relying on fork behavior, it's probably a good idea to write the code to work with any start method. This means putting the top-level code into a __main__ block. And, while we're at it, let's make sure we close the file once we're done with it:

    # imports
    # function definitions
    if __name__ == '__main__':
        infile = argv[1]
        pool = Pool(15)
        with open(infile) as fob:
            chunks = [x for x in chunker(fob)]
        args = ((chunk, infile) for chunk in chunks)
        pool.starmap(run, args)
    

    You may still have other errors in your code, but I think this exhausts the multiprocessing ones.