Search code examples
pythonqueuemultiprocessingpython-multiprocessing

Issues Multiprocessing with Queues


Situation: I have a file processor written in Python. The files will be "walked" and be put into a queue. It will then be processed using multirocessing

Problem: Refer to the code below

fileA.py
==========
import Queue
import os
def walker():
    filelist = Queue.Queue()
    queue_end = Object()
    for root, dirs, files in os.walk('/'):
        for f in files:
            path = os.path.join(root,f)
            if not os.path.islink(path):
                filelist.put(path)
    filelist.put(queue_end)

fileB.py
===========
import fileA
import os
import multiprocessing as mp

def processor(queuelock):
    while True:
        with queuelock:
            filepath = fileA.filelist.get()

            if filepath is fileA.queue_end:
                filelist.put(queue_end)
                break
        #example of a job
        os.move(filepath, "/home/newuser" + filepath)
        print filepath + " has been moved!"

if __name__ == '__main__':
    fileA.walker()
    queuelock = mp.Lock()
    jobs = []
    for i in range(0,mp.cpu_count()):
        process = mp.Process(target=processor(queuelock))
        jobs.append(process)
        process.start()

The problem is when the files are being moved, all processes will attempt to move the EXACT same file, even though it has supposedly been removed from the queue.

Example output:

randomFile as been moved!
Error: ......... randomFile not found
Error: ......... randomFile not found
Error: ......... randomFile not found

Thereby implying that every process spawned has dequeued the exact same file and tried to perform the same process on the same file.

Question: Is there something that I am doing wrong that for some reason, the filelist queue has been sent to every process (what is happening now), instead of the filelist queue being shared by all process (my intended result)?


Solution

    1. filelist is currently only a local variable to walker() and the queue object isn't shared with other parts of the code, so at least a return filelist is needed in walker().

    2. To share the same queue among multiple processes, a multiprocessing.Queue is needed. A queue.Queue is copied when a process is forked (or spawned) so it becomes a new indepent queue for each process.