Search code examples
c++boost-threadactive-objects

threading-related active object design questions (c++ boost)


I would like some feedback regarding the IService class listed below. From what I know, this type of class is related to the "active-object" pattern. Please excuse/correct if I use any related terminology incorrectly. Basically the idea is that the classes using this active object class need to provide a start and a stop method which control some event loop. This event loop could be implemented with a while loop or with boost asio etc.

This class is responsible for starting a new thread in a non-blocking manner so that events can be handled in/by the new thread. It must also handle all clean-up related code. I first tried an OO approach in which subclasses were responsible for overriding methods to control the event loop but the cleanup was messy: in the destructor calling the stop method resulted in a pure virtual function call in cases where the calling class had not manually called the stop method. The templated solution seems to be a lot cleaner:

template <typename T>
class IService : private boost::noncopyable
{
    typedef boost::shared_ptr<boost::thread> thread_ptr;
public:

  IService()
  {
  }

  ~IService()
  {
    /// try stop the service in case it's running
    stop();
  }

  void start()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);

    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      // already running
      return;
    }

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService::main, this)));

    // need to wait for thread to start: else if destructor is called before thread has started

    // Wait for condition to be signaled and then
    // try timed wait since the application could deadlock if the thread never starts?
    //if (m_startCondition.timed_wait(m_threadMutex, boost::posix_time::milliseconds(getServiceTimeoutMs())))
    //{
    //}
    m_startCondition.wait(m_threadMutex);

    // notify main to continue: it's blocked on the same condition var
    m_startCondition.notify_one();
  }

  void stop()
  {
    // trigger the stopping of the event loop
    m_serviceObject.stop();

    if (m_pServiceThread)
    {
      if (m_pServiceThread->joinable())
      {
        m_pServiceThread->join();
      }
      // the service is stopped so we can reset the thread
      m_pServiceThread.reset();
    }
  }

private:
  /// entry point of thread
  void main()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);
    // notify main thread that it can continue
    m_startCondition.notify_one();

    // Try Dummy wait to allow 1st thread to resume???
    m_startCondition.wait(m_threadMutex);

    // call template implementation of event loop
    m_serviceObject.start();
  }

  /// Service thread
  thread_ptr m_pServiceThread;
  /// Thread mutex
  mutable boost::mutex m_threadMutex;
  /// Condition for signaling start of thread
  boost::condition m_startCondition;

  /// T must satisfy the implicit service interface and provide a start and a stop method
  T m_serviceObject;
};

The class could be used as follows:

class TestObject3
{
public:
  TestObject3()
      :m_work(m_ioService),
      m_timer(m_ioService, boost::posix_time::milliseconds(200))
  {
      m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error));
  }

  void start()
  {
      // simple event loop
      m_ioService.run();
  }

  void stop()
  {
      // signal end of event loop
      m_ioService.stop();
  }

  void doWork(const boost::system::error_code& e)
  {
      // Do some work here
      if (e != boost::asio::error::operation_aborted)
      {
      m_timer.expires_from_now( boost::posix_time::milliseconds(200) );
      m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error));
      }
  }

private:
  boost::asio::io_service m_ioService;
  boost::asio::io_service::work m_work;
  boost::asio::deadline_timer m_timer;
};

Now to my specific questions:

1) Is the use of the boost condition variable correct? It seems like a bit of a hack to me: I wanted to wait for the thread to be launched so I waited on the condition variable. Then once the new thread has launched in the main method, I again wait on the same condition variable to allow the initial thread to continue. Then once the start method of the initial thread is exited, the new thread can continue. Is this ok?

2) Are there any cases in which the thread would not get launched successfully by the OS? I remember reading somewhere that this can occur. If this is possible, I should rather do a timed wait on the condition variable (as is commented out in the start method)?

3) I am aware that of the templated class could not implement the stop method "correctly" i.e. if the event loop fails to stop, the code will block on the joins (either in the stop or in the destructor) but I see no way around this. I guess it is up to the user of the class to make sure that the start and stop method are implemented correctly?

4) I would appreciate any other design mistakes, improvements, etc?

Thanks!


Solution

  • Finally settled on the following:

    1) After much testing use of condition variable seems fine

    2) This issue hasn't cropped up (yet)

    3) The templated class implementation must meet the requirements, unit tests are used to test for correctness

    4) Improvements

    • Added join with lock
    • Catching exceptions in spawned thread and rethrowing in main thread to avoid crashes and to not loose exception info
    • Using boost::system::error_code to communicate error codes back to caller
    • implementation object is set-able

    Code:

    template <typename T>
    class IService : private boost::noncopyable
    {
      typedef boost::shared_ptr<boost::thread> thread_ptr;
      typedef T ServiceImpl;
    public:
      typedef boost::shared_ptr<IService<T> > ptr;
    
      IService()
        :m_pServiceObject(&m_serviceObject)
      {
      }
    
      ~IService()
      {
        /// try stop the service in case it's running
        if (m_pServiceThread && m_pServiceThread->joinable())
        {
          stop();
        }
      }
    
      static ptr create()
      {
        return boost::make_shared<IService<T> >();
      }
    
      /// Accessor to service implementation. The handle can be used to configure the implementation object
      ServiceImpl& get() { return m_serviceObject; }
      /// Mutator to service implementation. The handle can be used to configure the implementation object
      void set(ServiceImpl rServiceImpl)
      {
        // the implementation object cannot be modified once the thread has been created
        assert(m_pServiceThread == 0);
        m_serviceObject = rServiceImpl;
        m_pServiceObject = &m_serviceObject;
      }
    
      void set(ServiceImpl* pServiceImpl)
      {
        // the implementation object cannot be modified once the thread has been created
        assert(m_pServiceThread == 0);
    
        // make sure service object is valid
        if (pServiceImpl)
          m_pServiceObject = pServiceImpl; 
      }
    
      /// if the service implementation reports an error from the start or stop method call, it can be accessed via this method
      /// NB: only the last error can be accessed
      boost::system::error_code getServiceErrorCode() const { return m_ecService; }
    
      /// The join method allows the caller to block until thread completion
      void join()
      {
        // protect this method from being called twice (e.g. by user and by stop)
        boost::mutex::scoped_lock lock(m_joinMutex);
        if (m_pServiceThread && m_pServiceThread->joinable())
        {
          m_pServiceThread->join();
          m_pServiceThread.reset();
        }
      }
    
      /// This method launches the non-blocking service
      boost::system::error_code start()
      {
        boost::mutex::scoped_lock lock(m_threadMutex);
    
        if (m_pServiceThread && m_pServiceThread->joinable())
        {
          // already running
          return boost::system::error_code(SHARED_INVALID_STATE, shared_category);
        }
    
        m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService2::main, this)));
        // Wait for condition to be signaled
        m_startCondition.wait(m_threadMutex);
    
        // notify main to continue: it's blocked on the same condition var
        m_startCondition.notify_one();
        // No error
        return boost::system::error_code();
      }
    
      /// This method stops the non-blocking service
      boost::system::error_code stop()
      {
        // trigger the stopping of the event loop
        //boost::system::error_code ec = m_serviceObject.stop();
        assert(m_pServiceObject);
        boost::system::error_code ec = m_pServiceObject->stop();
        if (ec)
        {
          m_ecService = ec;
          return ec;
        }
    
        // The service implementation can return an error code here for more information
        // However it is the responsibility of the implementation to stop the service event loop (if running)
        // Failure to do so, will result in a block
        // If this occurs in practice, we may consider a timed join?
        join();
    
        // If exception was thrown in new thread, rethrow it.
        // Should the template implementation class want to avoid this, it should catch the exception
        // in its start method and then return and error code instead
        if( m_exception )
          boost::rethrow_exception(m_exception);
    
        return ec;
      }
    
    private:
      /// runs in it's own thread
      void main()
      {
        try
        {
          boost::mutex::scoped_lock lock(m_threadMutex);
          // notify main thread that it can continue
          m_startCondition.notify_one();
          // Try Dummy wait to allow 1st thread to resume
          m_startCondition.wait(m_threadMutex);
    
          // call implementation of event loop
          // This will block
          // In scenarios where the service fails to start, the implementation can return an error code
          m_ecService = m_pServiceObject->start();
    
          m_exception = boost::exception_ptr();
        } 
        catch (...)
        {
          m_exception = boost::current_exception();
        }
      }
    
      /// Service thread
      thread_ptr m_pServiceThread;
      /// Thread mutex
      mutable boost::mutex m_threadMutex;
      /// Join mutex
      mutable boost::mutex m_joinMutex;
      /// Condition for signaling start of thread
      boost::condition m_startCondition;
    
      /// T must satisfy the implicit service interface and provide a start and a stop method
      T m_serviceObject;
      T* m_pServiceObject;
      // Error code for service implementation errors
      boost::system::error_code m_ecService;
    
      // Exception ptr to transport exception across different threads
      boost::exception_ptr m_exception;
    };
    

    Further feedback/criticism would of course be welcome.