Search code examples
pythonpython-3.xpython-multiprocessing

Listen to a list and do something whenever it is appended


I have an API listening to incoming requests and dumping them into a list. On a separate process, I'd like to trigger some kind of operation whenever the list is appended. I tried instantiating a new Process (from multiprocessing), but it doesn't update the state of the array once it starts.

from multiprocessing import Process
import time

procs = []

def separateProcess(start, counter):
    while True:
        time.sleep(1)
        print("length of the list from separate Process: "+str(len(procs)))


if __name__ == '__main__':
    print("program started")

    counter = 0
    Process(target=separateProcess, args=(counter, counter)).start()
    print("program end")
    while True:
        counter += 1
        newstring = "string "+str(counter)
        procs.append(newstring)
        print("length of the list from main: " + str(len(procs)))
        time.sleep(2)

here's the output:

length of the list from main: 1
length of the list from separate Process: 0
length of the list from main: 2
length of the list from separate Process: 0
length of the list from separate Process: 0
length of the list from main: 3
length of the list from separate Process: 0
length of the list from separate Process: 0

Solution

  • When a new child process is created, it gets a copy of the parent's address space however any subsequent changes (either by parent or child) are not reflected in the memory of the other process. They each have their own private address space.

    You can create a Manager() and use its shared list object instead:

    import time
      
    from multiprocessing import Manager, Process
    
    def separateProcess(start, counter):
        while True:
            time.sleep(1)
            print("length of the list from separate Process: "+str(len(procs)))
    
    
    if __name__ == '__main__':
        m = Manager()
        procs = m.list()
    
        print("program started")
    
        counter = 0
        Process(target=separateProcess, args=(counter, counter)).start()
        print("program end")
        while True:
            counter += 1
            newstring = "string "+str(counter)
            procs.append(newstring)
            print("length of the list from main: " + str(len(procs)))
            time.sleep(2)
    

    There is some overhead with this approach as it will a spawn child process to host the Manager server.

    In case you can adjust your worker process logic to use a queue instead, here is an example:

    import random
    import time
    
    from multiprocessing import cpu_count, Process, Queue
    
    
    def worker(q):
        for item in iter(q.get, 'STOP'):
            t = random.uniform(1, 5)
            print(f'START item: {item}')
            time.sleep(t)
            print(f'END item: {item}, ({t:.3f}s)')
    
    
    def main():
        cpus = cpu_count()
        q = Queue()
    
        for i in range(5):
            q.put(i)
    
        for i in range(cpus):
            Process(target=worker, args=(q,)).start()
    
        for i in range(cpus):
            q.put('STOP')
    
    
    if __name__ == '__main__':
        main()