Search code examples
pythonmultiprocessingpython-multiprocessingrammonitor

How to pause processes in case they are consuming too much memory?


Background: I process planetary imagery using a set of command-line utilities provided by the US Geologic Survey. Some of them are RAM hogs, to the extreme (10s of GB). USGS says it's just the way they run and don't have any plans to try to better manage the RAM. I built a Python wrapper to manipulate file lists to call the different steps to process the data in parts (such as all images taken in one color filter, and all taken in another, and all taken in another, etc.). Because things are done to multiple lists and multiple images, I thread it, using all the CPUs I can, to change stuff that might otherwise take two months to run to a week. I don't use native Python methods to thread, at the moment; instead I use GNU Parallel (and use os.system("") to call parallel and then the function) or I use Pysis, which is a Python way to call and multithread the USGS software.

Problem: As noted, some steps, for some files, take a huge amount of RAM, and there is no way of knowing ahead of time what those might be. So, I can get in a situation where for some files, each process was using 200 MB and runs fine on a 16GB RAM machine with 8 cores, but then it might start to process other files where I get RAM creep, using several GB, which with 8 processors on a 16GB RAM machine means RAM is compressed, swap space is used ... and that's if I'm lucky and the machine doesn't just lock up.

Solution? What I'm looking for is a way to monitor the RAM usage, say once a minute, by process name, and if I start to see RAM creep (e.g., 8 instances of a process each using over 2GB of RAM), I can pause all but one of those, let that one finish, un-pause another, let that finish, etc. until those 8 are done, then continue on with the rest of what might need to run for that step. Hopefully obviously, all this would be done in Python, not manually.

Is it possible to do that? If so, how?


Solution

  • You can use psutil.Process.suspend() to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss ("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.

    In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.

    import time
    import random
    from threading import Thread
    from multiprocessing import Process, active_children
    
    import psutil
    
    
    def format_mib(mem_bytes):
        """Format bytes into mebibyte-string."""
        return f'{mem_bytes / 2 ** 20:.2f} MiB'
    
    
    def f(append_length):
        """Main function in child-process. Appends random floats to list."""
        p = psutil.Process()
        li = []
        for i in range(10):
            li.extend([random.random() for _ in range(append_length)])
            print(f'i: {i} | pid: {p.pid} | '
                  f'{format_mib(p.memory_full_info().rss)}')
            time.sleep(2)
    
    
    def monitored(running_processes, max_mib):
        """Monitor memory usage for running processes.
        Suspend execution for processes surpassing `max_mib` and complete
        one by one after behaving processes have finished.
        """
        running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
        suspended_processes = []
    
        while running_processes:
            active_children()  # Joins all finished processes.
            #  Without it, p.is_running() below on Unix would not return `False`
            #  for finished processes.
            actual_processes = running_processes.copy()
            for p in actual_processes:
                if not p.is_running():
                    running_processes.remove(p)
                    print(f'removed finished process: {p}')
                else:
                    if p.memory_info().rss / 2 ** 20 > max_mib:
                        print(f'suspending process: {p}')
                        p.suspend()
                        running_processes.remove(p)
                        suspended_processes.append(p)
    
            time.sleep(1)
    
        for p in suspended_processes:
            print(f'\nresuming process: {p}')
            p.resume()
            p.wait()
    
    
    if __name__ == '__main__':
    
        MAX_MiB = 200
    
        append_lengths = [100000, 500000, 1000000, 2000000, 300000]
        processes = [Process(target=f, args=(append_length,))
                     for append_length in append_lengths]
    
        for p in processes:
            p.start()
    
        m = Thread(target=monitored, args=(processes, MAX_MiB))
        m.start()
        m.join()
    

    Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:

    i: 0 | pid: 17997 | 13.53 MiB
    i: 0 | pid: 18001 | 19.70 MiB
    i: 0 | pid: 17998 | 25.88 MiB
    i: 0 | pid: 17999 | 41.32 MiB
    i: 0 | pid: 18000 | 72.21 MiB
    ...
    i: 2 | pid: 17997 | 20.84 MiB
    i: 2 | pid: 18001 | 42.02 MiB
    i: 2 | pid: 17998 | 60.56 MiB
    i: 2 | pid: 17999 | 103.36 MiB
    i: 2 | pid: 18000 | 215.70 MiB
    suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
    i: 3 | pid: 17997 | 23.93 MiB
    i: 3 | pid: 18001 | 47.75 MiB
    i: 3 | pid: 17998 | 76.00 MiB
    i: 3 | pid: 17999 | 141.59 MiB
    ...
    i: 5 | pid: 17997 | 30.11 MiB
    i: 5 | pid: 18001 | 68.24 MiB
    i: 5 | pid: 17998 | 107.23 MiB
    i: 5 | pid: 17999 | 203.52 MiB
    suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
    i: 6 | pid: 17997 | 33.19 MiB
    i: 6 | pid: 18001 | 77.49 MiB
    i: 6 | pid: 17998 | 122.59 MiB
    ...
    i: 9 | pid: 17997 | 42.47 MiB
    i: 9 | pid: 18001 | 105.68 MiB
    i: 9 | pid: 17998 | 168.96 MiB
    removed finished process: psutil.Process(pid=17997, status='terminated')
    removed finished process: psutil.Process(pid=17998, status='terminated')
    removed finished process: psutil.Process(pid=18001, status='terminated')
    
    resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
    i: 3 | pid: 18000 | 277.46 MiB
    i: 4 | pid: 18000 | 339.22 MiB
    i: 5 | pid: 18000 | 400.84 MiB
    ...
    i: 9 | pid: 18000 | 648.00 MiB
    
    resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
    i: 6 | pid: 17999 | 234.55 MiB
    ...
    i: 9 | pid: 17999 | 327.31 MiB
    
    
    Process finished with exit code 0
    

    EDIT:

    I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?

    I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.

    import time
    import random
    from threading import Thread
    from collections import deque
    from multiprocessing import Process, active_children, set_start_method
    
    import psutil
    
    # `def format_mib` and `def f` from above unchanged...
    
    class TaskProcessor(Thread):
        """Processor class which monitors memory usage for running
        tasks (processes). Suspends execution for tasks surpassing
        `max_mib` and completes them one by one, after behaving
        tasks have finished.
        """
        def __init__(self, n_cores, max_mib, tasks):
            super().__init__()
            self.n_cores = n_cores
            self.max_mib = max_mib  # memory threshold
            self.tasks = deque(tasks)
    
            self._running_tasks = []
            self._suspended_tasks = []
    
        def run(self):
            """Main-function in new thread."""
            self._update_running_tasks()
            self._monitor_running_tasks()
            self._process_suspended_tasks()
    
        def _update_running_tasks(self):
            """Start new tasks if we have less running tasks than cores."""
            while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
                p = self.tasks.popleft()
                p.start()
                # for further process-management we here just need the
                # psutil.Process wrapper
                self._running_tasks.append(psutil.Process(pid=p.pid))
                print(f'Started process: {self._running_tasks[-1]}')
    
        def _monitor_running_tasks(self):
            """Monitor running tasks. Replace completed tasks and suspend tasks
            which exceed the memory threshold `self.max_mib`.
            """
            # loop while we have running or non-started tasks
            while self._running_tasks or self.tasks:
                active_children()  # Joins all finished processes.
                # Without it, p.is_running() below on Unix would not return
                # `False` for finished processes.
                self._update_running_tasks()
                actual_tasks = self._running_tasks.copy()
    
                for p in actual_tasks:
                    if not p.is_running():  # process has finished
                        self._running_tasks.remove(p)
                        print(f'Removed finished process: {p}')
                    else:
                        if p.memory_info().rss / 2 ** 20 > self.max_mib:
                            p.suspend()
                            self._running_tasks.remove(p)
                            self._suspended_tasks.append(p)
                            print(f'Suspended process: {p}')
    
                time.sleep(1)
    
        def _process_suspended_tasks(self):
            """Resume processing of suspended tasks."""
            for p in self._suspended_tasks:
                print(f'\nResuming process: {p}')
                p.resume()
                p.wait()
    
    
    if __name__ == '__main__':
    
        # Forking (default on Unix-y systems) an already multithreaded process is
        # error-prone. Since we intend to start processes after we are already
        # multithreaded, we switch to another start-method.
        set_start_method('spawn')  # or 'forkserver' (a bit faster start up) if available
    
        MAX_MiB = 200
        N_CORES = 2
    
        append_lengths = [100000, 500000, 1000000, 2000000, 300000]
        tasks = [Process(target=f, args=(append_length,))
                 for append_length in append_lengths]
    
        tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
        tp.start()
        tp.join()
    

    Example Output (shortened):

    Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
    Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
    i: 0 | pid: 9422 | 18.95 MiB
    i: 0 | pid: 9423 | 31.45 MiB
    ...
    i: 9 | pid: 9422 | 47.36 MiB
    i: 9 | pid: 9423 | 175.41 MiB
    Removed finished process: psutil.Process(pid=9422, status='terminated')
    Removed finished process: psutil.Process(pid=9423, status='terminated')
    Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
    Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
    i: 0 | pid: 9445 | 46.86 MiB
    i: 0 | pid: 9446 | 77.74 MiB
    ...
    i: 2 | pid: 9445 | 117.41 MiB
    i: 2 | pid: 9446 | 220.99 MiB
    Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
    Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
    i: 0 | pid: 9450 | 25.16 MiB
    i: 3 | pid: 9445 | 148.29 MiB
    i: 1 | pid: 9450 | 36.47 MiB
    i: 4 | pid: 9445 | 179.17 MiB
    i: 2 | pid: 9450 | 45.74 MiB
    i: 5 | pid: 9445 | 211.14 MiB
    Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
    i: 3 | pid: 9450 | 55.00 MiB
    ...
    i: 9 | pid: 9450 | 110.62 MiB
    Removed finished process: psutil.Process(pid=9450, status='terminated')
    
    Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
    i: 3 | pid: 9446 | 282.75 MiB
    ...
    i: 9 | pid: 9446 | 655.35 MiB
    
    Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
    i: 6 | pid: 9445 | 242.12 MiB
    ...
    i: 9 | pid: 9445 | 334.88 MiB
    
    Process finished with exit code 0