Search code examples
c++asynchronousboostvalgrindboost-signals2

Is it possible to create boost::signal2 with asynchronous combiner?


For a project, I try to create asynchronous boost signals, it seems to work, but valgrind tells me the opposite.

In the following example you can see a basic implementation and usage.

For this example I need an asynchronous signal because, signal is trigger in SET function, who lock mutex, and slot tries to call GET, who lock mutex too. And yes, I can call mutex.unlock() before signal call, but form my project it's a little more complex, because I don't want to take the risk of blocking the process that updates data with potentially slow slots.

So, is it possible to create asynchronous signal with boost? If so, can someone put me on the way to make it work without valgrind errors?

I try to take a look to boost source code, but I can't figure out how to solve my problem. My lambda, in Combiner, take iterator by copy, but it's not enough for valgrind.

I try to made an example as small as possible but valgrind errors are pretty big, sorry.

I'm using :

  • g++ compiler version 9.3
  • valgrind version 3.15
  • c++ revision 17
  • boost version 1.71.0
// test_signals.cpp
// Compilation : g++ -std=gnu++17 -o test_signal test_signals.cpp -O0 -g -pthread
// Valgrind : valgrind --trace-children=yes --leak-check=full --track-origins=yes --log-file=valgrind.log ./test_signal
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/signals2/signal.hpp>
#include <iostream>
#include <mutex>

using ThreadPool = std::shared_ptr<::boost::asio::thread_pool>;

struct AsyncSignalCombiner
{
    typedef void result_type;

    AsyncSignalCombiner(ThreadPool pool)
        : thread_pool(pool){};
    AsyncSignalCombiner(const AsyncSignalCombiner &) = default;

    template <typename InputIterator>
    result_type operator()(InputIterator first, InputIterator last) const
    {
        while (first != last)
        {
            ::boost::asio::post(*thread_pool, [=]() { *first; });

            ++first;
        }
    }

    ThreadPool thread_pool{nullptr};
};

class SignalASync : public boost::signals2::signal<void(), AsyncSignalCombiner>
{
public:
    explicit SignalASync(ThreadPool thread_pool)
        : boost::signals2::signal<void(), AsyncSignalCombiner>(
              AsyncSignalCombiner{thread_pool}){};
};

class A
{
    std::mutex mutex_;

public:
    A(ThreadPool pool)
        : changed_{pool} {};
    int get()
    {
        std::lock_guard lock{mutex_};
        return 42;
    }

    void set()
    {
        std::lock_guard lock{mutex_};
        changed_();
    }

    SignalASync changed_;
};

int main()
{
    auto pool = std::make_shared<boost::asio::thread_pool>(1);
    A data{pool};

    auto slot = [&]() { std::cout << "slot: " << data.get() << std::endl; };

    data.changed_.connect(slot);

    data.set();

    pool->join();

    return 0;
}

And the valgrind's errors : (Sorry I only put one error on 5, because they are to big to stackoverflow, but they are all the same, invalid read/write on cache->result, slot_call_iterator.cpp line 107, 110 and 119)

==69966== Thread 2:
==69966== Invalid read of size 1
==69966==    at 0x126B6A: boost::optional_detail::optional_base<boost::signals2::detail::void_type>::is_initialized() const (optional.hpp:396)
==69966==    by 0x1251EF: boost::optional<boost::signals2::detail::void_type>::operator!() const (optional.hpp:1446)
==69966==    by 0x123759: boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >::dereference() const (slot_call_iterator.hpp:107)
==69966==    by 0x121DEB: boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >::reference boost::iterators::iterator_core_access::dereference<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >(boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > const&) (iterator_facade.hpp:550)
==69966==    by 0x11FCC7: boost::iterators::detail::iterator_facade_base<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >, boost::signals2::detail::void_type, boost::iterators::single_pass_traversal_tag, boost::signals2::detail::void_type const&, long, false, false>::operator*() const (iterator_facade.hpp:656)
==69966==    by 0x11DFE6: void AsyncSignalCombiner::operator()<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >(boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >, boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >) const::{lambda()#1}::operator()() const (test_signals.cpp:25)
==69966==    by 0x1297DE: void boost::asio::asio_handler_invoke<void AsyncSignalCombiner::operator()<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >(boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >, boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >) const::{lambda()#1}>(void AsyncSignalCombiner::operator()<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >(boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >, boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >) const::{lambda()#1}&, ...) (handler_invoke_hook.hpp:69)
[...]
==69966==  Address 0x1ffefffb30 is on thread 1's stack
==69966==  144 bytes below stack pointer

Thank you for reading me and potentially take time to answer me, for my first post here.

EDIT 20/01/21 : Problem come from slot_call_iterator_t (in boost/signals2/detail/slot_call_iterator.hpp) who take "cache_type" by reference but store it by address (cache_type*) so when a copy of slot_call_iterator_t was made to give to thread 2, cache_type* refers to stack address on thread 1.

But I don't no how to fix it, for now.


Solution

  • Erm, what is the code supposed to do anway?

    *first;
    

    This statement conceptually doesn't do anything. It dereferences an iterator. An iterator to a result of a callback.

    I think you're attempting to dispatch signals asynchronously, but you are too late when in the combiner. The dereference of the iterator encapsulates the execution, but the iterator isn't valid outside the scope of the combiner.

    So this can't work.

    Some Direct Answers

    To technically answer your question on how to fix the dangling reference:

    while (first != last) {
        auto value = *first++;
        post(*thread_pool, [=] { value; });
    }
    

    Obviously, this just refutes the purpose of your combiner, since now the evaluation is on the signaling thread.

    Side Note: Executors

    Instead of passing around the thread pool, use an executor, so you abstract away the type of execution context. The examples below should make clear what I mean.

    Possible Solution #1

    Instead of intercepting the execution, you could intercept the connection:

    Live On Wandbox

    // test_signals.cpp
    // Compilation : g++ -std=gnu++17 -o test_signal test_signals.cpp -O0 -g
    // -pthread Valgrind :
    // valgrind --trace-children=yes --leak-check=full --track-origins=yes --log-file=valgrind.log ./test_signal
    #include <boost/signals2/signal.hpp>
    #include <iostream>
    #include <mutex>
    #include <utility>
    
    #include <boost/asio.hpp>
    
    template <typename Ex>
    struct SignalASync : protected boost::signals2::signal<void()> {
        using Base = boost::signals2::signal<void()>;
    
        template <typename... Args>
        SignalASync(Ex ex, Args&&... args)
         : Base(std::forward<Args>(args)...), _ex(ex) {}
    
        using Base::operator=;
        using Base::operator();
    
        template <typename F, typename... Args>
        auto connect(F&& f, Args&&... other) {
            return Base::connect(
                Binder<F>(_ex, std::forward<F>(f)),
                std::forward<Args>(other)...);
        }
    
      private:
        template <typename F> struct Binder {
            Binder(Ex ex, F&& f) : _ex(ex), _f(std::forward<F>(f)) {}
    
            template <typename... Args>
            void operator()(Args&&... args) const {
                auto argtup = std::make_tuple(std::forward<Args>(args)...);
                post(_ex, [=] { std::apply(_f, argtup); });
            }
    
            Ex _ex;
            F _f;
        };
    
        Ex _ex;
    };
    
    template <typename Ex>
    class A {
        std::mutex mutex_;
    
      public:
        A(Ex ex) : changed_{ ex } {}
    
        int get() {
            std::lock_guard lock{ mutex_ };
            return 42;
        }
    
        void set() {
            std::lock_guard lock{ mutex_ };
            changed_();
        }
    
        SignalASync<Ex> changed_;
    };
    
    int main() {
        boost::asio::thread_pool pool(2);
        A data{ pool.get_executor() };
    
        auto slot = [&] { std::cout << "thread_id: "  << std::this_thread::get_id() << " slot: " << data.get() << std::endl; };
    
        std::cout << "main thread id: "  << std::this_thread::get_id() << std::endl;
    
        data.changed_.connect(slot);
    
        data.set();
    
        pool.join();
    }
    

    Which prints

    main thread id: 139941975648128
    thread_id: 139941863012096 slot: 42
    

    And runs cleanly under valgrind and ubsan/asan.

    More Suggestions

    1. Perhaps you could change stuff so that you don't require an Executor template argument, by using boost::asio::any_io_executor: Live On Wandbox

      Note that this can be less efficient.

    2. Also, you might do away with the Signal Subclass altogether, because it would require you to facade more interface (like operator+= and so on). Instead, simply make a wrapper function:

      Live On Wandbox 46 lines of code!

      // test_signals.cpp
      // Compilation : g++ -std=gnu++17 -o test_signal test_signals.cpp -O0 -g
      // -pthread Valgrind :
      // valgrind --trace-children=yes --leak-check=full --track-origins=yes --log-file=valgrind.log ./test_signal
      #include <boost/signals2/signal.hpp>
      #include <iostream>
      #include <mutex>
      #include <utility>
      
      #include <boost/asio.hpp>
      
      using Signal = boost::signals2::signal<void()>;
      
      class A {
          std::mutex mutex_;
      
        public:
          int get() {
              std::lock_guard lock{ mutex_ };
              return 42;
          }
      
          void set() {
              std::lock_guard lock{ mutex_ };
              changed_();
          }
      
          Signal changed_;
      };
      
      int main() {
          boost::asio::thread_pool pool(2);
          auto on_pool = [ex = pool.get_executor()](auto f) { 
              return [ex,f] { post(ex, f); };
          };
      
          A data;
          auto slot = [&] { std::cout << "thread_id: "  << std::this_thread::get_id() << " slot: " << data.get() << std::endl; };
      
          std::cout << "main thread id: "  << std::this_thread::get_id() << std::endl;
      
          data.changed_.connect(on_pool(slot));
          data.set();
      
          pool.join();
      }
      

      Prints

      main thread id: 139851977009024
      thread_id: 139851872274176 slot: 42
      
    3. If that's not what you like, because it is easy to forget to wrap with on_pool, move the wrapping into A. Make change_ private, instead add an on_change method:

      Live On Wandbox

      As A Bonus, this also makes the signal less racy, by passing the actual value directly to the signal handler.

      // test_signals.cpp
      // Compilation : g++ -std=gnu++17 -o test_signal test_signals.cpp -O0 -g
      // -pthread Valgrind :
      // valgrind --trace-children=yes --leak-check=full --track-origins=yes --log-file=valgrind.log ./test_signal
      #include <boost/signals2/signal.hpp>
      #include <iostream>
      #include <mutex>
      #include <utility>
      #include <boost/asio.hpp>
      
      using Signal = boost::signals2::signal<void(int)>;
      
      class A {
          mutable std::mutex mutex_;
          boost::asio::executor ex_;
          Signal changed_;
          int value_ = 42;
      
        public:
          A(boost::asio::executor ex) : ex_(ex) {}
      
          int get() const {
              std::lock_guard lock{ mutex_ };
              return value_;
          }
      
          void set(int value) {
              {
                  std::lock_guard lock{ mutex_ };
                  value_ = value;
              }
              changed_(value);
          }
      
          boost::signals2::connection on_changed(Signal::slot_type const& f) {
              return changed_.connect([=](auto&&... args) {
                  post(ex_, std::bind(f, std::forward<decltype(args)>(args)...));
              });
          }
      };
      
      int main() {
          boost::asio::thread_pool pool(2);
          A data { pool.get_executor() };
      
          std::cout << "main thread id: "  << std::this_thread::get_id() << std::endl;
      
          data.on_changed([&](int v) { std::cout << " slot: " << v << std::endl; });
          data.set(99);
      
          pool.join();
      }
      

      Prints

      main thread id: 140379793893248
       slot: 99