Search code examples
c++multithreadingboost-threaddeque

boost::threads based queue algorithm


Assuming there is a std::deque queue of pointers to tasks to be performed, what is the best way to ensure the number of threads running at one time is limited to the number of CPU cores? i.e. After a task is completed remaining task are subsequently launched.

I made the following code for an earlier post, the problem I have now is that I am unsure of the best strategy to implement what I have described, and thought it may be worth canvasing opinion.

Note that the "std::deque queue of pointers" I mentioned above is NOT referring to deque mtasks in the code. I do not wish to pop tasks from this deque as I'm using this to store previously completed tasks.

In the program type something like, task p1 p2 p3 p4 p5 , at the prompt and then info to check the current status of each. Note that currently all 5 tasks complete around the same time. However, what I actually want is the first 2 to complete (dual core machine) then the next 2 etc.

The reason for this is that in practice these tasks may take hours, and therefore I want to get the first lot of results sooner so that I may load them into MATLAB or whatever.

I hope I made myself and the example code clear. I guess thread::hardware_concurrency() covers the number of cores part....

Thanks A.

#include <iostream>  
#include <string>
#include <sstream>
#include <boost/thread.hpp>  

using namespace std;

class task {
public:
    string mname;
    bool completed;
    void start()
    {
        int a = 0;
        for (int i=0 ; i<10000; i++)
        {
            for (int j=0 ; j<100000; j++)
            {
                a= i*2;
            }
        }
        this->completed = true;
    }
    task(string name)
    {
        mname = name;
        completed = false; 
    }
};

class taskManager{
    public:
        boost::thread_group threads;
        void startTask( string name )
        {
            //add new task to vector list           
            mtasks.push_back( task(name) );
            // execute start() on a new thread
            threads.create_thread( boost::bind( &task::start, &mtasks.back()) );
        }
        int tasksTotal()
        {
            return mtasks.size();
        }
        string taskInfo(int i)
        {
            string compstr("Not Completed");
            if ( mtasks.at(i).completed == true )
            {
                compstr = "Completed";
            }
            return mtasks.at(i).mname + " " + compstr; 
        }
    private:
        deque<task> mtasks; 
};

int main(int argc, char* argv[])  
{
    string cmd, temp;
    stringstream os;
    bool quit = false;
    taskManager mm;

    cout << "PROMPT>";

    while (quit == false)
    {
        //Wait for a valid command from user
        getline(cin,cmd);

        // Reset stringstream and assign new cmd string
        os.clear(); 
        os << "";
        os << cmd;
        //parse input string
        while (os >> temp) 
        {               
            if ( temp.compare("task") == 0 )
            {
                while (os >> temp) { mm.startTask( temp ); }                     
            }
            if ( temp.compare("info") == 0 )
            { 
                // Returns a list of all completed and not completed tasks
                for (int i = 0; i<mm.tasksTotal(); i++)
                {
                    cout << mm.taskInfo(i).c_str() << endl;
                }                           
            }
            if ( temp.compare("quit") == 0 ){ quit = true; }
        }

        cout << "PROMPT>";
    }

    mm.threads.join_all();      

    return 0;  
};

Solution

  • What you're describing is the thread pool pattern, where you have a fixed number of threads and a group of tasks you want to perform using those threads (# tasks > # threads).

    The wikipedia entry on the thread pool pattern has more information. You can write your own, or use something like the unofficial boost threadpool library.