Search code examples
pythonmultiprocessing

Python multiprocessing IOError: [Errno 232] The pipe is being closed


I am trying to implement this tutorial on mutliprocessing in python, but when I tried to do my own task I get the following error:

Traceback (most recent call last):
>>>   File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed
    send(obj)
IOError: [Errno 232] The pipe is being closed

Here is a reproducible example of what I am trying to do which gives the same error message:

from multiprocessing import Lock, Process, Queue, current_process
import time

class Testclass(object):
    def __init__(self, x):
        self.x = x

def toyfunction(testclass):
    testclass.product = testclass.x * testclass.x
    return testclass


def worker(work_queue, done_queue):
    try:
        for testclass in iter(work_queue.get, 'STOP'):
            print(testclass.counter)
            newtestclass = toyfunction(testclass)
            done_queue.put(newtestclass)

    except:
        print('error')

    return True

def main():

    counter = 1

    database = []
    while counter <= 1000:
        database.append(Testclass(3))
        counter += 1
        print(counter)



    workers = 8
    work_queue = Queue()
    done_queue = Queue()
    processes = []

    start = time.clock()
    counter = 1

    for testclass in database:
        testclass.counter = counter
        work_queue.put(testclass)
        counter += 1
        print(counter)


    print('items loaded')
    for w in range(workers):
        p = Process(target=worker, args=(work_queue, done_queue))
        p.start()
        processes.append(p)
        work_queue.put('STOP')

    for p in processes:
        p.join()

    done_queue.put('STOP')

    print(time.clock()-start)
    print("Done")

if __name__ == '__main__':
    main()    

Solution

  • When I add code that processes the done queue I no longer get the error. Here is working code:

    from multiprocessing import Lock, Process, Queue, current_process
    import time
    
    class Testclass(object):
        def __init__(self, x):
            self.x = x
    
    def toyfunction(testclass):
        testclass.product = testclass.x * testclass.x
        return testclass
    
    
    def worker(work_queue, done_queue):
        try:
            for testclass in iter(work_queue.get, 'STOP'):
                print(testclass.counter)
                newtestclass = toyfunction(testclass)
                done_queue.put(newtestclass)
    
        except:
            print('error')
    
        return True
    
    def main():
    
        counter = 1
    
        database = []
        while counter <= 100:
            database.append(Testclass(10))
            counter += 1
            print(counter)
    
    
    
        workers = 8
        work_queue = Queue()
        done_queue = Queue()
        processes = []
    
        start = time.clock()
        counter = 1
    
        for testclass in database:
            testclass.counter = counter
            work_queue.put(testclass)
            counter += 1
            print(counter)
    
    
        print('items loaded')
        for w in range(workers):
            p = Process(target=worker, args=(work_queue, done_queue))
            p.start()
            processes.append(p)
            work_queue.put('STOP')
    
        for p in processes:
            p.join()
    
        done_queue.put('STOP')
    
        # added: process the done queue
        newdatabase = []
        for testclass in iter(done_queue.get, 'STOP'):
            newdatabase.append(testclass)
    
        print(time.clock()-start)
        print("Done")
        return(newdatabase)
    
    if __name__ == '__main__':
        database = main()