Search code examples
pythonpython-3.xmultithreadingmultiprocessingpython-multiprocessing

How to init Pool() inside child daemonic, but only once?


On python 3.11 and Ubuntu I have a task to init asynchronous calls every time interval (not asyncio) and inside child do multiprocessing task. I have 36 cores / 72 processors. The problem is when I init new Pool(72) it takes 0.3 seconds that is too much for my task, because performance matters. With this article Python Process Pool non-daemonic? I found out how to do new pool inside pool (using NoDaemonProcess). But how to init child pool only once? concurrent.futures does not good for me, because I made test and it's slower than multiprocessing.

Here is working example, I need to modify somehow to init pool inside child only once.

parent pid=907058

2024-06-01 19:16:44.856839 start
2024-06-01 19:16:44.861229 sleep 4 sec
2024-06-01 19:16:44.861777 [907059] on_message(): 1
2024-06-01 19:16:44.866430 [907059] starting pool..
2024-06-01 19:16:44.867275 worker_function(), a=907059_1
2024-06-01 19:16:44.867373 worker_function(), a=907059_2
2024-06-01 19:16:44.867410 worker_function(), a=907059_3

2024-06-01 19:16:48.861738 start
2024-06-01 19:16:48.864965 sleep 4 sec
2024-06-01 19:16:48.865581 [907070] on_message(): 2
2024-06-01 19:16:48.870826 [907070] starting pool..
2024-06-01 19:16:48.871544 worker_function(), a=907070_1
2024-06-01 19:16:48.871638 worker_function(), a=907070_2
2024-06-01 19:16:48.871695 worker_function(), a=907070_3

2024-06-01 19:16:52.865456 long sleep..
2024-06-01 19:16:56.867489 end worker_function(), a=907059_1
2024-06-01 19:16:56.867657 end worker_function(), a=907059_3
2024-06-01 19:16:56.867666 end worker_function(), a=907059_2
2024-06-01 19:16:56.868269 [907059] pool ended
2024-06-01 19:16:56.870487 [907059] finished on_message(): 1
2024-06-01 19:17:00.871746 end worker_function(), a=907070_1
2024-06-01 19:17:00.871896 end worker_function(), a=907070_2
2024-06-01 19:17:00.871903 end worker_function(), a=907070_3
2024-06-01 19:17:00.872659 [907070] pool ended
2024-06-01 19:17:00.874545 [907070] finished on_message(): 2
2024-06-01 19:17:12.865676 finished

Code:

import os
import time
import traceback
from datetime import datetime
from multiprocessing import Pool
import multiprocessing.pool

# https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False
    @daemon.setter
    def daemon(self, value):
        pass

class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)


class Message():

    def __init__(self):

        # self.pool_3 = Pool(3)
        pass

    def worker_function(self, a):

        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} worker_function(), a={a}")
        time.sleep(12)
        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} end worker_function(), a={a}")

        return None

    def on_message(self, message):

        try:
            pid = os.getpid()
            print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] on_message(): {message}")

            # I need to make code that here I don't init new Pool()
            # because my server has 72 logic processos and it takes 300ms to init
            # for my task it's super long, so I want to init Pool() once, but not everytime when calling on_message()

            # this could be possible solution
            # but it does not work, in __init__() the Pool(3) is not initing
            # res = self.pool_3.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
            # if I init Pool with self. here, I will get error
            # "Pool objects cannot be passed between processes or pickled"

            with Pool(3) as pool:
                print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] starting pool..")
                res = pool.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
                print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] pool ended")

            print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] finished on_message(): {message}")
            # os.kill(pid, 9)
        except Exception as e:
            print(traceback.format_exc())
            print(e)



if __name__ == "__main__":

    print(f"parent pid={os.getpid()}")
    
    # https://stackoverflow.com/a/44719580/1802225 process.terminate()

    me = Message()
    
    for i in range(1, 3):
        print()
        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} start")
        # starting pool non-daemonic to start pool inside
        pool = NestablePool(1)
        pool.starmap_async(me.on_message, [(i,)])

        print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} sleep 4 sec")
        time.sleep(4)
    
    print()
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} long sleep..")
    time.sleep(20)
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} finished")
       

Another answers code (modified) that works not as expected:

from multiprocessing.pool import Pool, ThreadPool
from datetime import datetime
import traceback
import time
import os

def convert_char_to_integer(ch):

    pid = os.getpid()
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] convert_char_to_integer(), ch={ch}")
    time.sleep(12)  # Simulate real processing
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] end convert_char_to_integer(), ch={ch}")
    
    return ch

def on_message(multiprocessing_pool, message):

    pid = os.getpid()
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] on_message(): {message}")

    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] starting pool..")
    result = multiprocessing_pool.starmap_async(convert_char_to_integer, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] pool ended")
    

def await_next_message():
    yield 'Message 1'
    time.sleep(.1)
    yield 'Message 2'
    time.sleep(.1)
    yield 'Message 3'

def main():
    # Pool(3) must be 3 (let's say it's maxumim cpu's I have)
    # if I set Pool(9) it will work correct, but this way is false, because it collects +3 +3.. pool that can be used
    # that does not good for me, because in real task I use all CPUs
    # and I need to execute pools independently of each other
    with Pool(3) as multiprocessing_pool, ThreadPool(3) as multithreading_pool:
        for message in await_next_message():
            print()
            print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} start")
            multithreading_pool.apply_async(on_message, args=(multiprocessing_pool, message))
            print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} sleep 4 sec")
            time.sleep(4)
        # Wait for all submitted tasks to complete
        multithreading_pool.close()
        multithreading_pool.join()
  
if __name__ == '__main__':
    main()

Result:

2024-06-03 20:41:26.731560 start
2024-06-03 20:41:26.731670 sleep 4 sec
2024-06-03 20:41:26.731726 [27211] on_message(): Message 1
2024-06-03 20:41:26.731838 [27211] starting pool..
2024-06-03 20:41:26.732411 [27212] convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:26.732582 [27213] convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:26.732634 [27214] convert_char_to_integer(), ch=27211_3

2024-06-03 20:41:30.832071 start
2024-06-03 20:41:30.832175 sleep 4 sec
2024-06-03 20:41:30.832264 [27211] on_message(): Message 2
2024-06-03 20:41:30.832343 [27211] starting pool..

# (!) HERE is not expected behavior. It must be:
# convert_char_to_integer(), ch=27211_1
# convert_char_to_integer(), ch=27211_2
# convert_char_to_integer(), ch=27211_3
# , but that is not happening, because it is waiting 3 processes to be ending ("end convert_char_to_integer()"), but I need to work them independently

2024-06-03 20:41:34.932409 start
2024-06-03 20:41:34.932532 sleep 4 sec
2024-06-03 20:41:34.932644 [27211] on_message(): Message 3
2024-06-03 20:41:34.932712 [27211] starting pool..
2024-06-03 20:41:38.732708 [27212] end convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:38.732865 [27213] end convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:38.732895 [27214] end convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:38.733189 [27212] convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:38.733256 [27214] convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:38.733299 [27213] convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:38.733613 [27211] pool ended
2024-06-03 20:41:50.733313 [27212] end convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:50.733415 [27214] end convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:50.733449 [27213] end convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:50.733624 [27212] convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:50.733642 [27214] convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:50.733668 [27213] convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:50.733868 [27211] pool ended
2024-06-03 20:42:02.733748 [27212] end convert_char_to_integer(), ch=27211_1
2024-06-03 20:42:02.733746 [27214] end convert_char_to_integer(), ch=27211_2
2024-06-03 20:42:02.733757 [27213] end convert_char_to_integer(), ch=27211_3
2024-06-03 20:42:02.734315 [27211] pool ended

Solution

  • You haven't really shown how on_message gets invoked in your actual application. The name of the function suggests that it is invoked when a new message is received from somewhere. Why can't on_message be executed in a multithreading pool if we pass to it additionally a multiprocessing pool instance that has been pre-allocated and which does the CPU-intensive processing? For example:

    from multiprocessing.pool import Pool, ThreadPool
    import time
    import os
    
    def convert_char_to_integer(ch):
        time.sleep(.3)  # Simulate real processing
        print(f'ch = {ch}, pid = {os.getpid()}, time = {time.time()}')
        return ord(ch)
    
    def on_message(multiprocessing_pool, message):
        # Get the sum of each character of the message after it has been
        # converted to an integer:
        result = sum(multiprocessing_pool.map(convert_char_to_integer, message))
        print(f'{message!r}: {result}')
    
    def await_next_message():
        yield 'Message 1'
        time.sleep(.1)
        yield 'Message 2'
        time.sleep(.1)
        yield 'Message 3'
    
    def main():
        # The pools are started only once using the
        # with Pool() .... statement
        print('starting pools')
        with Pool(10) as multiprocessing_pool, \
        ThreadPool(3) as multithreading_pool:
            for message in await_next_message():
                multithreading_pool.apply_async(on_message, args=(multiprocessing_pool, message))
            # Wait for all submitted tasks to complete
            multithreading_pool.close()
            multithreading_pool.join()
        # Both pools are now terminated
        print('pools ended')
    
    if __name__ == '__main__':
        main()
    

    Prints:

    starting pools
    ch = M, pid = 18776, time = 1717442160.0104358
    ch = e, pid = 17232, time = 1717442160.0540285
    ch = s, pid = 24404, time = 1717442160.0580287
    ch = s, pid = 10336, time = 1717442160.0828772
    ch = a, pid = 19348, time = 1717442160.126844
    ch = g, pid = 14968, time = 1717442160.126844
    ch = e, pid = 5600, time = 1717442160.1278431
    ch =  , pid = 23884, time = 1717442160.1293561
    ch = 1, pid = 3900, time = 1717442160.1298823
    ch = M, pid = 15160, time = 1717442160.1299446
    'Message 1': 790
    ch = e, pid = 18776, time = 1717442160.3112478
    ch = s, pid = 17232, time = 1717442160.3551002
    ch = s, pid = 24404, time = 1717442160.3591855
    ch = a, pid = 10336, time = 1717442160.3840768
    ch = e, pid = 14968, time = 1717442160.4282424
    ch =  , pid = 5600, time = 1717442160.4282424
    ch = g, pid = 19348, time = 1717442160.4282424
    ch = 2, pid = 23884, time = 1717442160.4301693
    ch = e, pid = 15160, time = 1717442160.4309397
    ch = M, pid = 3900, time = 1717442160.4309397
    'Message 2': 791
    ch = s, pid = 18776, time = 1717442160.6133676
    ch = s, pid = 17232, time = 1717442160.6570253
    ch = a, pid = 24404, time = 1717442160.6604424
    ch = g, pid = 10336, time = 1717442160.6856165
    ch = e, pid = 14968, time = 1717442160.7294924
    ch = 3, pid = 19348, time = 1717442160.7315066
    ch =  , pid = 5600, time = 1717442160.7315066
    'Message 3': 792
    pools ended