I need some help for a concurrent c++ programming.
I have a file of names, named "names.txt"
, in this format:
0 James
1 Sara
2 Isaac
And I have another file named "op.txt"
that contains some operations on names file, in this format:
0 1 + // this means add Sara to James and store it in 0 position
1 2 $ // this means swap values in position 1 and position 2
and a file "output.txt"
that has the output of operations, in this format:
0 JamesSara
1 Isaac
2 Sara
The problem says that create a thread for read names.txt
and op.txt
and store them. Next create some variable threads to do operations concurrently and at last do the output.txt
in a thread.
Here is my code for this problem, and it works correctly when number of concurrent threads are greater then 2. But the output for 1 and 2 thread are incorrect. What I missed in this code?
#include <fstream>
#include <iostream>
#include <vector>
#include <sstream>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>
using namespace std;
std::mutex _opMutex;
std::condition_variable _initCondition;
std::condition_variable _operationCondition;
int _counter = 0;
int _initCounter = 0;
int _doOperationCounter = 0;
struct OperationStruct
{
int firstOperand;
int secondOperand;
char cOperator;
};
const int THREADS = 5;
std::deque<std::pair<int, string> > _nameVector;
std::deque<OperationStruct> _opStructVec;
void initNamesAndOperations()
{
ifstream infile;
std::pair<int, string> namePair;
infile.open("names.txt");
if (!infile)
{
cout << "Unable to open file";
exit(-1);
}
int id;
string value;
while (infile >> id >> value)
{
namePair.first = id;
namePair.second = value;
_nameVector.push_back(namePair);
}
infile.close();
infile.open("op.txt");
if (!infile)
{
cout << "Unable to open file";
exit(-1);
}
int firstOperand;
int secondOperand;
char cOperator;
while (infile >> firstOperand >> secondOperand >> cOperator)
{
OperationStruct opSt;
opSt.firstOperand = firstOperand;
opSt.secondOperand = secondOperand;
opSt.cOperator = cOperator;
_opStructVec.push_back(opSt);
++_initCounter;
}
infile.close();
return;
}
void doOperationMath(int firstIndex, string firstValue, string secondValue, char cOp)
{
//basic mathematics
switch (cOp)
{
case '+':
{
for (int i = 0; i < _nameVector.size(); ++i)
{
std::pair<int, string> acc = _nameVector[i];
if (acc.first == firstIndex)
{
acc.second = firstValue + secondValue;
_nameVector[i].second = acc.second;
}
}
}
break;
default:
break;
}
++_doOperationCounter;
}
void doOperationSwap(int firstIndex, int secondIndex, string firstValue, string secondValue)
{
//swap
for (int i = 0; i < _nameVector.size(); ++i)
{
if (_nameVector[i].first == firstIndex)
_nameVector[i].second = secondValue;
if (_nameVector[i].first == secondIndex)
_nameVector[i].second = firstValue;
}
++_doOperationCounter;
}
void doOperations()
{
while (_doOperationCounter < _initCounter)
{
std::unique_lock<mutex> locker(_opMutex);
_initCondition.wait(locker, [](){return !_opStructVec.empty(); });
OperationStruct opSt = _opStructVec.front();
_opStructVec.pop_front();
locker.unlock();
_operationCondition.notify_one();
int firstId = opSt.firstOperand;
int secondId = opSt.secondOperand;
char cOp = opSt.cOperator;
string firstValue = "";
string secondValue = "";
for (int j = 0; j < _nameVector.size(); ++j)
{
std::pair<int, string> acc = _nameVector[j];
if (firstId == acc.first)
firstValue = acc.second;
if (secondId == acc.first)
secondValue = acc.second;
}
if (cOp == '$')
{
doOperationSwap(firstId, secondId, firstValue, secondValue);
}
else
{
doOperationMath(firstId, firstValue, secondValue, cOp);
}
}
return;
}
void doOutputFile()
{
ofstream outfile;
outfile.open("sampleOutput.txt", std::ios::out | std::ios::app);
if (!outfile)
{
cout << "Unable to open the file";
exit(-1);
}
while (_counter < _initCounter)
{
std::unique_lock<mutex> locker(_opMutex);
_operationCondition.wait(locker, [](){return !_nameVector.empty(); });
auto accPair = _nameVector.front();
_nameVector.pop_front();
locker.unlock();
outfile << accPair.first << " " << accPair.second << endl;
++_counter;
}
return;
}
int main()
{
thread th1(initNamesAndOperations);
std::vector<thread> operationalThreads;
for (int i = 0; i < THREADS; ++i)
{
operationalThreads.push_back(thread(doOperations));
}
thread th3(doOutputFile);
th1.join();
for (auto& opthread : operationalThreads)
opthread.join();
th3.join();
return 0;
}
If a variable is modified from multiple threads, you might have to use some synchronisation to ensure that the proper value is read. The simplest way would probably be to use std::atomic
for your variables to ensure that operations are properly sequenced.
Also, there is nothing in your code to ensure that your doOperations
thread won't finish before you have read the whole file.
Obviously, you need to either read the whole data first or have a way to wait for some data to become available (or to reach the end of data). If reading the initial data is fast but processing is slow, then the easier solution is to read the data before starting processing threads.
What is probably happening is that if you create a lot of threads, by the time you create the last thread, the initNamesAndOperations
would have read the whole file.
I highly recommend you to buy and read C++ Concurrency in Action by Anthony Williams. By reading such book, you will get a good understanding on modern C++ multithreading and it will help you a lot to write correct code.