Search code examples
csynchronizationsemaphorebarrier

How to synchronize child processes with each other using semaphores?


I have N number of childs that needs to do some work in a loop while being synchronized with each other at the same time. Namely, if a child process is at its i'th iteration, all the other childs should be at i'th iteration. I need to synchronize them with semaphores but I can't find how to do it. This is the code I wrote:

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/sem.h>

void sem_signal(int semid, int val) {
    struct sembuf semaphore;
    semaphore.sem_num = 0;
    semaphore.sem_op = val;
    semaphore.sem_flg = 0;
    semop(semid, &semaphore, 1);
}

void sem_wait(int semid, int val) {
    struct sembuf semaphore;
    semaphore.sem_num = 0;
    semaphore.sem_op = (-1 * val);
    semaphore.sem_flg = 0;
    semop(semid, &semaphore, 1);
}

int main() {
    int sem_worker = semget(1, 1, 0700 | IPC_CREAT);
    semctl(sem_worker, 0, SETVAL, 0);
    int process_index = 0;
    int N = 4, pid;

    for (process_index = 0; process_index < N; process_index++) {
        pid = fork();
        if (pid == -1) {
            printf("ERROR: cannot fork!\n");
            return EXIT_FAILURE;
        }
        if (pid == 0)
            break;
    }
    if (pid!=0) // parent
        pause();

    else {
        int i = 0;
        while (i < 3) {
            printf("process %d: i: %d\n", process_index, i);
            sem_signal(sem_worker, 1); // increase the semaphore by one
            sem_wait(sem_worker, N);   // wait for all the other childs
            i += 1;
        }
    }
}

But when I run it, it can't continue after the first iteration.

process 0: i: 0
process 1: i: 0
process 3: i: 0
process 2: i: 0
process 0: i: 1

I understand why this happens. It's because one of the processes makes the semaphore 0 and continue to next iteration but all the other ones still waits. So how should I write my code to solve this problem?

P.S: I have taken sem_signal and sem_wait functions from somewhere else so I'm not sure how it works but I'm sure that they are working correctly. For example, if I write sem_wait(my_sem, num_of_children) in parent to wait all the child processes and increase my_sem by 1 in childs when they finish, it works.


Solution

  • As it is mentioned in the comments, you can create a barrier using semaphores and use it to synchronize your processes. You need to create your barrier in a shared memory and set a non-zero value for your semaphores' pshared parameter to share it among processes:

    #include <semaphore.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <string.h>
    #include <sys/shm.h>
    #include <signal.h>
    #include <unistd.h>
    
    
    typedef struct {
        int n;
        int count;
        sem_t mutex;
        sem_t turnstile;
        sem_t turnstile2;
    } barrier_t;
    
    void init_barrier(barrier_t *barrier, int n)
    {
        barrier->n = n;
        barrier->count = 0;
        sem_init(&barrier->mutex, 1, 1); // second parameter is pshared
        sem_init(&barrier->turnstile, 1, 0);
        sem_init(&barrier->turnstile2, 1, 0);
    }
    
    void phase1(barrier_t *barrier)
    {
        sem_wait(&barrier->mutex);
        if (++barrier->count == barrier->n) {
            int i;
            for (i = 0; i < barrier->n; i++) {
                sem_post(&barrier->turnstile);
            }
        }
        sem_post(&barrier->mutex);
        sem_wait(&barrier->turnstile);
    }
    
    void phase2(barrier_t *barrier)
    {
        sem_wait(&barrier->mutex);
        if (--barrier->count== 0) {
            int i;
            for (i = 0; i < barrier->n; i++) {
                sem_post(&barrier->turnstile2);
            }
        }
        sem_post(&barrier->mutex);
        sem_wait(&barrier->turnstile2);
    }
    
    void wait_barrier(barrier_t *barrier)
    {
        phase1(barrier);
        phase2(barrier);
    }
    
    int shmid, KEYSHM=123456;
    
    int main(int argc, char const* argv[]) {
        barrier_t* barrier;
        shmid = shmget(KEYSHM, sizeof(barrier_t), 0700 | IPC_CREAT);
        barrier = (barrier_t*) shmat(shmid, 0, 0);
        int N = 4;
        init_barrier(barrier, N);
        shmdt(barrier);
    
    
        int process_index, pid;
    
        for (process_index = 0; process_index < N; process_index++) {
            pid = fork();
            if (pid == -1) {
                printf("ERROR: cannot fork!\n");
                return EXIT_FAILURE;
            }
            if (pid == 0)
                break;
        }
        if (pid != 0) // parent
            pause();
        else {
            int i = 0;
            while (i < 3) {
                barrier = (barrier_t*) shmat(shmid, 0, 0);
                printf("process %d: i: %d\n", process_index, i);
                i += 1;
                wait_barrier(barrier);
                shmdt(barrier);
            }
    
            if (process_index == 3){
                kill(getppid(), SIGKILL);
            }
        }
    }
    
    process 0: i: 0
    process 1: i: 0
    process 2: i: 0
    process 3: i: 0
    process 2: i: 1
    process 3: i: 1
    process 0: i: 1
    process 1: i: 1
    process 3: i: 2
    process 2: i: 2
    process 0: i: 2
    process 1: i: 2