Search code examples
c++multithreadingtimerpthreads

C++ callback timer implementation


I have found the following implementation for a callback timer to use in my c++ application. However, this implementation requires me to "join" the thread from the start caller, which effectively blocks the caller of the start function.

What I really like to do is the following.

  1. someone can call foo(data) multiple times and store them in a db.
  2. whenever foo(data) is called, it initiates a timer for few seconds.
  3. while the timer is counting down, foo(data) can be called several times and multiple items can be stored, but doesn't call erase until timer finishes
  4. whenever the timer is up, the "remove" function is called once to remove all the records from the db.

Bascially I want to be able to do a task, and wait a few seconds and batch do a single batch task B after a few seconds.

class CallBackTimer {

public:

    /**
     * Constructor of the CallBackTimer
     */
    CallBackTimer() :_execute(false) { }

    /**
     * Destructor
     */
    ~CallBackTimer() {
        if (_execute.load(std::memory_order_acquire)) {
            stop();
        };
    }

    /**
     * Stops the timer
     */
    void stop() {
        _execute.store(false, std::memory_order_release);
        if (_thd.joinable()) {
            _thd.join();
        }
    }

    /**
     * Start the timer function
     * @param interval Repeating duration in milliseconds, 0 indicates the @func will run only once
     * @param delay Time in milliseconds to wait before the first callback
     * @param func Callback function
     */
    void start(int interval, int delay, std::function<void(void)> func) {
        if(_execute.load(std::memory_order_acquire)) {
            stop();
        };
        _execute.store(true, std::memory_order_release);


        _thd = std::thread([this, interval, delay, func]() {
            std::this_thread::sleep_for(std::chrono::milliseconds(delay));
            if (interval == 0) {
                func();
                stop();
            } else {
                while (_execute.load(std::memory_order_acquire)) {
                    func();
                    std::this_thread::sleep_for(std::chrono::milliseconds(interval));
                }
            }
        });

    }

    /**
     * Check if the timer is currently running
     * @return bool, true if timer is running, false otherwise.
     */
    bool is_running() const noexcept {
        return ( _execute.load(std::memory_order_acquire) && _thd.joinable() );
    }


private:
    std::atomic<bool> _execute;
    std::thread _thd;

};

I have tried modifying the above code using the thread.detach(). However, I am running issues in detached thread not being able to write (erase) from the database..

Any help and suggestions are appreciated!


Solution

  • Rather than using threads you could use std::async. The following class will process the queued strings in order 4 seconds after the last string is added. Only 1 async task will be launched at a time and std::aysnc takes care of all the threading for you.

    If there are unprocessed items in the queue when the class is destructed then the async task stops without waiting and these items aren't processed (but this would be easy to change if its not your desired behaviour).

    #include <iostream>
    #include <string>
    #include <future>
    #include <mutex>
    #include <chrono>
    #include <queue>
    
    class Batcher
    {
    public:
      Batcher()
        : taskDelay( 4 ),
          startTime( std::chrono::steady_clock::now() ) // only used for debugging
      {
      }
    
      void queue( const std::string& value )
      {
        std::unique_lock< std::mutex > lock( mutex );
        std::cout << "queuing '" << value << " at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms\n";
        work.push( value );
        // increase the time to process the queue to "now + 4 seconds"
        timeout = std::chrono::steady_clock::now() + taskDelay;
        if ( !running )
        {
          // launch a new asynchronous task which will process the queue
          task = std::async( std::launch::async, [this]{ processWork(); } );
          running = true;
        }
      }
    
      ~Batcher()
      {
        std::unique_lock< std::mutex > lock( mutex );
        // stop processing the queue
        closing = true;
        bool wasRunning = running;
        condition.notify_all();
        lock.unlock();
        if ( wasRunning )
        {
          // wait for the async task to complete
          task.wait();
        }
      }
    
    private:
      std::mutex mutex;
      std::condition_variable condition;
      std::chrono::seconds taskDelay;
      std::chrono::steady_clock::time_point timeout;
      std::queue< std::string > work;
      std::future< void > task;
      bool closing = false;
      bool running = false;
      std::chrono::steady_clock::time_point startTime;
    
      void processWork()
      {
        std::unique_lock< std::mutex > lock( mutex );
        // loop until std::chrono::steady_clock::now() > timeout
        auto wait = timeout - std::chrono::steady_clock::now();
        while ( !closing && wait > std::chrono::seconds( 0 ) )
        {
          condition.wait_for( lock, wait );
          wait = timeout - std::chrono::steady_clock::now();
        }
        if ( !closing )
        {
          std::cout << "processing queue at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms\n";
          while ( !work.empty() )
          {
            std::cout << work.front() << "\n";
            work.pop();
          }
          std::cout << std::flush;
        }
        else
        {
          std::cout << "aborting queue processing at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms with " << work.size() << " remaining items\n";
        }
        running = false;
      }
    };
    
    int main()
    {
      Batcher batcher;
      batcher.queue( "test 1" );
      std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
      batcher.queue( "test 2" );
      std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
      batcher.queue( "test 3" );
      std::this_thread::sleep_for( std::chrono::seconds( 2 ) );
      batcher.queue( "test 4" );
      std::this_thread::sleep_for( std::chrono::seconds( 5 ) );
      batcher.queue( "test 5" );
    }