On python 3.11
and Ubuntu
I have a task to init asynchronous calls every time interval (not asyncio
) and inside child do multiprocessing task. I have 36 cores / 72 processors. The problem is when I init new Pool(72)
it takes 0.3 seconds that is too much for my task, because performance matters. With this article Python Process Pool non-daemonic? I found out how to do new pool inside pool (using NoDaemonProcess
). But how to init child pool only once? concurrent.futures
does not good for me, because I made test and it's slower than multiprocessing
.
Here is working example, I need to modify somehow to init pool inside child only once.
parent pid=907058
2024-06-01 19:16:44.856839 start
2024-06-01 19:16:44.861229 sleep 4 sec
2024-06-01 19:16:44.861777 [907059] on_message(): 1
2024-06-01 19:16:44.866430 [907059] starting pool..
2024-06-01 19:16:44.867275 worker_function(), a=907059_1
2024-06-01 19:16:44.867373 worker_function(), a=907059_2
2024-06-01 19:16:44.867410 worker_function(), a=907059_3
2024-06-01 19:16:48.861738 start
2024-06-01 19:16:48.864965 sleep 4 sec
2024-06-01 19:16:48.865581 [907070] on_message(): 2
2024-06-01 19:16:48.870826 [907070] starting pool..
2024-06-01 19:16:48.871544 worker_function(), a=907070_1
2024-06-01 19:16:48.871638 worker_function(), a=907070_2
2024-06-01 19:16:48.871695 worker_function(), a=907070_3
2024-06-01 19:16:52.865456 long sleep..
2024-06-01 19:16:56.867489 end worker_function(), a=907059_1
2024-06-01 19:16:56.867657 end worker_function(), a=907059_3
2024-06-01 19:16:56.867666 end worker_function(), a=907059_2
2024-06-01 19:16:56.868269 [907059] pool ended
2024-06-01 19:16:56.870487 [907059] finished on_message(): 1
2024-06-01 19:17:00.871746 end worker_function(), a=907070_1
2024-06-01 19:17:00.871896 end worker_function(), a=907070_2
2024-06-01 19:17:00.871903 end worker_function(), a=907070_3
2024-06-01 19:17:00.872659 [907070] pool ended
2024-06-01 19:17:00.874545 [907070] finished on_message(): 2
2024-06-01 19:17:12.865676 finished
Code:
import os
import time
import traceback
from datetime import datetime
from multiprocessing import Pool
import multiprocessing.pool
# https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
class NestablePool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(NestablePool, self).__init__(*args, **kwargs)
class Message():
def __init__(self):
# self.pool_3 = Pool(3)
pass
def worker_function(self, a):
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} worker_function(), a={a}")
time.sleep(12)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} end worker_function(), a={a}")
return None
def on_message(self, message):
try:
pid = os.getpid()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] on_message(): {message}")
# I need to make code that here I don't init new Pool()
# because my server has 72 logic processos and it takes 300ms to init
# for my task it's super long, so I want to init Pool() once, but not everytime when calling on_message()
# this could be possible solution
# but it does not work, in __init__() the Pool(3) is not initing
# res = self.pool_3.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
# if I init Pool with self. here, I will get error
# "Pool objects cannot be passed between processes or pickled"
with Pool(3) as pool:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] starting pool..")
res = pool.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] pool ended")
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] finished on_message(): {message}")
# os.kill(pid, 9)
except Exception as e:
print(traceback.format_exc())
print(e)
if __name__ == "__main__":
print(f"parent pid={os.getpid()}")
# https://stackoverflow.com/a/44719580/1802225 process.terminate()
me = Message()
for i in range(1, 3):
print()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} start")
# starting pool non-daemonic to start pool inside
pool = NestablePool(1)
pool.starmap_async(me.on_message, [(i,)])
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} sleep 4 sec")
time.sleep(4)
print()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} long sleep..")
time.sleep(20)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} finished")
Another answers code (modified) that works not as expected:
from multiprocessing.pool import Pool, ThreadPool
from datetime import datetime
import traceback
import time
import os
def convert_char_to_integer(ch):
pid = os.getpid()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] convert_char_to_integer(), ch={ch}")
time.sleep(12) # Simulate real processing
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] end convert_char_to_integer(), ch={ch}")
return ch
def on_message(multiprocessing_pool, message):
pid = os.getpid()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] on_message(): {message}")
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] starting pool..")
result = multiprocessing_pool.starmap_async(convert_char_to_integer, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] pool ended")
def await_next_message():
yield 'Message 1'
time.sleep(.1)
yield 'Message 2'
time.sleep(.1)
yield 'Message 3'
def main():
# Pool(3) must be 3 (let's say it's maxumim cpu's I have)
# if I set Pool(9) it will work correct, but this way is false, because it collects +3 +3.. pool that can be used
# that does not good for me, because in real task I use all CPUs
# and I need to execute pools independently of each other
with Pool(3) as multiprocessing_pool, ThreadPool(3) as multithreading_pool:
for message in await_next_message():
print()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} start")
multithreading_pool.apply_async(on_message, args=(multiprocessing_pool, message))
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} sleep 4 sec")
time.sleep(4)
# Wait for all submitted tasks to complete
multithreading_pool.close()
multithreading_pool.join()
if __name__ == '__main__':
main()
Result:
2024-06-03 20:41:26.731560 start
2024-06-03 20:41:26.731670 sleep 4 sec
2024-06-03 20:41:26.731726 [27211] on_message(): Message 1
2024-06-03 20:41:26.731838 [27211] starting pool..
2024-06-03 20:41:26.732411 [27212] convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:26.732582 [27213] convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:26.732634 [27214] convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:30.832071 start
2024-06-03 20:41:30.832175 sleep 4 sec
2024-06-03 20:41:30.832264 [27211] on_message(): Message 2
2024-06-03 20:41:30.832343 [27211] starting pool..
# (!) HERE is not expected behavior. It must be:
# convert_char_to_integer(), ch=27211_1
# convert_char_to_integer(), ch=27211_2
# convert_char_to_integer(), ch=27211_3
# , but that is not happening, because it is waiting 3 processes to be ending ("end convert_char_to_integer()"), but I need to work them independently
2024-06-03 20:41:34.932409 start
2024-06-03 20:41:34.932532 sleep 4 sec
2024-06-03 20:41:34.932644 [27211] on_message(): Message 3
2024-06-03 20:41:34.932712 [27211] starting pool..
2024-06-03 20:41:38.732708 [27212] end convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:38.732865 [27213] end convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:38.732895 [27214] end convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:38.733189 [27212] convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:38.733256 [27214] convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:38.733299 [27213] convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:38.733613 [27211] pool ended
2024-06-03 20:41:50.733313 [27212] end convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:50.733415 [27214] end convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:50.733449 [27213] end convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:50.733624 [27212] convert_char_to_integer(), ch=27211_1
2024-06-03 20:41:50.733642 [27214] convert_char_to_integer(), ch=27211_2
2024-06-03 20:41:50.733668 [27213] convert_char_to_integer(), ch=27211_3
2024-06-03 20:41:50.733868 [27211] pool ended
2024-06-03 20:42:02.733748 [27212] end convert_char_to_integer(), ch=27211_1
2024-06-03 20:42:02.733746 [27214] end convert_char_to_integer(), ch=27211_2
2024-06-03 20:42:02.733757 [27213] end convert_char_to_integer(), ch=27211_3
2024-06-03 20:42:02.734315 [27211] pool ended
You haven't really shown how on_message
gets invoked in your actual application. The name of the function suggests that it is invoked when a new message is received from somewhere. Why can't on_message
be executed in a multithreading pool if we pass to it additionally a multiprocessing pool instance that has been pre-allocated and which does the CPU-intensive processing? For example:
from multiprocessing.pool import Pool, ThreadPool
import time
import os
def convert_char_to_integer(ch):
time.sleep(.3) # Simulate real processing
print(f'ch = {ch}, pid = {os.getpid()}, time = {time.time()}')
return ord(ch)
def on_message(multiprocessing_pool, message):
# Get the sum of each character of the message after it has been
# converted to an integer:
result = sum(multiprocessing_pool.map(convert_char_to_integer, message))
print(f'{message!r}: {result}')
def await_next_message():
yield 'Message 1'
time.sleep(.1)
yield 'Message 2'
time.sleep(.1)
yield 'Message 3'
def main():
# The pools are started only once using the
# with Pool() .... statement
print('starting pools')
with Pool(10) as multiprocessing_pool, \
ThreadPool(3) as multithreading_pool:
for message in await_next_message():
multithreading_pool.apply_async(on_message, args=(multiprocessing_pool, message))
# Wait for all submitted tasks to complete
multithreading_pool.close()
multithreading_pool.join()
# Both pools are now terminated
print('pools ended')
if __name__ == '__main__':
main()
Prints:
starting pools
ch = M, pid = 18776, time = 1717442160.0104358
ch = e, pid = 17232, time = 1717442160.0540285
ch = s, pid = 24404, time = 1717442160.0580287
ch = s, pid = 10336, time = 1717442160.0828772
ch = a, pid = 19348, time = 1717442160.126844
ch = g, pid = 14968, time = 1717442160.126844
ch = e, pid = 5600, time = 1717442160.1278431
ch = , pid = 23884, time = 1717442160.1293561
ch = 1, pid = 3900, time = 1717442160.1298823
ch = M, pid = 15160, time = 1717442160.1299446
'Message 1': 790
ch = e, pid = 18776, time = 1717442160.3112478
ch = s, pid = 17232, time = 1717442160.3551002
ch = s, pid = 24404, time = 1717442160.3591855
ch = a, pid = 10336, time = 1717442160.3840768
ch = e, pid = 14968, time = 1717442160.4282424
ch = , pid = 5600, time = 1717442160.4282424
ch = g, pid = 19348, time = 1717442160.4282424
ch = 2, pid = 23884, time = 1717442160.4301693
ch = e, pid = 15160, time = 1717442160.4309397
ch = M, pid = 3900, time = 1717442160.4309397
'Message 2': 791
ch = s, pid = 18776, time = 1717442160.6133676
ch = s, pid = 17232, time = 1717442160.6570253
ch = a, pid = 24404, time = 1717442160.6604424
ch = g, pid = 10336, time = 1717442160.6856165
ch = e, pid = 14968, time = 1717442160.7294924
ch = 3, pid = 19348, time = 1717442160.7315066
ch = , pid = 5600, time = 1717442160.7315066
'Message 3': 792
pools ended