Search code examples
pythonmultiprocessingjobs

how do I generate multiprocessing jobs inside a list without making duplicates?


How do I make a multiprocessor system work, which makes new jobs inside a list? I keep getting:

assert self._popen is None, 'cannot start a process twice' AttributeError: 'Worker' object has no attribute '_popen'

which makes sense, because I'm basically making multiple instances of the same job... so how do i fix that? do I need to set up a multiprocessor pool?

let me know if I need to clarify things more.

here is my multiprocessing class:

class Worker(multiprocessing.Process):

    def __init__(self, output_path, source, file_name):
        self.output_path = output_path
        self.source = source
        self.file_name = file_name

    def run(self):

        t = HTML(self.source)
        output = open(self.output_path+self.file_name+'.html','w')
        word_out = open(self.output_path+self.file_name+'.txt','w')  
        try:
            output.write(t.tokenized)

            for w in word_list:
                if w:
                    word_out.write(w+'\n')

            word_out.close()
            output.close()
            word_list = []

        except IndexError: 
            output.write(s[1])
            output.close()
            word_out.close()

        except UnboundLocalError:
            output.write(s[1])
            output.close()
            word_out.close()   

here is the class that implements this whole thing.

class implement(HTML):

    def __init__(self, input_path, output_path):
        self.input_path = input_path
        self.output_path = output_path

    def ensure_dir(self, directory):
        if not os.path.exists(directory):
            os.makedirs(directory)
        return directory    

    def prosses_epubs(self):
        for root, dirs, files in os.walk(self.input_path+"\\"):
            epubs = [root+file for file in files if file.endswith('.epub')]
            output_file = [self.ensure_dir(self.output_path+"\\"+os.path.splitext(os.path.basename(e))[0]+'_output\\') for e in epubs]

        count = 0 
        for e in epubs:
            epub = epubLoader(e)

            jobs = []

            # this is what's breaking everything right here. I'm not sure how to fix it. 
            for output_epub in epub.get_html_from_epub():                
                worker = Worker(output_file[count], output_epub[1], output_epub[0])
                jobs.append(worker)
                worker.start()

            for j in jobs:
                j.join()

            count += 1
        print "done!"


if __name__ == '__main__':
    test = implement('some local directory', 'some local directory')    
    test.prosses_epubs()

any help on this would be greatly appreciated. also let me know if something I'm doing in my code can be done better... I'm always trying to learn how to do things the best way.


Solution

    • Don't use classes when functions suffice. In this case, your classes each have essentially one meaty method, and the __init__ method is simply holding arguments used in the meaty method. You can make your code sleeker by just making the meaty method a function and passing the arguments directly to it.
    • Separate the idea of "jobs" (i.e. tasks) from the idea of "workers" (i.e. processes). Your machine has a limited number of processors, but the number of jobs could be far greater. You don't want to open a new process for each job since that might swamp your CPUs -- essentially fork-bombing yourself.
    • Use the with statement to guarantee that your file handles get closed. I see output.close() and word_out.close() getting called in three different places each. You can eliminate all those lines by using the with-statement, which will automatically close those file handles once Python leaves the with-suite.
    • I think a multiprocessing Pool would work well with your code. The jobs can be sent to the workers in the pool using pool.apply_async. Each call queues a job which will wait until a worker in the pool is available to handle it. pool.join() causes the main process to wait until all the jobs are done.
    • Use os.path.join instead of joining directories with '\\'. This will make your code compatible with non-Windows machines.
    • Use enumerate instead of manually implementing/incrementing counters. It's less typing and will make your code a bit more readable.

    The following code will not run since epubLoader, HTML, and word_list are not defined, but it may give you a clearer idea of what I am suggesting above:

    import multiprocessing as mp
    
    def worker(output_path, source, filename):
        t = HTML(source)
        output_path = output_path+filename
        output = open(output_path+'.html', 'w')
        word_out = open(output_path+'.txt','w')
        with output, word_out:
            try:
                output.write(t.tokenized)
    
                for w in word_list:
                    if w:
                        word_out.write(w+'\n')
    
                word_list = []
    
            except IndexError: 
                output.write(s[1])
    
            except UnboundLocalError:
                output.write(s[1])
    
    
    def ensure_dir(directory):
        if not os.path.exists(directory):
            os.makedirs(directory)
        return directory    
    
    
    def process_epubs(input_path, output_path):
        pool = mp.Pool()
    
        for root, dirs, files in os.walk(input_path):
            epubs = [os.path.join(root, file) for file in files
                     if file.endswith('.epub')]
            output_file = [
                ensure_dir(
                    os.path.join(
                        output_path,
                        os.path.splitext(os.path.basename(e))[0] + '_output')
                    for e in epubs)]
    
        for count, e in enumerate(epubs):
            epub = epubLoader(e)
            for filename, source in epub.get_html_from_epub():
                pool.apply_async(
                    worker,
                    args=(output_file[count], source, filename))
        pool.close()
        pool.join()
    
        print "done!"
    
    
    if __name__ == '__main__':
        process_epubs('some local directory', 'some local directory')