Search code examples
cmutexshared-memorycondition-variable

Condition variables and shared memory


I tried to use condition variable (with mutex) inside shared memory to synchronize parent and child after exec. Everything seems to be fine, the child and parent are synchronized. But at random point of the execution, I receive every time the error " Invalid argument " on pthread_cond_wait function.

File test1.c

The code below is the parent that creates mutex, cond variable (inside shared memory) and another block of shared memory for data, and then try to share data with the child.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/shm.h>
#include <string.h>
#include <errno.h>
#include <stdarg.h>     

#define PNUM 17500
#define PLENGHT 60
#define MAXLINE 4096


void err_sys(const char *fmt, ...);
void err_exit(int error, const char *fmt, ...);
static void err_doit(int errnoflag, int error, const char *fmt, va_list ap);

int main (int argc , char *argv[]){

    /* creating new mutex inside shared memory area */
    void *shmem_mu_ptr;             /* pointer to mutex shared memory */         
    pthread_mutex_t *mu;            /* pointer to mutex */
    int shmem_mu_id;                /* shared memory mutex id */        
    int mu_err;
    size_t size = sizeof(pthread_mutex_t);  /* shared m. mutex size (size was already declared )*/
    char packet[PLENGHT];

    for(int i = 0; i < PLENGHT; i++) packet[i] = 'a';

    /* getting shared m. id for mutex*/
    if ( (shmem_mu_id =  shmget((key_t)1111, size , 0666 | IPC_CREAT )) == -1) err_sys("shared memory error ");

    /* getting shared m. pointer for mutex */
    if ((shmem_mu_ptr = shmat(shmem_mu_id, 0, 0)) == (void *)-1) err_sys("shmat error");

    /* make mu, that expects a memory area made like pthread_mutex_t , to point the shared memory. */
    mu = (pthread_mutex_t *)shmem_mu_ptr;

    /* initialize the attribute to prepare mutex to be placed in shared memory */
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);

    /* initialize mutex */
    pthread_mutex_init(mu,&attr);
    pthread_mutexattr_destroy(&attr);


    /* creating new condition variable inside shared memory area */
    void *shmem_cond_ptr;           /* pointer to cond shared memory */
    int shmem_cond_id;              /* shared memory cond id */     
    pthread_cond_t *cond;
    size = sizeof(pthread_cond_t);  /* shared m. cond size (size was already declared )*/

    /* getting shared m. id for cond*/
    if ( (shmem_cond_id =  shmget((key_t)2222, size , 0666 | IPC_CREAT )) == -1) err_sys("shared memory error ");

    /* getting shared m. pointer for cond */
    if ((shmem_cond_ptr = shmat(shmem_cond_id, 0, 0)) == (void *)-1) err_sys("shmat error");

    /* make cond, that expects a memory area made like pthread_cond_t , to point the shared memory. */
    cond = (pthread_cond_t *)shmem_cond_ptr;


    /* * initialize the attribute to prepare cond to be placed in shared memory */
    pthread_condattr_t attrcond;
    pthread_condattr_init(&attrcond);
    pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);

    /* Initialize condition. */
    pthread_cond_init(cond, &attrcond);
    pthread_condattr_destroy(&attrcond);

    /* Clean up. */
    pthread_condattr_destroy(&attrcond); 


    /* creating new shared memory area to share only PLENGHT (60) byte at time */
    void *shmem_ptr60;
    int shmem60;
    char *shmem_char;
    size = sizeof(char) * PLENGHT;   /* shared m. data size (size was already declared )*/

    /* getting shared m. id */
    if ( (shmem60 =  shmget(3333, size , 0666 | IPC_CREAT )) == -1) err_sys("shared memory error ");

    /* getting shared m. pointer */
    if ((shmem_ptr60 = shmat(shmem60, 0, 0)) == (void *)-1) err_sys("shmat error");

    shmem_char = shmem_ptr60;

    /* locking mutex before creating new process to avoid child reading void memory area */
    if( (mu_err = pthread_mutex_lock(mu)) != 0 ) err_exit(mu_err,"lock mutex error");

    /* calling child */
    pid_t pid;
    if( (pid = vfork()) == -1 )err_sys("fork error");
    else if(pid == 0){
        if (execl("./test2","test2",(void*)0) == -1) err_sys("exec error");
        exit(0);
    }
    else if (pid == -1 ) err_sys("fork error");


    /*  copying data inside shared memory */
    for( int i = 0; i < PNUM; i++){
        strcpy(shmem_char, packet);
        printf("data written, iteration number :%d\n",i);
        if ( (mu_err = pthread_cond_signal(cond)) != 0) err_exit(mu_err,"cond signal error: ");
        if ( (mu_err = pthread_cond_wait(cond, mu)) != 0) err_exit(mu_err,"cond wait error"); 
    }
    if ( (mu_err = pthread_mutex_unlock(mu)) != 0) err_exit(mu_err,"mutex unlock error");

    waitpid(pid,NULL,(int)NULL);

    if (shmctl(shmem_mu_id, IPC_RMID, 0) < 0) err_sys("shmctl error");
    if (shmctl(shmem_cond_id, IPC_RMID, 0) < 0) err_sys("shmctl error");
    if (shmctl(shmem60, IPC_RMID, 0) < 0) err_sys("shmctl error");

    return 0;
}


/* from apue to print errors*/
void err_sys(const char *fmt, ...){
    va_list     ap;

    va_start(ap, fmt);
    err_doit(1, errno, fmt, ap);
    va_end(ap);
    exit(1);
}

void err_exit(int error, const char *fmt, ...){
    va_list     ap;

    va_start(ap, fmt);
    err_doit(1, error, fmt, ap);
    va_end(ap);
    exit(1);
}

static void err_doit(int errnoflag, int error, const char *fmt, va_list ap){
    char    buf[MAXLINE];

    vsnprintf(buf, MAXLINE-1, fmt, ap);
    if (errnoflag)
        snprintf(buf+strlen(buf), MAXLINE-strlen(buf)-1, ": %s",
          strerror(error));
    strcat(buf, "\n");
    fflush(stdout);     /* in case stdout and stderr are the same */
    fputs(buf, stderr);
    fflush(NULL);       /* flushes all stdio output streams */
}

File test2.c

Then, child recovers all shared memory areas containing mutex, condition variable and data, and try to read. Code below is the child.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/shm.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <stdarg.h> 

#define PNUM 17500  
#define MAXLINE 4096
#define PLENGHT 60


void err_sys(const char *fmt, ...);
void err_exit(int error, const char *fmt, ...);
static void err_doit(int errnoflag, int error, const char *fmt, va_list ap);



int main (int argc , char *argv[]){
    char packets[PNUM][PLENGHT];


    /* -----recovering mutex inside shared memory */
    pthread_mutex_t *mu;
    int shmem_mu_id;
    void* shmem_mu_ptr;
    int mu_err;

    /* getting shared m. id for data*/
    if ( (shmem_mu_id =  shmget((key_t)1111, 0 , 0666 | IPC_CREAT )) == -1) err_sys("shared memory error ");

    /* getting shared m. pointer for data */
    if ((shmem_mu_ptr = shmat(shmem_mu_id, 0, 0)) == (void *)-1) err_sys("shmat error");

    /* make mu, that expects a memory area made like pthread_mutex_t *, to point the shared memory. */
    mu = (pthread_mutex_t *)shmem_mu_ptr;


    /* -----recovering condition variable inside shared memory */
    void *shmem_cond_ptr;           /* pointer to cond shared memory */
    int shmem_cond_id;              /* shared memory cond id */     
    pthread_cond_t *cond;

    /* getting shared m. id for cond*/
    if ( (shmem_cond_id =  shmget((key_t)2222, 0 , 0666 | IPC_CREAT )) == -1) err_sys("shared memory error ");

    /* getting shared m. pointer for cond */
    if ((shmem_cond_ptr = shmat(shmem_cond_id, 0, 0)) == (void *)-1) err_sys("shmat error");

    /* make cond, that expects a memory area made like pthread_cond_t , to point the shared memory. */
    cond = (pthread_cond_t *)shmem_cond_ptr;

    /* recovering shared memory aread with data */
    int shmem;
    char *shmem_ptr;

    if ( (shmem =  shmget(3333, 0, 0666 )) == -1) err_sys("shared memory error ");
    if ((shmem_ptr = shmat(shmem, 0, 0)) == (void *)-1) err_sys("shmat error child");


    if( (mu_err = pthread_mutex_lock(mu)) != 0 ) err_exit(mu_err,"lock mutex error");

    /* try to read data inside shared memory */
    for (int i = 0; i < PNUM; i++){ 
        strcpy(packets[i],shmem_ptr);
        printf("data read, iteration number :%d\n",i);
        if ( (mu_err = pthread_cond_signal(cond)) != 0) err_exit(mu_err,"cond signal error: ");
        if ( (mu_err = pthread_cond_wait(cond, mu)) != 0) err_exit(mu_err,"cond wait error"); 

    }

    if ( (mu_err = pthread_mutex_unlock(mu)) != 0) err_exit(mu_err,"mutex unlock error");
    return 0;

}




/* from apue to print errors*/
void err_sys(const char *fmt, ...){
    va_list     ap;

    va_start(ap, fmt);
    err_doit(1, errno, fmt, ap);
    va_end(ap);
    exit(1);
}

void err_exit(int error, const char *fmt, ...){
    va_list     ap;

    va_start(ap, fmt);
    err_doit(1, error, fmt, ap);
    va_end(ap);
    exit(1);
}

static void err_doit(int errnoflag, int error, const char *fmt, va_list ap){
    char    buf[MAXLINE];

    vsnprintf(buf, MAXLINE-1, fmt, ap);
    if (errnoflag)
        snprintf(buf+strlen(buf), MAXLINE-strlen(buf)-1, ": %s",
          strerror(error));
    strcat(buf, "\n");
    fflush(stdout);     /* in case stdout and stderr are the same */
    fputs(buf, stderr);
    fflush(NULL);       /* flushes all stdio output streams */
}

The problem appears inside the for loop in the parent: pthread_cond_wait after a random numbers of iterations fail with invalid argument error.

This is the output:

[...]
data written, iteration number :85
data read, iteration number :85
data written, iteration number :86
data read, iteration number :86
data written, iteration number :87
data read, iteration number :87
data written, iteration number :88
data read, iteration number :88
data written, iteration number :89
data read, iteration number :89
data written, iteration number :90
data read, iteration number :90
cond wait error: Invalid argument

The pthread_cond_wait function man page says:

These functions atomically release mutex and cause the calling thread to block on the condition variable cond; atomically here means "atomically with respect to access by another thread to the mutex and then the condition variable".

I'm working with processes, not with threads: can this explain the issue or did I make an error?


Solution

  • Actually there is bug in Macos when conditional variable lives in shared memory and this cond variable may hold pointer to mutex. As shmat() might return random addresses for each of the processes, thus at some point cond variable internals might hold the pointer to "logically" the same Mutex, but with different address, due to shmat() logic. Thus these different addresses confuses the pthread_cond_wait() and it returns EINVAL.

    I was fighting with this for when working on Enduro/X middleware. Basically was also getting random errors when different processes used shared cond variable.

    The solution was to hack the Macos Opaque type pthread_cond_t (extract header infos from the Darwin phtreads library) and reset the cond variable internals to get it more or less functional at pthread_cond_wait() entry.

    The sources for working solution (wrappers) are here: https://github.com/endurox-dev/endurox/blob/master/libnstd/sys_emqueue.c, Also some additional research notes here: https://www.endurox.org/issues/512

    To get explanation for the error, take a look on Darwin source:

    https://github.com/apple/darwin-libpthread/blob/main/src/pthread_cond.c

    int _pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex,
            const struct timespec *abstime, int isRelative,
            pthread_conformance_t conforming)
    ...
    if (!ulock && cond->busy != NULL && cond->busy != mutex) { //<<< Here it compares your mutex ptr with some "busy" field. "busy" is set elsewhere by other process. And it would be set to other process mutex pointer. Thus pointer might, even the other process is using logically the same mutex as for normal pthread_cond_wait() application.
        // TODO: PTHREAD_STRICT candidate
        return EINVAL;
    }
    

    Where the "busy" field at some stage is set in _pthread_psynch_cond_wait() when firstly it did pass these filter.

    Thus solution is to reset the internal busy field to NULL and let _pthread_psynch_cond_wait() do the actual wait logic. As it seems that _pthread_cond_wait() is more or less just some entry wrapper/checker.

    To get access to the internals of the pthread_cond_t, I used the snippet:

    enter code here
    
    typedef struct {
        long sig;
        uint32_t lock;
        uint32_t unused;
        char *busy;
    } ndrx_osx_pthread_cond_t;
    

    Then just prior the pthread_cond_wait() entry, cast the cond variable to ndrx_osx_pthread_cond_t and reset the busy to NULL. Also due to fact the several processes might set the busy field back to some invalid value, my solution was to do it with several attempts.

    All this above works with Macos 10, not sure about Macos 11, did not yet get the access to the machine :)