Search code examples
pythonpython-3.xqueuepython-multiprocessingpython-multithreading

Multiprocessing.Queue with hugh data causes _wait_for_tstate_lock


An Exception is raised in threading._wait_for_tstate_lock when I transfere hugh data between a Process and a Thread via multiprocessing.Queue.

My minimal working example looks a bit complex first - sorry. I will explain. The original application loads a lot of (not so important) files into RAM. This is done in a separate process to save ressources. The main gui thread shouldn't freez.

  1. The GUI start a separate Thread to prevent the gui event loop from freezing.

  2. This separate Thread then starts one Process which should does the work.

a) This Thread instanciates a multiprocess.Queue (be aware that this is a multiprocessing and not threading!)

b) This is givin to the Process for sharing data from Process back to the Thread.

  1. The Process does some work (3 steps) and .put() the result into the multiprocessing.Queue.

  2. When the Process ends the Thread takes over again and collect the data from the Queue, store it to its own attribute MyThread.result.

  3. The Thread tells the GUI main loop/thread to call a callback function if it has time for.

  4. The callback function (MyWindow::callback_thread_finished()) get the results from MyWindow.thread.result.

The problem is if the data put to the Queue is to big something happen I don't understand - the MyThread never ends. I have to cancle the application via Strg+C.

I got some hints from the docs. But my problem is I did not fully understand the documentation. But I have the feeling that the key of my problems can be found there. Please see the two red boxex in "Pipes and Queues" (Python 3.5 docs). That is the full output

MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

This is the minimal working example

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib


class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()

Possible duplicate but quite different and IMO without an answer for my situation: Thread._wait_for_tstate_lock() never returns.

Workaround

Using a Manager by modifing line 22 to queue = multiprocessing.Manager().Queue() solve the problem. But I don't know why. My intention of this question is to understand the things behind and not only to make my code work. Even I don't really know what a Manager() is and if it has other (problem causing) implications.


Solution

  • According to the second warning box in the documentation you are linking to you can get a deadlock when you join a process before processing all items in the queue. So starting the process and immediately joining it and then processing the items in the queue is the wrong order of steps. You have to start the process, then receive the items, and then only when all items are received you can call the join method. Define some sentinel value to signal that the process is finished sending data through the queue. None for example if that can't be a regular value you expect from the process.

    class MyThread(threading.Thread):
        """This thread just starts the process."""
    
        def __init__(self, callback):
            threading.Thread.__init__(self)
            self._callback = callback
            self.result = []
    
        def run(self):
            print('Running MyThread...')
            queue = multiprocessing.Queue()
            process = MyProcess(queue)
            process.start()
            while True:
                process_result = queue.get()
                if process_result is None:
                    break
                self.result.append(process_result)
            process.join()
            print('MyThread stoppd.')
            GLib.idle_add(self._callback)
    
    
    class MyProcess(multiprocessing.Process):
    
        def __init__(self, queue):
            multiprocessing.Process.__init__(self)
            self.queue = queue
    
        def run(self):
            print('Running MyProcess...')
            for i in range(3):
                self.queue.put((i, 'x' * 102048))
            self.queue.put(None)
            print('MyProcess stoppd.')