Search code examples
c++network-programmingboostboost-asiop2p

c++ boost::asio p2p networking


I want to create a p2p network using boost::asio tcp protocol. I give the first listening port in which the p2p node starts listening, and if I give a port that I want to connect to, I want the listening port to connect to the given port. (all the sockets performed on the localhost IP)

for example:

listening port - 10 (accepting clients)
connect to port: 20
creates a new socket: 10 -> 20 

where I try to use the function do_connect it gives me the error:

"Failed to connect to 127.0.0.1:10: An operation was attempted on something that is not a socket"

I guess the problem is with the socket I getting from the acceptor.get_executor(). How can I get the listening port and connect it to the other endpoint?

the main:

try
    {
        boost::asio::io_context io_context;
        int port;
        int option;
        std::cout << "enter the port you are listening in: " << std::endl;
        std::cin >> port;
        tcp::endpoint endpoint(tcp::v4(), port);
        P2PNode node(io_context, endpoint, 1);
        std::cout << "do u want to connect to some server? yes - 1 / no - any other";
        std::cin >> option;
        if (option == 1)
        {
            std::cout << "enter the port you will be connecting to: " << std::endl;
            std::cin >> port;
            tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string("127.0.0.1"), port);
            node.do_connect(remote_endpoint);
        }
        io_context.run();
    }
    catch (const std::exception& e)
    {
        std::cout << "Exception was thrown in function: " << e.what() << std::endl;
    }
    catch (...)
    {
        std::cout << "Unknown exception in main !" << std::endl;
    }

P2PNode.h

#pragma once
#include <boost/asio.hpp>
#include <iostream>
#include <string>
#include <vector>
#include "Protocol.h"


#define CONNECTING_NODE 1
#define CLIENT_NODE 2


using boost::asio::ip::tcp;
using namespace std;
class ProtocolMessage;

class P2PNode
{
public:
    P2PNode(boost::asio::io_context& io_context, const tcp::endpoint& endpoint, int nodeType);
    void do_connect(const tcp::endpoint& endpoint);
private:
    void do_accept();
    void do_read(tcp::socket& socket);
    void do_write(tcp::socket& socket, std::size_t length);
    tcp::acceptor acceptor_;
    std::string data_;
    std::vector<tcp::socket> sockets_;
    tcp::socket listeningSocket_;
    int nodeType_;
    ProtocolMessage GenerateResponse(ProtocolMessage message);
    std::string combineSocketsToString();
};

P2PNode.cpp

P2PNode::P2PNode(boost::asio::io_context& io_context, const tcp::endpoint& endpoint, int nodeType)
    : acceptor_(io_context, endpoint), listeningSocket_(io_context) {
    this->nodeType_ = nodeType;
    do_accept();
}

void P2PNode::do_connect(const tcp::endpoint& endpoint)
{
    tcp::socket socket = tcp::socket(acceptor_.get_executor());
    std::cout << "executor: " << socket.get_executor() << std::endl;
    socket.async_connect(endpoint,
        [this, &socket, endpoint](boost::system::error_code ec)
        {
            std::cout << "------------do connect------------";
            if (!ec)
            {
                std::cout << "Connected to " << endpoint << std::endl;

                // Add the connected socket to the list of sockets
                sockets_.emplace_back(std::move(socket));

                // Send a message to the connected server
                std::string message = "Hello from node " + std::to_string(nodeType_);
                data_ = message;
                do_write(sockets_.back(), message.size());
            }
            else
            {
                std::cout << "Failed to connect to " << endpoint << ": " << ec.message() << std::endl;
            }
        });
}

void P2PNode::do_accept() {
    std::cout << "Start Listening" << std::endl;
    acceptor_.async_accept(
        [this](boost::system::error_code ec, tcp::socket socket) {
            if (!ec) {
                std::cout << "Connection established with " << socket.remote_endpoint() << std::endl;
                sockets_.emplace_back(std::move(socket));
                do_read(sockets_.back());
            }
            do_accept();
        });
}

I tried to get the executor from the 'listeningSocket_' but it doesnt worked.


Solution

  • do_connect has a problem:

    void P2PNode::do_connect(tcp::endpoint endpoint) {
        std::cout << "------------do connect------------";
        tcp::socket socket = tcp::socket(acceptor_.get_executor());
        std::cout << "executor: " << socket.get_executor() << std::endl;
    
        socket.async_connect(endpoint, [this, &socket, endpoint](error_code ec) {
            if (!ec) {
                std::cout << "Connected to server " << socket.remote_endpoint() << std::endl;
    
                sockets_.push_back(std::move(socket));
    
                data_ = "Hello from node " + std::to_string(nodeType_);
                do_write(sockets_.back(), data_.size());
            } else {
                std::cout << "Failed to connect to " << endpoint << ": " << ec.message() << std::endl;
            }
        });
    }
    

    The local variable socket leaves scope before the async operation (async_connect) will ever start, which causes the operation to be canceled. Moving from the stale reference into the vector is Undefined Behavior.

    You don't give the implementations of do_read or do_write, but in fact the use of vector all but guarantees that there will be more UB in the future because if you ever push another connection the vector may reallocate, which is UB when there are operations in flight on them. I suggest a container with suitable reference stability guarantees. std::list or std::deque would be of interest.

    Assuming a choice for std::list this would be better:

    void P2PNode::do_connect(tcp::endpoint endpoint) {
        std::cout << "------------do connect------------";
    
        auto& conn = sockets_.emplace_back(acceptor_.get_executor());
        std::cout << "executor: " << conn.get_executor() << std::endl;
    
        conn.async_connect(endpoint, [=, &conn](error_code ec) {
            if (!ec) {
                std::cout << "Connected to server " << conn.remote_endpoint() << std::endl;
    
                data_ = "Hello from node " + std::to_string(nodeType_);
                do_write(conn, data_.size());
            } else {
                std::cout << "Failed to connect to " << endpoint << ": " << ec.message() << std::endl;
            }
        });
    }
    

    See it Live On Coliru

    // #pragma once
    #include <boost/asio.hpp>
    #include <iostream>
    #include <string>
    #include <list>
    // #include "Protocol.h"
    
    using boost::asio::ip::tcp;
    class ProtocolMessage;
    
    class P2PNode {
        using error_code = boost::system::error_code;
    
      public:
        enum Type { CONNECTING_NODE = 1, CLIENT_NODE = 2 };
        P2PNode(boost::asio::io_context& io_context, tcp::endpoint endpoint, Type nodeType);
        void do_connect(tcp::endpoint endpoint);
    
      private:
        void do_accept();
        void do_read(tcp::socket& ) {}
        void do_write(tcp::socket&, size_t) {}
    
        tcp::acceptor          acceptor_;
        std::list<tcp::socket> sockets_;
        tcp::socket            listeningSocket_;
        std::string            data_;
        Type                   nodeType_;
    
        ProtocolMessage GenerateResponse(ProtocolMessage const& message);
        std::string     combineSocketsToString();
    };
    
    P2PNode::P2PNode(boost::asio::io_context& io_context, tcp::endpoint endpoint, Type nodeType)
        : acceptor_(io_context, endpoint)
        , listeningSocket_(io_context)
        , nodeType_(nodeType) {
        do_accept();
    }
    
    void P2PNode::do_connect(tcp::endpoint endpoint) {
        std::cout << "------------do connect------------";
    
        auto& conn = sockets_.emplace_back(acceptor_.get_executor());
        std::cout << "executor: " << conn.get_executor() << std::endl;
    
        conn.async_connect(endpoint, [=, this, &conn](error_code ec) {
            if (!ec) {
                std::cout << "Connected to server " << conn.remote_endpoint() << std::endl;
    
                data_ = "Hello from node " + std::to_string(nodeType_);
                do_write(conn, data_.size());
            } else {
                std::cout << "Failed to connect to " << endpoint << ": " << ec.message() << std::endl;
            }
        });
    }
    
    void P2PNode::do_accept() {
        std::cout << "Start Listening" << std::endl;
    
        acceptor_.async_accept([this](error_code ec, tcp::socket socket) {
            if (!ec) {
                std::cout << "Connection accepted from " << socket.remote_endpoint() << std::endl;
                sockets_.push_back(std::move(socket));
                do_read(sockets_.back());
            }
            do_accept();
        });
    }
    
    int main(int argc, char** argv) try {
        uint16_t const port = argc > 1 ? atoi(argv[1]) : 8989;
    
        std::cout << "listening port: " << port << std::endl;
        boost::asio::io_context io_context;
        P2PNode node(io_context, {tcp::v4(), port}, P2PNode::CONNECTING_NODE);
    
        if (uint16_t option, port; //
            std::cout << "Do you want to connect? yes - 1 / no - any other" && std::cin >> option &&
            option == 1) {
            std::cout << "port to connect: " << std::endl;
            std::cin >> port;
            node.do_connect({{},port});
        }
        io_context.run();
    } catch (std::exception const& e) {
        std::cout << "Exception was thrown in function: " << e.what() << std::endl;
    } catch (...) {
        std::cout << "Unhandled exception" << std::endl;
    }
    

    Testing with two instances:

    g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp 
    (./a.out 7878 <<< "other")& sleep 1
    (./a.out 8989 <<< "1 7878")&
    

    Prints output like:

    enter image description here

    Binding The Local Port

    This will not lead to observable difference, unless you inspect the connections using netstat or tcpview et al.

    I thought to just add the bind call:

    auto& conn = sockets_.emplace_back(acceptor_.get_executor());
    conn.open(acceptor_.local_endpoint().protocol());
    conn.bind(acceptor_.local_endpoint());
    

    However this leads to

    Exception was thrown in function: bind: Address already in use [system:98 at /home/sehe/custom/superboost/boost/asio/detail/reactive_socket_service.hpp:161:5 in function 'boost::system::error_code boost::asio::detail::reactive_socket_service<Protocol>::bind(boost::asio::detail::reactive_socket_service<Protocol>::implementation_type&, const endpoint_type&, boost::system::error_code&)']
    

    Setting the relevant options doesn't fix it:

    void P2PNode::do_accept() {
        std::cout << "Start Listening" << std::endl;
    
        acceptor_.set_option(tcp::acceptor::reuse_address(true));
        acceptor_.async_accept([this](error_code ec, tcp::socket socket) {
             // etc.
    

    I'd tend to conclude that it is not (portably) supported to bind to the address that the listener is using.