Search code examples
pythonpyqtgstreamerglibpygobject

Messages are not received when GStreamer pipeline is created in a QThread


I have a PyQt application that creates a GStreamer pipeline when the user presses a button and listens for messages on that pipeline's bus.

import gi

gi.require_version("Gst", "1.0")

from gi.repository import Gst, GLib
from PyQt5.QtWidgets import QApplication, QPushButton


Gst.init()

pipeline = None


def on_pipeline_message(bus, message):
    print("Got a message from pipeline:", message.type)
    return True


def on_button_press():
    global pipeline

    pipeline = Gst.parse_launch("videotestsrc ! xvimagesink")
    pipeline.bus.add_watch(GLib.PRIORITY_DEFAULT, on_pipeline_message)
    pipeline.set_state(Gst.State.PLAYING)


app = QApplication([])

playback_button = QPushButton("Press to Start Playback", None)
playback_button.clicked.connect(on_button_press)
playback_button.show()

app.exec()

The above code works as expected and my on_pipeline_message callback function is called. However, if I decide to move the pipeline creation code into a separate QThread:

class MakePipelineThread(QThread):
    def run(self):
        global pipeline

        pipeline = Gst.parse_launch("videotestsrc ! xvimagesink")
        pipeline.bus.add_watch(GLib.PRIORITY_DEFAULT, on_pipeline_message)
        pipeline.set_state(Gst.State.PLAYING)

... and start that QThread when the button is pressed instead:

make_pipeline_thread = MakePipelineThread()


def on_button_press():
    make_pipeline_thread.start()

My on_pipeline_message callback is no longer run. Why does it matter if I create the pipeline in a separate QThread? How can I continue to receive messages?


Solution

  • GStreamer and Qt both use the GLib.MainContext type to handle transporting messages between senders and receivers asynchronously1. By default, both GStreamer and Qt transport messages through the global default MainContext instance, accessible via GLib.MainContext.default(). When messages are sent, be them from user input, from a pipeline, or anywhere else, they are initially stored in a message queue. Qt routinely iterates the MainContext, which pulls messages from the queue and sends them to any listeners. This is why you are able to receive messages from your GStreamer pipeline when the pipeline is started on the UI thread.

    However, when Qt starts a new QThread, it also creates a new MainContext object and sets it as the default context for that thread. When you create a GStreamer pipeline in a QThread, your pipeline and watcher register with that context instead of the global default. Qt does not automatically iterate a QThread's MainContext for you, so no messages will be received unless you iterate the context yourself. This can be done by calling QCoreApplication.processEvents() in the QThread.

    class MakePipelineThread(QThread):
        def run(self):
            global pipeline
    
            pipeline = Gst.parse_launch("videotestsrc ! xvimagesink")
            pipeline.bus.add_watch(GLib.PRIORITY_DEFAULT, on_pipeline_message)
            pipeline.set_state(Gst.State.PLAYING)
    
            # Process events until the pipeline reaches the null state
            _, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)
            while state != Gst.State.NULL:
                QCoreApplication.processEvents()
                _, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)
    

    This of course means that the QThread will run for as long as the pipeline runs, instead of stopping as soon as the pipeline is constructed.

    Alternatively, you can use set_sync_handler instead of add_watch. This tells the bus to run your callback immediately on the same thread that sent the message instead of sending the message asynchronously through a MainContext.

    class MakePipelineThread(QThread):
        def run(self):
            global pipeline
    
            pipeline = Gst.parse_launch("videotestsrc ! xvimagesink")
            pipeline.bus.set_sync_handler(on_pipeline_message)
            pipeline.set_state(Gst.State.PLAYING)
    

    This removes the need for iterating the MainContext entirely, but this means that your callback will be run in one of GStreamer's "streaming threads" and will block that thread from doing other work while your callback is running.


    1 Some platforms may compile Qt without GLib support, in which case Qt will use its own event handling system. In that case, this problem won't occur as long as the application iterates the global default context itself. You can set the QT_NO_GLIB environment variable to 1 to force Qt to not use GLib at runtime.