Search code examples
cmultithreadinggetposixconsumer

Posix thread consumer's get not printing properly


My program is supposed to use the producer thread to read in lines from a file to a buffer that will hold up to numlines at a time and then use the consumer thread to add the words of the file to a linked list depending on if the character count of the word is odd or even, appending the word to the list if it isnt there already and if the word is already in the list it will add it to the count. The output will be an odd list and even list with words and how often they appear in the file. I am running this on Mint Linux

The program is used like ./exec -b numlines -t maxcounters -d filedelay -D threaddelay file, so I am using ./exec -b 4 -t 1 -d 0 -D 0 file for now. When the program with the file attached, I see that my producer thread correctly puts in 4 lines of the file, but then the consumer's get seems to return different outputs each time. Im very new to pthreads, can someone please tell me what's wrong with my get?

#define _GNU_SOURCE 
#include <features.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <limits.h>
#include <time.h>

//globals
int numlines;
int maxcounters;
int filedelay;
int threaddelay;
#define MAXSIZE 2048
#define OVER (-1)
char* lastLine;

/* Circular buffer of integers. */
struct prodcons
{
  char** data;  /* the actual data */
  pthread_mutex_t lock;     /* mutex ensuring exclusive access to buffer */
  int readpos, writepos;    /* positions for reading and writing */
  pthread_cond_t notempty;  /* signaled when buffer is not empty */
  pthread_cond_t notfull;   /* signaled when buffer is not full */
};

//linked lists for odd and even
struct node  
{
  char *word;
  int timesAppeared;
  struct node *next;
};

int checkargs(int argc, char* argv[]);
static void init (struct prodcons *b);
static void put (struct prodcons *b, char* line);
static char *get (struct prodcons *b);
static void *producer (void *args);
static void *consumer (void *args);
static void addWord (char *word, struct node **head);

//buffer and lists
struct prodcons buffer;
struct node *odd = NULL;
struct node *even = NULL;

//mutexes for list
pthread_mutex_t oddlock;
pthread_mutex_t evenlock;

int main(int argc, char* argv[]){
    if (checkargs(argc, argv) == 1){
        printf("CORRECT ARGS!\n");
    }
    else{
        exit(1);
    }
    int filecount;
    filecount = argc - 9;
    pthread_t th_a, th_b;
    void *retval;
    init(&buffer);

    //create threads
    if(pthread_create (&th_a, NULL, producer, argv) != 0){
        fprintf(stderr, "Error: producer create problem\n");
        exit(1);
    }
    if(pthread_create (&th_b, NULL, consumer, argv) != 0){
        fprintf(stderr, "Error: consumer create problem\n");
        exit(1);
    }
    /* Wait until producer and consumer finish. */
    if(pthread_join(th_a, &retval) != 0){
        fprintf(stderr, "ERROR: producer join problem\n");
        exit(1);
    }
    printf("hey121342343\n");
    if(pthread_join(th_b, &retval) != 0){
        fprintf(stderr, "ERROR: consumer join problem\n");
        exit(1);
    }   
    printf("hey123\n");
    struct node *ptr;
    printf("\nODD LIST BEGIN\n");
    printf("%s %s\n", "Word", "# appeared");
    for( ptr = odd; ptr != NULL; ptr = ptr->next){
        printf("%s %d\n", ptr->word, ptr->timesAppeared);
    }

    printf("\nEVEN LIST BEGIN\n");
    printf("%s %s\n", "Word", "# appeared");
    for( ptr = even; ptr != NULL; ptr = ptr->next){
        printf("%s %d\n", ptr->word, ptr->timesAppeared);
    }

    return 0;
} 

static void addWord (char* word, struct node **head){


    struct node *headnode;
    struct node *addnode;
    struct node *lastnode;

    //check for word
    for(headnode = *head; headnode != NULL; headnode = headnode->next){
        if(strcmp(headnode->word, word) == 0){
            //same
            printf("SAME WORD!\n");
            headnode -> timesAppeared++;
            return;
        }
    }

    //not in list? make a new node
    addnode = (struct node *) malloc(sizeof(struct node));
    addnode -> word = malloc(strlen(word) + 1);
    strcpy(addnode->word, word);
    printf("the word %s has been added!\n", addnode->word);
    addnode-> timesAppeared = 1;
    addnode->next = NULL;

    //insert into linked list
    headnode = *head;
    lastnode = NULL;

//possible debug, try taking out the strcmp condition
//(strcmp(addnode->word, headnode->word) > 0)
    //traverse to end
    while(( headnode != NULL)){
        lastnode = headnode;
        headnode = headnode->next;
    }
    addnode->next = headnode;

    //in case of empty list
    if(lastnode == NULL){
        *head = addnode;
    }
    else{
        lastnode ->next = addnode;
    }
    return;
}

/* Initialize a buffer */
static void init (struct prodcons *b)
{
    //make a buffer of correct size and check
    b->data = malloc(sizeof(char *) * numlines);
    if (b->data == NULL){
        perror("failed buffer malloc");
        exit(1);
    }
    pthread_mutex_init (&b->lock, NULL);
    pthread_cond_init (&b->notempty, NULL);
    pthread_cond_init (&b->notfull, NULL);
    b->readpos = 0;
    b->writepos = 0;
}

/* Store an integer in the buffer */
static void put (struct prodcons *b, char* line)
{
    pthread_mutex_lock (&b->lock);
    /* Wait until buffer is not full */
    while ((b->writepos + 1) % numlines == b->readpos)
    {
      pthread_cond_wait (&b->notfull, &b->lock);
      /* pthread_cond_wait reacquired b->lock before returning */
    }
    /* Write the data and advance write pointer */
    printf("THIS IS THE READLINE in PUT %s\n", line);
    b->data[b->writepos] = line;
    printf("AFTER PUT command: %s is at WRITEPOS %d\n", b->data[b->writepos], b->writepos);
    b->writepos++;
    if (b->writepos >= numlines){
        b->writepos = 0;
    }


    /* Signal that the buffer is now not empty */
    pthread_cond_signal (&b->notempty);
    pthread_mutex_unlock (&b->lock);
}


/* Read and remove an integer from the buffer */
static char* get (struct prodcons *b)
{
    char* line;
    pthread_mutex_lock (&b->lock);

    /* Wait until buffer is not empty */
    while (b->writepos == b->readpos)
    {
      pthread_cond_wait (&b->notempty, &b->lock);
    }
    /* Read the data and advance read pointer */
    printf("THIS IS THE LINE in GET%s\n", b->data[b->readpos]);
    line = b->data[b->readpos];
    printf("AFTER GET command: %s is at READPOS %d\n", b->data[b->readpos], b->readpos);
    b->readpos++;
    if (b->readpos >= numlines){
        b->readpos = 0;
    }

    /* Signal that the buffer is now not full */
    pthread_cond_signal (&b->notfull);
    pthread_mutex_unlock (&b->lock);

    return line;
}

/* A test program: one thread inserts integers from 1 to 10000,
   the other reads them and prints them. */



static void *producer (void *args)
{
    FILE *file;
    char readline[MAXSIZE];
    char **argv = args;
    int filePos = 9;

    //go through every file
    while (argv[filePos] != NULL){
        printf("This is the file opened: %s", argv[filePos]);
        file = fopen(argv[filePos], "r");

        //read file lines
        while(fgets(readline, MAXSIZE, file) == readline){

            printf("THIS IS THE READLINE in PRODUCER%s", readline);
            put(&buffer, readline);

        }
        // lastLine = &readline;
        // printf("THIS IS THE LAST READLINE: %s\n", lastLine);

        fclose(file);
        filePos+= 1;
    }//endwhile
    // need a way for the consumer to know when the thread has finished
    put(&buffer, "endofile");
    // int a;
    // for(a = 0; a < ccount; a++){
    //  put(NULL);
    // }

  printf("-------------PRODUCER FINISHED FILLING BUFFER-------------\n");
    return NULL;  
}

static void *consumer (void *args)
{
    char name[2] = "a";
    char *removedLine;
    char* token;
    //get line
    removedLine = get(&buffer);
    while(removedLine != "endofile"){
        printf("This is the removedline: %s", removedLine);
        token = strtok_r(removedLine, " ", &removedLine);
        printf("This is the TOKEN %s and its length: %zu\n", token, strlen(token));
        //process each word
        while(token != NULL){

            //odd
            if( (strlen(token) - 1) % 2 == 1){
                pthread_mutex_lock(&oddlock);
                printf("adding the word in odd !\n");
                addWord(token, &odd);
                pthread_mutex_unlock(&oddlock);
            }
            //even
            if( (strlen(token) - 1) % 2 == 0){
                pthread_mutex_lock(&evenlock);
                printf("adding the word in even!\n");
                addWord(token, &even);
                pthread_mutex_unlock(&evenlock);
            }
            //subsequent call, has to make the first arg null from the man page
            token = strtok_r(NULL, " ", &removedLine);
        }//inner while
        // printf("This is thread A!\n");
        removedLine = get(&buffer);
        printf("This is the next line, after the while: %s", removedLine);
    }//outer while
    printf("-------------CONSUMER FINISHED FILLING BUFFER-------------\n");

    return;
}
// //need to make 2 lists for odd and even
// struct linkedlist{
//   struct node *start;
//   struct node *end;
// };

// struct list odd;
// struct list even;

int checkargs(int argc, char* argv[]){
    //have checkers to see if the program went through all the args properly 
    int checknumlines;
    int checkmaxcounters;
    int checkfiledelay;
    int checkthreaddelay;
    printf("ARGC %d\n", argc);
    //10 args or more (depending on files)
    if(argc <= 9){
        fprintf(stderr, "Error: Please enter correct number of args. \n");
        return 0;
    }
    //grab data
    int i;

    for(i = 0; i < (argc - 1); i++){
        //travel through until -b, get that info of the next
        if(strcmp(argv[i], "-b") == 0){
            numlines = atoi(argv[i+1]);
            if (numlines <= 0){
                fprintf(stderr, "Error: numlines is not a positive non-zero integer. \n");
                return 0;
            }
            checknumlines = -1;
        }
        //travel through until -t, get that info of the next
        if(strcmp(argv[i], "-t") == 0){
            maxcounters = atoi(argv[i +1]);
            if((maxcounters <= 0) || (maxcounters > 26)){
                fprintf(stderr, "Error: maxcounters needs to be a positive non-zero integer less than or equal to 26. \n");
                return 0;
            }
            checkmaxcounters = -1;
        }

        //travel through until -d, get that info of the next
        if(strcmp(argv[i], "-d") == 0){
            filedelay = atoi(argv[i+1]);
            if(filedelay < 0 ){
                fprintf(stderr, "Error: filedelay needs to be a positive integer. \n");
                return 0;
            }
            checkfiledelay = -1;
        }

        //travel through until -D, get that info of the next
        if(strcmp(argv[i], "-D") == 0){
            threaddelay = atoi(argv[i+1]);
            if(threaddelay < 0 ){
                fprintf(stderr, "Error: threaddelay needs to be a positive integer. \n");
                return 0;
            }
            checkthreaddelay = -1;
        }
    }
    printf("CHECKS: This is numlines: %d, This is maxcounters: %d, This is filedelay: %d, This is threaddelay: %d \n", checknumlines, checkmaxcounters, checkfiledelay, checkthreaddelay);

    if(checknumlines != -1 || checkmaxcounters != -1|| checkfiledelay != -1|| checkthreaddelay!= -1){
        fprintf(stderr, "Error: incorrect args.\n");
        return 0;
    }

    printf("This is numlines: %d, This is maxcounters: %d, This is filedelay: %d, This is threaddelay: %d \n", numlines, maxcounters, filedelay, threaddelay);


    return 1;
}

The file Im using to check:

a
aa
aaa
aaaa
aaaaa
aaaaaa
aaaaaaa
aaaaaaaa
aaaaaaaaa
aaaaaaaaaa
aaaaaaaaaaa
aaaaaaaaaaaa
aaaaaaaaaaaaa
aaaaaaaaaaaaaa
aaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaa

The output (minus the debugging statements) after 3 runs 1.

 ODD LIST BEGIN
    Word # appeared
    aaaaa
     3
    aaaaaaaaa
     1
    aaaaaaaaaaa
     1
    a
     1
    aaaaaaaaaaaaa
     2
    aaaaaaaaaaaaaaa
     1
    aaaaaaaaaaaaaaaaa
     4

    EVEN LIST BEGIN
    Word # appeared
    aaaaa
     1

     2
    aaaaaa
     1
    aaaaaaaaaaaa
     2
    aaaaaaaaaaaaaaaa
     1

2.

ODD LIST BEGIN
Word # appeared
aaaaa
 4
aaaaaaaaa
 3
aaaaaaaaaaaaaaa
 1
aaaaaaaaaaaaaaaaa
 4

EVEN LIST BEGIN
Word # appeared

 3
aaaaaaaa
 1
aaaaaaaaaaaa
 1
aaaaaaaaaaaaaa
 2
aaaaaaaaaaaaaaaa
 1

3.

ODD LIST BEGIN
Word # appeared
aaaaa
 2
aaaaaaa
 1
aaaaaaaaa
 1
aaaaaaaaaaa
 1
aaaaaaaaaaaaa
 1
aaaaaaaaaaaaaaa
 1
aaaaaaaaaaaaaaaaa
 4

EVEN LIST BEGIN
Word # appeared

 2
aaaaaa
 1
aaaaaaaa
 1
aaaaaaaaaa
 1
aaaaaaaaaaaa
 1
aaaaaaaaaaaaaa
 1
aaaaaaaaaaaaaaaa
 2

Thank you for your time! I tried to be as descriptive as possible, let me know if this is bad (new to stackoverflow)


Solution

  • The way you add lines to your buffer isn't correct:

    static void *producer (void *args)
    {
        char readline[MAXSIZE];
        // ...
            while(fgets(readline, MAXSIZE, file) == readline){
    
                printf("THIS IS THE READLINE in PRODUCER%s", readline);
                put(&buffer, readline);
            }
        // ...
    }
    

    Notice that you are reading lines into your local variable readline, and adding readline to your buffer. Then you loop around and overwrite the contents of readline with the next line. Therefore, the previous contents added to the buffer will be overwritten. What you need to be doing is making a copy of each line. For example, if you have a strdup() function:

            while(fgets(readline, MAXSIZE, file) == readline){
    
                printf("THIS IS THE READLINE in PRODUCER%s", readline);
                put(&buffer, strdup(readline));
            }
    

    Now each string you add to the buffer is an allocated copy. You need to remember to free this copy later after you consume it.