I have a producer thread that reads each char and its offset from a source file, then its writes it to a shared circular buffer.
I also have a consumer thread that reads the oldest element in the buffer, then writes it to a copy file.
The number of producer and consumer threads are given as a command line argument, also everytime the producer/consumer thread reads or writes to the buffer, or reads or writes to the file a specific line is written to a log file.
Right now I have a single critical section for the producer and consumer threads. How can I structure it so to minimize the time spent in the critical section (its getting slow). I was thinking having multiple critical sections for my producer and consumer threads. For example in my producer thread I can have a critical section for reading from the source file and writing the specific line (ex "read from file") to the log file, and another critical section for writing to the buffer and writing the specific line (ex "writing to buffer") to the log file. But how does this make a difference if the critical sections are one after the other?
Here is my producer thread:
void *INthread(void *arg)
{
printf("INSIDE INthread\n");
FILE *srcFile = (FILE*)arg;
FILE *lp; // Log file pointers.
int t_id = INid++; // Thread number.
int curOffset;
BufferItem result;
struct timespec t;
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);
fseek(srcFile, 0, SEEK_CUR);
curOffset = ftell(srcFile); // Save the current byte offset.
fseek(srcFile, 0, SEEK_END);
if(len == 0) // If len hasnt been set by the first IN thread yet.
len = ftell(srcFile); // Save the length of the srcFile (number of chars).
fseek(srcFile, curOffset, SEEK_SET); // Revert file pointer to curOffset.
printf("ID %d number of bytes %d\n", t_id, len);
int offs;
int ch;
while(len > 0) // Go through each byte/char in file.
{
/*** CRITICAL SECTION ********************************/
sem_wait(&empty); /* acquire the empty lock */
pthread_mutex_lock( &pt_mutex );
if(len > 0){
fseek(srcFile, 0, SEEK_CUR);
if((offs = ftell(srcFile)) != -1)
result.offset = offs; /* get position of byte in file */
if((ch = fgetc(srcFile)) != EOF)
result.data = ch; /* read byte from file */
// Write to log file "read_byte PTn Ox Bb I-1".
if (!(lp = fopen(log, "a"))) {
printf("could not open log file for writing");
}
if(fprintf(lp, "read_byte PT%d O%d B%d I-1\n", t_id, offs, ch) < 0){
printf("could not write to log file");
}
printf("ID %d --- offset %d char %c len%d\n", t_id, result.offset, result.data, len);
addItem(&cBuff, &result);
// Write to log file "produce PTn Ox Bb Ii ".
if(fprintf(lp, "produce PT%d O%d B%d I%d\n", t_id, offs, ch, cBuff.lastInd) < 0){
printf("could not write to log file");
}
fclose(lp);
len--;
}
pthread_mutex_unlock( &pt_mutex );
sem_post(&full); /* signal full */
/*** END CRITICAL SECTION ********************************/
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);
}
inJoin[t_id] = 1; // This IN thread is ready to be joined.
printf("EXIT INthread\n");
pthread_exit(0);
}
Here is my consumer thread:
void *OUTthread(void *arg)
{
printf("INSIDE OUTthread\n");
struct timespec t;
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);
int processing = 1;
FILE *targetFile, *lp;
BufferItem OUTresult;
int t_id = OUTid++;
int offs, ch;
int numBytes = len;
while(processing){
/*** CRITICAL SECTION ********************************/
sem_wait(&full); /* acquire the full lock */
pthread_mutex_lock( &pt_mutex );
cbRead(&cBuff, &OUTresult);
offs = OUTresult.offset;
ch = OUTresult.data;
if (!(lp = fopen(log, "a"))) {
printf("could not open log file for writing");
}
// Write to log file "consume CTn Ox Bb Ii".
if(fprintf(lp, "consume CT%d O%d B%d I%d\n", t_id, offs, ch, cBuff.lastInd) < 0){
printf("could not write to log file");
}
printf("From buffer: offset %d char %c\n", OUTresult.offset, OUTresult.data);
if (!(targetFile = fopen(arg, "r+"))) {
printf("could not open output file for writing");
}
if (fseek(targetFile, OUTresult.offset, SEEK_SET) == -1) {
fprintf(stderr, "error setting output file position to %u\n",
(unsigned int) OUTresult.offset);
exit(-1);
}
if (fputc(OUTresult.data, targetFile) == EOF) {
fprintf(stderr, "error writing byte %d to output file\n", OUTresult.data);
exit(-1);
}
// Write to log file "write_byte CTn Ox Bb I-1".
if(fprintf(lp, "write_byte CT%d O%d B%d I-1\n", t_id, offs, ch) < 0){
printf("could not write to log file");
}
fclose(lp);
fclose(targetFile);
pthread_mutex_unlock( &pt_mutex );
sem_post(&empty); /* signal empty */
/*** END CRITICAL SECTION ********************************/
t.tv_sec = 0;
t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
nanosleep(&t, NULL);
}
outJoin[t_id] = 1; // This OUT thread is ready to be joined.
printf("EXIT OUTthread\n");
pthread_exit(0);
}
I would orginize this such that you're code takes on the form:
... processing ...
mutex lock
resource read / write
mutex unlock
... continue processing
around each resource that is to be shared. Thus, you would end up having multiple mutexes, one for the producers reading the file, one for the producers writing to the circular buffer. One for the consumers reading from the circular buffer... ext. And each one would be encapsulating a single read/write operation to their respected resource.
I would also make it so that you're circular buffer can not overwrite itself, otherwise you will not be able to read and write to the buffer at the same time, making only one mutex for both read/write operations effectively increasing both consumer and producer thread wait times.
This might not be the 'best' solution, but it should process faster then having a giant lock around the majority of your code.