I'm using an hardware simulator which uses PIN Tools to execute the workload. As the workload, I'm using the following code. Although it works on Ubuntu with -lpthread flag, it freezes on the simulator when it comes to join threads.
I think there is something not safe in this code which native OS can tolerate but simulator cannot. What is the most appropriate way to code this?
main.h:
#include <stdlib.h>
#include <iostream>
#include <fstream>
#include <string>
#include <pthread.h>
#include <stdint.h>
#include <getopt.h>
#include <set>
#include <vector>
#include <algorithm>
#include <iterator>
#define NUM_OF_VERTICES 4
#define NUM_OF_PTHREADS (NUM_OF_VERTICES*(NUM_OF_VERTICES-1)/2)
std::string payload_texts[NUM_OF_VERTICES];
void payload_text_initialize();
double indices [NUM_OF_VERTICES][NUM_OF_VERTICES];
class thread_args {
public:
uint index1, index2;
unsigned long value = 0;
unsigned long * valuePointer = &value;
};
main.cc:
#include "main.h"
extern "C" {
extern void mcsim_skip_instrs_begin();
extern void mcsim_skip_instrs_end();
extern void mcsim_spinning_begin();
extern void mcsim_spinning_end();
int32_t log_2(uint64_t);
}
using namespace std;
set<char> find_uniques(string str){
set<char> unique_chars;
for (int i = 0 ; i < str.length() ; i++ ){
char c = str.at(i);
if (unique_chars.find(c) == unique_chars.end())
unique_chars.insert(c);
}
return unique_chars;
}
void * jaccard_visit(void *arg){
thread_args * args = (thread_args *) arg;
set<char> setunion;
set<char> intersect;
set<char> set1 = find_uniques(payload_texts[args->index1]);
set<char> set2 = find_uniques(payload_texts[args->index2]);
std::set_intersection(set1.begin(),set1.end(),set2.begin(),set2.end(),std::inserter(intersect,intersect.begin()));
std::set_union(set1.begin(),set1.end(),set2.begin(),set2.end(),std::inserter(setunion,setunion.begin()));
double similarity = ((double) intersect.size()) / ((double) setunion.size());
indices[args->index1][args->index2] = similarity;
indices[args->index2][args->index1] = similarity;
unsigned long a = 1;
unsigned long b = 1;
unsigned long c = a + b;
for (int i = 3 ; i < 100000 * (similarity - 0.9) ; i++){
a = b;
b = c;
c = a + b;
}
*(args->valuePointer) = c;
return NULL;
}
void execute_parallel(){
pthread_t threads[NUM_OF_PTHREADS]; //array to hold thread information
thread_args *th_args = (thread_args*) malloc(NUM_OF_PTHREADS * sizeof(thread_args));
cout << "NUM_OF_PTHREADS is " << NUM_OF_PTHREADS << endl;
uint k = 0 ;
for (int i = 0 ; i < NUM_OF_VERTICES ; i++){
for (int j = i+1 ; j < NUM_OF_VERTICES ; j++){
th_args[k].index1 = i;
th_args[k].index2 = j;
th_args[k].value = i+j;
th_args[k].valuePointer = &(th_args[k].value);
pthread_create(&threads[k], NULL, jaccard_visit, (void*) &th_args[k]);
cout << "Thread " << k << " is started" << endl;
k++;
}
}
cout << "k is " << k << endl;
for(int i = 0; i < NUM_OF_PTHREADS; i++){
cout << "Thread " << i << " is joined" << endl;
pthread_join(threads[i], NULL);
}
cout << "Free threads" << endl ;
free(th_args);
}
void manual_schedule(){
pthread_t th0, th1, th2, th3, th4, th5;
thread_args arg0, arg1, arg2, arg3, arg4, arg5;
arg0.index1 = 0; arg0.index2 = 1; arg0.value = 0; arg0.valuePointer = &arg0.value;
arg1.index1 = 0; arg1.index2 = 2; arg1.value = 1; arg1.valuePointer = &arg1.value;
arg2.index1 = 0; arg2.index2 = 3; arg2.value = 2; arg2.valuePointer = &arg2.value;
arg3.index1 = 1; arg3.index2 = 2; arg3.value = 3; arg3.valuePointer = &arg3.value;
arg4.index1 = 1; arg4.index2 = 3; arg4.value = 4; arg4.valuePointer = &arg4.value;
arg5.index1 = 2; arg5.index2 = 3; arg5.value = 5; arg5.valuePointer = &arg5.value;
cout << "Arguments are done ";
pthread_create(&th0, NULL, jaccard_visit, (void*) &arg0);
pthread_create(&th1, NULL, jaccard_visit, (void*) &arg1);
pthread_create(&th2, NULL, jaccard_visit, (void*) &arg2);
pthread_create(&th3, NULL, jaccard_visit, (void*) &arg3);
pthread_create(&th4, NULL, jaccard_visit, (void*) &arg4);
pthread_create(&th5, NULL, jaccard_visit, (void*) &arg5);
cout << "Threads are created" << endl;
cout << "Join starts here" << endl;
pthread_join(th0, NULL);
pthread_join(th1, NULL);
pthread_join(th2, NULL);
pthread_join(th3, NULL);
pthread_join(th4, NULL);
pthread_join(th5, NULL);
cout << "Fibonaccis: " <<endl;
cout << *(arg0.valuePointer) << endl;
cout << *(arg1.valuePointer) << endl;
cout << *(arg2.valuePointer) << endl;
cout << *(arg3.valuePointer) << endl;
cout << *(arg4.valuePointer) << endl;
cout << *(arg5.valuePointer) << endl;
}
int main(int argc, const char * argv[]){
cout << "Jaccard process is started"<<endl;
mcsim_skip_instrs_begin();
payload_text_initialize();
mcsim_skip_instrs_end();
cout << "Parallel part begins"<< endl;
manual_schedule();
cout << "Calculated results are being logged"<<endl;
for (int i = 0 ; i < NUM_OF_VERTICES ; i++){
for (int j = 0 ; j < NUM_OF_VERTICES ; j++){
cout << indices[i][j] << " ";
}
cout << endl;
}
}
void payload_text_initialize(){
payload_texts[0] = "l5IC5uC9AzcROkE3YkDJ2lEzLts8XP8a9WqDgDLWjg1M7HysAUfDFwzLWjc7875PnZVUHLzi6nQaUMQDNUeG4Wn2UkiOB79tOlE1t6LaKYbYiCJwJ34CAOFZCIbFSmcLTAAoB1rvPfeA6oM3kV3C8BDvraGvXjUORLGFAcBRQCerb3WD0qhrrM0MVW0t93bBqlTsrkxg";
payload_texts[1] = "tILKwAhbUkoqouKZ1G1VrZRmKwQnwzBgQirLkdedsYIAplKdEfk8oSmqdJmCJd5g0Q3VcJ8RYoxtIwA7jL1L01DcagIOuld0whcyM0yvSP0pMWO2yVTwOQPGkW2k7AHqzSEvb5BWkKsTexBsCUepjbG50T6vKsEHXGJ9aZwn2274Ekhnu1hlvuTqsS8jgwr0kQwhbwxN";
payload_texts[2] = "LNyQgx3mox3szmRNn1tSB4ibVuLsTr7MfANlj41Y0hKStx3NJx1O52XxNiqTMDCu4eGwWYcBvFMEC5tl1E7Rsm0Q9NZsPAJIwuiPYQuXeUyhMmbFiwRk6PlziXne0QaFJ3TrncsHsL3LxIDyaDPScSRdEvX72IJmi2gQTHgASi0KkKH4Sr6VJV3FjdNjKwY2ncT5oSXZ";
payload_texts[3] = "UxynTAvEWF4CcY9wUJRFnrX7sgrvvubcXUqH5DXK12UjSHDUME397S3BdB38FeMQJq8r7P7RILAY0qkw7OxUhGsZHRPmuY7VwKULqb6fx0Oy2McW2u07yqdAEMCN6AkQ1jTn2sXB4uWH21uLbjCf9i2V7W9tyw3cx6piE7XJb3vfbLI34OG5LKQXmVAGT0D6nbibaN8M";
}
execute_parallel()
contains 2 for loops to create and join pthreads. manual_schedule()
has the unrolled version of the same code. While executing on PIN, both works well until the join function of the first thread. When join comes, it freezes and stays like that forever, without any signal or error. While executing on Ubuntu with -lpthread flag, it works perfect and generates the results.
What could be the most safe and appropriate way to implement pthreads in this situation?
Thanks in advance
Edit
I noticed that program freezes before reading payload_texts[args->index1]
. Adding a mutex helped to proceed at that point. Also it worked for one time properly. It is non-deterministic now, in multiple executions of the same binary it rarely finishes properly. I think there is supposed to be a reason for a deadlock inside the jaccard_visit function. I changed it like the following:
void * jaccard_visit(void *arg){
thread_args * args = (thread_args *) arg;
set<char> setunion;
set<char> intersect;
int id = args->index1 * 10 + args->index2;
pthread_mutex_lock(&cout_mutex); cout << "Thread "<< id << " started with indices: " << args->index1 << " " << args->index2 << endl; pthread_mutex_unlock(&cout_mutex);
pthread_mutex_lock(&payload_mutex);
set<char> set1 = find_uniques(payload_texts[args->index1]);
set<char> set2 = find_uniques(payload_texts[args->index2]);
pthread_mutex_unlock(&payload_mutex);
pthread_mutex_lock(&cout_mutex); cout << id << " : payload_texts were read" << endl; pthread_mutex_unlock(&cout_mutex);
pthread_mutex_lock(&cout_mutex); cout << id << " : intersect was created, scan begins" << endl; pthread_mutex_unlock(&cout_mutex);
for (set<char>::iterator i = set1.begin(); i != set1.end(); i++) {
char c1 = *i;
for (set<char>::iterator j = set2.begin(); j != set2.end(); j++) {
char c2 = *j;
if (c1 == c2){
intersect.insert(c1);
pthread_mutex_lock(&cout_mutex); cout << id << " : char" << c1 << " was inserted to intersection" << endl; pthread_mutex_unlock(&cout_mutex);
break;
}
}
}
pthread_mutex_lock(&cout_mutex); cout << id << " : intersection is calculated" << endl; pthread_mutex_unlock(&cout_mutex);
for (set<char>::iterator i = set1.begin(); i != set1.end(); i++) {
setunion.insert(*i);
}
for (set<char>::iterator i = set2.begin(); i != set2.end(); i++) {
char c = *i;
bool exists = false;
for (set<char>::iterator j = set1.begin(); j != set1.end(); j++) {
if (c == *j)
exists = true;
}
if (exists == false)
setunion.insert(c);
}
pthread_mutex_lock(&cout_mutex); cout << id << " : union is calculated" << endl; pthread_mutex_unlock(&cout_mutex);
double similarity = ((double) intersect.size()) / ((double) setunion.size());
pthread_mutex_lock(&cout_mutex);
cout << id << " : similarity is calculated as " << similarity << endl;
pthread_mutex_unlock(&cout_mutex);
indices[args->index1][args->index2] = similarity;
indices[args->index2][args->index1] = similarity;
unsigned long a = 1;
unsigned long b = 1;
unsigned long c = a + b;
pthread_mutex_lock(&cout_mutex);
cout << id << " : fibonacci starts" << endl;
pthread_mutex_unlock(&cout_mutex);
for (int i = 3 ; i < 100000 * (similarity - 0.9) ; i++){
a = b;
b = c;
c = a + b;
}
*(args->valuePointer) = c;
//pthread_exit(args->valuePointer);
return NULL;
}
Finally I made it work by doing the following modifications in the function which each thread executes (jaccard_visit):
The following code works pretty well:
void * jaccard_visit(void *arg){
thread_args * args = (thread_args *) arg;
int id = args->id;
pthread_mutex_lock(&cout_mutex); cout << id << " : thread started" << endl; pthread_mutex_unlock(&cout_mutex);
pthread_mutex_lock(&payload_mutex);
string str1 = my_graph[args->index1].payload_text;
string str2 = my_graph[args->index2].payload_text;
pthread_mutex_unlock(&payload_mutex);
pthread_mutex_lock(&cout_mutex); cout << id << " : payload texts are read" << endl; pthread_mutex_unlock(&cout_mutex);
int stringLength = str1.length() - 1;
for (int i = 0; i < stringLength; i++) {
for (int j = i + 1; j < stringLength;) {
if (str1[i] == str1[j])
str1[j] = str1[--stringLength];
else
j++;
}
}
string set1 = str1.substr(0, stringLength);
pthread_mutex_lock(&cout_mutex); cout << id << " : unique chars of first node were extracted" << endl; pthread_mutex_unlock(&cout_mutex);
stringLength = str2.length() - 1;
for (int i = 0; i < stringLength; i++) {
for (int j = i + 1; j < stringLength;) {
if (str2[i] == str2[j])
str2[j] = str2[--stringLength];
else
j++;
}
}
string set2 = str2.substr(0, stringLength);
pthread_mutex_lock(&cout_mutex); cout << id << " : unique chars of second node were extracted" << endl; pthread_mutex_unlock(&cout_mutex);
int intersection_index = 0;
int union_index = 0;
for (int i = 0 ; i < set1.length() ; i++){
bool exists_in_set2 = false;
for (int j = 0 ; j < set2.length() && exists_in_set2 == false; j++){
if (set1[i] == set2[j]) {
intersection_index ++;
exists_in_set2 = true;
}
}
if (!exists_in_set2) {
union_index ++;
}
}
union_index += set2.length();
pthread_mutex_lock(&cout_mutex); cout << id << " : set1={" << set1 << "}, set2={" << set2 << "}" << endl; pthread_mutex_unlock(&cout_mutex);
pthread_mutex_lock(&cout_mutex); cout << id << " : |n|=" << intersection_index << ", |u|=" << union_index << endl; pthread_mutex_unlock(&cout_mutex);
double similarity = ((double) intersection_index / union_index);
pthread_mutex_lock(&cout_mutex); cout<<id<<" : similarity is: " << similarity << endl; pthread_mutex_unlock(&cout_mutex);
pthread_mutex_lock(&indices_mutex);
my_graph[args->index1].jaccardList[args->index2] = similarity;
my_graph[args->index2].jaccardList[args->index1] = similarity;
pthread_mutex_unlock(&indices_mutex);
return NULL;
}