I have implemented a udp session using a multi-threaded environment.
using RawDataArray=std::array <unsigned char,65000>;
class StaticBuffer
{
private:
RawDataArray m_data;
std::size_t m_n_avail;
public:
StaticBuffer():m_data(),m_n_avail(0){}
StaticBuffer(std::size_t n_bytes){m_n_avail=n_bytes;}
StaticBuffer(const StaticBuffer& other)
{
std::cout<<"ctor cpy\n";
m_data=other.m_data;
m_n_avail=other.m_n_avail;
}
StaticBuffer(const StaticBuffer& other,std::size_t n_bytes)
{
std::cout<<"ctor cpy\n";
m_data=other.m_data;
m_n_avail=n_bytes;
}
StaticBuffer(const RawDataArray& data,std::size_t n_bytes)
{
std::cout<<"ctor static buff\n";
m_data=data;
m_n_avail=n_bytes;
}
void set_size(int n)
{
m_n_avail=n;
}
void set_max_size(){m_n_avail=m_data.size();}
std::size_t max_size()const {return m_data.size();}
unsigned char& operator[](std::size_t i){return m_data[i];}
const unsigned char& operator[] (std::size_t i)const{return m_data[i];}
StaticBuffer& operator=(const StaticBuffer& other)
{
if (this == &other)
return *this;
m_data = other.m_data;
m_n_avail = other.m_n_avail;
return *this;
}
void push_back(unsigned char val)
{
if (m_n_avail<m_data.size())
{
m_data[m_n_avail]=val;
}else
throw "Out of memory";
}
void reset(){m_n_avail=0;}
unsigned char* data(){return m_data.data();}
const unsigned char* data()const {return m_data.data();}
std::size_t size()const{return m_n_avail;}
~StaticBuffer(){}
};
class UDPSeassion;
using DataBuffer = StaticBuffer;
using DataBufferPtr=std::unique_ptr<DataBuffer>;
using ExternalReadHandler=std::function<void(DataBufferPtr)>;
class UDPSeassion:public std::enable_shared_from_this<UDPSeassion>
{
private:
asio::io_context& m_ctx;
asio::ip::udp::socket m_socket;
asio::ip::udp::endpoint m_endpoint;
std::string m_addr;
unsigned short m_port;
asio::io_context::strand m_send_strand;
std::deque<DataBufferPtr> m_dq_send;
asio::io_context::strand m_rcv_strand;
DataBufferPtr m_rcv_data;
ExternalReadHandler external_rcv_handler;
private:
void do_send_data_from_dq()
{
if (m_dq_send.empty())
return;
m_socket.async_send_to(
asio::buffer(m_dq_send.front()->data(),m_dq_send.front()->size()),
m_endpoint,
asio::bind_executor(m_send_strand,[this](const boost::system::error_code& er, std::size_t bytes_transferred){
if (!er)
{
m_dq_send.pop_front();
do_send_data_from_dq();
}else
{
//post to loggger
}
}));
}
void do_read(const boost::system::error_code& er, std::size_t bytes_transferred)
{
if (!er)
{
m_rcv_data->set_size(bytes_transferred);
asio::post(m_ctx,[this,data=std::move(m_rcv_data)]()mutable{ external_rcv_handler(std::move(data));});
m_rcv_data=std::make_unique<DataBuffer>();
m_rcv_data->set_max_size();
async_read();
}
}
public:
UDPSeassion(asio::io_context& ctx,const std::string& addr, unsigned short port):
m_ctx(ctx),
m_socket(ctx),
m_endpoint(asio::ip::address::from_string(addr),port),
m_addr(addr),
m_port(port),
m_send_strand(ctx),
m_dq_send(),
m_rcv_strand(ctx),
m_rcv_data(std::make_unique<DataBuffer>(65000))
{}
~UDPSeassion(){}
const std::string& get_host()const{return m_addr;}
unsigned short get_port(){return m_port;}
template<typename ExternalReadHandlerCallable>
void set_read_data_headnler(ExternalReadHandlerCallable&& handler)
{
external_rcv_handler=std::forward<ExternalReadHandlerCallable>(handler);
}
void start()
{
m_socket.open(asio::ip::udp::v4());
async_read();
}
void async_read()
{
m_socket.async_receive_from(
asio::buffer(m_rcv_data->data(),m_rcv_data->size()),
m_endpoint,
asio::bind_executor(m_rcv_strand,std::bind(&UDPSeassion::do_read,this,std::placeholders::_1,std::placeholders::_2) )
);
}
void async_send(DataBufferPtr pData)
{
asio::post(m_ctx,
asio::bind_executor(m_send_strand,[this,pDt=std::move(pData)]()mutable{
m_dq_send.emplace_back(std::move(pDt));
if (m_dq_send.size()==1)
do_send_data_from_dq();
}));
}
};
void handler_read(DataBufferPtr pdata)
{
// decoding raw_data -> decod_data
// lock mutext
// queue.push_back(decod_data)
// unlock mutext
//for view pdata
std::stringstream ss;
ss<<"thread handler: "<<std::this_thread::get_id()<<" "<<pdata->data()<<" "<<pdata->size()<<std::endl;
std::cout<<ss.str()<<std::endl;
}
int main()
{
asio::io_context ctx;
//auto work_guard = asio::make_work_guard(ctx);
std::cout<<"MAIN thread: "<<std::this_thread::get_id()<<std::endl;
StaticBuffer b{4};
b[0]='A';
b[1]='B';
b[2]='C';
b[4]='\n';
UDPSeassion client(ctx,"127.0.0.1",11223);
client.set_read_data_headnler(handler_read);
client.start();
std::vector<std::thread> threads;
for (int i=0;i<3;++i)
{
threads.emplace_back([&](){
std::stringstream ss;
ss<<"run thread: "<<std::this_thread::get_id()<<std::endl;
std::cout<<ss.str();
ctx.run();
std::cout<<"end thread\n";
}
);
}
client.async_send(std::make_unique<StaticBuffer>(b));
ctx.run();
for (auto& t:threads)
t.join();
return 1;
}
in the code above, the main emphasis is on the UDPSeasion class. Class StaticBuffer is written so that it performs the main functions. I have some questions:
I will be very grateful for detailed answers to my questions.Thank you very much))
I'd simplify a ton.
you "use" enable_shared_from_this
but none of the asynchronous operations capture shared_from_this
. In fact, you don't even allocate UDPSession
shared, so it would be Undefined Behaviour to use shared_from_this
at all.
no-op destructors are implied. If you must declare them, = default
them
the m_rcv_strand
is deprecated - use strand<>
instead
why is there a separate strand for send/receive? Sure, 1 read operation is allowed to overlap 1 write operations, but you still cannot access the shared objects (like m_socket
) without proper synchronization
you have strands but seem to erronously not post to them where relevant (e.g. post(m_ctx, bind_executor(m_send_strand, ....))
is conflicting)
you have a laborious buffer type that /seemingly/ aims to avoid allocation, but you wrap it in a unique_ptr anyways ¯\(ツ)/¯
set_read_data_handler
doesn't need to be a template. Since you're erasing to std::function
anyways, there's zero benefit over just using:
void set_read_data_handler(ExternalReadHandler handler) {
external_rcv_handler = std::move(handler);
}
You have repeated magic constants (e.g. 65000
)
You seem to be missing a socket bind()
call
In short I'd replace the buffer with something sensible:
using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);
Since you seem to expect text protocol, chances are your average message is (much) smaller, so I reckon it may be much faster to just use
std::string
or evenboost::container::small_vector<...>
.
Not really required but to allow for elegant, asio-standard use:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
See a much simplified version Live On Coliru
#include <boost/asio.hpp>
#include <boost/container/static_vector.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
#include <list>
#include <thread>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
using StaticBuffer = boost::container::static_vector<uint8_t, 65000>;
static_assert(StaticBuffer::static_capacity == 65000);
// not really required but to allow for elegant, asio-standard use:
using asio::buffer;
static inline auto buffer(StaticBuffer const& b) { return boost::asio::buffer(b.data(), b.size()); }
static inline auto buffer(StaticBuffer& b) { return boost::asio::buffer(b.data(), b.size()); }
using ExternalReadHandler = std::function<void(StaticBuffer&&)>;
class UDPSession {
private:
using error_code = boost::system::error_code;
asio::any_io_executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
udp::socket m_socket{make_strand(m_ex)};
std::deque<StaticBuffer> m_dq_send;
StaticBuffer m_rcv_data;
ExternalReadHandler external_rcv_handler;
public:
UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(StaticBuffer data) {
asio::post(m_socket.get_executor(), [this, d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
m_rcv_data.assign(m_rcv_data.static_capacity, '\0');
m_socket.async_receive_from(
buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, this, std::placeholders::_1, std::placeholders::_2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
m_rcv_data.resize(bytes_transferred);
asio::post(m_ex, [this, data = std::move(m_rcv_data)]() mutable {
external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to(buffer(m_dq_send.front()), m_endpoint,
[this](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(StaticBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
asio::io_context ctx;
auto work_guard = asio::make_work_guard(ctx);
trace("Main thread");
std::list<std::thread> threads;
for (int i = 0; i < 3; ++i)
threads.emplace_back([&]() {
trace("START");
ctx.run();
trace("END");
});
UDPSession client(ctx.get_executor(), "127.0.0.1", 11223);
client.set_read_data_handler(handler_read);
client.start();
client.send({'A', 'B', 'C', '\n'});
work_guard.reset();
for (auto& t : threads)
t.join();
}
The live demo on Coliru "eats" words from main.cpp
. Here's a local dictionary demo:
shared_from_this
You might have noticed I changed to any_io_executor
instead of io_context&
. That way you can easily switch to asio::thread_pool
instead of doing it manually (poorly¹).
Let's also re-instate shared_from_this
, but correctly this time.
For simplicity I've used a static buffer ONLY for the receive buffer (because that's how datagram protocols roll), and just used vector
(or small_vector
) for the DataBuffer
.
#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;
class UDPSession : public std::enable_shared_from_this<UDPSession> {
private:
using error_code = boost::system::error_code;
asio::any_io_executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
udp::socket m_socket{make_strand(m_ex)};
std::deque<DataBuffer> m_dq_send;
std::array<uint8_t, 65000> m_rcv_data;
ExternalReadHandler external_rcv_handler;
public:
UDPSession(asio::any_io_executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(DataBuffer data) {
asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
using namespace std::placeholders;
m_socket.async_receive_from( //
asio::buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
asio::post(
m_ex,
[this, self=shared_from_this(),
data = DataBuffer(m_rcv_data.data(), m_rcv_data.data() + bytes_transferred)]() mutable {
external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to( //
asio::buffer(m_dq_send.front()), m_endpoint,
[this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(DataBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
trace("Main thread");
asio::thread_pool ctx(4);
{
auto client = std::make_shared<UDPSession>(ctx.get_executor(), "127.0.0.1", 11223);
client->set_read_data_handler(handler_read);
client->start();
client->send({'A', 'B', 'C', '\n'});
} // client stays alive through shared ownership
ctx.join();
}
As icing on the cake, you can template the entire thing on the concrete executor type and avoid type-erasing the executor type:
template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
using socket_t = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
See it Live On Coliru
#include <boost/asio.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
namespace { // user-friendly logging
static std::mutex s_console_mx;
static std::atomic_int t_id_gen = 0;
thread_local int t_id = ++t_id_gen;
template <typename... T> static inline void trace(T const&... args) {
std::lock_guard lk(s_console_mx);
((std::cout << "th:" << t_id << " ") << ... << args) << std::endl;
}
} // namespace
namespace asio = boost::asio;
using asio::ip::udp;
//using DataBuffer = boost::container::small_vector<uint8_t, 320>; // e.g. median length is 320
using DataBuffer = std::vector<uint8_t>;
using ExternalReadHandler = std::function<void(DataBuffer&&)>;
template <typename Executor>
class UDPSession : public std::enable_shared_from_this<UDPSession<Executor> > {
using socket_t = asio::basic_datagram_socket<udp, asio::strand<Executor>>;
using error_code = boost::system::error_code;
Executor m_ex;
std::string m_addr;
uint16_t m_port;
udp::endpoint m_endpoint{asio::ip::address::from_string(m_addr), m_port};
socket_t m_socket{make_strand(m_ex)};
std::deque<DataBuffer> m_dq_send;
std::array<uint8_t, 65000> m_rcv_data;
ExternalReadHandler external_rcv_handler;
using std::enable_shared_from_this<UDPSession>::shared_from_this;
public:
UDPSession(Executor ex, std::string const& addr, uint16_t port)
: m_ex(ex)
, m_addr(addr)
, m_port(port) {}
std::string const& get_host() const { return m_addr; }
uint16_t get_port() { return m_port; }
void set_read_data_handler(ExternalReadHandler handler) { external_rcv_handler = std::move(handler); }
void start() {
m_socket.open(udp::v4());
m_socket.bind(m_endpoint);
do_read();
}
void send(DataBuffer data) {
asio::post(m_socket.get_executor(), [this, self = shared_from_this(), d = std::move(data)]() mutable {
m_dq_send.emplace_back(std::move(d));
if (m_dq_send.size() == 1)
send_loop();
});
}
private:
void do_read() {
using namespace std::placeholders;
m_socket.async_receive_from( //
asio::buffer(m_rcv_data), m_endpoint,
std::bind(&UDPSession::on_read, shared_from_this(), _1, _2));
}
void on_read(error_code er, size_t bytes_transferred) {
if (!er) {
auto f = m_rcv_data.data(), l = f + bytes_transferred;
asio::post(m_ex, [self = shared_from_this(), data = DataBuffer(f, l)]() mutable {
self->external_rcv_handler(std::move(data));
});
do_read();
}
}
void send_loop() {
if (m_dq_send.empty())
return;
m_socket.async_send_to( //
asio::buffer(m_dq_send.front()), m_endpoint,
[this, self = shared_from_this()](error_code er, size_t /*bytes_transferred*/) {
if (!er) {
m_dq_send.pop_front();
send_loop();
} // else { /* post to loggger */ }
});
}
};
void handler_read(DataBuffer&& pdata) {
if (!pdata.empty()) {
std::string msg(pdata.begin(), pdata.end() - 1); // omit '\n'
trace("thread handler: ", pdata.size(), " as text: ", quoted(msg));
}
}
int main() {
trace("Main thread");
using Ex = asio::thread_pool::executor_type;
asio::thread_pool ctx(4);
{
auto client = std::make_shared<UDPSession<Ex> >(ctx.get_executor(), "127.0.0.1", 11223);
client->set_read_data_handler(handler_read);
client->start();
client->send({'A', 'B', 'C', '\n'});
} // client stays alive through shared ownership
ctx.join();
}
Another local demo:
¹ you needed at least exception handling in the runner threads: Should the exception thrown by boost::asio::io_service::run() be caught?