Search code examples
pythongstreamerpython-gstreamer

Sink restart on failure without stopping the pipeline


today I decided to transform my little script, based on gst-launch, into a real Python/GStreamer application, in order to add some features.

I developed a little program which send the audio from my microphone to both Icecast (shout2send) and local storage (filesink) thanks to tee.

Sometimes shout2send can halt, due to network problems. I would like to restart this element every N seconds until the connection is back, without stopping the pipeline, because the local audio file should not be impacted from network conditions.

Here's what I tried:

  1. stopping/starting the pipeline one second after the network error (result: streaming works, local file is truncated)
  2. unlink from tee, set shout2send state to NULL and removing it from pipeline (result: GStreamer critical errors like Trying to dispose element ... but it is in PLAYING instead of the NULL state)
  3. Trying to understand how to use pads in this case (result: same as above, but with more code involved)

What should I do?

Here's how my code looks like :

import gi
gi.require_version("Gst", "1.0")
from gi.repository import GLib
from gi.repository import Gst
# [...]

def message_handler(bus, message):
    if message.type == Gst.MessageType.ERROR:
        if message.src == shout2send:
            pass # TODO: restart the element
        else:
            print(message.parse_error())
            pipeline.set_state(Gst.State.NULL)
            exit(1)
    else:
        print(message.type)

pipeline = Gst.Pipeline()
message_bus = pipeline.get_bus()
message_bus.add_signal_watch()
message_bus.connect('message', message_handler)

# [...]
tee.link(queue0)
queue0.link(filesink)
tee.link(queue1)
queue1.link(shout2send)

Update (9/12/15): non-working code added + log

I tried to follow "Dynamically changing the pipeline" fro GStreamer doc, but my code doesn't work.

def event_probe(pad, info, *args):
    Gst.Pad.remove_probe(pad, info)
    queue1.unlink(shout2send)
    tee.unlink(queue1)
    pipeline.remove(shout2send)
    pipeline.remove(queue1)
    return Gst.PadProbeReturn.OK

def message_handler(bus, message):
    if message.type == Gst.MessageType.ERROR:
        if message.src == shout2send:
            pad = queue1.get_static_pad('src')
            pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
        else:
            print(message.parse_error())
            pipeline.set_state(Gst.State.NULL)
            exit(1)
    else:
        print(message.type)

Here's what I see if I run my script with GST_DEBUG=3 and I restart Icecast while streaming:

[...]
0:00:02.142033258  5462 0x55e414d900a0 WARN                  shout2 gstshout2.c:674:gst_shout2send_render:<shout2send> error: shout_send() failed: Socket error
0:00:02.658137998  5462 0x55e414d90140 WARN                 basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: Internal data flow error.
0:00:02.658169752  5462 0x55e414d90140 WARN                 basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: streaming task paused, reason error (-5)
(GLib.Error('Internal data flow error.', 'gst-stream-error-quark', 1), 'gstbasesrc.c(2943): gst_base_src_loop (): /GstPipeline:pipeline0/GstPulseSrc:pulsesrc:\nstreaming task paused, reason error (-5)')
0:00:02.658628129  5462 0x7f6ba8002a30 WARN                audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment

Solution

  • Thanks to otopolsky's comments I did it :)

    What I did wrong:

    1. elements must be set to NULL: this is very important
    2. oggmux must stay after tee, on both sub-pipelines: otherwise Icecast will list the stream without be able to serve it. Do the same for opusenc

    Advice:

    1. Is not necessary to unlink every element you don't need: just break where needed
    2. Is not necessary to remove from the pipeline every element you don't need: keep them if you think to reuse them

    Final code (reconnection works correctly and independently from local encoding/recording):

    def event_probe2(pad, info, *args):
        Gst.Pad.remove_probe(pad, info.id)
        tee.link(opusenc1)
        opusenc1.set_state(Gst.State.PLAYING)
        oggmux1.set_state(Gst.State.PLAYING)
        queue1.set_state(Gst.State.PLAYING)
        shout2send.set_state(Gst.State.PLAYING)
        return Gst.PadProbeReturn.OK
    
    def reconnect():
        pad = tee.get_static_pad('src_1')
        pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe2, None)
    
    def event_probe(pad, info, *args):
        Gst.Pad.remove_probe(pad, info.id)
        tee.unlink(opusenc1)
        opusenc1.set_state(Gst.State.NULL)
        oggmux1.set_state(Gst.State.NULL)
        queue1.set_state(Gst.State.NULL)
        shout2send.set_state(Gst.State.NULL)
        GLib.timeout_add_seconds(interval, reconnect)
        return Gst.PadProbeReturn.OK
    
    def message_handler(bus, message):
        if message.type == Gst.MessageType.ERROR:
            if message.src == shout2send:
                pad = tee.get_static_pad('src_1')
                pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
            else:
                print(message.parse_error())
                pipeline.set_state(Gst.State.NULL)
                exit(1)
        else:
            print(message.type)
    

    Minor problems:

    1. I use tee.get_static_pad('src_1'), but I think I could get the src id somewhere, instead of using a fixed value
    2. Probably the whole thing could be written in a better form (but this is my first program with Python+Gstreamer and it works, so I'm fine with it)
    3. In order to avoid data loss I call pipeline.set_state(Gst.State.NULL) one second after pipeline.send_event(Gst.Event.new_eos()), but I still get messages like WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment

    Code: https://github.com/ViGLug/libre-streaming