Search code examples
c++tcpboost-asio

Boost TCP client to connect to multiple servers


I want my TCP client to connect to multiple servers(each server has a separate IP and port).
I am using async_connect. I can successfully connect to different servers but the read/write fails since the server's corresponding tcp::socket object is not available.
Can you please suggest how I could store each server's socket in some data structure? I tried saving the IP, socket to a std::map, but the first server's socket object is not available in memory and the app crashes.
I tried making the socket static, but it does not help either.

Please help me!!

Also, I hope I am logically correct in making a single TCP client connect to 2 different servers. I am sharing below the simplified header & cpp file.

class TCPClient: public Socket
{
public:
    TCPClient(boost::asio::io_service& io_service,
        boost::asio::ip::tcp::endpoint ep);
    virtual ~TCPClient();
    void Connect(boost::asio::ip::tcp::endpoint ep, boost::asio::io_service &ioService, void (Comm::*SaveClientDetails)(std::string,void*),
        void *pClassInstance);
    
    void TransmitData(const INT8 *pi8Buffer);
    void HandleWrite(const boost::system::error_code& err, 
    size_t szBytesTransferred);
    void HandleConnect(const boost::system::error_code &err, 
        void (Comm::*SaveClientDetails)(std::string,void*),
        void *pClassInstance, std::string sIPAddr);
    static tcp::socket* CreateSocket(boost::asio::io_service &ioService)    
        {   return new tcp::socket(ioService); }
    static tcp::socket *mSocket;
private:
    std::string sMsgRead;
    INT8 i8Data[MAX_BUFFER_LENGTH];
    std::string sMsg;
    boost::asio::deadline_timer mTimer;
};

tcp::socket* TCPClient::mSocket = NULL;

TCPClient::TCPClient(boost::asio::io_service &ioService,
        boost::asio::ip::tcp::endpoint ep) :
        mTimer(ioService)
{
}

void TCPClient::Connect(boost::asio::ip::tcp::endpoint ep, 
        boost::asio::io_service &ioService, 
        void (Comm::*SaveServerDetails)(std::string,void*),
        void *pClassInstance)
{
    mSocket = CreateSocket(ioService);
    std::string sIPAddr = ep.address().to_string();
    /* To send connection request to server*/
    mSocket->async_connect(ep,boost::bind(&TCPClient::HandleConnect, this,
            boost::asio::placeholders::error, SaveServerDetails,
            pClassInstance, sIPAddr));
}

void TCPClient::HandleConnect(const boost::system::error_code &err,
        void (Comm::*SaveServerDetails)(std::string,void*),
        void *pClassInstance, std::string sIPAddr)
{
    if (!err)
    {
        Comm* pInstance = (Comm*) pClassInstance;
        if (NULL == pInstance) 
        {
            break;
        }
        (pInstance->*SaveServerDetails)(sIPAddr,(void*)(mSocket));
    }
    else
    {
        break;
    }
}

void TCPClient::TransmitData(const INT8 *pi8Buffer)
{
    sMsg = pi8Buffer;
    if (sMsg.empty()) 
    {
        break;
    }
    mSocket->async_write_some(boost::asio::buffer(sMsg, MAX_BUFFER_LENGTH),
            boost::bind(&TCPClient::HandleWrite, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
}

void TCPClient::HandleWrite(const boost::system::error_code &err,
        size_t szBytesTransferred) 
{
        if (!err) 
        {
            std::cout<< "Data written to TCP Client port! ";
        } 
        else 
        {
            break;
        }
}


Solution

  • You seem to know your problem: the socket object is unavailable. That's 100% by choice. You chose to make it static, of course there will be only one instance.

    Also, I hope I am logically correct in making a single TCP client connect to 2 different servers.

    It sounds wrong to me. You can redefine "client" to mean something having multiple TCP connections. In that case at the very minimum you expect a container of tcp::socket objects to hold those (or, you know, a Connection object that contains the tcp::socket.

    BONUS: Demo

    For fun and glory, here's what I think you should be looking for.

    Notes:

    • no more new, delete
    • no more void*, reinterpret casts (!!!)
    • less manual buffer sizing/handling
    • no more bind
    • buffer lifetimes are guaranteed for the corresponding async operations
    • message queues per connection
    • connections are on a strand for proper synchronized access to shared state in multi-threading environments
    • I added in a connection max idle time timeout; it also limits the time taken for any async operation (connect/write). I assumed you wanted something like this because (a) it's common (b) there was an unused deadline_timer in your question code

    Note the technique of using shared pointers to have Comm manage its own lifetime. Note also that _socket and _outbox are owned by the individual Comm instance.

    Live On Coliru

    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    
    using INT8 = char;
    using boost::asio::ip::tcp;
    using boost::system::error_code;
    //using SaveFunc = std::function<void(std::string, void*)>; // TODO abolish void*
    using namespace std::chrono_literals;
    using duration = std::chrono::high_resolution_clock::duration;
    
    static inline constexpr size_t MAX_BUFFER_LENGTH = 1024;
    
    using Handle  = std::weak_ptr<class Comm>;
    
    class Comm : public std::enable_shared_from_this<Comm> {
      public:
        template <typename Executor>
        explicit Comm(Executor ex, tcp::endpoint ep, // ex assumed to be strand
                      duration max_idle)
            : _ep(ep)
            , _max_idle(max_idle)
            , _socket{ex}
            , _timer{_socket.get_executor()}
        {
        }
    
        ~Comm() { std::cerr << "Comm closed (" << _ep << ")\n"; }
    
        void Start() {
            post(_socket.get_executor(), [this, self = shared_from_this()] {
                _socket.async_connect(
                    _ep, [this, self = shared_from_this()](error_code ec) {
                        std::cerr << "Connect: " << ec.message() << std::endl;
                        if (!ec)
                            DoIdle();
                        else
                            _timer.cancel();
                    });
                DoIdle();
            });
        }
    
        void Stop() {
            post(_socket.get_executor(), [this, self = shared_from_this()] {
                if (not _outbox.empty())
                    std::cerr << "Warning: some messages may be undelivered ("
                              << _ep << ")" << std::endl;
                _socket.cancel();
                _timer.cancel();
            });
        }
    
        void TransmitData(std::string_view msg) {
            post(_socket.get_executor(),
                 [this, self = shared_from_this(), msg = std::string(msg.substr(0, MAX_BUFFER_LENGTH))] {
                     _outbox.emplace_back(std::move(msg));
    
                     if (_outbox.size() == 1) { // no send loop already active?
                         DoSendLoop();
                     }
                 });
        }
    
      private:
        // The DoXXXX functions are assumed to be on the strand
        void DoSendLoop() {
            DoIdle(); // restart max_idle even after last successful send
            if (_outbox.empty())
                return;
    
            boost::asio::async_write(
                _socket, boost::asio::buffer(_outbox.front()),
                [this, self = shared_from_this()](error_code ec, size_t xfr) {
                    std::cerr << "Write " << xfr << " bytes to " << _ep << " " << ec.message() << std::endl;
                    if (!ec) {
                        _outbox.pop_front();
                        DoSendLoop();
                    } else
                        _timer.cancel(); // causes Comm shutdown
                });
        }
    
        void DoIdle() {
            _timer.expires_from_now(_max_idle); // cancels any pending wait
            _timer.async_wait([this, self = shared_from_this()](error_code ec) {
                if (!ec) {
                    std::cerr << "Timeout" << std::endl;
                    _socket.cancel();
                }
            });
        }
    
        tcp::endpoint                      _ep;
        duration                           _max_idle;
        tcp::socket                        _socket;
        boost::asio::high_resolution_timer _timer;
        std::deque<std::string>            _outbox;
    };
    
    class TCPClient {
        boost::asio::any_io_executor _ex;
        std::deque<Handle>           _comms;
    
      public:
        TCPClient(boost::asio::any_io_executor ex) : _ex(ex) {}
    
        void Add(tcp::endpoint ep, duration max_idle = 3s)
        {
            auto pcomm = std::make_shared<Comm>(make_strand(_ex), ep, max_idle);
            pcomm->Start();
            _comms.push_back(pcomm);
    
            // optionally garbage collect expired handles:
            std::erase_if(_comms, std::mem_fn(&Handle::expired));
        }
    
        void TransmitData(std::string_view msg) {
            for (auto& handle : _comms)
                if (auto pcomm = handle.lock())
                    pcomm->TransmitData(msg);
        }
    
        void Stop() {
            for (auto& handle : _comms)
                if (auto pcomm = handle.lock())
                    pcomm->Stop();
        }
    };
    
    int main() {
        using std::this_thread::sleep_for;
    
        boost::asio::thread_pool ctx(1);
        TCPClient                c(ctx.get_executor());
    
        c.Add({{}, 8989});
        c.Add({{}, 8990}, 1s); // shorter timeout for demo
    
        c.TransmitData("Hello world\n");
    
        c.Add({{}, 8991});
    
        sleep_for(2s); // times out second connection
    
        c.TransmitData("Three is a crowd\n"); // only delivered to 8989 and 8991
    
        sleep_for(1s); // allow for delivery
    
        c.Stop();
        ctx.join();
    }
    

    Prints (on Coliru):

    for p in {8989..8991}; do netcat -t -l -p $p& done
    sleep .5; ./a.out
    Hello world
    Connect: Success
    Connect: Success
    Hello world
    Connect: Success
    Write 12 bytes to 0.0.0.0:8989 Success
    Write 12 bytes to 0.0.0.0:8990 Success
    Timeout
    Comm closed (0.0.0.0:8990)
    Write Three is a crowd
    17Three is a crowd
     bytes to 0.0.0.0:8989 Success
    Write 17 bytes to 0.0.0.0:8991 Success
    Comm closed (0.0.0.0:8989)
    Comm closed (0.0.0.0:8991)
    

    The output is a little out of sequence there. Live local demo:

    enter image description here