Search code examples
pythonpython-3.xpython-multiprocessing

Multiprocessing Synchronization


Let's suppose I need to run 5 processes in parallel but processes 2 to 5 are dependent on process one. How can I make sure process 1 will run before the others? Should I use Python's Multiprocessing Event() or Lock() or both?

Example 1:

process 1
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5

Example 2:

process 3
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5

Example3 with 2 dependencies:

process 1
process 2 or 3 (run in parallel after 1)
process 4
process 5 or 6 (run in parallel after 1 and after 4)

All processes call the same function(msg) but all return different values.

I need some guidance, not necessarily code, if you can provide, thanks.

Pseudo-code:

import Multiprocessing as mp

function(msg):
    return 1 if msg == "one"
    return 2 if msg == "two"
    return 3 if msg == "three"
    return 4 if msg == "four"
    return 5 if msg == "five"

msgs = ['one', 'two', 'three', 'four', 'five']

jobs = []
for msg in msgs:
    p = Process(target=function, args=(msg,))
    p.start()
    jobs.append(p)

for job in jobs:
    job.join()

In this case all processes will run unordered.

If I want process 1 before I could do:

Possible solution:

import Multiprocessing as mp

function(msg):
    return 1 if msg == "one"
    return 2 if msg == "two"
    return 3 if msg == "three"
    return 4 if msg == "four"
    return 5 if msg == "five"

msg = ['one']
p1 = Process(target=function, args=(msg,))
p1.start()
p1.join()


msgs = ['two', 'three', 'four', 'five']

jobs = []
for msg in msgs:
    p = Process(target=function, args=(msg,))
    p.start()
    jobs.append(p)

for job in jobs:
    job.join()

Is there a better solution or like this is fine? It works, but it doesn't mean it can't be done in a better way (less code repetition for example).


Solution

  • Not sure what was done at the end, but you can use Events for this purpose after all:

    import multiprocessing as mp
    
    def function(msg,events):
      if msg == "one":
        print(1)
        events[0].set()
      if msg == "two":
        print("2 waiting")
        events[0].wait()
        events[1].wait()
        print("2 done")
      if msg == "three":
        print(3)
        events[1].set()
      if msg == "four":
        print(4)
      if msg == "five":
        print("5 waiting")
        events[0].wait()
        print("5 done")
    
    if __name__ == '__main__':
      events = [mp.Event(),mp.Event()]
      jobs = []
      for item in ['one','two','three','four','five']:
        job = mp.Process(target=function, args=(item,events))
        job.start()
        jobs.append(job)
      for job in jobs:
        job.join()
    

    Here I deliberately introduced a second dependency: p2 depends on both p1 and p3 (and p5 still depends on p1). This way if you run it a couple times, it shows more variation (than with a single dependency):

    python procy.py
    2 waiting
    4
    1
    5 waiting
    5 done
    3
    2 done
    
    python procy.py
    1
    5 waiting
    2 waiting
    4
    5 done
    3
    2 done
    
    python procy.py
    1
    4
    3
    5 waiting
    5 done
    2 waiting
    2 done