Search code examples
pythongstreamerpython-gstreamer

Seamless video loop in gstreamer


I'm trying to loop the video playback using gstreamer and it's python bindings. First attempt was to hook EOS message and generate seek message for the pipeline:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(video_sink)

######################################################

def main():
    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        if message_bus.have_pending():  # Working without glib mainloop
            message = message_bus.pop()
            if message.type == Gst.MessageType.EOS:  # End-Of-Stream: loop the video, seek to beginning
                pipeline0.seek(1.0,
                              Gst.Format.TIME,
                              Gst.SeekFlags.FLUSH,
                              Gst.SeekType.SET, 0,
                              Gst.SeekType.NONE, 0)
            elif message.type == Gst.MessageType.ERROR:
                print("ERROR", message)
                break
        time.sleep(0.01) # Tried 0.001 - same result

if __name__ == "__main__":
    main()

And it actually works quite fine except one thing - seek to the beginning is not really seamless. I can see tiny glitch. Because the video is an infinite animation this tiny glitch actually become noticeable. My second attempt was to use queue for decoded frames and hook EOS event:

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst

import time

if not Gst.init_check()[0]:
    print("gstreamer initialization failed")

source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")

qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None

decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None

def demux0_pad_added(demux, pad):
    if pad.name == 'video_0':  # We expect exactly first one video stream
        pad.link(decoder0.get_static_pad("sink"))

qtdemux0.connect("pad-added", demux0_pad_added)

queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None

pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)

source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)

######################################################

def cb_event(pad, info, *user_data):
    event = info.get_event()
    if event is not None and event.type == Gst.EventType.EOS:
        decoder0.seek(1.0,
                      Gst.Format.TIME,
                      Gst.SeekFlags.FLUSH,
                      Gst.SeekType.SET, 0,
                      Gst.SeekType.NONE, 0)
        return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.PASS

def main():
    dec0_src_pad = decoder0.get_static_pad("src")
    dec0_src_pad.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, cb_event)

    message_bus = pipeline0.get_bus()
    pipeline0.set_state(Gst.State.PLAYING)

    while True:
        # do nothing
        time.sleep(1)

if __name__ == "__main__":
    main()

After the first EOS event the playback is just stalled. I've tried several different things like: pass the EOS event, drop EOS and add offset to the decoder's source pad, send seek event to the pipeline itself and others. But I can't get it work.

In an effort to understand I also tried to enable debug mode and write my own kinda logger of pipeline activity using pad probes. Debug mode was not very useful, the log is very bulky and missing some details. My own log includes upstream/downstream events and the buffers timing information. However, I still can not understand what is wrong and how to get it to work.

Obviously I not just missing something but do not understand some fundamental thing about how gstreamer pipeline works.

So, the question is: What should I do with the second version of code to get it work?
Additional question: Are there some tools or techniques to get clear idea of what is happening inside the pipeline and its contained elements?

I will very appreciate detailed answers. It is more important for me to understand what I am doing wrong than to just bring the program to work.

p.s. Program is run under GNU/Linux on the NanoPi S2 board. Video is stored in the MP4 container (without audio) and compressed with h264. Please feel free to post code samples in any language, not necessarily Python.


Solution

  • Well, okay. I didn't get an answer so I continued research and finally found solution. Below I'll show two different approaches. First - direct answer to the question with working code sample. Second - different approach, which seems to be more native for gstreamer and definitely is more simple. Both give desired result - seamless video loop.

    Corrected code (the answer, but not the best approach)

    Changes:

    1. Added video duration query. Every loop we should increase time offset for a video duration value. It makes possible to emulate infinite contiguous stream.
    2. The seek event emitting moved to a separate thread. According to this post, we can not emit seek event from the streaming thread. Also, have a look at this file (link from mentioned post).
    3. Event callback now drops FLUSH events (contiguous stream should not have a FLUSH events).
    4. Video decoder changed from nxvideodec to avdec_h264. This is not relevant to the initial question and is done for a very special reason.

    Code:

    import gi
    gi.require_version("Gst", "1.0")
    from gi.repository import Gst
    
    import time
    import threading
    
    if not Gst.init_check()[0]:
        print("gstreamer initialization failed")
    
    source0 = Gst.ElementFactory.make("filesrc", "source0")
    assert source0 is not None
    source0.set_property("location", "video0.mp4")
    
    qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
    assert qtdemux0 is not None
    
    decoder0 = Gst.ElementFactory.make("avdec_h264", "video_decoder0")
    assert decoder0 is not None
    
    def demux0_pad_added(demux, pad):
        if pad.name == 'video_0':  # We expect exactly first one video stream
            pad.link(decoder0.get_static_pad("sink"))
    
    qtdemux0.connect("pad-added", demux0_pad_added)
    
    queue = Gst.ElementFactory.make("queue", "queue")
    assert queue is not None
    video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
    assert video_sink is not None
    
    pipeline0 = Gst.Pipeline()
    assert pipeline0 is not None
    pipeline0.add(source0)
    pipeline0.add(qtdemux0)
    pipeline0.add(decoder0)
    pipeline0.add(queue)
    pipeline0.add(video_sink)
    
    source0.link(qtdemux0)
    """qtdemux0 -> decoder0 dynamic linking"""
    decoder0.link(queue)
    queue.link(video_sink)
    
    # UPD: Get video duration
    pipeline0.set_state(Gst.State.PAUSED)
    assert pipeline0.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.PAUSED
    duration_ok, duration = pipeline0.query_duration(Gst.Format.TIME)
    assert duration_ok
    
    ######################################################
    
    seek_requested = threading.Event()
    # UPD: Seek thread. Wait for seek request from callback and generate seek event
    def seek_thread_func(queue_sink_pad):
        cumulative_offset = 0
        while True:
            seek_requested.wait()
            seek_requested.clear()
            decoder0.seek(1.0,
                          Gst.Format.TIME,
                          Gst.SeekFlags.FLUSH,
                          Gst.SeekType.SET, 0,
                          Gst.SeekType.NONE, 0)
            # Add offset. It is important step to ensure that downstream elements will 'see' infinite contiguous stream
            cumulative_offset += duration
            queue_sink_pad.set_offset(cumulative_offset)
    
    def cb_event(pad, info):
        event = info.get_event()
        if event is not None:
            if event.type == Gst.EventType.EOS:  # UPD: Set 'seek_requested' flag
                seek_requested.set()
                return Gst.PadProbeReturn.DROP
            elif event.type == Gst.EventType.FLUSH_START or event.type == Gst.EventType.FLUSH_STOP:  # UPD: Drop FLUSH
                return Gst.PadProbeReturn.DROP
        return Gst.PadProbeReturn.OK
    
    def main():
        queue_sink_pad = queue.get_static_pad("sink")
    
        # UPD: Create separate 'seek thread'
        threading.Thread(target=seek_thread_func, daemon=True, args=(queue_sink_pad,)).start()
    
        dec0_src_pad = decoder0.get_static_pad("src")
        dec0_src_pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM | Gst.PadProbeType.EVENT_FLUSH,
                               cb_event)
    
        pipeline0.set_state(Gst.State.PLAYING)
    
        while True:
            # do nothing
            time.sleep(1)
    
    if __name__ == "__main__":
        main()
    

    This code works. Seek is effectively performed while buffers from the queue still playing. However, I believe it can contain some flaws or even bugs. For example, SEGMENT events passed downstream with the RESET flag; it doesn't seems right. Much more clear (and probably more correct/reliable) way to implement this approach is to create a gstreamer plugin. Plugin will manage events and tune event's and buffer's timestamp.

    But there is a more simple and native solution:

    Using segment seek and SEGMENT_DONE message

    According to the documentation:

    Segment seeking (using the GST_SEEK_FLAG_SEGMENT) will not emit an EOS at the end of the playback segment but will post a SEGMENT_DONE message on the bus. This message is posted by the element driving the playback in the pipeline, typically a demuxer. After receiving the message, the application can reconnect the pipeline or issue other seek events in the pipeline. Since the message is posted as early as possible in the pipeline, the application has some time to issue a new seek to make the transition seamless. Typically the allowed delay is defined by the buffer sizes of the sinks as well as the size of any queues in the pipeline.

    Message SEGMENT_DONE indeed is posted earlier than the queue becomes empty. This gives more than enough time to perform next seek. So all we need to do is to issue segment seek in very beginning of the playback. Then wait for SEGMENT_DONE message and send next non-flushing seek event. Here is working example:

    import gi
    gi.require_version("Gst", "1.0")
    from gi.repository import Gst
    
    import time
    
    if not Gst.init_check()[0]:
        print("gstreamer initialization failed")
    
    source0 = Gst.ElementFactory.make("filesrc", "source0")
    assert source0 is not None
    source0.set_property("location", "video0.mp4")
    
    qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
    assert qtdemux0 is not None
    
    decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
    assert decoder0 is not None
    
    def demux0_pad_added(demux, pad):
        if pad.name == 'video_0':  # We expect exactly first one video stream
            pad.link(decoder0.get_static_pad("sink"))
    
    qtdemux0.connect("pad-added", demux0_pad_added)
    
    queue = Gst.ElementFactory.make("queue", "queue")
    assert queue is not None
    video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
    assert video_sink is not None
    
    pipeline0 = Gst.Pipeline()
    assert pipeline0 is not None
    pipeline0.add(source0)
    pipeline0.add(qtdemux0)
    pipeline0.add(decoder0)
    pipeline0.add(queue)
    pipeline0.add(video_sink)
    
    source0.link(qtdemux0)
    """qtdemux0 -> decoder0 dynamic linking"""
    decoder0.link(queue)
    queue.link(video_sink)
    
    ######################################################
    
    def main():
        message_bus = pipeline0.get_bus()
        pipeline0.set_state(Gst.State.PLAYING)
        pipeline0.get_state(Gst.CLOCK_TIME_NONE)
        pipeline0.seek(1.0,
                       Gst.Format.TIME,
                       Gst.SeekFlags.SEGMENT,
                       Gst.SeekType.SET, 0,
                       Gst.SeekType.NONE, 0)
    
        while True:
            if message_bus.have_pending():  # Working without glib mainloop
                message = message_bus.pop()
                if message.type == Gst.MessageType.SEGMENT_DONE:
                    pipeline0.seek(1.0,
                                  Gst.Format.TIME,
                                  Gst.SeekFlags.SEGMENT,
                                  Gst.SeekType.SET, 0,
                                  Gst.SeekType.NONE, 0)
                elif message.type == Gst.MessageType.ERROR:
                    print("bus ERROR", message)
                    break
            time.sleep(0.01)
    
    if __name__ == "__main__":
        main()
    

    With default queue configuration the SEGMENT_DONE message is posted approximately 1 second earlier than last video frame is played. Non-flushing seek ensures that none of the frames will be lost. Together this gives perfect result - truly seamless video loop.

    Note: I switch pipeline to the PLAYING state and then perform initial non-flushing seek. Alternatively we can switch pipeline to the PAUSED state, perform flushing segment seek and then switch pipeline to the PLAYING state.

    Note 2: Different sources suggests slightly different solution. See link below.


    Related topics and sources:

    1. http://gstreamer-devel.966125.n4.nabble.com/Flushing-the-data-in-partial-pipeline-tp4681893p4681899.html
      • https://cgit.freedesktop.org/gstreamer/gst-editing-services/tree/plugins/nle/nlesource.c
    2. http://gstreamer-devel.966125.n4.nabble.com/Loop-a-file-using-playbin-without-artefacts-td4671952.html