A related question came up at Why I can't use multiprocessing.Queue with ProcessPoolExecutor?. I provided a partial answer along with a workaround but admitted that the question raises another question, namely why a multiprocessing.Queue
instance can be passed as the argument to a multiprocessing.Process
worker function.
For example, the following code fails under platforms that use either the spawn or fork method of creating new processes:
from multiprocessing import Pool, Queue
def worker(q):
print(q.get())
with Pool(1) as pool:
q = Queue()
q.put(7)
pool.apply(worker, args=(q,))
The above raises:
RuntimeError: Queue objects should only be shared between processes through inheritance
Yet the following program runs without a problem:
from multiprocessing import Process, Queue
def worker(q):
print(q.get())
q = Queue()
q.put(7)
p = Process(target=worker, args=(q,))
p.start()
p.join()
It appears that arguments to a multiprocessing pool worker function ultimately get put on the pool's input queue, which is implemented as a multiprocessing.SimpleQueue
, and you cannot put a multiprocessing.Queue
instance to a multiprocessing.SimpleQueue
instance, which uses a ForkingPickler
for serialization.
So how is the multiprocessing.Queue
serialized when passed as an argument to a multiprocessing.Process
that allows it to be used in this way?
I wanted to expand on the accepted answer so I added my own which also details a way to make queues, locks, etc. picklable and able to be sent through a pool.
Basically, it's not that Queues cannot be serialized, it's just that multiprocessing
is only equipped to serialize these when it knows sufficient information about the target process it will be sent to (whether that be the current process or some else) which is why it works when you are spawning a process yourself (using Process
class) but not when you are simply putting it in a queue (like when using a Pool
).
Look over the source code for multiprocessing.queues.Queue
(or other connection objects like Condition
). You'll find that in their __getstate__
method (the method called when a Queue instance is being pickled), there is a call to function multiprocessing.context.assert_spawning
. This "assertion" will only pass if the current thread is spawning a process. If that is not the case, multiprocessing
raises the error you see and quits.
Now the reason why multiprocessing does not even bother to pickle the queue in case the assertion fails is that it does not have access to the Popen
object created when a thread creates a subprocess (for windows, you can find this at multiprocessing.popen_spawn_win32.Popen
). This object stores data about the target process including its pid and process handle. Multiprocessing requires this information because a Queue contains mutexes, and to successfully pickle and later rebuild these again, multiprocessing must call DuplicateHandle
through winapi with the information from the Popen
object. Without this object being present, multiprocessing does not know what to do and raises an error. So this is where our problem lies, but it is something fixable if we can teach multiprocessing a different approach to steal the duplicate handles from inside the target process itself without ever requiring it's information in advance.
Pay attention to the class multiprocessing.synchronize.SemLock
. It's the base class for all multiprocessing locks, so its objects are subsequently present in queues, pipes, etc. The way it's currently pickled is like how I described above, it requires the target process's handle to create a duplicate handle. However, we can instead define a __reduce__
method for SemLock
where we will create a duplicate handle using the current process's handle, and then from the target process, duplicate the previously created handle which will now be valid in the target process's context. It's quite a mouthful, but a similar approach is actually used to pickle PipeConnection
objects as well, but instead of a __reduce__
method, it uses the dispatch table to do so.
After this is done, we can the subclass Queue
and remove the call to assert_spawning
since it will no longer be required. This way, we will now successfully be able to pickle locks, queues, pipes, etc. Here's the code with examples:
import os, pickle
from multiprocessing import Pool, Lock, synchronize, get_context
import multiprocessing.queues
import _winapi
def work(q):
print("Worker: Main says", q.get())
q.put('haha')
class DupSemLockHandle(object):
"""
Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
"""
def __init__(self, handle, pid=None):
if pid is None:
# We just duplicate the handle in the current process and
# let the receiving process steal the handle.
pid = os.getpid()
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
self._handle = _winapi.DuplicateHandle(
_winapi.GetCurrentProcess(),
handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
self._pid = pid
def detach(self):
"""
Get the handle, typically from another process
"""
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
# The handle has already been duplicated for this process.
return self._handle
# We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
return _winapi.DuplicateHandle(
proc, self._handle, _winapi.GetCurrentProcess(),
0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
def reduce_lock_connection(self):
sl = self._semlock
dh = DupSemLockHandle(sl.handle)
return rebuild_lock_connection, (dh, type(self), (sl.kind, sl.maxvalue, sl.name))
def rebuild_lock_connection(dh, t, state):
handle = dh.detach() # Duplicated handle valid in current process's context
# Create a new instance without calling __init__ because we'll supply the state ourselves
lck = t.__new__(t)
lck.__setstate__((handle,)+state)
return lck
# Add our own reduce function to pickle SemLock and it's child classes
synchronize.SemLock.__reduce__ = reduce_lock_connection
class PicklableQueue(multiprocessing.queues.Queue):
"""
A picklable Queue that skips the call to context.assert_spawning because it's no longer needed
"""
def __init__(self, *args, **kwargs):
ctx = get_context()
super().__init__(*args, **kwargs, ctx=ctx)
def __getstate__(self):
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def is_locked(l):
"""
Returns whether the given lock is acquired or not.
"""
locked = l.acquire(block=False)
if locked is False:
return True
else:
l.release()
return False
if __name__ == '__main__':
# Example that shows that you can now pickle/unpickle locks and they'll still point towards the same object
l1 = Lock()
p = pickle.dumps(l1)
l2 = pickle.loads(p)
print('before acquiring, l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
l2.acquire()
print('after acquiring l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
# Example that shows how you can pass a queue to Pool and it will work
with Pool() as pool:
q = PicklableQueue()
q.put('laugh')
pool.map(work, (q,))
print("Main: Worker says", q.get())
Output
before acquiring, l1 locked: False l2 locked False
after acquiring l1 locked: True l2 locked True
Worker: Main says laugh
Main: Worker says haha
Disclaimer: The above code will only work on Windows. If you are on UNIX then you may try using @Booboo's modified code below (reported working but has not been adequately tested, full code link here):
import os, pickle
from multiprocessing import Pool, Lock, synchronize, get_context, Process
import multiprocessing.queues
import sys
_is_windows= sys.platform == 'win32'
if _is_windows:
import _winapi
.
.
.
class DupSemLockHandle(object):
"""
Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
"""
def __init__(self, handle, pid=None):
if pid is None:
# We just duplicate the handle in the current process and
# let the receiving process steal the handle.
pid = os.getpid()
if _is_windows:
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
self._handle = _winapi.DuplicateHandle(
_winapi.GetCurrentProcess(),
handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
else:
self._handle = handle
self._pid = pid
def detach(self):
"""
Get the handle, typically from another process
"""
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
# The handle has already been duplicated for this process.
return self._handle
if not _is_windows:
return self._handle
# We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
return _winapi.DuplicateHandle(
proc, self._handle, _winapi.GetCurrentProcess(),
0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)