Search code examples
clinuxmultithreadingmutexreaderwriterlock

What's wrong with this first readers-writers solution implementation in C using mutex?


I am trying to implement the first readers writers problem (reader's preference) in C. I am using mutex locks and unlocks to make sure that no writer can access the thread if a reader has a lock and any reader can access the thread if the first reader has a lock. Here is my code. I am unable to get my code till the end i.e., it is not reaching the thread join part. I guess I am getting a deadlock somewhere or maybe I am placing my mutex locks and unlocks in wrong place.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <fcntl.h>
#include <sys/types.h>

#define FALSE 0
#define TRUE  1
#define SLOWNESS 30000
#define INVALID_ACCNO -99999
#define SIZE  100
#define WRITE_ITR 100000
#define READ_ITR  100000
#define MAX_BALANCE 1000000

typedef struct {
        int accno;
        float balance;
} account;

// sleep function
void rest() 
{
    usleep(100);
}

//Global shared data structure
account account_list[SIZE];             /* this is the data structure that the readers and writers will be accessing concurrently.*/

pthread_mutex_t rw_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t r_lock = PTHREAD_MUTEX_INITIALIZER;


/* Writer thread - will update the account_list data structure. 
   Takes as argument the seed for the srand() function. 
*/
void * writer_thr(void * arg) 
{
    printf("Writer thread ID %ld\n", pthread_self());   
    srand(*((unsigned int *) arg));     /* set random number seed for this writer */

    int i, j;
    int r_idx;
    unsigned char found;            /* For every update_acc[j], set to TRUE if found in account_list, else set to FALSE */
    account update_acc[WRITE_ITR];

    /* first create a random data set of account updates */
    for (i = 0; i < WRITE_ITR;i++) 
    {
        r_idx = rand() % SIZE;      /* a random number in the range [0, SIZE) */ 
        update_acc[i].accno = account_list[r_idx].accno;
        update_acc[i].balance = 1000.0 + (float) (rand() % MAX_BALANCE);
    }//end for

    /* open a writer thread log file */
    char thr_fname[64];
    snprintf(thr_fname, 64, "writer_%ld_thr.log", pthread_self());
    FILE* fd = fopen(thr_fname, "w");
    if (!fd) 
    {
        fprintf(stderr,"Failed to open writer log file %s\n", thr_fname);
        pthread_exit(&errno);
    }//end if

    /* The writer thread will now try to update the shared account_list data structure. 
       For each entry 'j' in the update_acc[] array, it will find the corresponding 
       account number in the account_list array and update the balance of that account
       number with the value stored in update_acc[j].   
    */

    int temp_accno;
    for (j = 0; j < WRITE_ITR;j++) {
        found = FALSE;
        for (i = 0; i < SIZE;i++) {
            if (account_list[i].accno == update_acc[j].accno) {
                found = 1;
                temp_accno = account_list[i].accno;
                pthread_mutex_lock(&rw_lock);
                account_list[i].accno = INVALID_ACCNO;
                account_list[i].balance = update_acc[j].balance;
                account_list[i].accno = temp_accno;
                rest();                 /* makes the write long duration - SO AS TO INTRODUCE LATENCY IN WRITE before going for next 'j' */
                pthread_mutex_unlock(&rw_lock);
                fprintf(fd, "Account number = %d [%d]: old balance = %6.2f, new balance = %6.2f\n",
                        account_list[i].accno, update_acc[j].accno, account_list[i].balance, update_acc[j].balance);

        }//end if
        if (!found)
            fprintf(fd, "Failed to find account number %d!\n", update_acc[j].accno);

    }   // end test-set for-loop
}
    fclose(fd);
    return NULL;
}

/* Reader thread - will read the account_list data structure. 
   Takes as argument the seed for the srand() function. 
*/
void * reader_thr(void *arg) 
{
    printf("Reader thread ID %ld\n", pthread_self());
    srand(*((unsigned int *) arg));   /* set random number seed for this reader */

    int i, j;
    int r_idx;
    unsigned char found;            /* For every read_acc[j], set to TRUE if found in account_list, else set to FALSE */
    account read_acc[READ_ITR];

    /* first create a random data set of account updates */
    for (i = 0; i < READ_ITR;i++) 
    {
        r_idx = rand() % SIZE;      /* a random number in the range [0, SIZE) */
        read_acc[i].accno = account_list[r_idx].accno;
        read_acc[i].balance = 0.0;      /* we are going to read in the value */
    }//end for

    /* open a reader thread log file */
    char thr_fname[64];
    snprintf(thr_fname, 64, "reader_%ld_thr.log", pthread_self());
    FILE *fd = fopen(thr_fname, "w");
    if (!fd) 
    {
        fprintf(stderr,"Failed to reader log file %s\n", thr_fname);
        pthread_exit(&errno);
    }//end if

    /* The reader thread will now try to read the shared account_list data structure.
       For each entry 'j' in the read_acc[] array, the reader will fetch the 
       corresponding balance from the account_list[] array and store in
       read_acc[j].balance. */
    for (j = 0; j < READ_ITR;j++) {
        /* Now read the shared data structure */
        found = FALSE;
        for (i = 0; i < SIZE;i++) {
            rest();
            if (account_list[i].accno == read_acc[j].accno) {
                found = TRUE;
                fprintf(fd, "Account number = %d [%d], balance read = %6.2f\n",
                            account_list[i].accno, read_acc[j].accno, read_acc[j].balance); 
                pthread_mutex_lock(&r_lock);
                if(j == 1)
                {
                    pthread_mutex_lock(&rw_lock);
                }
                pthread_mutex_unlock(&r_lock);
                read_acc[j].balance = account_list[i].balance;
                pthread_mutex_lock(&r_lock);
                if(j == READ_ITR -  1)
                {
                    pthread_mutex_unlock(&rw_lock);
                }
                pthread_mutex_unlock(&r_lock);

        }

        if (!found)
            fprintf(fd, "Failed to find account number %d!\n", read_acc[j].accno);
    }   // end test-set for-loop
}

    fclose(fd);
    return NULL;
}

/* populate the shared account_list data structure */
void create_testset() {
    time_t t;
    srand(time(&t));
    int i;  
    for (i = 0;i < SIZE;i++) {
        account_list[i].accno = 1000 + rand() % RAND_MAX;
        account_list[i].balance = 100 + rand() % MAX_BALANCE;
    }   
    return;
}


void usage(char *str) {
    printf("Usage: %s -r <NUM_READERS> -w <NUM_WRITERS>\n", str);
    return;
}


int main(int argc, char *argv[]) 
{
    time_t t;
    unsigned int seed;
    int i;

    int READ_THREADS;           /* number of readers to create */
    int WRITE_THREADS;          /* number of writers to create */

    if(argc <= 3)
    {
        usage("./rw");
        exit(EXIT_FAILURE);
    }   
    int opt;
    while((opt = getopt(argc, argv, "r:w:")) != -1)
    {
        switch(opt)     
        {
            case 'r':
                READ_THREADS = atoi(optarg);
                break;
            case 'w':
                WRITE_THREADS = atoi(optarg);
                break;
            default: 
                usage("./rw");
                exit(EXIT_FAILURE);
        }
    }



    pthread_t* reader_idx = (pthread_t *) malloc(sizeof(pthread_t) * READ_THREADS);     /* holds thread IDs of readers */
    pthread_t* writer_idx  = (pthread_t *) malloc(sizeof(pthread_t) * WRITE_THREADS);       /* holds thread IDs of writers */

    /* create readers */
    for (i = 0;i < READ_THREADS;i++) 
        {
            seed = (unsigned int) time(&t);
            if((pthread_create(&reader_idx[i], NULL, reader_thr, &seed)) != 0)
            {
                perror("pthread reader create");
                exit(-1);
            }
        }
    printf("Done creating reader threads!\n");

    /* create writers */ 
    for (i = 0;i < WRITE_THREADS;i++) 
    {
        seed = (unsigned int) time(&t);
        /* YOUR CODE GOES HERE */
        if((pthread_create(&writer_idx[i], NULL, writer_thr, &seed)) != 0)
        {
            perror("pthread writer create");
            exit(-1);
        }

    }
    printf("Done creating writer threads!\n");

    /* Join all reader and writer threads. 
    */
    for(i = 0; i < READ_THREADS; i++)
    {
        pthread_join(reader_idx[i], NULL);
    }
    for(i = 0; i < WRITE_THREADS; i++)
    {
        pthread_join(writer_idx[i], NULL);
    }

    printf("Reader threads joined.\n");

    printf("Writer threads joined.\n");

    pthread_mutex_destroy(&r_lock);
    pthread_mutex_destroy(&rw_lock);

    return 0;
}

Solution

  • Your code is a mess. There are several things that are wrong with it and each one of them breaks the RW locking mechanism that you are trying to implement.

    • Both your reader threads and writer threads need to deal with reader exclusion and writer exclusion. Your current code completely ignores the reader exclusion in writer thread.
    • Your writer thread is reading from the shared structure (if (account_list[i].accno == update_acc[j].accno)) without excluding other writers.
    • I do not think this is implementable with just mutexes as you seem to be trying to do. E.g., last reader thread out of the critical section needs to be able to let waiting writers go. You probably need at least conditional variables or semaphores to do this.

    My suggestion is to use the POSIX pthread_rwlock_init and friends instead.

    If you insist on doing this yourself then please read at least this Concurrent Control with "Readers" and "Writers" paper for inspiration on how this can be implemented.