today I decided to transform my little script, based on gst-launch
, into a real Python/GStreamer application, in order to add some features.
I developed a little program which send the audio from my microphone to both Icecast (shout2send
) and local storage (filesink
) thanks to tee
.
Sometimes shout2send
can halt, due to network problems. I would like to restart this element every N seconds until the connection is back, without stopping the pipeline, because the local audio file should not be impacted from network conditions.
Here's what I tried:
tee
, set shout2send
state to NULL
and removing it from pipeline (result: GStreamer critical errors like Trying to dispose element ... but it is in PLAYING instead of the NULL state
)What should I do?
Here's how my code looks like :
import gi
gi.require_version("Gst", "1.0")
from gi.repository import GLib
from gi.repository import Gst
# [...]
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pass # TODO: restart the element
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
pipeline = Gst.Pipeline()
message_bus = pipeline.get_bus()
message_bus.add_signal_watch()
message_bus.connect('message', message_handler)
# [...]
tee.link(queue0)
queue0.link(filesink)
tee.link(queue1)
queue1.link(shout2send)
Update (9/12/15): non-working code added + log
I tried to follow "Dynamically changing the pipeline" fro GStreamer doc, but my code doesn't work.
def event_probe(pad, info, *args):
Gst.Pad.remove_probe(pad, info)
queue1.unlink(shout2send)
tee.unlink(queue1)
pipeline.remove(shout2send)
pipeline.remove(queue1)
return Gst.PadProbeReturn.OK
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pad = queue1.get_static_pad('src')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
Here's what I see if I run my script with GST_DEBUG=3
and I restart Icecast while streaming:
[...]
0:00:02.142033258 5462 0x55e414d900a0 WARN shout2 gstshout2.c:674:gst_shout2send_render:<shout2send> error: shout_send() failed: Socket error
0:00:02.658137998 5462 0x55e414d90140 WARN basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: Internal data flow error.
0:00:02.658169752 5462 0x55e414d90140 WARN basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: streaming task paused, reason error (-5)
(GLib.Error('Internal data flow error.', 'gst-stream-error-quark', 1), 'gstbasesrc.c(2943): gst_base_src_loop (): /GstPipeline:pipeline0/GstPulseSrc:pulsesrc:\nstreaming task paused, reason error (-5)')
0:00:02.658628129 5462 0x7f6ba8002a30 WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment
Thanks to otopolsky's comments I did it :)
What I did wrong:
NULL
: this is very importantoggmux
must stay after tee
, on both sub-pipelines: otherwise Icecast will list the stream without be able to serve it. Do the same for opusenc
Advice:
Final code (reconnection works correctly and independently from local encoding/recording):
def event_probe2(pad, info, *args):
Gst.Pad.remove_probe(pad, info.id)
tee.link(opusenc1)
opusenc1.set_state(Gst.State.PLAYING)
oggmux1.set_state(Gst.State.PLAYING)
queue1.set_state(Gst.State.PLAYING)
shout2send.set_state(Gst.State.PLAYING)
return Gst.PadProbeReturn.OK
def reconnect():
pad = tee.get_static_pad('src_1')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe2, None)
def event_probe(pad, info, *args):
Gst.Pad.remove_probe(pad, info.id)
tee.unlink(opusenc1)
opusenc1.set_state(Gst.State.NULL)
oggmux1.set_state(Gst.State.NULL)
queue1.set_state(Gst.State.NULL)
shout2send.set_state(Gst.State.NULL)
GLib.timeout_add_seconds(interval, reconnect)
return Gst.PadProbeReturn.OK
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pad = tee.get_static_pad('src_1')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
Minor problems:
tee.get_static_pad('src_1')
, but I think I could get the src id somewhere, instead of using a fixed valuepipeline.set_state(Gst.State.NULL)
one second after pipeline.send_event(Gst.Event.new_eos())
, but I still get messages like WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment