I'm trying to loop the video playback using gstreamer and it's python bindings. First attempt was to hook EOS message and generate seek message for the pipeline:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import time
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")
qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None
decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None
def demux0_pad_added(demux, pad):
if pad.name == 'video_0': # We expect exactly first one video stream
pad.link(decoder0.get_static_pad("sink"))
qtdemux0.connect("pad-added", demux0_pad_added)
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None
pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(video_sink)
source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(video_sink)
######################################################
def main():
message_bus = pipeline0.get_bus()
pipeline0.set_state(Gst.State.PLAYING)
while True:
if message_bus.have_pending(): # Working without glib mainloop
message = message_bus.pop()
if message.type == Gst.MessageType.EOS: # End-Of-Stream: loop the video, seek to beginning
pipeline0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.FLUSH,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
elif message.type == Gst.MessageType.ERROR:
print("ERROR", message)
break
time.sleep(0.01) # Tried 0.001 - same result
if __name__ == "__main__":
main()
And it actually works quite fine except one thing - seek to the beginning is not really seamless. I can see tiny glitch. Because the video is an infinite animation this tiny glitch actually become noticeable. My second attempt was to use queue for decoded frames and hook EOS event:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import time
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")
qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None
decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None
def demux0_pad_added(demux, pad):
if pad.name == 'video_0': # We expect exactly first one video stream
pad.link(decoder0.get_static_pad("sink"))
qtdemux0.connect("pad-added", demux0_pad_added)
queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None
pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)
source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)
######################################################
def cb_event(pad, info, *user_data):
event = info.get_event()
if event is not None and event.type == Gst.EventType.EOS:
decoder0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.FLUSH,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.PASS
def main():
dec0_src_pad = decoder0.get_static_pad("src")
dec0_src_pad.add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, cb_event)
message_bus = pipeline0.get_bus()
pipeline0.set_state(Gst.State.PLAYING)
while True:
# do nothing
time.sleep(1)
if __name__ == "__main__":
main()
After the first EOS event the playback is just stalled. I've tried several different things like: pass the EOS event, drop EOS and add offset to the decoder's source pad, send seek event to the pipeline itself and others. But I can't get it work.
In an effort to understand I also tried to enable debug mode and write my own kinda logger of pipeline activity using pad probes. Debug mode was not very useful, the log is very bulky and missing some details. My own log includes upstream/downstream events and the buffers timing information. However, I still can not understand what is wrong and how to get it to work.
Obviously I not just missing something but do not understand some fundamental thing about how gstreamer pipeline works.
So, the question is: What should I do with the second version of code to get it work?
Additional question: Are there some tools or techniques to get clear idea of what is happening inside the pipeline and its contained elements?
I will very appreciate detailed answers. It is more important for me to understand what I am doing wrong than to just bring the program to work.
p.s. Program is run under GNU/Linux on the NanoPi S2 board. Video is stored in the MP4 container (without audio) and compressed with h264. Please feel free to post code samples in any language, not necessarily Python.
Well, okay. I didn't get an answer so I continued research and finally found solution. Below I'll show two different approaches. First - direct answer to the question with working code sample. Second - different approach, which seems to be more native for gstreamer and definitely is more simple. Both give desired result - seamless video loop.
Changes:
FLUSH
events (contiguous stream should not have a FLUSH
events).Code:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import time
import threading
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")
qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None
decoder0 = Gst.ElementFactory.make("avdec_h264", "video_decoder0")
assert decoder0 is not None
def demux0_pad_added(demux, pad):
if pad.name == 'video_0': # We expect exactly first one video stream
pad.link(decoder0.get_static_pad("sink"))
qtdemux0.connect("pad-added", demux0_pad_added)
queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None
pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)
source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)
# UPD: Get video duration
pipeline0.set_state(Gst.State.PAUSED)
assert pipeline0.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.PAUSED
duration_ok, duration = pipeline0.query_duration(Gst.Format.TIME)
assert duration_ok
######################################################
seek_requested = threading.Event()
# UPD: Seek thread. Wait for seek request from callback and generate seek event
def seek_thread_func(queue_sink_pad):
cumulative_offset = 0
while True:
seek_requested.wait()
seek_requested.clear()
decoder0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.FLUSH,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
# Add offset. It is important step to ensure that downstream elements will 'see' infinite contiguous stream
cumulative_offset += duration
queue_sink_pad.set_offset(cumulative_offset)
def cb_event(pad, info):
event = info.get_event()
if event is not None:
if event.type == Gst.EventType.EOS: # UPD: Set 'seek_requested' flag
seek_requested.set()
return Gst.PadProbeReturn.DROP
elif event.type == Gst.EventType.FLUSH_START or event.type == Gst.EventType.FLUSH_STOP: # UPD: Drop FLUSH
return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.OK
def main():
queue_sink_pad = queue.get_static_pad("sink")
# UPD: Create separate 'seek thread'
threading.Thread(target=seek_thread_func, daemon=True, args=(queue_sink_pad,)).start()
dec0_src_pad = decoder0.get_static_pad("src")
dec0_src_pad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM | Gst.PadProbeType.EVENT_FLUSH,
cb_event)
pipeline0.set_state(Gst.State.PLAYING)
while True:
# do nothing
time.sleep(1)
if __name__ == "__main__":
main()
This code works. Seek is effectively performed while buffers from the queue still playing. However, I believe it can contain some flaws or even bugs. For example, SEGMENT
events passed downstream with the RESET
flag; it doesn't seems right. Much more clear (and probably more correct/reliable) way to implement this approach is to create a gstreamer plugin. Plugin will manage events and tune event's and buffer's timestamp.
But there is a more simple and native solution:
SEGMENT_DONE
messageAccording to the documentation:
Segment seeking (using the
GST_SEEK_FLAG_SEGMENT
) will not emit anEOS
at the end of the playback segment but will post aSEGMENT_DONE
message on the bus. This message is posted by the element driving the playback in the pipeline, typically a demuxer. After receiving the message, the application can reconnect the pipeline or issue other seek events in the pipeline. Since the message is posted as early as possible in the pipeline, the application has some time to issue a new seek to make the transition seamless. Typically the allowed delay is defined by the buffer sizes of the sinks as well as the size of any queues in the pipeline.
Message SEGMENT_DONE
indeed is posted earlier than the queue becomes empty. This gives more than enough time to perform next seek. So all we need to do is to issue segment seek in very beginning of the playback. Then wait for SEGMENT_DONE
message and send next non-flushing seek event.
Here is working example:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import time
if not Gst.init_check()[0]:
print("gstreamer initialization failed")
source0 = Gst.ElementFactory.make("filesrc", "source0")
assert source0 is not None
source0.set_property("location", "video0.mp4")
qtdemux0 = Gst.ElementFactory.make("qtdemux", "demux0")
assert qtdemux0 is not None
decoder0 = Gst.ElementFactory.make("nxvideodec", "video_decoder0")
assert decoder0 is not None
def demux0_pad_added(demux, pad):
if pad.name == 'video_0': # We expect exactly first one video stream
pad.link(decoder0.get_static_pad("sink"))
qtdemux0.connect("pad-added", demux0_pad_added)
queue = Gst.ElementFactory.make("queue", "queue")
assert queue is not None
video_sink = Gst.ElementFactory.make("nxvideosink", "video_sink")
assert video_sink is not None
pipeline0 = Gst.Pipeline()
assert pipeline0 is not None
pipeline0.add(source0)
pipeline0.add(qtdemux0)
pipeline0.add(decoder0)
pipeline0.add(queue)
pipeline0.add(video_sink)
source0.link(qtdemux0)
"""qtdemux0 -> decoder0 dynamic linking"""
decoder0.link(queue)
queue.link(video_sink)
######################################################
def main():
message_bus = pipeline0.get_bus()
pipeline0.set_state(Gst.State.PLAYING)
pipeline0.get_state(Gst.CLOCK_TIME_NONE)
pipeline0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.SEGMENT,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
while True:
if message_bus.have_pending(): # Working without glib mainloop
message = message_bus.pop()
if message.type == Gst.MessageType.SEGMENT_DONE:
pipeline0.seek(1.0,
Gst.Format.TIME,
Gst.SeekFlags.SEGMENT,
Gst.SeekType.SET, 0,
Gst.SeekType.NONE, 0)
elif message.type == Gst.MessageType.ERROR:
print("bus ERROR", message)
break
time.sleep(0.01)
if __name__ == "__main__":
main()
With default queue configuration the SEGMENT_DONE
message is posted approximately 1 second earlier than last video frame is played. Non-flushing seek ensures that none of the frames will be lost. Together this gives perfect result - truly seamless video loop.
Note: I switch pipeline to the PLAYING state and then perform initial non-flushing seek. Alternatively we can switch pipeline to the PAUSED state, perform flushing segment seek and then switch pipeline to the PLAYING state.
Note 2: Different sources suggests slightly different solution. See link below.
Related topics and sources:
http://gstreamer-devel.966125.n4.nabble.com/Flushing-the-data-in-partial-pipeline-tp4681893p4681899.html
https://cgit.freedesktop.org/gstreamer/gst-editing-services/tree/plugins/nle/nlesource.c