Search code examples
c++boostboost-asioautoit

How to avoid concurrent callbacks to user defined routine?


I am attempting to modify some Boost code so that it is compatible with Autoit. The original project can be found here. My version can be found here. I could use some help in determining how to prevent multiple concurrent callbacks into the user supplied Autoit routine.

Here is the existing on_read callback --

/// Callback registered by async_read. It calls user registered callback to actually process the data. And then issue another async_read to wait for data from server again.
    /// \param ec instance of error code
    /// \param bytes_transferred
    void
    on_read(
            beast::error_code ec,
            std::size_t bytes_transferred) {
        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> in on read" << std::endl;
        }       
        boost::ignore_unused(bytes_transferred);

        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            if(!Is_Connected) {
                return;
            }

        }

        // error occurs
        if (ec) {
            if(on_fail_cb)
                on_fail_cb(L"read");
            return fail(ec, L"read");
        }

        const std::string data = beast::buffers_to_string(buffer_.data());
        const std::wstring wdata(data.begin(), data.end());
        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << L"] " << wdata << std::endl;
        }       

//  The next section is where my issue resides

        if (on_data_cb)
            on_data_cb(wdata.c_str(), wdata.length());

        buffer_.consume(buffer_.size());

        if(EnableVerbose)
        {
            boost::lock_guard<boost::mutex> guard(mtx_);
            std::wcout << L"<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
        }       
        ws_.async_read(
                buffer_,
                beast::bind_front_handler(
                        &session::on_read,
                        shared_from_this()));

        // Close the WebSocket connection
        // ws_.async_close(websocket::close_code::normal,
        //     beast::bind_front_handler(
        //         &session::on_close,
        //         shared_from_this()));
    }

The code if (on_data_cb) on_data_cb(wdata.c_str(), wdata.length()); executes the callback into Autoit, and I need to know how I can prevent this from executing more than once at a time. I am not well versed in C++ / Boost, so please be gentle. ;-)


Solution

  • The gentle answer would be to point to the documentation: Strands: Use Threads Without Explicit Locking

    In reality you don't show enough code. For example, we have no way of knowing

    • what execution context is being used. If you're using a io_context with a single service thread run()-ing it, you already have the implicit strand and a guarantee that no handlers ever run simultaneously

    • what executor the IO object(s) bind to. In your code, the only object visible is ws_ which we'll assume for to be something like

       net::io_context                ctx_;
       websocket::stream<tcp::socket> ws_{ctx_};
      

      Now, in case you want to have multiple threads servicing ctx_ you could bind the ws_ to a strand executor instead:

       websocket::stream<tcp::socket> ws_{make_strand(ctx_)};
      

      Now, as long as you make sure your own accesses (e.g. async_ initiations) are on the proper strand, your code is already safe. If you want - and you don't mind hardcoding the executor type, you can assert this:

      auto strand = ws_.get_executor().targetnet::strand<net::io_context::executor_type>(); assert(strand && strand->running_in_this_thread());

    Pro tip:

    If you really commit to a particular executor type, consider statically binding that type:

    using Context  = net::io_context::executor_type;
    using Executor = net::io_context::executor_type;
    using Strand   = net::strand<net::io_context::executor_type>;
    using Socket   = net::basic_stream_socket<tcp, Strand>;
    
    Context                   ctx_;
    websocket::stream<Socket> ws_{make_strand(ctx_)};
    

    This avoids the overhead of type-erased executors, and you can simplify the assert:

    assert(ws_.get_executor().running_in_this_thread());
    

    Side Notes

    Demo

    Obligatory "live" code:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/beast.hpp>
    #include <iostream>
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using net::ip::tcp;
    
    static std::mutex s_consoleMtx;
    
    static void fail(beast::error_code ec, std::string txt) {
        std::cerr << txt << ": " << ec.message() << " at " << ec.location() << std::endl;
    }
    
    #define ARCH_LABEL "STACKO"
    struct session : std::enable_shared_from_this<session> {
        using Context  = net::io_context::executor_type;
        using Executor = net::io_context::executor_type;
        using Strand   = net::strand<net::io_context::executor_type>;
        using Socket   = net::basic_stream_socket<tcp, Strand>;
    
        Context                   ctx_;
        websocket::stream<Socket> ws_{make_strand(ctx_)};
    
        static bool const  EnableVerbose = true;
        std::atomic_bool   Is_Connected  = false;
        beast::flat_buffer buffer_;
    
        std::function<void(std::string)>         on_fail_cb;
        std::function<void(char const*, size_t)> on_data_cb;
    
        /// Callback registered by async_read. It calls user registered
        /// callback to actually process the data. And then issue another
        /// async_read to wait for data from server again. 
        /// \param ec instance of error code 
        /// \param bytes_transferred
        void on_read(beast::error_code ec, [[maybe_unused]] size_t bytes_transferred) {
            if (EnableVerbose) {
                std::lock_guard<std::mutex> guard(s_consoleMtx);
                std::cout << "<WsDll-" ARCH_LABEL "> in on read" << std::endl;
            }
    
            if (!Is_Connected)
                return;
    
            // error occurs
            if (ec) {
                if (on_fail_cb)
                    on_fail_cb("read");
                return fail(ec, "read");
            }
    
            std::string const data = beast::buffers_to_string(buffer_.data());
            if (EnableVerbose) {
                std::lock_guard<std::mutex> guard(s_consoleMtx);
                std::cout << "<WsDll-" ARCH_LABEL "> received[" << bytes_transferred << "] " << data << std::endl;
            }
    
            if (on_data_cb)
                on_data_cb(data.c_str(), data.length());
    
            buffer_.consume(buffer_.size());
    
            if (EnableVerbose) {
                std::lock_guard<std::mutex> guard(s_consoleMtx);
                std::cout << "<WsDll-" ARCH_LABEL "> issue new async_read in on_read" << std::endl;
            }
    
            assert(ws_.get_executor().running_in_this_thread());
            ws_.async_read(buffer_, beast::bind_front_handler(&session::on_read, shared_from_this()));
        }
    };