Search code examples
cmultithreadingsemaphore

[Thread sync]Make a print thread wait for n threads to finish a single cycle of execution, then signal the n threads to complete another cycle, repeat


How can I have a single thread wait for n threads to complete a single execution cycle, then repeat this for i cycles. The solution needs to use semaphores to signal the single thread that all n threads have completed an execution cycle. The single thread should then signal ALL n threads that they can carry on with one more execution, ...repeat.

Some challenges I can't fix:

  • Controlling the semaphores so that one of the n threads doesn't eat up more than one cycle (semaphore post/wait).
  • The n number of workers is only known at run time, so we can't initialize a sem_t array[n].

:

//these would have been read in at run time
int n = 4;
int control = 4;

void *worker(void* args){
    int num = (int)args;
    printf("Worker id: %d\n", num);
    int proceed = 5; // simulates "5" jobs in this workers queue
    while(proceed > 0){
        sem_wait(&execute);
        printf("Report from: %d\n", num);
        sem_post(&reports);
        proceed--;
    }
    control--;
} 

void *print(){
    while(control > 0){
        for(int i = 0; i < n; i++) {
            sem_wait(&reports);
        }
        printf("All reported\n");
        for(int i = 0; i < n; i++) {
            sem_post(&execute);
        }
    }
}

void main(){
    // control/n would have been scanned and passed to threads, the global control var 
    would be set after reading in n
    sem_init(&execute,0,n);    //initialization of the first semaphore
    sem_init(&reports,0,0);    //initialization of the second semaphore

    pthread_t printer;
    pthread_create(&printer,NULL, &print,NULL);

    pthread_t workers[n];
    for(int i = 0; i < n; i++) {
        int* num = malloc(sizeof(int));
        num = i;
        pthread_create(&(workers[i]),NULL, &worker, (void*)num);
    }
    for(int i = 0; i < n; i++) {
        pthread_join(workers[i],NULL);
    }

    pthread_join(printer,NULL);
}

Output:

[rnitz]$ gcc -Wall -Werror -w -g example2.c -std=gnu99 -lpthread 
[rnitz]$ ./a.out
Worker id: 0
Worker id: 3
Report from: 3
Report from: 3
Worker id: 2
Worker id: 1
Report from: 3
Report from: 0
All reported
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 0
Report from: 1
Report from: 3
Report from: 2
All reported
Report from: 0
Report from: 0
Report from: 1
Report from: 2
All reported
Report from: 2
Report from: 1
Report from: 1
Report from: 2
All reported
[rnitz]$ 

Pretty obvious issue where some workers are eating up all the available "cycle" room in the semaphore.

Note: This question is based on simulating a cpu, where n is the number of cpu threads that will work on an their own independent job queue; while a single print thread handles printing the current job being processed for a given cpu cycle. The print thread will:

  1. Wait for all cpus to finish a cycle
  2. Print the current job on each cpu
  3. Signal the n cpus to complete another cycle.

Solution

  • Solution: sem_t* array of semaphores that are malloc'd at run time.. I couldn't find an example of this anywhere but ended up trying, and having it work :)

    I'm keeping the question because I can't find anything similar on SOF, understandably waiting for n threads to synch on every cycle probably isn't common

    For this, you can use arrays of semaphores instead of incrementing/decrementing a single semaphore. At minimum you will need at least one semaphore array, this solution uses 2.

    This solution can be extended if the worker threads each have a different amount of jobs to complete, you can have a global int that decrements when a thread is finished, which would be then be used in the for-loops of the printer thread. That way if 1 of the n threads is done early due to less work, you can sem_wait()/sem_post n-1 times.

    sem_t* execute_arr;
    sem_t* reports_arr;
    
    //this both need to be the same value
    int n = 4;
    int control = 4;
    
    void *worker(void* args){
        int num = (int)args;
        //printf("init arg: %d\n", num);
        int proceed = 5;
        while(proceed > 0){
            sem_wait(&execute_arr[num]);
            printf("Report from: %d\n", num);
            sem_post(&reports_arr[num]);
            proceed--;
        }
        control--;
        //free memory and/or destroy sems if needed
    } 
    
    void *print(){
        while(control > 0){
            for(int i = 0; i < n; i++) {
                sem_wait(&reports_arr[i]);
            }
            printf("All reported\n");
            for(int i = 0; i < n; i++) {
                sem_post(&execute_arr[i]);
            }
        }
    }
    
    void main(){
        execute_arr = (sem_t*)malloc(n * sizeof(sem_t));
        reports_arr = (sem_t*)malloc(n * sizeof(sem_t));
        for(int i = 0; i < n; i++) {
            sem_init(&execute_arr[i], 0, 1);
            sem_init(&reports_arr[i], 0, 0);
        }
    
        pthread_t printer;
        pthread_create(&printer,NULL, &print,NULL);
    
        pthread_t workers[n];
        for(int i = 0; i < n; i++) {
            int* num = malloc(sizeof(int));
            num = i;
            pthread_create(&(workers[i]),NULL, &worker, (void*)num);
        }
        for(int i = 0; i < n; i++) {
            pthread_join(workers[i],NULL);
        }
    
        pthread_join(printer,NULL);
        //free memory and/or destroy sems if needed
    }
    

    And the fixed output:

    [rnitz]$ ./a.out
    Report from: 1
    Report from: 0
    Report from: 2
    Report from: 3
    All reported
    Report from: 1
    Report from: 3
    Report from: 0
    Report from: 2
    All reported
    Report from: 1
    Report from: 3
    Report from: 2
    Report from: 0
    All reported
    Report from: 1
    Report from: 3
    Report from: 2
    Report from: 0
    All reported
    Report from: 1
    Report from: 3
    Report from: 0
    Report from: 2
    All reported
    [rnitz]$