Search code examples
cmultithreadingsocketspthreadsshared-variable

How to share variables among threads?


I am working on a server-client project which allows the server to exchange messages with a client individually. However, I have to modify the server so that when the server sends a message, it is sent to all connected clients.

I know this involves sharing variables among threads but I am confused on how to go about do this?

Any tips/guidence will be appreciated!

Server Code:

#include<stdio.h>
#include<string.h>    //strlen
#include<stdlib.h>    //strlen
#include<sys/socket.h>
#include<arpa/inet.h> //inet_addr
#include<unistd.h>    //write
#include<pthread.h> //for threading , link with lpthread
#define MAX 80

#define PORT 6543
#define SA struct sockaddr
// Function designed for chat between client and server.
void join( pthread_t *thread_id)
{
    if(!pthread_join( *thread_id , NULL))
        printf("thread %ld complted\n",*thread_id);
    pthread_exit(0);//to exit the current thread
    
}

void func(int *sockfd)
{
    char buff[MAX];
    int n;
    // infinite loop for chat
    for (;;) {
        bzero(buff, MAX);
        // read the message from client and copy it in buffer
        read(*sockfd, buff, sizeof(buff));
        // print buffer which contains the client contents
        printf("From client: %s\t To client : ", buff);
        bzero(buff, MAX);
        n = 0;
        // copy server message in the buffer
        while ((buff[n++] = getchar()) != '\n');
        // and send that buffer to client
        write(*sockfd, buff, sizeof(buff));
        // if msg contains "Exit" then server exit and chat ended.
        if (strncmp("exit", buff, 4) == 0) {
            printf("Server Exit...\n");
            break;
        }
        
    }
}

// Driver function

int main()
{
    int sockfd, connfd, len;
    struct sockaddr_in servaddr, cli;
    pthread_t thread_id,jointhread_id;
    // socket create and verification
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        printf("socket creation failed...\n");
        exit(0);
    }
    else
        printf("Socket successfully created..\n");
    bzero(&servaddr, sizeof(servaddr));
    // assign IP, PORT
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);
    // Binding newly created socket to given IP and verification
    if ((bind(sockfd, (SA*)&servaddr, sizeof(servaddr))) != 0) {
        printf("socket bind failed...\n");
        exit(0);
    }
    else
        printf("Socket successfully binded..\n");
    // Now server is ready to listen and verification
    if ((listen(sockfd, 5)) != 0) {
        printf("Listen failed...\n");
        exit(0);
    }
    else
        printf("Server listening..\n");
    len = sizeof(cli);
    // Accept the data packet from client and verification
    while(connfd = accept(sockfd, (SA*)&cli, &len))
    {
        if (connfd < 0) {
            printf("server acccept failed...\n");
            exit(0);
        }
        else
            printf("server acccept the client..%d.\n",connfd);
        if( pthread_create( &thread_id , NULL ,  func , (void*) &connfd) < 0)
        {
            perror("could not create thread");
            return 1;
        }
        
        if( pthread_create( &jointhread_id , NULL ,  join , (void*) &thread_id) < 0)
        {
            perror("could not create thread");
            return 1;
        }
        
    }
    
    // After chatting close the socket
    close(sockfd);
}

Solution

  • Unfortunately, there were a number of bugs ...

    Wrong prototype for thread functions

    Race condition for connfd as mentioned in my top comments (passing connfd to func as a pointer).

    Doing getchar destroys the message data from the read of sockfd

    I've produced a version below that illustrates this.

    But, to actually get the code closer to what is required for your stated purpose, required a fair amount of refactoring. There's a second version further down that illustrates my take on that.


    Here's an annotated version that shows the bugs and some fixes [mostly what it took to get it to compile cleanly].

    It wraps original code with #if ORIG and new code with #if FIX and each place has a comment about the bug it's fixing

    #include <stdio.h>
    #include <string.h>                     // strlen
    #include <stdlib.h>                     // strlen
    #include <sys/socket.h>
    #include <arpa/inet.h>                  // inet_addr
    #include <unistd.h>                     // write
    #include <pthread.h>                    // for threading , link with lpthread
    
    #define MAX 80
    #define PORT 6543
    #define SA struct sockaddr
    
    #define ORIG    0
    #define FIX     1
    
    // Function designed for chat between client and server.
    // NOTE/BUG -- the main thread has to join the thread
    #if ORIG
    void
    join(pthread_t *thread_id)
    {
        if (!pthread_join(*thread_id, NULL))
            printf("thread %ld complted\n", *thread_id);
    
        // to exit the current thread
        pthread_exit(0);
    }
    #endif
    
    // NOTE/BUG: this is the _wrong_ signature for a thread function
    #if ORIG
    void
    func(int *sockfd)
    #else
    void *
    func(void *ptr)
    #endif
    {
    #if FIX
        int sockfd = (long) ptr;
    #endif
        char buff[MAX];
        int n;
    
        // infinite loop for chat
        for (;;) {
            bzero(buff, MAX);
    
            // read the message from client and copy it in buffer
    // NOTE/BUG: this has a race condition
    // NOTE/BUG: we need the actual length
    #if ORIG
            read(*sockfd, buff, sizeof(buff));
    #else
            int rlen = read(sockfd, buff, sizeof(buff));
    #endif
    
            // print buffer which contains the client contents
            printf("From client: %s\t To client : ", buff);
            bzero(buff, MAX);
    
            // copy server message in the buffer
    // NOTE/BUG: this is destroying the data that was
    #if ORIG
            n = 0;
            while ((buff[n++] = getchar()) != '\n');
    #endif
    
            // and send that buffer to client
    #if ORIG
            write(*sockfd, buff, sizeof(buff));
    #else
            write(sockfd, buff, rlen);
    #endif
    
            // if msg contains "Exit" then server exit and chat ended.
            if (strncmp("exit", buff, 4) == 0) {
                printf("Server Exit...\n");
                break;
            }
        }
    
    // NOTE/BUG: we must return the error code
    #if FIX
        return (void *) 0;
    #endif
    }
    
    // Driver function
    int
    main()
    {
        int sockfd, connfd, len;
        struct sockaddr_in servaddr, cli;
        pthread_t thread_id, jointhread_id;
    
        // socket create and verification
        sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd == -1) {
            printf("socket creation failed...\n");
            exit(0);
        }
        else
            printf("Socket successfully created..\n");
    
        // assign IP, PORT
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = INADDR_ANY;
        servaddr.sin_port = htons(PORT);
    
        // Binding newly created socket to given IP and verification
        if ((bind(sockfd, (SA *) & servaddr, sizeof(servaddr))) != 0) {
            printf("socket bind failed...\n");
            exit(0);
        }
        else
            printf("Socket successfully binded..\n");
    
        // Now server is ready to listen and verification
        if ((listen(sockfd, 5)) != 0) {
            printf("Listen failed...\n");
            exit(0);
        }
        else
            printf("Server listening..\n");
    
        // Accept the data packet from client and verification
        len = sizeof(cli);
    
    // NOTE/BUG: connfd can be zero for a valid connection
    #if ORIG
        while (connfd = accept(sockfd, (SA *) &cli, &len)) {
    #else
        while (1) {
            connfd = accept(sockfd, (SA *) &cli, &len);
    #endif
            if (connfd < 0) {
                printf("server acccept failed...\n");
                exit(0);
            }
            else
                printf("server acccept the client..%d.\n", connfd);
    
    #if ORIG
            if (pthread_create(&thread_id, NULL, func, (void *) &connfd) < 0) {
                perror("could not create thread");
                return 1;
            }
    #else
            if (pthread_create(&thread_id, NULL, func, (void *) ((long) connfd)) < 0) {
                perror("could not create thread");
                return 1;
            }
    #endif
    
    // NOTE/BUG -- creating a separate thread just to join the above thread does
    // not help
    #if ORIG
            if (pthread_create(&jointhread_id, NULL, join, (void *) &thread_id) < 0) {
                perror("could not create thread");
                return 1;
            }
    #endif
        }
    
        // After chatting close the socket
        close(sockfd);
    
        return 0;
    }
    

    Here's a refactored version that implements the multiple client message echo that you wanted.

    It uses a per-thread task block to control things. Each client, when it receives a message, sends it all other clients.

    It also does a bit of interthread locking using mutexes.

    Having a given client thread do all the echo to the other clients is but one way to do this. There are others (e.g. all client threads queue the message to the master/main thread and it does the echo)

    This is a lot closer but you'll have more work to do in func to receive/send the client message.

    Anyway, here is the code:

    #include <stdio.h>
    #include <string.h>                     // strlen
    #include <stdlib.h>                     // strlen
    #include <sys/socket.h>
    #include <arpa/inet.h>                  // inet_addr
    #include <unistd.h>                     // write
    #include <pthread.h>                    // for threading , link with lpthread
    
    #define MAX 80
    #define PORT 6543
    #define SA struct sockaddr
    
    #define ORIG    0
    #define FIX     1
    
    enum {
        TSKSTATE_IDLE,                      // task slot free/available
        TSKSTATE_PENDING,                   // task is being created
        TSKSTATE_RUNNING,                   // task is alive and running
        TSKSTATE_DONE                       // task has completed (but not reaped)
    };
    
    typedef struct tsk tsk_t;
    struct tsk {
        tsk_t *tsk_next;                    // chain pointer
        pthread_t tsk_tid;                  // thread id
        long tsk_xid;                       // sequential task id
        int tsk_sockfd;                     // client socket descriptor
        int tsk_state;                      // current task state
        pthread_mutex_t tsk_mutex;          // per-thread mutex
        void *tsk_rtn;                      // thread's return value
    };
    
    // NOTE: using an array obviates the need for a master lock if we used a
    // linked list here instead -- by passing TSKMAX to listen below [in main],
    // we guarantee that main can always find a free task slot when it needs one
    #define TSKMAX      5
    tsk_t tsklist[TSKMAX];                  // active task list
    
    #define TSKFORALL(_tsk) \
        _tsk = &tsklist[0];  _tsk < &tsklist[TSKMAX];  ++_tsk
    
    long tskxid;                            // sequential task id
    __thread tsk_t *tskcur;                 // current thread's tsk block
    
    pthread_mutex_t master_lock = PTHREAD_MUTEX_INITIALIZER;
    
    // lockall -- lock all threads
    void
    lockall(void)
    {
    
        pthread_mutex_lock(&master_lock);
    }
    
    // unlockall -- lock all threads
    void
    unlockall(void)
    {
    
        pthread_mutex_unlock(&master_lock);
    }
    
    // tsklock -- lock single thread
    void
    tsklock(tsk_t *tsk)
    {
    
        pthread_mutex_lock(&tsk->tsk_mutex);
    }
    
    // tskunlock -- unlock single thread
    void
    tskunlock(tsk_t *tsk)
    {
    
        pthread_mutex_unlock(&tsk->tsk_mutex);
    }
    
    // tskreapall -- release all completed threads
    void
    tskreapall(void)
    {
        tsk_t *tsk;
    
        lockall();
    
        for (TSKFORALL(tsk)) {
            tsklock(tsk);
    
            if (tsk->tsk_state == TSKSTATE_DONE) {
                pthread_join(tsk->tsk_tid,&tsk->tsk_rtn);
                tsk->tsk_state = TSKSTATE_IDLE;
            }
    
            tskunlock(tsk);
        }
    }
    
    // tsksendall -- send message to all other clients
    void
    tsksendall(char *msg,int len)
    {
        tsk_t *tsk;
    
        lockall();
    
        for (TSKFORALL(tsk)) {
            if (tsk == tskcur)
                continue;
    
            tsklock(tsk);
            if (tsk->tsk_state == TSKSTATE_RUNNING)
                write(tsk->tsk_sockfd,msg,len);
            tskunlock(tsk);
        }
    }
    
    void *
    func(void *ptr)
    {
        tskcur = ptr;
        char buff[MAX];
    #if ORIG
        int n;
    #endif
    
        tsklock(tskcur);
        tskcur->tsk_state = TSKSTATE_RUNNING;
        tskunlock(tskcur);
    
        // infinite loop for chat
    // NOTE: this loop still needs work ...
        for (;;) {
            bzero(buff, MAX);
    
            // read the message from client and copy it in buffer
            int rlen = read(tskcur->tsk_sockfd, buff, sizeof(buff));
    
            // print buffer which contains the client contents
            printf("From client: %s\t To client : ", buff);
            bzero(buff, MAX);
    
            // copy server message in the buffer
    // NOTE/BUG: this is destroying the data that was
    #if ORIG
            n = 0;
            while ((buff[n++] = getchar()) != '\n');
    #endif
    
            // and send that buffer to client
            write(tskcur->tsk_sockfd, buff, rlen);
    
            // if msg contains "Exit" then server exit and chat ended.
            if (strncmp("exit", buff, 4) == 0) {
                printf("Server Exit...\n");
                break;
            }
    
            // echo message to all other clients
            tsksendall(buff,rlen);
        }
    
        tsklock(tskcur);
        tskcur->tsk_state = TSKSTATE_DONE;
        close(tskcur->tsk_sockfd);
        tskcur->tsk_sockfd = -1;
        tskunlock(tskcur);
    
        return (void *) 0;
    }
    
    // Driver function
    int
    main(void)
    {
        int sockfd, connfd;
        socklen_t len;
        struct sockaddr_in servaddr, cli;
    
        int state;
        tsk_t *tsktry;
        tsk_t *tsknew;
    
        // socket create and verification
        sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd == -1) {
            printf("socket creation failed...\n");
            exit(0);
        }
        else
            printf("Socket successfully created..\n");
    
        // assign IP, PORT
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = INADDR_ANY;
        servaddr.sin_port = htons(PORT);
    
        // Binding newly created socket to given IP and verification
        if ((bind(sockfd, (SA *) & servaddr, sizeof(servaddr))) != 0) {
            printf("socket bind failed...\n");
            exit(0);
        }
        else
            printf("Socket successfully binded..\n");
    
        // Now server is ready to listen and verification
        if ((listen(sockfd, TSKMAX)) != 0) {
            printf("Listen failed...\n");
            exit(0);
        }
        else
            printf("Server listening..\n");
    
        // Accept the data packet from client and verification
        len = sizeof(cli);
    
        for (TSKFORALL(tsktry)) {
            pthread_mutex_init(&tsktry->tsk_mutex,NULL);
            tsktry->tsk_state = TSKSTATE_IDLE;
        }
    
        while (1) {
            connfd = accept(sockfd, (SA *) &cli, &len);
            if (connfd < 0) {
                printf("server acccept failed...\n");
                exit(0);
            }
            else
                printf("server acccept the client..%d.\n", connfd);
    
            // reap all completed threads
            tskreapall();
    
            // find an idle slot
            tsknew = NULL;
            for (TSKFORALL(tsktry)) {
                tsklock(tsktry);
                state = tsktry->tsk_state;
    
                if (state == TSKSTATE_IDLE) {
                    tsknew = tsktry;
                    tsknew->tsk_state = TSKSTATE_PENDING;
                }
    
                tskunlock(tsktry);
    
                if (tsknew != NULL)
                    break;
            }
            tsknew->tsk_xid = ++tskxid;
            tsknew->tsk_sockfd = connfd;
    
            if (pthread_create(&tsknew->tsk_tid, NULL, func, tsknew) < 0) {
                perror("could not create thread");
                return 1;
            }
        }
    
        // After chatting close the socket
        close(sockfd);
    
        return 0;
    }