Search code examples
c++multithreadingsynchronizationopenmprace-condition

openmp lock reacquired by the same thread while other thread is waiting for it


I'm using an inverted lock as a semaphore to signal a queue update (notice the commented out Sleep(1), it will be used later):

#include <stdio.h>
#include <omp.h>
#include <queue>
#include <stdint.h>
#include <windows.h>

class ThreadLock
{
protected:
  omp_lock_t lock;

public:
  ThreadLock() {
    omp_init_lock(&lock);
  }
  ~ThreadLock() {
    omp_destroy_lock(&lock);
  }

  void acquire() {
    omp_set_lock(&lock);
  }

  void release() {
    omp_unset_lock(&lock);
  }
};

std::queue< uint32_t > g_queue;
ThreadLock g_lock;

void producer()
{
  uint32_t seq = 0;
  g_lock.acquire();
  while (true) {
    Sleep(200);
    #pragma omp critical
      g_queue.push(++seq);
    printf("Produced %u\n", seq);

    g_lock.release();
    //Sleep(1);
    g_lock.acquire();
  }
  g_lock.release();
}

void consumer()
{
  while (true) {
    // Lock if empty
    if (g_queue.empty()) {
      printf("[Consumer] Acquiring lock\n");
      g_lock.acquire();
      g_lock.release();
      printf("[Consumer] Released lock\n");
      if (g_queue.empty()) {
        printf("Still empty\n");
        Sleep(100);
        continue;
      }
    }

    #pragma omp critical
    {
      printf("Consumed %u\n", g_queue.front());
      g_queue.pop();
    }
  }
}

int main(int argc, char* argv[])
{
  #pragma omp parallel sections
  {
    #pragma omp section
      consumer();
    #pragma omp section
      producer();
  }

  return 0;
}

This code contains a race condition, which stalls the consumer after a while like this:

[Consumer] Acquiring lock
Produced 1
Produced 2
[Consumer] Released lock
Consumed 1
Consumed 2
[Consumer] Acquiring lock
Produced 3
Produced 4
Produced 5
Produced 6
Produced 7
Produced 8
Produced 9
Produced 10
Produced 11
Produced 12
Produced 13
Produced 14
Produced 15
Produced 16
Produced 17
Produced 18
Produced 19

It appears the producer thread is rushing through release/acquire without context switching. Fine. Let's force it by uncommenting Sleep(1):

[Consumer] Acquiring lock
Produced 1
[Consumer] Released lock
Consumed 1
[Consumer] Acquiring lock
[Consumer] Released lock
Still empty
[Consumer] Acquiring lock
Produced 2
[Consumer] Released lock
Consumed 2
[Consumer] Acquiring lock
[Consumer] Released lock
Still empty
[Consumer] Acquiring lock
Produced 3
[Consumer] Released lock
Consumed 3

Noticed those Still empty lines? Looks like the consumer manages to stick an extra processing round between the producer's release/acquire lines.

I know that adding another Sleep(1), into consumer thread, solves the problem. But I feel these fixed artificial delays in the code are wrong (Sleep(200) doesn't count, it serves a demonstration purpose only).

How can this be done the right way, with OpenMP and without OpenMP versions higher than 2.0?


Solution

  • There are a couple of issues in your code. You are mixing #pragma omp critical and the lock - which doesn't make much sense. What you actually want is a combination of a lock - for protecting all operations on the queue - and a condition variable - to get notified about insertion of elements. Unfortunately OpenMP does not provide primitives for condition variables. You could also use a counted semaphore on the number of elements in the queue - which is also not available in OpenMP.

    Then there is the issue with starvation, which you try to tackle with sleep - whatever hint for the OS to switch the tasks, it will not be perfect. You might consider using OpenMP tasks + taskyield (but that's not OpenMP 2.0).

    At the end of the day, OpenMP is not a very well suited for this kind of work. OpenMP is more focused at having a 1 thread - 1 core mapping and distributing parallel loops. You can combine OpenMP threads with C++11 std::lock / std::condition_variable. While it will probably work in practice, it is not officially supported by the standard.

    Note: When you protect operations on the queue, you must protect all calls, including g_queue.empty().