I have a single thread application which is sending a file to other server by calling send_new_file
void send_new_file_command::start_sending_file()
{
m_thread = thread(&send_new_file_command::execute_file, this);
}
void send_new_file_command::execute_file()
{
for (auto it = files_need_to_send.begin(); it != files_need_to_send.end() && !is_complete(); ++it)
{
{
std::unique_lock<spinning_lock> guard(lock_obj);
m_current_file = *it;
}
// send a file.
// I want to call this in parallel
send_new_file(*it);
}
}
Is there any way I can have multiple threads and each thread sending one file each. As an example let's say we have 4 threads and thread 1,2,3,4 will send different files in parallel. I want to call send_new_file
in parallel?
I am using std::thread
. I was looking at thread example on how can I do this in C++ but got confuse how can I divide number of files per thread here and make sure each thread works on subset of files.
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i)
threads.push_back(std::thread(send_new_file(*it)));
My background is in Java so slightly confuse how to do this in C++ using std::thread.
This is a fairly simple approach using a work queue. You can concatenate the code snippets into a self-contained program. We will use the following standard library headers.
#include <fstream>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
First, we define a function that takes a single file name and sends it wherever it should go to. I'll simulate this by simply writing it to /dev/null
.
void
send_file(const std::string& filename)
{
std::ifstream istr {};
std::ofstream ostr {};
std::string line {};
istr.exceptions(std::ifstream::badbit);
ostr.exceptions(std::ofstream::badbit);
istr.open(filename);
ostr.open("/dev/null");
while (std::getline(istr, line))
ostr << line << '\n';
}
Next, we define a function that takes a pointer to a std::vector
of files that still need to be sent and another pointer to a std::mutex
that is supposed to protect that vector. I'm using pointers instead of references because this allows me to create the std::thread
s simpler later on. You don't need to do this, if you don't like it.
int
send_files(std::vector<std::string> *const files_p, std::mutex *const mutex_p)
{
auto count = 0;
while (true)
{
std::string next {};
{
const std::unique_lock<std::mutex> lck {*mutex_p};
if (files_p->empty()) // nothing left to do
return count;
next = std::move(files_p->back());
files_p->pop_back();
}
send_file(next);
count += 1;
}
}
The important thing is that we don't hold the lock while the actual work of sending the file is performed. Otherwise, we would completely kill concurrency. I was also careful not to allocate any memory while holding to the lock. Usually, you will see std::list
s used as work queues and std::condition_variable
s to signal when a change to the queue has occurred. I have posted code showing this in another answer some time ago.
However, in this simple case, the queue is only ever removed from so a std::vector
is a perfect fit.
Finally, we use what we have in a simple program that creates one thread per hardware concurrency unit and asks these threads to send all files named in the command line arguments. Note that, as written, this will process the list in reversed order. It is trivial to change, though, if this is an issue for you.
int
main(int argc, char * * argv)
{
const auto nthreads = std::thread::hardware_concurrency();
std::mutex mutex {};
std::vector<std::thread> threads {};
std::vector<std::string> files {};
files.reserve(argc - 1);
for (auto i = 1; i < argc; ++i)
files.push_back(argv[i]);
threads.reserve(nthreads);
for (auto t = 0U; t < nthreads; ++t)
threads.emplace_back(send_files, &files, &mutex);
for (auto t = 0U; t < nthreads; ++t)
threads[t].join();
}