Search code examples
pythonvideo-streaminggstreamerqos

How te retrieve stream statistics in Gstreamer?


First of all I would like to start by saying that I'm really new to Gstreamer and its capabilities so pardon my ignorance if my understanding or implementation was wrong, I am still learning .

I would like to build a small streaming application in PYTHON or JAVA (as I am not proficient in C) with QOS integrated in it, especially for the package drop count statistics, and RTP and RTCP seem the perfect match.

For this purpose I implemented a SERVER

#! /usr/bin/env python
import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst

#gst-launch -v rtpbin name=rtpbin audiotestsrc ! audioconvert ! alawenc ! rtppcmapay ! rtpbin.send_rtp_sink_0 \
#                rtpbin.send_rtp_src_0 ! udpsink port=10000 host=xxx.xxx.xxx.xxx \
#                rtpbin.send_rtcp_src_0 ! udpsink port=10001 host=xxx.xxx.xxx.xxx sync=false async=false \
#                udpsrc port=10002 ! rtpbin.recv_rtcp_sink_0

DEST_HOST = '127.0.0.1'

AUDIO_SRC = 'audiotestsrc'
AUDIO_ENC = 'alawenc'
AUDIO_PAY = 'rtppcmapay'

RTP_SEND_PORT = 5002
RTCP_SEND_PORT = 5003
RTCP_RECV_PORT = 5007

GObject.threads_init()
Gst.init(sys.argv)

# the pipeline to hold everything
pipeline = Gst.Pipeline.new('rtp_server')

# the pipeline to hold everything
audiosrc = Gst.ElementFactory.make(AUDIO_SRC, 'audiosrc')
audioconv = Gst.ElementFactory.make('audioconvert', 'audioconv')
audiores = Gst.ElementFactory.make('audioresample', 'audiores')

# the pipeline to hold everything
audioenc = Gst.ElementFactory.make(AUDIO_ENC, 'audioenc')
audiopay = Gst.ElementFactory.make(AUDIO_PAY, 'audiopay')

# add capture and payloading to the pipeline and link
pipeline.add(audiosrc)
pipeline.add(audioconv)
pipeline.add(audiores)
pipeline.add(audioenc)
pipeline.add(audiopay)


audiosrc.link(audioconv)
audioconv.link(audiores)
audiores.link(audioenc)
audioenc.link(audiopay)

# the rtpbin element
rtpbin = Gst.ElementFactory.make('rtpbin', 'rtpbin')

pipeline.add(rtpbin)

# the udp sinks and source we will use for RTP and RTCP
rtpsink = Gst.ElementFactory.make('udpsink', 'rtpsink')
rtpsink.set_property('port', RTP_SEND_PORT)
rtpsink.set_property('host', DEST_HOST)

rtcpsink = Gst.ElementFactory.make('udpsink', 'rtcpsink')
rtcpsink.set_property('port', RTCP_SEND_PORT)
rtcpsink.set_property('host', DEST_HOST)
# no need for synchronisation or preroll on the RTCP sink
rtcpsink.set_property('async', False)
rtcpsink.set_property('sync', False)

rtcpsrc = Gst.ElementFactory.make('udpsrc', 'rtcpsrc')
rtcpsrc.set_property('port', RTCP_RECV_PORT)

pipeline.add(rtpsink)
pipeline.add(rtcpsink)
pipeline.add(rtcpsrc)

# now link all to the rtpbin, start by getting an RTP sinkpad for session 0
sinkpad = Gst.Element.get_request_pad(rtpbin, 'send_rtp_sink_0')
srcpad = Gst.Element.get_static_pad(audiopay, 'src')
lres = Gst.Pad.link(srcpad, sinkpad)

# get the RTP srcpad that was created when we requested the sinkpad above and
# link it to the rtpsink sinkpad
srcpad = Gst.Element.get_static_pad(rtpbin, 'send_rtp_src_0')
sinkpad = Gst.Element.get_static_pad(rtpsink, 'sink')
lres = Gst.Pad.link(srcpad, sinkpad)

# get an RTCP srcpad for sending RTCP to the receiver
srcpad = Gst.Element.get_request_pad(rtpbin, 'send_rtcp_src_0')
sinkpad = Gst.Element.get_static_pad(rtcpsink, 'sink')
lres = Gst.Pad.link(srcpad, sinkpad)

# we also want to receive RTCP, request an RTCP sinkpad for session 0 and
# link it to the srcpad of the udpsrc for RTCP
srcpad = Gst.Element.get_static_pad(rtcpsrc, 'src')
sinkpad = Gst.Element.get_request_pad(rtpbin, 'recv_rtcp_sink_0')
lres = Gst.Pad.link(srcpad, sinkpad)

# set the pipeline to playing
Gst.Element.set_state(pipeline, Gst.State.PLAYING)

# we need to run a GLib main loop to get the messages
mainloop = GObject.MainLoop()
mainloop.run()
Gst.Element.set_state(pipeline, Gst.State.NULL)

and a Client

#! /usr/bin/env python

import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst

#
# A simple RTP receiver
#
#  receives alaw encoded RTP audio on port 5002, RTCP is received on  port 5003.
#  the receiver RTCP reports are sent to port 5007
#
#             .-------.      .----------.     .---------.   .-------.   .--------.
#  RTP        |udpsrc |      | rtpbin   |     |pcmadepay|   |alawdec|   |alsasink|
#  port=5002  |      src->recv_rtp recv_rtp->sink     src->sink   src->sink      |
#             '-------'      |          |     '---------'   '-------'   '--------'
#                            |          |
#                            |          |     .-------.
#                            |          |     |udpsink|  RTCP
#                            |    send_rtcp->sink     | port=5007
#             .-------.      |          |     '-------' sync=false
#  RTCP       |udpsrc |      |          |               async=false
#  port=5003  |     src->recv_rtcp      |
#             '-------'      '----------'

AUDIO_CAPS = 'application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)PCMA'
AUDIO_DEPAY = 'rtppcmadepay'
AUDIO_DEC = 'alawdec'
AUDIO_SINK = 'autoaudiosink'

DEST = '127.0.0.1'

RTP_RECV_PORT = 5002
RTCP_RECV_PORT = 5003
RTCP_SEND_PORT = 5007

GObject.threads_init()
Gst.init(sys.argv)

#gst-launch -v rtpbin name=rtpbin                                                \
#       udpsrc caps=$AUDIO_CAPS port=$RTP_RECV_PORT ! rtpbin.recv_rtp_sink_0              \
#             rtpbin. ! rtppcmadepay ! alawdec ! audioconvert ! audioresample ! autoaudiosink \
#           udpsrc port=$RTCP_RECV_PORT ! rtpbin.recv_rtcp_sink_0                              \
#         rtpbin.send_rtcp_src_0 ! udpsink port=$RTCP_SEND_PORT host=$DEST sync=false async=false

def pad_added_cb(rtpbin, new_pad, depay):
    sinkpad = Gst.Element.get_static_pad(depay, 'sink')
    lres = Gst.Pad.link(new_pad, sinkpad)

# the pipeline to hold eveything
pipeline = Gst.Pipeline.new('rtp_client')

# the udp src and source we will use for RTP and RTCP
rtpsrc = Gst.ElementFactory.make('udpsrc', 'rtpsrc')
rtpsrc.set_property('port', RTP_RECV_PORT)

# we need to set caps on the udpsrc for the RTP data
caps = Gst.caps_from_string(AUDIO_CAPS)
rtpsrc.set_property('caps', caps)

rtcpsrc = Gst.ElementFactory.make('udpsrc', 'rtcpsrc')
rtcpsrc.set_property('port', RTCP_RECV_PORT)

rtcpsink = Gst.ElementFactory.make('udpsink', 'rtcpsink')
rtcpsink.set_property('port', RTCP_SEND_PORT)
rtcpsink.set_property('host', DEST)

# no need for synchronisation or preroll on the RTCP sink
rtcpsink.set_property('async', False)
rtcpsink.set_property('sync', False)

pipeline.add(rtpsrc)
pipeline.add(rtcpsrc)
pipeline.add(rtcpsink)

# the depayloading and decoding
audiodepay = Gst.ElementFactory.make(AUDIO_DEPAY, 'audiodepay')
audiodec = Gst.ElementFactory.make(AUDIO_DEC, 'audiodec')

# the audio playback and format conversion
audioconv = Gst.ElementFactory.make('audioconvert', 'audioconv')
audiores = Gst.ElementFactory.make('audioresample', 'audiores')
audiosink = Gst.ElementFactory.make(AUDIO_SINK, 'audiosink')

# add depayloading and playback to the pipeline and link
pipeline.add(audiodepay)
pipeline.add(audiodec)
pipeline.add(audioconv)
pipeline.add(audiores)
pipeline.add(audiosink)


audiodepay.link(audiodec)
audiodec.link(audioconv)
audioconv.link(audiores)
audiores.link(audiosink)

# the rtpbin element
rtpbin = Gst.ElementFactory.make('rtpbin', 'rtpbin')

pipeline.add(rtpbin)

# now link all to the rtpbin, start by getting an RTP sinkpad for session 0
srcpad = Gst.Element.get_static_pad(rtpsrc, 'src')
sinkpad = Gst.Element.get_request_pad(rtpbin, 'recv_rtp_sink_0')
lres = Gst.Pad.link(srcpad, sinkpad)

# get an RTCP sinkpad in session 0
srcpad = Gst.Element.get_static_pad(rtcpsrc, 'src')
sinkpad = Gst.Element.get_request_pad(rtpbin, 'recv_rtcp_sink_0')
lres = Gst.Pad.link(srcpad, sinkpad)

# get an RTCP srcpad for sending RTCP back to the sender
srcpad = Gst.Element.get_request_pad(rtpbin, 'send_rtcp_src_0')
sinkpad = Gst.Element.get_static_pad(rtcpsink, 'sink')
lres = Gst.Pad.link(srcpad, sinkpad)

rtpbin.connect('pad-added', pad_added_cb, audiodepay)



# def newManager():
    # rtpbin.connect('on-ssrc-active', onSSRCActive)

# def onSSRCActive(self):
    # print (self)

# rtpsrc.connect('new-manager', newManager)

Gst.Element.set_state(pipeline, Gst.State.PLAYING)

mainloop = GObject.MainLoop()
mainloop.run()

Gst.Element.set_state(pipeline, Gst.State.NULL)

While the server and client works without problems I cannot find a way or information on how to retrieve the dropped package statistics from RTCP for this RTP stream even if it is in command line.

And any help in that regard would be appreciated!


Solution

  • Here is a mock up, on Linux, just to illustrate a method of listening to the bus messages.
    I haven't the time to sort out the QOS Gst.Message, so I hacked something in there to force a QOS, so that you at least see it.
    Hopefully this will be enough to get you going in the right direction.

    #!/usr/bin/python3
    
    from os import path
    import time
    import gi
    gi.require_version('Gst', '1.0')
    gi.require_version('Gtk', '3.0')
    gi.require_version('GstVideo', '1.0')
    from gi.repository import GObject, Gst, Gtk
    from gi.repository import GdkX11, GstVideo
    
    
    GObject.threads_init()
    Gst.init(None)
    filename = path.join(path.dirname(path.abspath(__file__)), '../H.mkv')
    uri = 'file://' + filename
    
    
    class Player(object):
        def __init__(self):
            self.window = Gtk.Window()
            self.window.connect('destroy', self.quit)
            self.window.set_default_size(800, 450)
    
            self.drawingarea = Gtk.DrawingArea()
            self.window.add(self.drawingarea)
    
            # Create GStreamer pipeline
            self.pipeline = Gst.Pipeline()
    
            # Create bus to get events from GStreamer pipeline
            self.bus = self.pipeline.get_bus()
            self.bus.add_signal_watch()
            self.bus.connect('message::eos', self.on_eos)
            self.bus.connect('message::error', self.on_error)
            self.bus.connect('message::qos', self.on_quality_of_service)
    
            self.bus.enable_sync_message_emission()
            self.bus.connect('sync-message::element', self.on_sync_message)
    
            # Create GStreamer elements
            self.playbin = Gst.ElementFactory.make('playbin', None)
    
            # Add playbin to the pipeline
            self.pipeline.add(self.playbin)
    
            # Set properties
            self.playbin.set_property('uri', uri)
    
        def run(self):
            self.window.show_all()
            self.xid = self.drawingarea.get_property('window').get_xid()
            self.pipeline.set_state(Gst.State.PLAYING)
            time.sleep(2)
            self.bus.emit('message::qos',Gst.Message('xxxxx'))
            Gtk.main()
    
        def quit(self, window):
            self.pipeline.set_state(Gst.State.NULL)
            Gtk.main_quit()
    
        def on_sync_message(self, bus, msg):
            if msg.get_structure().get_name() == 'prepare-window-handle':
                print('prepare-window-handle')
                msg.src.set_window_handle(self.xid)
    
        def on_eos(self, bus, msg):
            print('End of Service')
    
        def on_error(self, bus, msg):
            print('Error', msg.parse_error())
    
        def on_quality_of_service(self, bus, msg):
            print('Qos Message:', msg.parse_qos())
    
    p = Player()
    p.run()