This is an offshoot of this question. The code in python runs fine. When I tried the cythonized version, I started getting "Can't pickle <cyfunction init_worker_processes at 0x7fffd7da5a00>" even though I defined the init_worker_processes at top level. So, I moved it to another module and used the imported init_worker_processes. Now, I get the below error:
error: unrecognized arguments: -s -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=8, pipe_handle=16) --multiprocessing-fork
Python3/lib/python3.9/multiprocessing/resource_tracker.py:96: UserWarning: resource_tracker: process died unexpectedly, relaunching. Some resources might leak.
warnings.warn('resource_tracker: process died unexpectedly, '
I'm not explicitly using -s
or -c
as reported in the error. The error is coming from below code in the multiprocessing library (method - ensure_running
)
warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
How to resolve this issue?
# updated Python code
# ---------------------- mp_app.py ------------------
import argparse
import logging
import signal
import sys
import time
import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn
import numpy as np
from mp_utils import init_worker_processes
@dataclass
class TmpData:
name: str
value: int
def worker(name: str, data: TmpData) -> NoReturn:
logger_obj = mp.get_logger()
logger_obj.info(f"processing : {name}; value: {data.value}")
time.sleep(data.value)
def get_args(logger: logging.Logger) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="test MP app")
parser.add_argument(
"-m",
"--max-time",
type=int,
dest="max_time",
required=True,
help="max timeout in seconds",
)
parser.add_argument(
"-j",
dest="num_workers",
type=int,
default=1,
required=False,
help=argparse.SUPPRESS,
)
try:
args = parser.parse_args()
except argparse.ArgumentError as err:
logger.exception(parser.print_help())
raise err
return args
def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn:
map_data: Dict[str, TmpData] = {
key: TmpData(name=key, value=np.random.randint(1, options.max_time))
for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"]
}
with mp.get_context("fork").Pool(
processes=options.num_workers,
initializer=init_worker_processes,
) as pool:
results = []
for key in map_data:
try:
results.append(
pool.apply_async(
worker,
args=(
key,
map_data[key],
),
)
)
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
logger.error(f"{err}")
if __name__ == "__main__":
main_logger = logging.getLogger()
try:
args = get_args(main_logger)
mp_app(options=args, logger=main_logger)
except Exception as e:
main_logger.error(e)
raise SystemExit(1) from e
sys.exit(0)
# --------------------- mp_utils.py --------------------------
import multiprocessing
import logging
import signal
from typing import NoReturn
def init_worker_processes() -> NoReturn:
"""
Initializes each worker to handle signals
Returns:
None
"""
this_process_logger = multiprocessing.log_to_stderr()
this_process_logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)
Please note the main issue seems to be "-s" and "-c" being unrecognized option; not sure from where those are coming.
While I'm still trying to decipher the cython build process as it happens through a complex make system in our environment. However, I'm guessing that I am able to trace the root cause of -s and -c options. -s
seems to be coming from the _args_from_interpreter_flags
method in subprocess.py (module subprocess).
In my python shell I see sys.flags as following -
>>> sys.flags sys.flags(debug=0, inspect=0, interactive=0, optimize=0, dont_write_bytecode=0, no_user_site=1, no_site=0, ignore_environment=0, verbose=0, bytes_warning=0, quiet=0, hash_randomization=1, isolated=0, dev_mode=False, utf8_mode=0, int_max_str_digits=-1)
Since sys.flags.no_user_site is 1, -s
seems to get appended.
get_command_line
in spawn.py seems to be adding -c
. Since this branch is coming from else of if getattr(sys, 'frozen', False)
, is spwan approach not supposed to work with a cythonized binary?
I tried with both "fork" and "spawn". Both works in Python. But with cythonized build, "spawn" based app I get "UserWarning: resource_tracker: process died unexpectedly, relaunching. Some resources might leak" message and along with "unrecognized arguments" for -s and -c. The cythonized version of the "fork" based app, simply hangs at launch itself as if it's waiting on some lock. I tried, pstack on the process id, but could not spot anything -
# top 20 frames from pstack
#0 0x00007ffff799675d in read () from /usr/lib64/libpthread.so.0
#1 0x00007ffff70c3996 in _Py_read (fd=fd@entry=3, buf=0x7fffbabfdbf0, count=count@entry=4) at Python/fileutils.c:1707
#2 0x00007ffff70ce872 in os_read_impl (module=<optimized out>, length=4, fd=3) at ./Modules/posixmodule.c:9474
#3 os_read (module=<optimized out>, nargs=<optimized out>, args=<optimized out>) at ./Modules/clinic/posixmodule.c.h:5012
#4 os_read (module=<optimized out>, args=<optimized out>, nargs=<optimized out>) at ./Modules/clinic/posixmodule.c.h:4977
#5 0x00007ffff6fc444f in cfunction_vectorcall_FASTCALL (func=0x7ffff7f4aa90, args=0x7fffbae68f10, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/methodobject.c:430
#6 0x00007ffff6f303ec in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=0x7ffff7f4aa90, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
#7 PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbae68f10, callable=<optimized out>) at ./Include/cpython/abstract.h:127
#8 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077
#9 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3520
#10 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbae68d60, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40
#11 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac0b750, kwcount=0, kwstep=1, defs=0x7fffbae95298, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae8d230, qualname=0x7fffbae8c210) at Python/ceval.c:4329
#12 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396
#13 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=0x7fffbabefe50, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
#14 PyObject_Vectorcall (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac0b740, callable=<optimized out>) at ./Include/cpython/abstract.h:127
#15 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x419cb0) at Python/ceval.c:5077
#16 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>) at Python/ceval.c:3506
#17 0x00007ffff7066ea4 in _PyEval_EvalFrame (throwflag=0, f=0x7fffbac0b5b0, tstate=0x419cb0) at ./Include/internal/pycore_ceval.h:40
#18 _PyEval_EvalCode (tstate=tstate@entry=0x419cb0, _co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, args=<optimized out>, argcount=2, kwnames=0x0, kwargs=0x7fffbac08f58, kwcount=0, kwstep=1, defs=0x7fffbae83b08, defcount=1, kwdefs=0x0, closure=0x0, name=0x7fffbae84830, qualname=0x7fffbae8c3f0) at Python/ceval.c:4329
#19 0x00007ffff6f7baba in _PyFunction_Vectorcall (func=<optimized out>, stack=<optimized out>, nargsf=<optimized out>, kwnames=<optimized out>) at Objects/call.c:396
#20 0x00007ffff6f311aa in _PyObject_VectorcallTstate (kwnames=0x0, nargsf=<optimized out>, args=0x7fffbac08f48, callable=0x7fffbabeff70, tstate=0x419cb0) at ./Include/cpython/abstract.h:118
I checked the cython build process prints something like following:
cython --3str --embed --no-docstrings -o mp_app.c mp_app.py
gcc -Os -Loss/Python3/lib -DNDEBUG -Wl,--strip-all -IPython-3.9.15/include/python3.9 -LPython-3.9.15/lib/python3.9/config-3.9-x86_64-linux-gnu -LPython-3.9.15/lib -lcrypt -lpthread -ldl -lutil -lm -lm -B /binutils/bin -static-libgcc -static-libstdc++ -fPIC -lpython3.9 mp_app.c -o mp_app.pex
PS: I've also edited the source code example
I was able to resolve the issue by switching to the fork
start method from the spawn
start method. Additionally, I had to move the worker
method from mp_app.py
to mp_utils.py
as otherwise in the cythonized version it was throwing PicklingError
for cythonized function worker
.
I am still not sure why the spawn
start method did not work for me on CentOS7 machine.
The final code is approximately following:
# ------------- mp_app.py ------------------
import argparse
import logging
import signal
import sys
import multiprocessing as mp
from typing import Dict, NoReturn
import numpy as np
from mp_utils import (init_worker_processes, worker_task, InterProcessData)
def get_args(logger: logging.Logger) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="test MP app")
parser.add_argument(
"-m",
"--max-time",
type=int,
dest="max_time",
required=True,
help="max timeout in seconds",
)
parser.add_argument(
"-j",
dest="num_workers",
type=int,
default=1,
required=False,
help=argparse.SUPPRESS,
)
try:
args = parser.parse_args()
except argparse.ArgumentError as err:
logger.exception(parser.print_help())
raise err
return args
def mp_app(options: argparse.Namespace, logger: logging.Logger) -> NoReturn:
map_data: Dict[str, InterProcessData] = {
key: InterProcessData(name=key, value=np.random.randint(1, options.max_time))
for key in ["ABC", "DEF", "GHI", "JKL", "PQR", "STU", "XYZ"]
}
with mp.get_context("fork").Pool(
processes=options.num_workers,
initializer=init_worker_processes,
) as pool:
results = []
for key in map_data:
try:
results.append(
pool.apply_async(
worker_task,
args=(
key,
map_data[key],
),
)
)
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
logger.error(f"{err}")
if __name__ == "__main__":
main_logger = logging.getLogger()
try:
args = get_args(main_logger)
mp_app(options=args, logger=main_logger)
except Exception as e:
main_logger.error(e)
raise SystemExit(1) from e
sys.exit(0)
# ---------- mp_utils.py -----------
import time
import logging
import signal
import multiprocessing
from dataclasses import dataclass
from typing import NoReturn
@dataclass
class InterProcessData:
name: str
value: int
def worker_task(name: str, data: InterProcessData) -> NoReturn:
logger_obj = multiprocessing.get_logger()
logger_obj.info(f"processing : {name}; value: {data.value}")
time.sleep(data.value)
def init_worker_processes() -> NoReturn:
this_process_logger = multiprocessing.log_to_stderr()
this_process_logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)