Search code examples
pythonlinuxpython-multiprocessing

Easy way to tell apart python multiprocessing's OS processes


Summary

I'd like to use the Python multiprocessing module to run multiple jobs in parallel on a Linux server. Further, I'd like to be able to look at the running processes with top or ps and kill one of them but let the others run.

However, what I'm seeing is that every process launched from the Python multiprocessing module looks identical to the ps -f command.

All I'm seeing is this:

fermion:workspace ross$ ps -f
  UID   PID  PPID   C STIME   TTY           TIME CMD
  501 32257 32256   0  8:52PM ttys000    0:00.04 -bash
  501 32333 32257   0  9:05PM ttys000    0:00.04 python ./parallel_jobs.py
  501 32334 32333   0  9:05PM ttys000    0:00.00 python ./parallel_jobs.py
  501 32335 32333   0  9:05PM ttys000    0:00.00 python ./parallel_jobs.py
  501 32336 32333   0  9:05PM ttys000    0:00.00 python ./parallel_jobs.py
  501 32272 32271   0  8:53PM ttys001    0:00.05 -bash

Is there any way to get something more descriptive in the CMD column? Do I need to just keep track of PIDs in log files? Or is there another option?

Background

I am doing some batch processing where some jobs can run for hours. I need to be able to run some of those jobs in parallel to save time. And all those parallel jobs need to complete successfully before I can run another job that depends on them all. However, if one job is misbehaving I want to be able to kill it while letting the others complete... and this goes one where I have one job, then parallel jobs, then a few more jobs in sequence, then some more parallel jobs...

Example code

This is some dummy code that outlines the concept of what I'm trying to do.

#!/usr/bin/env python
import time
import multiprocessing

def open_zoo_cages():
    print('Opening zoo cages...')

def crossing_road(animal, sleep_time):
    print('An ' + animal + ' is crossing the road')
    for i in range(5):
        print("It's a wide road for " + animal + " to cross...")
        time.sleep(sleep_time)

    print('The ' + animal + ' is across.')

def aardvark():
    crossing_road('aardvark', 2)

def badger():
    crossing_road('badger', 4)

def cougar():
    crossing_road('cougar', 3)

def clean_the_road():
    print('Cleaning off the road of animal droppings...')

def print_exit_code(process):
    print(process.name + " exit code: " + str(process.exitcode))

def main():
    # Run a single job that must finish before running some jobs in parallel
    open_zoo_cages()

    # Run some jobs in parallel
    amos = multiprocessing.Process(name='aardvark Amos', target=aardvark)
    betty = multiprocessing.Process(name='badger Betty', target=badger)
    carl = multiprocessing.Process(name='cougar Carl', target=cougar)

    amos.start()
    betty.start()
    carl.start()

    amos.join()
    betty.join()
    carl.join()

    print_exit_code(amos)
    print_exit_code(betty)
    print_exit_code(carl)

    # Run another job (clean_the_road) if all the parallel jobs finished in 
    # success. Otherwise end in error.
    if amos.exitcode == 0 and betty.exitcode == 0 and carl.exitcode == 0:
        clean_the_road()
    else:
        sys.exit('Not all animals finished crossing')

if __name__ == '__main__':
    main()

Also, I noted that putting one of the functions in another Python module doesn't change what goes in the ps command column for the associated process.

Output

fermion:workspace ross$ ./parallel_jobs.py 
Opening zoo cages...
An aardvark is crossing the road
It's a wide road for aardvark to cross...
An badger is crossing the road
It's a wide road for badger to cross...
An cougar is crossing the road
It's a wide road for cougar to cross...
It's a wide road for aardvark to cross...
It's a wide road for cougar to cross...
It's a wide road for aardvark to cross...
It's a wide road for badger to cross...
It's a wide road for cougar to cross...
It's a wide road for aardvark to cross...
It's a wide road for badger to cross...
It's a wide road for aardvark to cross...
It's a wide road for cougar to cross...
The aardvark is across.
It's a wide road for badger to cross...
It's a wide road for cougar to cross...
The cougar is across.
It's a wide road for badger to cross...
The badger is across.
aardvark Amos exit code: 0
badger Betty exit code: 0
cougar Carl exit code: 0
Cleaning off the road of animal droppings...

Solution

  • The nice easy answer, have each process open a descriptive file handle, and then use lsof.

    f = open('/tmp/hippo.txt','w')
    

    this will give you the pid for your process

    lsof | grep "hippo"
    

    it's not the most pythonic answer, but so what : )

    My initial answer was the easy way, here is an incomplete tiny example of larger concept, adding signal handler to the class being called as a subprocess, allows you to issue something like a kill -6 ... to dump out info .... you can even use it to on demand dump out progress of how much is left to process in a given subprocess,

    import signal
    
    class Foo():
        def __init__(self, name):
            self.myname = name
            signal.signal(signal.SIGTERM, self.my_callback)
            self.myqueue = Queue.Queue()
    
        def my_callback(self):
            logging.error("%s %s %s", self.myname, psutil.blah_getmypid(), len(self.myqueue))         
    

    Or you can do this, which i think may be what you really want:

    import multiprocessing
    import time
    def foo():
        time.sleep(60)
    if __name__ == "__main__":
        process = [
            multiprocessing.Process(name="a",target=foo),
            multiprocessing.Process(name="b",target=foo),
            multiprocessing.Process(name="c",target=foo),
        ]
        for p in process:
            p.start()
        for p in process:
            print(p.name, p.pid)
        for p in process:
            p.join()