Search code examples
c++multithreadingmemorytimeoutvalgrind

C++ detached thread memory access errors


I'm trying to implement cooperative function timeout cancellation in C++, following recommendations in this answer. The idea is simple: if executing in separate thread function exceedes its allowed time limit, it terminates without writing to shared resource. Since its thread wouln't be used anymore, it gets detached.

Here's the code:

#include <cassert>
#include <chrono>
#include <condition_variable>
#include <future>
#include <memory>
#include <thread>

class Inner
{
      public:
    Inner()
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    };
    ~Inner(){};
};

static std::unique_ptr<std::runtime_error>
func(std::stop_token stoken, std::shared_ptr<Inner> obj)
{
    try {
        std::shared_ptr<Inner> new_obj(new Inner());
        if (!stoken.stop_requested()) {
            obj.swap(new_obj);
        }
        return std::unique_ptr<std::runtime_error>(nullptr);
    } catch (const std::runtime_error &err) {
        return std::unique_ptr<std::runtime_error>(new std::runtime_error(err.what()));
    }
}

class Wrapper
{
      private:
    std::shared_ptr<Inner> obj = nullptr;

      public:
    Wrapper(){};
    ~Wrapper(){};

    void
    cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
    {
        std::packaged_task<std::unique_ptr<std::runtime_error>(std::stop_token, std::shared_ptr<Inner>)> task(
            func);

        auto         result = task.get_future();
        std::jthread thr(std::ref(task), obj);

        if (result.wait_for(timeout) == std::future_status::ready) {
            auto error = result.get();
            if (error != nullptr)
                throw *error;
        } else {
            thr.request_stop();
#ifdef DETACH
            thr.detach();
#else
            thr.join();
#endif
            throw std::runtime_error("motor creation timeout");
        }
    }
};

int
main(void)
{
    auto cls = std::shared_ptr<Wrapper>(new Wrapper());
    try {
        cls->cancellable_function(std::chrono::milliseconds(100));
        assert(false);
    } catch (const std::exception &) {
        assert(true);
    }

    // wait for detached thread to finish
    std::this_thread::sleep_for(std::chrono::seconds(1));

    return 0;
}

In this code construction of the Inner object gets timed out. When it does, thread thr receives stop signal via stoken and leaves without modifying shared resouce obj.

When built with join instead of detach, this code does not raise vargrind errors:

g++ -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function

However, the version with detach shows a lot of memory access errors (cannot insert all due to question size limitation):

g++ -D DETACH -Wall -Wextra -pedantic -Werror -std=gnu++20 cancel_function.cxx -o cancel_function
valgrind -q --tool=memcheck --leak-check=full --error-exitcode=69 --track-origins=yes --show-reachable=yes ./cancel_function
==95200== Thread 2:
==95200== Invalid write of size 8
==95200==    at 0x110BEA: std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >::_Tuple_impl(std::_Tuple_impl<0ul, std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x110C12: std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >::tuple(std::tuple<std::runtime_error*, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x110C41: std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >::__uniq_ptr_impl(std::__uniq_ptr_impl<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x10F58A: std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>::__uniq_ptr_data(std::__uniq_ptr_data<std::runtime_error, std::default_delete<std::runtime_error>, true, true>&&) (in /tmp/cancel_function)
==95200==    by 0x10F5B0: std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >::unique_ptr(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x1157E5: std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::_M_set(std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> >&&) (in /tmp/cancel_function)
==95200==    by 0x115484: std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >::operator()() const (in /tmp/cancel_function)
==95200==    by 0x115271: std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter> std::__invoke_impl<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&>(std::__invoke_other, std::__future_base::_Task_setter<std::unique_ptr<std::__future_base::_Result<std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >, std::__future_base::_Result_base::_Deleter>, std::__future_base::_Task_state<Wrapper::cancellable_function(std::chrono::duration<unsigned int, std::ratio<1l, 1000000l> >)::{lambda(std::stop_token)#1}, std::allocator<int>, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > (std::stop_token)>::_M_run(std::stop_token&&)::{lambda()#1}, std::unique_ptr<std::runtime_error, std::default_delete<std::runtime_error> > >&) (in /tmp/cancel_function)
...

I cannot understand, why the detached thread accessing memory after being cancelled. How could this code be fixed without using join?


Solution

  • The problem is that the task variable is only passed in as a reference to the std::jthread, so that when you detach it, and the function ends, the variable will fall out of scope and the underlying object will be deleted. However, once the function finally ends in the detached thread it will still try to access the task variable you passed in as a reference, but the underlying object has already been deleted.

    What you can do is move the packaged_task into the std::jthread so that it will only be destroyed once the actual thread ends:

        std::jthread thr(std::move(task), obj);
    

    Then detaching will work.

    However: there are some issues in your code that don't make much sense:

    • You are using std::unique_ptr<std::runtime_error> -- why? There's already std::exception_ptr that you can use if you want to wrap exceptions. Also, why use it explicitly anyway? std::future can hold exceptions just fine, why the explicit handling here?

    • .swap() doesn't do what you want: it will swap the local function variable for the local function parameter - and since both are local variables, this will have effectively no effect... You could of course swap the contents of the shared pointers (std::swap(*obj, *new_obj);), but that's not adviseable because:

    • Your abort check has an inherent race condition. What if in the thread you started the check notices that it wasn't aborted, then the main thread aborts at that point, and then the swap happens. That's bad.

    I think a much cleaner (and simpler) way of doing this is simply relying on std::future to return the value that you wanted to create and then use that value. If an error occurs, std::future will also forward any exceptions to you. That'll make your code much simpler:

    static std::shared_ptr<Inner>
    func(std::stop_token token)
    {
    #if THROW_EXCEPTION
        throw std::runtime_error("Some error happened");
    #endif
        std::shared_ptr<Inner> new_obj(new Inner());
        if (token.stop_requested())
            throw std::runtime_error("Aborted");
        return new_obj;
    }
    
    class Wrapper
    {
          private:
        std::shared_ptr<Inner> obj = nullptr;
    
          public:
        Wrapper(){};
        ~Wrapper(){};
    
        void
        cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
        {
            std::packaged_task<std::shared_ptr<Inner>(std::stop_token)> task(func);
    
            auto result = task.get_future();
            std::jthread thr(std::move(task));
    
            if (result.wait_for(timeout) == std::future_status::ready) {
                // result.get() will throw if there was an exception
                // in the code
                obj = result.get();
            } else {
                thr.request_stop();
    #ifdef DETACH
                thr.detach();
    #else
                thr.join();
    #endif
                throw std::runtime_error("motor creation timeout");
            }
        }
    };
    

    However, if you can't really interrupt the operation at all in the middle and can only check at the very end if the stop_token was used or not, why bother with this complication? Just use std::async:

    static std::shared_ptr<Inner>
    func()
    {
    #if THROW_EXCEPTION
        throw std::runtime_error("Some error happened");
    #endif
        std::shared_ptr<Inner> new_obj(new Inner());
        return new_obj;
    }
    
    class Wrapper
    {
          private:
        std::shared_ptr<Inner> obj = nullptr;
    
          public:
        Wrapper(){};
        ~Wrapper(){};
    
        void
        cancellable_function(std::chrono::duration<uint32_t, std::micro> timeout)
        {
            auto result = std::async(func);
            if (result.wait_for(timeout) == std::future_status::ready) {
                // result.get() will throw if there was an exception
                // in the code
                obj = result.get();
            } else {
                throw std::runtime_error("motor creation timeout");
            }
        }
    };
    

    Of course if you do have possible cancellation points in your routine that can be checked periodically, then using the jthread + packaged_task variant is probably better (because as far as I know std::async doesn't have any cancellation support).