I have a simple class that handles execution of a callback function on another thread, with a delay, using boost::asio. The delay can be reset, so the class can also work like a watchdog.
The class works, however every so often (~10% of the time), it crashes with a SIGSEGV on class destruction. I can't seem to figure out why. My first though was that maybe this
is no longer valid (as noticed by the this=0x0
in the error trace) but since we're waiting on the thread to join before finishing destroying the class, it should be valid for the whole duration. Or am I mission something?
Thanks for all the help! Below is the class .cpp code & GDB stack trace. Variable types are as follows.
boost::asio::chrono::milliseconds delay_ms_;
boost::asio::io_context io_;
boost::asio::steady_timer timer_;
boost::thread thread_;
std::mutex mutable mutex_;
bool is_running = false;
AsyncExecutor::AsyncExecutor(std::chrono::milliseconds const &delay_ms, Callback callback) :
delay_ms_(delay_ms),
callback_(callback),
io_(),
timer_(io_, delay_ms_)
{
}
AsyncExecutor::~AsyncExecutor() {
// Make sure the timer stops when the object is destroyed & callback is not called.
stop();
}
void AsyncExecutor::start() {
std::lock_guard<std::mutex> lock(mutex_);
if (is_running)
return;
is_running = true;
if (io_.stopped()) {
io_.reset(); // Reset io_service if it has stopped
}
timer_.expires_after(delay_ms_);
timer_.async_wait(boost::bind(&AsyncExecutor::timer_ran, this, boost::asio::placeholders::error));
// Start the io context in a separate thread.
// Once the timer fires, the thread will exit.
thread_ = boost::thread(boost::bind(&boost::asio::io_service::run, &io_));
}
void AsyncExecutor::stop() {
{
std::lock_guard<std::mutex> lock(mutex_);
if (!is_running)
return;
// Cancel the timer.
boost::asio::post(io_, [this]() {
this->timer_.cancel();
});
}
// Finaly wait for the thread to finish.
if (this->thread_.joinable())
this->thread_.join();
std::lock_guard<std::mutex> lock(mutex_);
is_running = false;
}
void AsyncExecutor::reset() {
std::lock_guard<std::mutex> lock(mutex_);
if (!is_running)
return;
// Reset the timer.
boost::asio::post(io_, [this]() {
this->timer_.expires_after(this->delay_ms_);
this->timer_.async_wait(boost::bind(&AsyncExecutor::timer_ran, this, boost::asio::placeholders::error));
});
}
void AsyncExecutor::timer_ran(const boost::system::error_code &ec) {
{
std::lock_guard<std::mutex> lock(mutex_);
is_running = false;
}
// If the timer was cancelled, we don't want to execute the callback or detach the thread.
// Reset also calls this function with ec, so we want to keep the thread attached.
// This complicates things & means that join() must be called in stop() to wait for the thread to finish.
// But if the thread fires the callback, it will detach itself.
if (ec) {
return;
}
if (this->callback_)
this->callback_();
io_.stop();
// Detach the thread, as it is no longer needed. It will be cleaned up by the OS.
this->thread_.detach();
}
bool AsyncExecutor::is_timer_running() const {
std::lock_guard<std::mutex> lock(mutex_);
return is_running;
}
[gdb-7] [New Thread 0x7fff1a7fc000 (LWP 385144)]
[gdb-7] [Thread 0x7fff1a7fc000 (LWP 385075) exited]
[gdb-7]
[gdb-7] Thread 171 "scout_control_u" received signal SIGSEGV, Segmentation fault.
[gdb-7] [Switching to Thread 0x7fff2253c000 (LWP 385067)]
[gdb-7] boost::asio::detail::epoll_reactor::run (this=0x0, usec=<optimized out>, ops=...) at /usr/include/boost/asio/detail/impl/epoll_reactor.ipp:462
[gdb-7] 462 if (timer_fd_ == -1)
[gdb-7] #0 boost::asio::detail::epoll_reactor::run(long, boost::asio::detail::op_queue<boost::asio::detail::scheduler_operation>&)
[gdb-7] (this=0x0, usec=<optimized out>, ops=...)
[gdb-7] at /usr/include/boost/asio/detail/impl/epoll_reactor.ipp:462
[gdb-7] #1 0x00007ffff6f0aef3 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&)
[gdb-7] (this=this@entry=0x7fff1412e5f0, lock=..., this_thread=..., ec=...)
[gdb-7] at /usr/include/boost/asio/detail/impl/scheduler.ipp:465
[gdb-7] #2 0x00007ffff6f0bb01 in boost::asio::detail::scheduler::run(boost::system::error_code&) (this=0x7fff1412e5f0, ec=...)
[gdb-7] at /usr/include/boost/asio/detail/impl/scheduler.ipp:204
[gdb-7] #3 0x00007ffff6f0d552 in boost::asio::io_context::run() (this=<optimized out>)
[gdb-7] at /usr/include/boost/asio/impl/io_context.ipp:63
[gdb-7] #4 0x00007ffff1f570cb in ()
[gdb-7] at /lib/x86_64-linux-gnu/libboost_thread.so.1.74.0
[gdb-7] #5 0x00007ffff7094ac3 in start_thread (arg=<optimized out>)
[gdb-7] at ./nptl/pthread_create.c:442
[gdb-7] #6 0x00007ffff7126850 in clone3 ()
[gdb-7] at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
I was expecting the class to cancel the background task & not throw.
You're mixing io_service (!?) with io_context, using boost::thread instead of std::thread, using a full blown mutex to synchronize on a bool (make it atomic?) that is redundant since the timer knows everything that it needs to know. And with all that, reset()
and stop()
invoke Undefined Behaviour because steady_timer
instances are not thread-safe.
You're joining and detaching the thread for little reason (there's never a reason not to join it here). All in all you're basically implementing a very expensive version of
void on_timeout(Duration d, Callback cb) {
std::thread{[d, cb] {
std::this_thread::sleep_for(d);
cb();
}}.detach();
}
With perhaps the only extra feature of postponing it by calling reset()
while it is still waiting. Consider not worrying about the thread at all:
struct AsyncExecutor {
using Timer = asio::steady_timer;
AsyncExecutor(Duration delay_ms, Callback callback) : delay_ms_(delay_ms), callback_(callback) {}
~AsyncExecutor() {
trace("~dtor()");
auto f = timer_.async_wait(asio::use_future);
do_stop();
f.wait(); // ensure the timer is done
}
void start() { trace("start()"); do_start(false); }
void reset() { trace("reset()"); do_start(true); }
void stop() { trace("stop()"); do_stop(); }
private:
static constexpr Timer::time_point STOPPED = Timer::time_point::min();
asio::steady_timer timer_{make_strand(asio::system_executor{}), STOPPED};
Duration delay_ms_;
Callback callback_;
void do_start(bool require_running) {
dispatch(timer_.get_executor(), [this, require_running] {
if (require_running == (timer_.expiry() != STOPPED)) {
auto num_canceled = timer_.expires_after(delay_ms_);
trace(" [setting delay, canceled " + std::to_string(num_canceled) + "]");
timer_.async_wait(std::bind(&AsyncExecutor::timer_ran, this, _1));
}
});
}
std::future<size_t> do_stop() {
return dispatch(timer_.get_executor(),
asio::use_future([this] { return timer_.expires_at(STOPPED); }));
}
void timer_ran(boost::system::error_code ec) {
trace(" [timer_ran: " + ec.message() + "]");
if (ec != asio::error::operation_aborted) {
timer_.expires_at(STOPPED);
if (callback_)
callback_();
}
}
};
Note that all operations except construction/destruction are thread safe.
Demo program:
#include <boost/asio.hpp>
#include <fmt/core.h>
namespace asio = boost::asio;
using Callback = std::function<void()>;
using Duration = asio::steady_timer::duration;
using namespace std::placeholders;
using namespace std::chrono_literals;
namespace { // logging
constexpr auto now = std::chrono::steady_clock::now;
auto start = now();
void trace(std::string const& msg) { fmt::print("at {:4}ms {}\n", (now() - start) / 1ms, msg); }
} // namespace
struct AsyncExecutor {
using Timer = asio::steady_timer;
AsyncExecutor(Duration delay_ms, Callback callback) : delay_ms_(delay_ms), callback_(callback) {}
~AsyncExecutor() {
trace("~dtor()");
auto f = timer_.async_wait(asio::use_future);
do_stop();
f.wait(); // ensure the timer is done
}
void start() { trace("start()"); do_start(false); }
void reset() { trace("reset()"); do_start(true); }
void stop() { trace("stop()"); do_stop(); }
private:
static constexpr Timer::time_point STOPPED = Timer::time_point::min();
asio::steady_timer timer_{make_strand(asio::system_executor{}), STOPPED};
Duration delay_ms_;
Callback callback_;
void do_start(bool require_running) {
dispatch(timer_.get_executor(), [this, require_running] {
if (require_running == (timer_.expiry() != STOPPED)) {
auto num_canceled = timer_.expires_after(delay_ms_);
trace(" [setting delay, canceled " + std::to_string(num_canceled) + "]");
timer_.async_wait(std::bind(&AsyncExecutor::timer_ran, this, _1));
}
});
}
std::future<size_t> do_stop() {
return dispatch(timer_.get_executor(),
asio::use_future([this] { return timer_.expires_at(STOPPED); }));
}
void timer_ran(boost::system::error_code ec) {
trace(" [timer_ran: " + ec.message() + "]");
if (ec != asio::error::operation_aborted) {
timer_.expires_at(STOPPED);
if (callback_)
callback_();
}
}
};
using std::this_thread::sleep_for;
int main() {
{
trace("--- Main started");
AsyncExecutor job(1500ms, [] {trace("~~~RUNNING TIMED JOB~~~"); });
sleep_for(500ms);
job.reset(); // no effect, not pending
job.start();
sleep_for(200ms);
job.start(); // no effect, already pending
trace("--- Main keeping the job alive");
for (auto i = 0; i < 5; ++i) {
job.reset(); // restarts the wait of 1500ms
sleep_for(1.0s);
}
trace("--- Main loop done");
sleep_for(1.0s); // this allows job to run
job.reset(); // no effect, not pending
job.start();
sleep_for(200ms);
} // destructor cancels the job
trace("--- Main exit");
}
Printing e.g.: