Situation: I have a file processor written in Python. The files will be "walked" and be put into a queue. It will then be processed using multirocessing
Problem: Refer to the code below
fileA.py
==========
import Queue
import os
def walker():
filelist = Queue.Queue()
queue_end = Object()
for root, dirs, files in os.walk('/'):
for f in files:
path = os.path.join(root,f)
if not os.path.islink(path):
filelist.put(path)
filelist.put(queue_end)
fileB.py
===========
import fileA
import os
import multiprocessing as mp
def processor(queuelock):
while True:
with queuelock:
filepath = fileA.filelist.get()
if filepath is fileA.queue_end:
filelist.put(queue_end)
break
#example of a job
os.move(filepath, "/home/newuser" + filepath)
print filepath + " has been moved!"
if __name__ == '__main__':
fileA.walker()
queuelock = mp.Lock()
jobs = []
for i in range(0,mp.cpu_count()):
process = mp.Process(target=processor(queuelock))
jobs.append(process)
process.start()
The problem is when the files are being moved, all processes will attempt to move the EXACT same file, even though it has supposedly been removed from the queue.
Example output:
randomFile as been moved!
Error: ......... randomFile not found
Error: ......... randomFile not found
Error: ......... randomFile not found
Thereby implying that every process spawned has dequeued the exact same file and tried to perform the same process on the same file.
Question: Is there something that I am doing wrong that for some reason, the filelist
queue has been sent to every process (what is happening now), instead of the filelist
queue being shared by all process (my intended result)?
filelist
is currently only a local variable to walker()
and the queue object isn't shared with other parts of the code, so at least a return filelist
is needed in walker()
.
To share the same queue among multiple processes, a multiprocessing.Queue
is needed. A queue.Queue
is copied when a process is forked (or spawned) so it becomes a new indepent queue for each process.