So I am trying to learn multiprocessing module and have written a code(below) where 4 processes are generated and are assigned 8 jobs (in the processor function) and each job contains just a sleep function (in the example job function).Now I have written the similar code in multithreading module there is was working fine but here it is not outputting any thing.Please help
from multiprocessing import Process, Lock
import multiprocessing
import time
print_lock = Lock()
def exampleJob(worker): # function simulating some computation
time.sleep(.5)
print_lock.acquire()
print(multiprocessing.current_process.pid,worker)
print_lock.release()
def processor(): #function where process pick up the job
while True:
worker = q.get()
exampleJob(worker)
q.task_done()
q = multiprocessing.JoinableQueue()
process = []
for x in range(4):
p = multiprocessing.Process(target=processor)
process.append(p)
for i in range(0,len(process)):
process[i].start
start = time.time()
for worker in range(8):
q.put(worker)
q.join()
print('Entire job took:',time.time() - start)
The first problem is start
needs to be start()
.
Also, separate processes have separate global variables, so print_lock = Lock()
is a different lock in each process. You have to create the lock once and pass it to the individual processes. This goes for the queue as well.
A JoinableQueue
isn't really needed. What's needed is a sentinel flag to tell the processes to exit, and join the processes.
Working example with other fixes:
import multiprocessing as mp
import time
def exampleJob(print_lock,worker): # function simulating some computation
time.sleep(.5)
with print_lock:
print(mp.current_process().name,worker)
def processor(print_lock,q): # function where process pick up the job
while True:
worker = q.get()
if worker is None: # flag to exit the process
break
exampleJob(print_lock,worker)
# This "if" required for portability in some OSes.
# Windows for example creates new Python processes and imports the original script.
# Without this the below code would run again in each child process.
if __name__ == '__main__':
print_lock = mp.Lock()
q = mp.Queue()
processes = [mp.Process(target=processor,args=(print_lock,q)) for _ in range(4)]
for process in processes:
process.start() # OP code didn't *call* the start method.
start = time.time()
for worker in range(8):
q.put(worker)
for process in processes:
q.put(None) # quit indicator
for process in processes:
process.join()
print('Entire job took:',time.time() - start)
Output:
Process-2 2
Process-1 0
Process-3 1
Process-4 3
Process-3 6
Process-1 5
Process-2 4
Process-4 7
Entire job took: 1.1350018978118896