Search code examples
c++multithreadingopenmptbbtbb-flow-graph

Employing parallelism in pipelined execution


I am trying to develop a pipeline in which data is first read and processed, manipulated once, manipulated in a different way, and then displayed. I have a design in mind in which the data IO feeds into a buffer that is read by the first manipulator. Subsequently that first manipulator writes to another buffer which is read when possible by the second manipulator. Lastly, the output of the second manipulator is written to a display buffer which is read by the visualizer and displayed using OpenGL.

In my mind this is a fairly straightforward parallel problem in which each task has its own thread and they communicate via data buffers. However, all the tutorials I've come across for threaded programs seem to suggest that multithreading is something to be left up to some middleware (like OpenMP) that decides how to divide the workload.

I'm new to developing multithreaded applications, so this may be a dumb question, but is what I've described feasible and can it be done with middleware like OpenMP? I realize the obvious answer is "try it," and I want to, but the tutorials don't shed any light on *how* to try it.


Solution

  • OpenMP is better suited for algorithms that easily span to multiple cores (SIMD). Other scenarios are possible but in your case I think a direct use of threads will work better and will be easier to code and maintain.

    I'm dividing my answer in two parts: a general solution without OpenMP, and some specific changes to use OpenMP.

    As mentioned in a comment, you're facing the producer/consumer problem, but twice: one thread is filling a buffer (producing an item), which then must be read (and modified) by a second one (consumed). The particularity of your problem is that this second thread is also a producer (the image to be drawn) and a third thread is the one in charge of consuming it (the visualizer).

    As you already know, the P/C problem is solved using a buffer (probably a circular buffer or a queue of produced items), where each element of the buffer is marked as produced or consumed, and where threads have exclusive access when adding or taking items from it.


    Lets use the queue approach with your problem in following example program.

    • Produced items will be stored in a queue. The front of the queue contains the oldest elements, those that must be consumed first.
    • There are two queues: one for data produced by the first manipulator (and to be consumed by the second manipulator), and another one for data produced by the second manipulator (and that is going to be visualized by another thread).
    • The production phase is simple: gain exclusive access to the corresponding queue and insert the element at the end.
    • Consumption is similar but must wait for the queue to have at least one element (be not empty).
    • I've added some sleeps to simulate other operations.
    • The stop condition is for illustration purposes.

    Note: I'm assuming you have access to a C++11 compiler for the sake of simplicity. Implementations using other APIs are relatively similar.

    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <atomic>
    #include <chrono>
    #include <list>
    
    using namespace std::chrono_literals;
    
    std::mutex g_data_produced_by_m1_mutex;
    std::list<int> g_data_produced_by_m1;
    
    std::mutex g_data_produced_by_m2_mutex;
    std::list<int> g_data_produced_by_m2;
    
    std::atomic<bool> stop = false;
    
    void manipulator1_kernel()
    {
      while (!stop) {
        // Producer 1: generate data
        {
          std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
          g_data_produced_by_m1.push_back(rand());
        }
        std::this_thread::sleep_for(100ms);
      }
    }
    
    void manipulator2_kernel()
    {
      int data;
    
      while (!stop) {
        // Consumer 1
        while (!stop) { // wait until there is an item to be consumed
          {
            std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
            if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
              data = g_data_produced_by_m1.front(); // consume
              g_data_produced_by_m1.pop_front();
              break;
            }
          }
          std::this_thread::sleep_for(100ms);
        }
    
        // Producer 2: modify and send to the visualizer
        {
          std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
          g_data_produced_by_m2.push_back(5 * data);
        }
    
        std::this_thread::sleep_for(100ms);
      }
    }
    
    void visualizer_kernel()
    {
      int data;
    
      while (!stop) {
        // Consumer 2
        while (!stop) { // wait until there is an item to be visualized
          {
            std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
            if (!g_data_produced_by_m2.empty()) {
              data = g_data_produced_by_m2.front();
              g_data_produced_by_m2.pop_front();
              break;
            }
          }
          std::this_thread::sleep_for(100ms);
        }
    
        std::cout << data << std::endl; // render to display
        std::this_thread::sleep_for(100ms);
    
        if (data % 8 == 0) stop = true; // some stop condition for the example
      }
    }
    
    int main()
    {
      std::thread manipulator1(manipulator1_kernel);
      std::thread manipulator2(manipulator2_kernel);
      std::thread visualizer(visualizer_kernel);
    
      visualizer.join();
      manipulator2.join();
      manipulator1.join();
    
      return 0;
    }
    

    If you still want to use OpenMP, probably the closest thing you can find are tasks (since OpenMP 3.0 I think). I haven't used them very much, but above program can be rewritten like:

    int main()
    {
      #pragma omp parallel
      {
        #pragma omp task
        manipulator1_kernel();
        #pragma omp task
        manipulator2_kernel();
        #pragma omp task
        visualizer_kernel();
    
        #pragma omp taskwait
      }    
    
      return 0;
    }
    

    The rest of the code could be changed to use OpenMP features too, but I think this answers your question.

    The main problem with this approach is that you have to create a code-block for tasks to live within the OpenMP parallel, easily complicating the rest of your application logic and structure.