Search code examples
c++vectorthread-safetyqueuepipeline

Pipeline with thread vectors and queues C++


here is my code, it works but after few iterations it stop without any error, probably because of some race or deadlock.

The goal of the code is to model an encoding application: after the creation of some fake random frames, the stages of my pipeline are first give the type to the frames and than encode it with some random operation.

To do that I've used two different thread vectors (one for each stage) that are used concurrently with some shared queues, once one thread have pushed a frame, it is popped by another in the other vector and "encoded".

 #include <iostream>
 #include <vector>
 #include <algorithm>

 #include "SafeQueue.h"

 using namespace std;

 const int DATA_MAG = 256;

 struct Frame
 {
    int num;

    char type;

    bool encoded;

    vector<vector<int>> grid;
};

void do_join(thread& t)
{
    t.join();
}

void join_all(vector<thread>& v)
{
    for_each(v.begin(),v.end(),do_join);
}

void create_input (Queue<Frame>& stream, int num_frames, int height, int width)
{
    for (int i = 0; i < num_frames; i++)
    {
        vector<vector<int>>tmp_grid(height, vector<int>(width, 0));

        Frame frame;

        for (int j = 0; j < height; j++)
        {
            for (int k = 0; k < width; k++)
            {
                tmp_grid[j][k] = rand()%DATA_MAG;
            }
        }

        frame.grid = tmp_grid;
        frame.num = i;

        stream.push(frame);
    }
}


void decide_type(int preset, Queue<Frame>& stream, Queue<Frame>& typed, vector<char>& param, int num_frames)
{
    cout<<"hello from decide"<<" "<<endl;

    for(int i = 0; i < num_frames; i++)
    {
        Frame tmp = stream.pop();

        int j = rand() % 10;

        if(j < preset)
        {
            tmp.type = 'I';
        }

        else
        {
            tmp.type = 'B';
        }

        param[tmp.num] = tmp.type;

        typed.push(tmp);
    }
}

void decode_flow(int preset, Queue<Frame>& typed, vector<Frame>& encoded,
                    vector<char>& parameters, int num_frames, int height, int width)
{
    cout<<"hello from decode"<<" "<<endl;

    for(int i = 0; i < num_frames; i++)
    {
        Frame f = typed.pop();

        if (f.type == 'I')
        {
            cout<<"hi from I"<<" "<<endl;
            for (int j = 0; j < height; j++)
            {
                for (int k = 0; k < width; k++)
                {
                    f.grid[j][k] = f.grid[j][k] * 2;
                }
            }
        }

        else cout<<"hi from B"<<" "<<endl;

        encoded.push_back(f);
    }
}




int main()
{
    srand(time(NULL));

    int num_threadsXstage = 2;

    int width = 500;
    int height = 500;

    int num_frames = 100;

    int frames_thread = num_frames/num_threadsXstage;

    int preset = 3;

    vector<Frame> final;

    //Vectors of threads
    vector<thread> typer;
    vector<thread> encoder;

    //Vector of parameters
    vector<char> parameters(num_frames);

    //Working queues
    Queue<Frame> created;
    Queue<Frame> typed;

    //Final vector
    vector<Frame> encoded(num_frames);

    //Movie creation

    create_input(created, num_frames, height, width);



for (int i = 0; i < num_threadsXstage; i++)
    {
        //stage 1
        typer.push_back(thread(bind(&decide_type, preset, ref(created),
                                    ref(typed), ref(parameters), frames_thread)));

        //stage 2
        encoder.push_back(thread(bind(&decode_flow, preset, ref(typed), ref(encoded),
                                      ref(parameters), frames_thread, height, width)));
    }


    // JOIN

    join_all(typer);

    join_all(encoder);


    for (int i = 0; i < num_frames; i++)
    {
        Frame k = typed.pop();

        cout<<k.type<<" ";
    }

    cout<<endl<<endl;

    for (int i = 0; i < num_frames; i++)
    {
        cout<<parameters[i]<<" ";
    }
}

And this is the code of my thread safe queue, or at least it is supposed to be.

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

using namespace std;

template <typename T>

class Queue
{
private:
    queue<T> queue_;
    mutex mutex_;
    condition_variable cond_;

public:

    T pop()
    {
        unique_lock<std::mutex> mlock(mutex_);
        while (queue_.empty())
        {
            cond_.wait(mlock);
        }

        auto val = queue_.front();
        queue_.pop();
        return val;
    }

    void pop(T& item)
    {
        unique_lock<std::mutex> mlock(mutex_);

        while (queue_.empty())
        {
            cond_.wait(mlock);
        }
        item = queue_.front();
        queue_.pop();
    }

    void push(const T& item)
    {
        unique_lock<std::mutex> mlock(mutex_);
        queue_.push(item);
        mlock.unlock();
        cond_.notify_one();
    }
    Queue()=default;
    Queue(const Queue&) = delete;            // disable copying
    Queue& operator=(const Queue&) = delete; // disable assignment

};

Solution

  • After all threads have finished, you extract all the queued frames from the typed queue - but this is the intermediate queue between the processing stages, and is now empty. The call to typed.pop() will block forever.

    You should be extracting the frames from the output queue encoded.