Search code examples
cmultithreadingsegmentation-faultpthreadspython-c-api

Weird error of Python C Extension using Pthreads that RARELY happens


I am writing a Python C Extension for Graph-related computations.

One routine computes the edge list of a certain class of Graphs and since this is "embarassingly" parallelizable, I rewrote it to use a Threadpool.

At first I used OpenMP which was fine, however, I couldn't continue using it since its Threadpool didn't persist between Python calls of the routine.

So, I started to roll my own using the pthreads library.

Now, it almost always works, except:

  • almost all the jobs are almost always executed by a single thread, however, not always, I have encountered a fair distribution a couple of times, so I am not sure if this is an actual problem or if I have to fiddle with the scheduler to get more consistent distribution of workload

  • from time to time I get a segfault. Now, this doesn't happen in the multi-threaded(MT) code, this happens in the single-threaded(ST) code after the MT code. The MT code basically computes offsets into an array which the ST code uses for further processing. And like in 1% of cases, the memory that holds the offsets is not fully initialized, therefore the ST code goes out-of-bounds. I have no idea what causes this rare error. I have a unit test which compares the result to a Python computed result and it passes like 99% of the time, so it can't be that uninitialized memory is more often correct than wrong. Sometimes, the ST code doesn't go even out of bounds, but then creates a huge numpy array so that the interpreter takes quite a long time executing the function call

Well, I used some printfs to see that the offsets memory simply doesn't get fully initialized in the cases where the segfault occurs. Some offsets are then some garbage values in the range of a million or so, hence the out of bound memory access. In the 99% cases the memory is initialized exactly as expected.

The MT code also doesn't execute any Python code, however, it does read from the memory of a numpy array. But as far as I can tell from Python C Extension documentation that shouldn't cause any problems.

I am not quite sure what else to do, the code looks correct, it also works almost always.

I suspect there has to be something wrong with my Threadpool implementation since I didn't have this problem single-threaded or with OpenMP.

The scheduling behaviour also seems strange to me, I would have expected the workload to either be fairly equally distributed all the time OR it would simply be executed by one thread all the time and I messed up a mutex lock somewhere, but again, it changes from time time and it doesn't seem to be related with the error, I have seen the error occur in cases of fair distribution and single distribution.

I am not sure if code would help much at this point, I will provide some if requested.

Thank you.

UPDATE: Here is the code that deals with Job execution and synchronization between worker threads and main thread (I left out some struct declarations since I think they would only clutter up the code)

typedef struct{
    volatile uint capacity;
    volatile void** volatile data;

    volatile uint cursor;
    pthread_mutex_t mutex;
    pthread_cond_t empty_condition;
} Queue;

typedef struct{
    Queue* job_queue;
    void (*job_process)(void*, int ID);

    pthread_mutex_t* volatile barrier_mutex;

    pthread_mutex_t progress_mutex;
    pthread_cond_t progress_condition;
    volatile uint work_in_progress;

    volatile bool terminate;
} SynchronizationHandle;

/* after locking the queue, thread either:
    - executes job from queue asynchronously
    -- this also involves updating and signaling to main thread when job execution is in progress or finished
    - or signals to main thread that queue is empty*/

static bool WorkerThread_do_job(SynchronizationHandle* sync_handle, int ID){  
    Queue* job_queue = sync_handle->job_queue;
    pthread_mutex_lock(&(job_queue->mutex));

    if(job_queue->cursor > 0){
        // grab data from queue and give it free for other threads
        void* job_data = (void*)job_queue->data[job_queue->cursor-1];
        job_queue->cursor--;
        pthread_mutex_unlock(&(job_queue->mutex));

        pthread_mutex_lock(&(sync_handle->progress_mutex));
        sync_handle->work_in_progress++;
        pthread_mutex_unlock(&(sync_handle->progress_mutex));

        sync_handle->job_process(job_data, ID);

        pthread_mutex_lock(&(sync_handle->progress_mutex));
        sync_handle->work_in_progress--;
        if(sync_handle->work_in_progress == 0)
            pthread_cond_signal(&(sync_handle->progress_condition));
        pthread_mutex_unlock(&(sync_handle->progress_mutex));

        return true;
    }
    else{
        pthread_cond_signal(&(job_queue->empty_condition));
        pthread_mutex_unlock(&(job_queue->mutex));
    
        return false;
    }
}

// function for worker threads
// the 2 nested loops correspond to a waiting phase and a work phase
static void* worker_thread(void* arguments){
    WorkerThread* worker_thread = (WorkerThread*) arguments;
    SynchronizationHandle* sync_handle = worker_thread->sync_handle;

    // the outer loop deals with waiting on the main thread to start work or terminate
    while(true){
        // adress is saved to local variable so that main thread can change adress to other mutex without race condition
        pthread_mutex_t* const barrier_mutex = (pthread_mutex_t* const)sync_handle->barrier_mutex;
        // try to lock mutex and go to sleep and wait for main thread to unlock it
        pthread_mutex_lock(barrier_mutex);
        // unlock it for other threads
        pthread_mutex_unlock(barrier_mutex);

        // the inner loop executes jobs from the work queue and checks inbetween if it should terminate
        while(true){
            if(sync_handle->terminate)
                pthread_exit(0);
            if(!WorkerThread_do_job(sync_handle, worker_thread->ID))
                break;
        }
    }
}

typedef struct{
    pthread_t* thread_handles;
    WorkerThread* worker_threads;
    uint worker_threads_count;

    SynchronizationHandle* sync_handle;
    pthread_mutex_t barrier_mutexes[2];
    uint barrier_mutex_cursor;
} ThreadPool;

static void ThreadPool_wakeup_workers(ThreadPool* pool){
    // ASSUMPTION: thread executing already owns pool->barrier_mutexes + pool->barrier_mutex_cursor

    // compute which mutex to switch to next
    uint offset = (pool->barrier_mutex_cursor + 1)%2;

    // lock next mutex before wake up so that worker threads can't get hold of it before main thread
    pthread_mutex_lock(pool->barrier_mutexes + offset);

    // change adress to the next mutex before unlocking previous mutex, otherwise race condition
    pool->sync_handle->barrier_mutex = pool->barrier_mutexes + offset;

    // unlocking the previous mutex "wakes up" the worker threads since they are trying to lock it
    // hence why the assumption needs to hold
    pthread_mutex_unlock(pool->barrier_mutexes + pool->barrier_mutex_cursor);
    
    pool->barrier_mutex_cursor = offset;
}

static void ThreadPool_participate(ThreadPool* pool){
    while(WorkerThread_do_job(pool->sync_handle, 0));
}

static void ThreadPool_waiton_workers(ThreadPool* pool){
    // wait until queue is empty
    pthread_mutex_lock(&(pool->sync_handle->job_queue->mutex));
    while(pool->sync_handle->job_queue->cursor > 0)
        pthread_cond_wait(&(pool->sync_handle->job_queue->empty_condition), &(pool->sync_handle->job_queue->mutex));
    pthread_mutex_unlock(&(pool->sync_handle->job_queue->mutex));

    // wait until all work in progress is finished
    pthread_mutex_lock(&(pool->sync_handle->progress_mutex));
    while(pool->sync_handle->work_in_progress > 0)
        pthread_cond_wait(&(pool->sync_handle->progress_condition), &(pool->sync_handle->progress_mutex));
    pthread_mutex_unlock(&(pool->sync_handle->progress_mutex));
}

Solution

  • The code presented has good bones, but it nevertheless has numerous synchronization issues.

    It contains some probable data races:

    • In worker_thread():

      • pointer sync_handle->barrier_mutex is accessed (read) without mutex protection or other effective synchronization. It looks like this pointer will be written by ThreadPool_wakeup_workers(), so a data race occurs if ThreadPool_wakeup_workers() is called while any worker threads are alive.

      • sync_handle->terminate is accessed (read) without mutex protection or other effective synchronization. I don't see any code that writes to this object, but I presume that such writes do take place somewhere, while worker threads are alive, which creates a data race. Maybe you just want to make this one _Atomic.

      I assume that *worker_thread is not accessed by other threads (via a different pointer) while worker_thread() is running, else there would be more.

    Also some other possible data races:

    • WorkerThread_do_job() reads pointer sync_handle->job_queue without mutex protection or any other effective synchronization. If the pointer is also written while any of the worker threads are alive, then that creates a data race.

    • WorkerThread_do_job() reads pointer sync_handle->job_process without mutex protection or any other effective synchronization. If the pointer is also written while any of the worker threads are alive, then that creates a data race.

    • ThreadPool_wakeup_workers() accesses pool->barrier_mutex_cursor and pool->barrier_mutexes, apparently without a consistent choice of mutex to protect that access. If any other thread writes to these members during the lifetime of the thread pool management thread then that may create a data race.

    • ThreadPool_waiton_workers() reads pointers pool->sync_handle and pool->sync_handle->job_queue without mutex protection or any other effective synchronization. If either of these is also written by another thread while the pool management thread is alive, then that creates a data race.

    Additionally,

    • Your double-mutex barrier is potentially racy (but does not in this way have a data race per se) in that it assumes that all the worker threads are in fact blocked trying to acquire the current barrier mutex. It's not clear who calls this function or when, but if it doesn't do anything to ensure that the workers have in fact all reached the barrier mutex first, then the workers can fall out of step with each other.

      I don't particularly like the double-mutex approach in general. I think a better and more reliable barrier could be constructed from one mutex, a condition variable, and a couple of int variables. Basically, you want the threads involved to be able to determine whether to proceed based on data, rather than based on acquiring a mutex. Or at least you want the pool manager to be able to make such determination.

    • WorkerThread_do_job() is racy with respect to ThreadPool_waiton_workers() in that the decrement of job_queue->cursor and increment of sync_handle->work_in_progress are not performed as an atomic unit. If a thread in ThreadPool_waiton_workers() runs between those events, then it can see the queue cursor and progress counter both at zero, and therefore think that all the workers are done, when in fact, one or more workers are instead about to increment the progress counter.

    • In worker_thread(), a pointer of type pthread_mutex_t * const is passed to pthread_mutex_lock() and pthread_mutex_unlock(), which expect instead a parameter of type pthread_mutex_t *. Your compiler should be warning about that. This is semantically incorrect because one must suppose that these two functions modify the mutex to which the argument points, so a pointer to const data is not a suitable argument.

    • As I wrote in comments, volatile has no role to play in multithreaded programming.


    None of that directly addresses the specific issues you raised, that one thread usually takes all the jobs, and that occasionally the single-threaded code running after completion of the multi-threaded workload fails with a segfault.

    With respect to the former, I speculate that the job_process() function does very little work per call, in which case one thread might be able to cycle the inner loop in worker_thread() many times while mostly blocking other threads by repeatedly acquiring (and releasing) the various mutexes needed. Pthreads mutexes do not ensure fair scheduling by default, so if a thread is quick enough, it can release a mutex and then be the next thread to acquire it, even though other threads are also contending for it. This is in fact reasonably likely when the time between release and attempted reacquisition is very short, and such mutex-hogging is a known behavior in such cases.

    If your job_process() is indeed so fine grained, however, then that's a deeper problem. Fair scheduling for the mutex would probably subject you to high overhead from all the mutex operations. One possible mitigation in that case would be to make job_process() do more work per call.

    With respect to the segfaults, I speculate that the memory accesses by job_process() are not (reliably) synchronized relative to those of the thread that continues past the parallel section, so that there is a data race there, too.