Search code examples
c++cgstreamer

Gstreamer custom-built pipeline not playing/hanging out


The context

I'm trying to build a small video cutter to work with video files having multiple audio tracks, as for example, one for the vocal chat, one for the microphone, and one for the in-game audio. The final goal is to be able to exports clips with all the audio tracks mixed together, and eventually being able to change the volume of each track.

I'm building the UI with Qt.

Before building the rendering/export part, I try to build a small multi-track audio video viewer in my app, using GStreamer.

The pipeline

Through looking at GStreamer tutorials, I've built a PlayerPipeline class inheriting from QObject (code below in the post). This class takes a file path and a QWidget as parameters, and builds the pipeline from a UriDecodeBin, to reach this shape (here with 2 audio tracks in the video) :

|              | -> video  ->  | Queue | ---------------------------------------> | PlaySink |
| UriDecodeBin | -> audio1 ->  | Queue | -> | AudioConvert | -> | AudioMixer | -> |          |
|              | -> audio2 ->  | Queue | -> | AudioConvert | -> |            |

(I'm not sure queues are necessary here, as UriDecodeBin is already creating queues for each of its src pads if I'm not mistaken)

With following steps :

  • Create pipeline and bus
  • Create UriDecodebin, add it to pipeline and play the pipeline
  • Setup no-more-pad callback on UriDecodeBin, and once it is fired, query each src pad and block them with a probe
  • Once all pads are blocked, pause pipeline and build the rest of the pipeline
  • Then perform a flushing seek (supposedly triggering the probes callback and removing the probes, which somehow doesn't happen) to set pipeline ready to play

The problem

Two things happen.

If I try to gst_element_get_state after uridecodebin is prerolled, with no timeout limit, it sometimes hangs out indefinitely.

If I don't call it, but try to set pipeline in playing state, nothing happens, and I don't receive a state change message on the bus either.

I tried to add queues (according to another SO post), but I admit I'm a bit out of ideas now. Any help, even ideas of what to try to solve the problem/get insights of it, would be highly appreciated.

The code

Please note there is no memory management done here, so the code is likely full of leaks, that's normal, I want to have a working version of the player before adding cleanup/clean error management code.

This file, gstmanips.h, defines few helpers, which are likely trivial to implement, so I'm not detailing them here.

bool isPadSrc(GstPad* pad, std::string name);

void printPipelineState(GstPipeline* pipeline);

void printElementState(GstElement* elem);

void flushAndPrintBus(GstBus* bus, GstPipeline* pipe);

Only showing implem of PrintPipelineState and flushAndPrintBus:


void printPipelineState(GstPipeline* pipeline){
    GstStateChangeReturn ret;
    GstState state;
    GstState pending;

    ret = gst_element_get_state(GST_ELEMENT(pipeline), &state, &pending, GST_CLOCK_TIME_NONE);
    if(ret == GST_STATE_CHANGE_FAILURE){
        std::cout << "failed querying state" << std::endl;
        return;
    }
    if(ret == GST_STATE_CHANGE_SUCCESS){
        std::cout << "successfully queried the state of pipeline" << std::endl;
        std::string str = "pipeline ";
        str += std::string(gst_element_state_get_name(state));
        std::cout << str << std::endl;
    }
}

void flushAndPrintBus(GstBus* bus, GstPipeline* pipe){
    if(!bus){
        return;
    }
    GstMessage* gm = gst_bus_pop(bus);
    while(gm){
        GError *err;
        gchar *debug_info;

        switch (GST_MESSAGE_TYPE (gm)) {
            case GST_MESSAGE_ERROR:
                gst_message_parse_error (gm, &err, &debug_info);
                 std::cout << "## BUS ## Error received from element " << GST_OBJECT_NAME (gm->src) <<  " : " <<  err->message << std::endl;;
                 std::cout << "## BUS ## Debugging information: " << (debug_info ? debug_info : "none")<< std::endl;
                 g_clear_error (&err);
                 g_free (debug_info);
                 break;
            case GST_MESSAGE_EOS:
                 std::cout << "## BUS ## End-Of-Stream reached." << std::endl;
                 break;
            case GST_MESSAGE_STATE_CHANGED:
            /* We are only interested in state-changed messages from the pipeline */
                if (GST_MESSAGE_SRC (gm) == GST_OBJECT (pipe)) {
                    GstState old_state, new_state, pending_state;
                    gst_message_parse_state_changed (gm, &old_state, &new_state, &pending_state);
                    std::cout << "## BUS ## Pipeline state changed from " <<
                        gst_element_state_get_name (old_state) << " to " << gst_element_state_get_name (new_state) << std::endl;
                }
                break;
            case GST_MESSAGE_STREAM_STATUS:
                GstStreamStatusType type;
                GstElement *owner;
                gst_message_parse_stream_status(gm, &type, &owner);
                std::cout << "Stream status : " << type << std::endl;
                break;
            default:
                /* We should not reach here */
                std::cout <<  "## BUS ## Unexpected message received: " << std::to_string(GST_MESSAGE_TYPE(gm)) << std::endl;

                break;
              }
        gm = gst_bus_pop(bus);
    }
}

And now we have implementation of the pipeline, in playerpipeline.h and playerpipeline.cpp :

#ifndef PLAYERPIPELINE_H
#define PLAYERPIPELINE_H

#include <QObject>
#include <glib.h>
#include <gst/gst.h>
#include <gst/video/videooverlay.h>
#include "gstmanips.h"
#include <vector>
#include <QTimer>
#include <QWidget>


class PlayerPipeline : public QObject
{
    Q_OBJECT
public:
    explicit PlayerPipeline(QString file, QWidget* display, QObject *parent = nullptr);
    void triggerNoMorePadSignal();
    void increasePreroll();
    void decreasePreroll();


signals:
    void noMorePadSignal();
    void decoderPrerolledSignal();
    void decoderBuiltSignal();
    void pipelineBuiltSignal();


public slots:
    void onNoMorePad();
    void onBusRefresh();
    void onDecoderPrerolled();
    void onPipelineBuilt();
    void play();
    void pause();

private:
    void changePipelineState(GstState state);
    void buildAndLinkConverters();
    void buildAndLinkMixer();
    void buildAndLinkPlaySink();

public:
    gint prerollCounter;
    bool prerolled;

private:
    //misc
    QString file;

    //pipeline
    GstBus* bus;
    GstPipeline* pipeline;
    QTimer* timer;
    unsigned timerStop;

    //decoder
    GstElement* decoder;
    std::vector<GstPad*> decoderAudioSrcs;
    GstPad* decoderVideoSrc;


    //audio convert (TODO add volume after)
    std::vector<GstElement*> audioConverters;
    std::vector<GstPad*> audioConvertersSrc;
    std::vector<GstPad*> audioConvertersSink;
    std::vector<GstElement*> audioQueues;

    //mixer
    GstElement* audioMixer;
    std::vector<GstPad*> audioMixerSinks;
    GstPad* audioMixerSrc;

    //sink
    GstElement* videoQueue;
    GstElement* playSink;
    GstPad* playSinkVideoSink;
    GstPad* playSinkAudioSink;
    GstElement* xvideo;
    QWidget* display;
};

#endif // PLAYERPIPELINE_H

#include "playerpipeline.h"
#include <assert.h>
#include <iostream>

void pp_no_more_pads_cb(GstElement* self, gpointer udata){
    (void)self;
    PlayerPipeline* db = (PlayerPipeline*)udata;
    std::cout << "no more pad :" <<std::endl;
    db->triggerNoMorePadSignal();
}

PlayerPipeline::PlayerPipeline(QString file, QWidget* display, QObject *parent)
    : QObject{parent}, prerollCounter(0), prerolled(false), file(file), display(display)
{
    pipeline = (GstPipeline*)gst_pipeline_new("PlayerPipeline");
    assert(pipeline);
    bus = gst_pipeline_get_bus(pipeline);
    assert(bus);
    decoder = gst_element_factory_make("uridecodebin", "decoder");
    assert(decoder);
    assert(gst_bin_add(GST_BIN(pipeline), decoder));
    assert(gst_element_sync_state_with_parent(decoder));
    std::string uri = std::string("file://")+file.toStdString();
    g_object_set(decoder, "uri", uri.c_str(), nullptr);

    QObject::connect(this, &PlayerPipeline::noMorePadSignal, this, &PlayerPipeline::onNoMorePad, Qt::QueuedConnection);
    g_signal_connect(decoder, "no-more-pads", G_CALLBACK(pp_no_more_pads_cb), this);
    QObject::connect(this, &PlayerPipeline::decoderPrerolledSignal, this, &PlayerPipeline::onDecoderPrerolled, Qt::QueuedConnection);
    QObject::connect(this, &PlayerPipeline::pipelineBuiltSignal, this, &PlayerPipeline::onPipelineBuilt);
    changePipelineState(GST_STATE_PLAYING);

    timer = new QTimer(this);
    QObject::connect(timer, &QTimer::timeout, this, &PlayerPipeline::onBusRefresh);
    timer->start(1);
    timerStop = 0;
}


void PlayerPipeline::triggerNoMorePadSignal()
{
    emit noMorePadSignal();
}


void PlayerPipeline::increasePreroll()
{
    g_atomic_int_inc(&prerollCounter);
}

void PlayerPipeline::decreasePreroll()
{
    if(g_atomic_int_dec_and_test(&prerollCounter)){
        this->prerolled = true;
        emit decoderPrerolledSignal();
    }
}

// FIXME there exists probably a better way to implem this
bool pp_caps_is_audio(GstCaps* caps){
    std::string description(gst_caps_to_string(caps));
    return description.rfind("audio/", 0) == 0;
}

// FIXME there exists probably a better way to implem this
bool pp_caps_is_video(GstCaps* caps){
    std::string description(gst_caps_to_string(caps));
    return description.rfind("video/", 0) == 0;
}

// FIXME maybe we need to use it to trigger when the pipeline is actually prerolled ?
GstPadProbeReturn blocked_cb(GstPad* pad, GstPadProbeInfo* info, gpointer udata){
    (void)pad;
    (void)info;
    std::cout << "blocked cb"<< std::endl;
    PlayerPipeline* pp = (PlayerPipeline*)udata;
    if(pp->prerolled){
        std::cout << "removing blocking probe" << std::endl;
        return GST_PAD_PROBE_REMOVE;
    } else {
    std::cout << "blocking probe installed !" << std::endl;
    pp->decreasePreroll();
    std::cout << "preroll decreased, exiting blocking_cb" << std::endl;
    return GST_PAD_PROBE_OK;
    }
}

void PlayerPipeline::onNoMorePad(){
    std::cout << "### LISTING PADS ###" << std::endl;
    changePipelineState(GST_STATE_PAUSED);
    GstIterator *it = gst_element_iterate_src_pads(decoder);
    GValue padV = G_VALUE_INIT;
    while(gst_iterator_next(it, &padV) == GST_ITERATOR_OK){
        std::cout << "PAD : " << std::endl;
        GstPad* pad = (GstPad*)g_value_get_object(&padV);
        assert(pad);
        GstCaps* caps = gst_pad_get_current_caps(pad);
        assert(caps);
        std::string description(gst_caps_to_string(caps));
        std::cout << description << std::endl;
        //FIXME some code duplication here, could likely be factorized ?
        if(pp_caps_is_audio(caps)){
            decoderAudioSrcs.push_back(pad);
        } else if (pp_caps_is_video(caps)){

            decoderVideoSrc = pad;
        } else {
            std::cout << "UNKNOWN PAD FOUND" << std::endl;
        }
        //probing the pad.
        this->increasePreroll();
        gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, blocked_cb, (gpointer)this, NULL);
    }
    std::cout << "waiting for pipeline to be prerolled" << std::endl;
}



void PlayerPipeline::onBusRefresh()
{
    //std::cout << "### timer timeouted ###" << std::endl;
    flushAndPrintBus(bus, pipeline);
    //timerStop += 1;
    //if (timerStop > 130){
    //    timer->stop();
    //}
}


void PlayerPipeline::onDecoderPrerolled()
{
    //here we can build the rest of the pipeline
    std::cout << "printing pipeline state after prerolling" << std::endl;
    printPipelineState(pipeline);
    buildAndLinkConverters();
    buildAndLinkMixer();
    buildAndLinkPlaySink();
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-prerolled");
    emit pipelineBuiltSignal();
}

void PlayerPipeline::buildAndLinkConverters(){
    std::cout << "BUILDING AND LINKING CONVERTERS" << std::endl;
    unsigned pRank = 0;
    for(auto dpad : decoderAudioSrcs){
        std::cout << "building audio queue" << std::endl;
        GstElement* queue = gst_element_factory_make("queue", (std::string("audioqueue_") + std::to_string(pRank)).c_str());
        audioQueues.push_back(queue);
        assert(queue);
        assert(gst_bin_add(GST_BIN(pipeline), queue));
        assert(gst_element_sync_state_with_parent(queue));
        GstPad* qsink = gst_element_get_static_pad(queue, "sink");
        GstPad* qsrc = gst_element_get_static_pad(queue, "src");
        assert(gst_pad_link(dpad, qsink) == GST_PAD_LINK_OK);
        std::cout << "building audio converter" << std::endl;
        GstElement* convert = gst_element_factory_make("audioconvert", (std::string("converter_") + std::to_string(pRank)).c_str());
        assert(convert);
        assert(gst_bin_add(GST_BIN(pipeline), convert));
        assert(gst_element_sync_state_with_parent(convert));
        GstPad* convsink = gst_element_get_static_pad(convert, "sink");
        assert(convsink);
        assert(gst_pad_link(qsrc, convsink) == GST_PAD_LINK_OK);
        audioConverters.push_back(convert);
        audioConvertersSink.push_back(convsink);
        //GstPad* convsrc = gst_element_get_static_pad(convert, "src");
        //assert(convsrc);
        //audioConvertersSrc.push_back(convsrc);
        pRank++;
    }
    std::cout << "printing pipeline state after converters" << std::endl;
    printPipelineState(pipeline);
   std::cout << "CONVERTERS : DONE" << std::endl;
}

void PlayerPipeline::buildAndLinkMixer(){
    std::cout << "BUILDING AND LINKING MIXER" << std::endl;
    audioMixer = gst_element_factory_make("audiomixer", "audioMixer");
    assert(audioMixer);
    assert(gst_bin_add(GST_BIN(pipeline), audioMixer));
    assert(gst_element_sync_state_with_parent(audioMixer));
    for(auto converter : audioConverters){
        assert(gst_element_link(converter, audioMixer));
    }
    std::cout << "printing pipeline state after adding mixer" << std::endl;
    printPipelineState(pipeline);
    std::cout << "MIXER : DONE" << std::endl;
}


GstPad* pp_requestPad(GstElement* elem, const char* tname, const char* name){
    GstPadTemplate* ptempl = gst_element_get_pad_template(elem, tname);
    if(ptempl == nullptr){
        std::cout << "failed to retreive pad template " << std::string(tname) << std::endl;
    }
    GstPad* pad = gst_element_request_pad(elem, ptempl, name, nullptr);
    if(pad == nullptr){
        std::cout << "pad request for " << std::string(name) << " returned nullptr" << std::endl;
    }
    return pad;
}

void PlayerPipeline::buildAndLinkPlaySink(){
    std::cout << "BUILDING AND LINKING PLAYSINK" << std::endl;
    playSink = gst_element_factory_make("playsink", "playSink");
    assert(playSink);
    WId xwinid = display->winId();
    gst_video_overlay_set_window_handle(GST_VIDEO_OVERLAY(playSink), xwinid);
    assert(gst_bin_add(GST_BIN(pipeline), playSink));
    assert(gst_element_sync_state_with_parent(playSink));
    assert(gst_element_link(audioMixer, playSink));
    playSinkVideoSink = pp_requestPad(playSink, "video_raw_sink", "video_raw_sink");
    assert(decoderVideoSrc);
    assert(playSinkVideoSink);


    std::cout << "building video queue" << std::endl;
    GstElement* queue = gst_element_factory_make("queue", "videoqueue");
    videoQueue = queue;
    assert(queue);
    assert(gst_bin_add(GST_BIN(pipeline), queue));
    assert(gst_element_sync_state_with_parent(queue));
    GstPad* qsink = gst_element_get_static_pad(queue, "sink");
    assert(qsink);
    GstPad* qsrc = gst_element_get_static_pad(queue, "src");
    assert(qsrc);
    assert(gst_pad_link(decoderVideoSrc, qsink) == GST_PAD_LINK_OK);
    assert(gst_pad_link(qsrc, playSinkVideoSink) == GST_PAD_LINK_OK);

    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-psink");
    std::cout << "PLAYSINK : DONE" << std::endl;
}

void PlayerPipeline::onPipelineBuilt(){
    //TODO implement here
    //here we can remove blocking probes. (by playing ?)
    std::cout << "UNPREROLLING THE PIPELINE" << std::endl;
    //printPipelineState(pipeline);
   // std::cout << "printing pipeline state before seeking" << std::endl;
   // printPipelineState(pipeline);
    assert(gst_element_seek(GST_ELEMENT(pipeline), 1.0,
                     GST_FORMAT_TIME,
                     GstSeekFlags(GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE),
                     GST_SEEK_TYPE_SET, 0,
                     GST_SEEK_TYPE_NONE, 0));
    // play();
    std::cout << "DONE" << std::endl;
}

void PlayerPipeline::play()
{
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-play");

}

void PlayerPipeline::pause()
{
    std::cout << "paused ..." << std::endl;
    changePipelineState(GST_STATE_PAUSED);
}


void PlayerPipeline::changePipelineState(GstState state)
{
    GstStateChangeReturn ret = gst_element_set_state(GST_ELEMENT(pipeline), state);
    if(ret == GST_STATE_CHANGE_FAILURE){
        std::cout << "could not set pipeline to state : " << gst_element_state_get_name (state) << std::endl;
        exit(124);
    }
}

Methods and functions should be executed in following order :

  • PlayerPipeline::PlayerPipeline
  • pp_no_more_pad_cb
  • triggerNoMorePadSignal and then onNoMorePad
  • several times (one per audio track) : blocked_cb and then decreasePreroll
  • OnDecoderPrerolled, calling, in order :
    • ```buildAndLinkConverters``
    • buildAndLinkMixer
    • buildAndLinkPlaySink
  • onPipelineBuilt

In parallel (not exactly in parallel, events in the Qt event loop) on timer every 1ms, onBusRefresh is called. It uses following implementation :

If at this point, the program isn't stuck in a printPipelineState call, and I'm calling play() or pause() method, nothing happens (except std output of the method). Also, I checked, and pads are still blocked at this point.

EDIT : the pipeline at the time of calling play method looks like (I recommend opening in new tab for zooming): pipeline_dump


Solution

  • Finally I found the solution, which is to manually remove the probes instead of relying on flushing seek and callback black magic.

    So, add probe returns a gulong :

    gulong
    gst_pad_add_probe (GstPad * pad,
                       GstPadProbeType mask,
                       GstPadProbeCallback callback,
                       gpointer user_data,
                       GDestroyNotify destroy_data)
    

    So I stored it in a vector alongside the pads and then manually removed the probes in onPipelineBuilt, using :

    gst_pad_remove_probe (GstPad * pad,
                          gulong id)
    

    I don't know why the callback+flushing didn't worked, despite being shown in this tutorial : https://gstreamer.freedesktop.org/documentation/application-development/advanced/pipeline-manipulation.html?gi-language=c I probably had done something wrong.

    EDIT : I did the answer quite quickly, feel free to ask (through comment) if you want more details, or the full code of the working pipeline.