Search code examples
pythongstreamerpython-multiprocessingglibgobject

Calling GMainLoop.quit() for mainloop running in child process, from parent process


I need to run a gstreamer pipeline to perform video streaming. The GStreamer pipeline requires a GObject.MainLoop object which has a run() method that does not terminate until quit() is called. For this I create a process (P2) from my main application process (P1), which runs the GObject.MainLoop instance in its main thread. The problem is that loop goes on indefinitly within the process P2 and I'm unable to exit/quit it from the main application process (P1).

Following is the section of code that might help understanding the scenario.

'''
start() spawns a new process P2 that runs Mainloop within its main thread.
stop() is called from P1, but does not quit the Mainloop. This is probably because 
processes do not have shared memory
'''
from multiprocessing import Process
import gi

from gi.repository import GObject

class Main:
    def __init__(self):
        self.process = None
        self.loop = GObject.MainLoop()

    def worker(self):
        self.loop.run()

    def start(self):
        self.process=Process(target=self.worker, args=())
        self.process.start()

    def stop(self):
        self.loop.quit()

Next, I tried using a multiprocessing Queue for sharing the 'loop' variable between the processes, but am still unable to quit the mainloop.

'''
start() spawns a new process and puts the loop object in a multiprocessing Queue
stop() calls get() from the loop and calls the quit() method, though it still does not quit the mainloop.
'''
from multiprocessing import Process, Queue
import gi

from gi.repository import GObject

class Main:
    def __init__(self):
        self.p=None
        self.loop = GObject.MainLoop()
        self.queue = Queue()

    def worker(self):
        self.queue.put(self.loop)
        self.loop.run()


    def start(self):
        self.p=Process(target=self.worker, args=())
        self.p.start()

    def stop(self):
        # receive loop instance shared by Child Process
        loop=self.queue.get()
        loop.quit()

How do I call the quit method for the MainLoop object which is only accessible within the child Process P2?


Solution

  • I extended multiprocessing.Process module in my class Main and overridden its run() method to actually run the GObject.Mainloop instance inside another thread (T1) instead of its main thread. And then implemented a wait-notify mechanism which will make the main thread of Process (P2) to go under wait-notify loop and used multiprocessing.Queue to forward messages to the main thread of P2 and P2 will be notified at the same time. For eg, stop() method, which will send the quit message to P2 for which a handler is defined in the overridden run() method. This module can be extended to parse any number of messages to the Child Process provided their handlers are to be defined also.

    Following is the code snippet which I used.

    from multiprocessing import Process, Condition, Queue
    from threading import Thread
    import gi
    
    from gi.repository import GObject
    
    loop=GObject.MainLoop()
    
    def worker():
        loop.run()
    
    class Main(Process):
    
        def __init__(self, target=None, args=()):
            self.target=target
            self.args=tuple(args)
            print self.args
            self.message_queue = Queue()
            self.cond = Condition()
            self.thread = None
            self.loop = GObject.MainLoop()
            Process.__init__(self)
    
        def run(self):
            if self.target:
                self.thread = Thread(target=self.target, args=())
                print "running target method"
                self.thread.start()
    
            while True:
                with self.cond:
                    self.cond.wait()
                msg = self.message_queue.get()
                if msg == 'quit':
                    print loop.is_running()
                    loop.quit()
                    print loop.is_running()
                    break
                else:
                    print 'message received', msg
    
        def send_message(self, msg):
            self.message_queue.put(msg)
            with self.cond:
                self.cond.notify_all()
    
        def stop(self):
            self.send_message("quit")
            self.join()
    
        def func1(self):
            self.send_message("msg 1") # handler is defined in the overridden run method
    
        # few others functions which will send unique messages to the process, and their handlers 
        # are defined in the overridden run method above
    

    This method is working fine for my scenerio but suggestions are welcomed if there is a better way to do the same.