Search code examples
c++multithreadingmutexace

How do ACE mutexes work and why does the ACE thread mutex work differently?


This question is about the use of ACE mutexes. Below is a rather lengthy minimal example, which works as follows:

The code starts several threads and stops them after 10 seconds. Each of the threads tries to acquire a mutex and keeps it until its termination. There are two kinds of threads. The first kind uses a global variable while the second kind uses a local variable. All mutexes have the same name. Since the threads share a single process, I expect that one acquires the mutex while the others must wait until this thread terminates.

Observation: While the global mutex always works as expected, the ACE_Thread_Mutex and ACE_Recursive_Thread_Mutex fail if used as a local variable. The local objects seem to be distinct mutexes despite the shared name. Why is that? Is there a thread mutex which works similar to a process mutex. I should mention that I use ACE on Windows.

My second question is whether a mutex should be an object or a pointer to an object. Is there any difference in behavior? Of course the pointer can be shared, while the object is easier to handle. Hence I prefer objects, but then thread mutexes do not seem to work like process mutexes.

Thank you very much for some insight.

PS: I am aware that I can always emulate a thread mutex using a process mutex with prepended process ID or something similar. This is not what my questions aim at.

#include <iostream>
#include <sstream>

#include <ace/Process_Mutex.h>
#include <ace/Task.h>
#include <ace/Semaphore.h>

#define LOG(msg) std::cout << msg << std::endl;

#define NAME_MUTEX_THREAD "MY_UNIQUE_MUTEX"

// switch between different mutex types
// #define MUTEX_DECL(var, name) ACE_Thread_Mutex var(name);
// #define MUTEX_DECL(var, name) ACE_Recursive_Thread_Mutex var(name);
#define MUTEX_DECL(var, name) ACE_Process_Mutex var(name);
// #define MUTEX_DECL(var, name) ACE_Semaphore var(1, USYNC_THREAD, name);
// #define MUTEX_DECL(var, name) ACE_Semaphore var(1, USYNC_PROCESS, name);

ACE_Thread_Manager * g_pThreadManager;

MUTEX_DECL(g_oMutex, NAME_MUTEX_THREAD);

using namespace std;

std::string getThreadLogPrefix(void)
{
  ACE_thread_t thisThread = ACE_OS::thr_self();
  const int iProcessID = ACE_OS::getpid();
  const int iThreadID = thisThread;

  ostringstream convert;
  convert <<"P"<<iProcessID<<"T"<<iThreadID;

  return convert.str();
}


ACE_THR_FUNC_RETURN threadLocalMutexRace(void * par)
{
  const ACE_thread_t thisThread = ACE_OS::thr_self();
  const string strLog = getThreadLogPrefix() + "_threadLocalMutexRace: ";

  MUTEX_DECL(oMutex, NAME_MUTEX_THREAD);

  LOG(strLog<<"Start");

  LOG(strLog<<"Try to acquire mutex");
  int irc = oMutex.acquire();
  if (irc == -1) {
    LOG(strLog<<"Mutex not acquired (code "<<irc<<").");
    return 0;
  }

  LOG(strLog<<"Mutex acquired (code "<<irc<<").");

  while(ACE_Thread_Manager::instance()->testcancel(thisThread) == 0){
    ;
  }

  LOG(strLog<<"Stop");
  oMutex.release();
  LOG(strLog<<"Mutex released.");

  return 0;
}


ACE_THR_FUNC_RETURN threadMutexRace(void * par)
{
  const ACE_thread_t thisThread = ACE_OS::thr_self();
  const string strLog = getThreadLogPrefix() + "_threadMutexRace: ";

  LOG(strLog<<"Start");

  LOG(strLog<<"Try to acquire mutex");
  int irc = g_oMutex.acquire();
  if (irc == -1) {
    LOG(strLog<<"Mutex not acquired (code "<<irc<<").");
    return 0;
  }

  LOG(strLog<<"Mutex acquired (code "<<irc<<").");

  while(ACE_Thread_Manager::instance()->testcancel(thisThread) == 0){
    ;
  }

  LOG(strLog<<"Stop");
  g_oMutex.release();
  LOG(strLog<<"Mutex released.");

  return 0;
}


int main(int argc, char * argv[])
{
  ACE::init(); 
  g_pThreadManager = new ACE_Thread_Manager();

  const unsigned int uiNumThreadMutexRace = 3;


  if (g_pThreadManager)
  {
    LOG("*******************************************************************");
    LOG("Start threads...");

    for (unsigned int i = 0; i < uiNumThreadMutexRace; ++i)
    {
      g_pThreadManager->spawn((ACE_THR_FUNC)threadMutexRace, nullptr);
    }

    for (unsigned int i = 0; i < uiNumThreadMutexRace; ++i)
    {
      g_pThreadManager->spawn((ACE_THR_FUNC)threadLocalMutexRace, nullptr);
    }

    ACE_OS::sleep(10);

    LOG("Stop threads...");
    g_pThreadManager->cancel_all();
    g_pThreadManager->wait();
    LOG("All threads stopped.");
    LOG("*******************************************************************");
  }

  delete g_pThreadManager;
  g_pThreadManager = nullptr;

  return 0;
}

Solution

  • The name is not an unique identifier for the ACE thread mutexes. You have to make sure that both threads use the same ACE thread mutex instance, by using a local variable both have its own instance which both can lock independently. Normally in C++ you put the mutex as class member so that you can use it in the various operations of that class. I would recommend to read chapter 10 of the C++ Network Programming volume 1 book.