Search code examples
clinuxpthreads

Periodically trigger pthread workers and wait for completion


I would like to create a set of N pthreads under control of the original process. I'd like to control them as in this pseudocode:

create_n_threads();

While(1) {
    main task modifies a global variable "phase" to control the type of work
    trigger the N threads to wake up and do work based on the global "phase" variable
    wait until all threads have completed their tasks
    main task does meta-calculation on the partial results of all the workers
}

I've tried pthread_barrier_wait(). It works well to trigger a compute cycle, but it doesn't give me a way to know when every task has completed. How do I know when all the threads are done so that I can safely do my meta-calculation on the result? I don't want to use pthread_join, because these work cycles will be in a tight loop and I don't want the overhead of killing and recreating the tasks on each cycle.

#include <stdio.h>
#include <stdlib.h> 
#include <pthread.h> 
#include <unistd.h>     // for sleep()

#define NTHREADS 4
pthread_barrier_t b;

typedef struct threadargs {
int id;             // thread ID 0-\>N
int phase;          // either 0 or non zero to set blk/red phase
} THREADARGS;

int phase=0;
int cycle=0;

// worker function
// gets called with a pointer to THREADARGS structure
// which tells worker their id, starting, ending column to relax, and the red/black phase

void *thread_func(void *x)
{
int tid;                    // thread id

    int *retval = (int *) malloc(sizeof(int));   // so it persists across thread death
    tid = ((THREADARGS *) x)->id;
    
    while(1) {                  // wait to be triggered
        printf("%d: %d %d\n", cycle, tid, phase);
        pthread_barrier_wait(&b);
    }
    
    *retval = 2*tid;
    pthread_exit((void *)retval);

}

int main(int argc, char *argv[])
{
pthread_t threadids[NTHREADS];      // store os thread ids
THREADARGS thread_args[NTHREADS];           // arguments to thread
int rc, i;

    // initialize the multiprocess barrier 
    pthread_barrier_init(&b, NULL, NTHREADS+1);
    
    /* spawn the threads */
    for (i = 0; i < NTHREADS; ++i) {
        thread_args[i].id = i;
        printf("spawning thread %d\n", i);
        if((rc=pthread_create(&threadids[i], NULL, thread_func, (void *) &thread_args[i]))!=0) {
            fprintf(stderr, "cannot create thread %d\n",i);
            exit(8);
        };
    }
    
    for (i=0; i<10; i++) {              // do ten iterations
        printf("cycle %d\n", i);
        phase=(phase+1)%3;
        cycle++;
        pthread_barrier_wait(&b);       // trigger all the workers and wait for all to complete
    }
    
    exit(2);    // just kill everything

}

This example produces output like:

!) pthread
spawning thread 0
spawning thread 1
spawning thread 2
spawning thread 3
0: 0 0
0: 1 0
cycle 0
1: 2 1
1: 3 1
1: 3 1
1: 1 1
1: 2 1
1: 0 1
cycle 1
cycle 2
3: 1 0
3: 3 0
3: 2 0
3: 0 0
3: 0 0
3: 2 0
3: 3 0
cycle 3
3: 1 0
4: 1 1
4: 2 1
4: 0 1
4: 3 1

You can see that some workers are running multiple times per cycle and that the "phase" variable doesn't count properly from cycle to cycle. What I want is something like:

cycle 1
1: 0 0
1: 1 0
1: 2 0
1: 3 0
cycle 2
2: 0 1
2: 1 1
2: 2 1
2: 2 1
cycle 3
3: 0 2
...

Of course the print statements from each task will be scrambled but I want to trigger all 4 pthreads to perform a task "0,1,2" and for them all to finish so that I can work with their results and safely change global variables for the next cycle.


Solution

  • From pthread_barrier_wait

    DESCRIPTION

    The calling thread shall block until the required number of threads have called pthread_barrier_wait() specifying the barrier.

    When the required number of threads have called pthread_barrier_wait() specifying the barrier, the constant PTHREAD_BARRIER_SERIAL_THREAD shall be returned to one unspecified thread and zero shall be returned to each of the remaining threads.

    RETURN VALUE

    Upon successful completion, the pthread_barrier_wait() function shall return PTHREAD_BARRIER_SERIAL_THREAD for a single (arbitrary) thread synchronized at the barrier and zero for each of the other threads. Otherwise, an error number shall be returned to indicate the error.

    Therefore:

    int res = pthread_barrier_wait(barrier);
    if (res == PTHREAD_BARRIER_SERIAL_THREAD) {
        //only one single (arbitrary) thread will reach this point
        //and only if all other threads have reached the barrier
    } else {
       //all others will see this part
    }
    

    With two barriers, you can do the following:

    int done = 0;
    
    void* thread_func(void *arg) {
    
        while (!done) {
    
            //wait until all threads have signal to go
            pthread_barrier_wait(start_barrier);
    
            //all green, lets do the work ...
            //as last operation, store the result in a dedicated global variable
            //as long as only this thread will access it, no need to
            //protect it with a mutex
    
            //after work is done, wait until all the others are done too
            pthread_barrier_wait(stop_barrier);
    
        }
    
    }
    
    int main()
    {
        //all the init stuff (inclusive thread creation)
    
        do {
            
            //do preparations before the threads starts doing their work
            //set global vars etc.
            //e.g. done = 1;
    
            //all threads are waiting for the last thread to get going
            //give green signal
            pthread_barrier_wait(start_barrier);
    
            //while the threads doing their job
            pthread_barrier_wait(stop_barrier);
    
            //at this point, each task has done its job
            //and it is guaranteed, that all data will no longer
            //accessed by any thread, therefore no protection needed
    
            //do the main calculation
    
        } while (!done);
    
    }