There is a simple example with making use of boost::asio::io_context
https://github.com/unegare/boost-ex/blob/500e46f4d3b41e2abe48e2deccfab39d44ae94e0/main.cpp
#include <boost/asio.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <thread>
#include <vector>
#include <memory>
#include <mutex>
#include <chrono>
#include <iostream>
#include <exception>
std::mutex m_stdout;
class MyWorker {
std::shared_ptr<boost::asio::io_context> io_context;
std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard;
public:
MyWorker(std::shared_ptr<boost::asio::io_context> &_io_context, std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> &_work_guard):
io_context(_io_context), work_guard(_work_guard) {}
MyWorker(const MyWorker &mw): io_context(mw.io_context), work_guard(mw.work_guard) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] MyWorker copy constructor" << std::endl;
m_stdout.unlock();
}
MyWorker(MyWorker &&mw): io_context(std::move(mw.io_context)), work_guard(std::move(mw.work_guard)) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] MyWorker move constructor" << std::endl;
m_stdout.unlock();
}
~MyWorker() {}
void operator() () {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] Thread Start" << std::endl;
m_stdout.unlock();
while(true) {
try {
boost::system::error_code ec;
io_context->run(ec);
if (ec) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] MyWorker: received an error: " << ec << std::endl;
m_stdout.unlock();
continue;
}
break;
} catch (std::exception &ex) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] MyWorker: caught an exception: " << ex.what() << std::endl;
m_stdout.unlock();
}
}
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] Thread Finish" << std::endl;
m_stdout.unlock();
}
};
class Client: public std::enable_shared_from_this<Client> {
std::shared_ptr<boost::asio::io_context> io_context;
std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard;
std::shared_ptr<boost::asio::ip::tcp::socket> sock;
std::shared_ptr<std::array<char, 512>> buff;
public:
Client(std::shared_ptr<boost::asio::io_context> &_io_context, std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> &_work_guard, std::shared_ptr<boost::asio::ip::tcp::socket> &_sock):
io_context(_io_context), work_guard(_work_guard), sock(_sock) {
buff = std::make_shared<std::array<char,512>>();
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << " with args" << std::endl;
m_stdout.unlock();
}
Client(const Client &cl): io_context(cl.io_context), work_guard(cl.work_guard), sock(cl.sock), buff(cl.buff) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << " copy" << std::endl;
m_stdout.unlock();
}
Client(Client &&cl): io_context(std::move(cl.io_context)), work_guard(std::move(cl.work_guard)), sock(std::move(cl.sock)), buff(std::move(cl.buff)) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << " move" << std::endl;
m_stdout.unlock();
}
~Client() {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << " buff.use_count: " << buff.use_count() << " | sock.use_count: " << sock.use_count() << " | io_context.use_count: " << io_context.use_count() << std::endl;
m_stdout.unlock();
}
void OnConnect(const boost::system::error_code &ec) {
std::cout << __FUNCTION__ << std::endl;
if (ec) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << ": " << ec << std::endl;
m_stdout.unlock();
} else {
// buff = std::make_shared<std::array<char, 512>>();
char req[] = "GET / HTTP/1.1\r\nHost: unegare.info\r\n\r\n";
memcpy(buff->data(), req, strlen(req));
m_stdout.lock();
std::cout << req << std::endl;
m_stdout.unlock();
sock->async_write_some(boost::asio::buffer(buff->data(), strlen(buff->data())), std::bind(std::mem_fn(&Client::OnSend), this, std::placeholders::_1, std::placeholders::_2));
}
std::cout << __FUNCTION__ << " use_count: " << buff.use_count() << std::endl;
}
void OnSend(const boost::system::error_code &ec, std::size_t bytes_transferred) {
std::cout << __FUNCTION__ << " use_count: " << io_context.use_count() << std::endl;
if (ec) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << ": " << ec << std::endl;
m_stdout.unlock();
} else {
std::cout << __FUNCTION__ << " use_count: " << buff.use_count() << std::endl;
buff->fill(0);
std::cout << __FUNCTION__ << std::endl;
sock->async_read_some(boost::asio::buffer(buff->data(), buff->size()), std::bind(std::mem_fn(&Client::OnRecv), this, std::placeholders::_1, std::placeholders::_2));
}
}
void OnRecv(const boost::system::error_code &ec, std::size_t bytes_transferred) {
std::cout << __FUNCTION__ << std::endl;
if (ec) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] " << __FUNCTION__ << ": " << ec << std::endl;
m_stdout.unlock();
} else {
m_stdout.lock();
std::cout << buff->data() << std::endl;
m_stdout.unlock();
}
}
};
int main () {
std::shared_ptr<boost::asio::io_context> io_context(std::make_shared<boost::asio::io_context>());
std::shared_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard(
std::make_shared<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> (boost::asio::make_work_guard(*io_context))
);
MyWorker mw(io_context, work_guard);
std::vector<std::thread> vth;
vth.reserve(1);
for (int i = 1; i > 0; --i) {
vth.emplace_back(mw);
}
std::shared_ptr<Client> cl = 0;
try {
boost::asio::ip::tcp::resolver resolver(*io_context);
boost::asio::ip::tcp::resolver::query query("unegare.info", "80");
boost::asio::ip::tcp::endpoint ep = *resolver.resolve(query);
m_stdout.lock();
std::cout << "ep: " << ep << std::endl;
m_stdout.unlock();
std::shared_ptr<boost::asio::ip::tcp::socket> sock(std::make_shared<boost::asio::ip::tcp::socket>(*io_context));
std::shared_ptr<Client> cl2(std::make_shared<Client>(io_context, work_guard, sock));
cl = cl2->shared_from_this();
m_stdout.lock();
std::cout << "HERE: use_count = " << cl.use_count() << std::endl;
m_stdout.unlock();
sock->async_connect(ep, std::bind(std::mem_fn(&Client::OnConnect), *cl2->shared_from_this(), std::placeholders::_1));
std::this_thread::sleep_for(std::chrono::duration<double>(1));
m_stdout.lock();
std::cout << "AFTER CALL" << std::endl;
m_stdout.unlock();
// asm volatile ("");
} catch (std::exception &ex) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] Main Thread: caught an exception: " << ex.what() << std::endl;
m_stdout.unlock();
}
try {
char t;
std::cin >> t;
work_guard->reset();
// std::this_thread::sleep_for(std::chrono::duration<double>(1));
// std::cout << "Running" << std::endl;
// io_context->run();
} catch (std::exception &ex) {
m_stdout.lock();
std::cout << "[" << std::this_thread::get_id() << "] Main Thread: caught an exception: " << ex.what() << std::endl;
m_stdout.unlock();
}
std::for_each(vth.begin(), vth.end(), std::mem_fn(&std::thread::join));
return 0;
}
stdout:
[140487203505984] MyWorker copy constructor
[140487203505984] MyWorker move constructor
[140487185372928] Thread Start
ep: 95.165.130.37:80
[140487203505984] Client with args
HERE: use_count = 2
[140487203505984] Client copy
[140487203505984] Client move
[140487203505984] ~Client buff.use_count: 0 | sock.use_count: 0 | io_context.use_count: 0
[140487185372928] Client move
[140487185372928] ~Client buff.use_count: 0 | sock.use_count: 0 | io_context.use_count: 0
OnConnect
GET / HTTP/1.1
Host: unegare.info
OnConnect use_count: 2
[140487185372928] ~Client buff.use_count: 2 | sock.use_count: 3 | io_context.use_count: 5
Segmentation Fault (core dumped)
But there is a little problem with the understanding of the segfault caused by the bad reference to the Client
object.
But I do not understand why cl2
becomes destructed after the call of
sock->async_connect(ep, std::bind(std::mem_fn(&Client::OnConnect), *cl2->shared_from_this(), std::placeholders::_1));
at the 162 line.
As well as ... Why was there an invocation of the copy constructor?
as it may be noticed from the stdout
above.
It's good that you try to understand. However, start simple!
this
(which does NOT keep the shared pointer alive)*cl2->shared_from_this()
- oops this binds to a COPY of *cl2
by value¹. That is obviously the reason why that COPY is destructed when you're doneThe invalid reference arose from the combination of 1. and 2.b.
I'd suggest to simplify. A lot!
Prefer std::unique_ptr
for things that do not require shared ownership (shared ownership should be really rare)
Prefer std::lock_guard
instead of manually locking and unlocking (that's not exception safe)
work_guard
doesn't need to be copied, it has a reset()
member already, catch by const-reference, if you're gonna catch, don't need to use error_code
on io_context::run
, check the end-point iterator of resolve
before dereference, use boost::asio::connect
instead so you don't have to check and iterate over different endpoints etc.std::string
over fixed-size buffers if you're doing dynamic allocation anyways, use non-implementation defined chrono durations (for example 1.0s
, not duration<double>(1)
), consider using boost::thread_group
instead of vector<std::thread>
or at least only joining threads that are joinable()
etcasync_connect
part of Client
, since then you can simply bind members the same way as all the rest of the binds (using shared_from_this()
, instead of writing the bug you had)Just as a reference, this is what MyWorker
could be:
class MyWorker {
ba::io_context& io_context;
public:
MyWorker(ba::io_context& _io_context) : io_context(_io_context) {}
//// best is to not mention these at all, because even `default`ing can change what compiler generates
//MyWorker(const MyWorker& mw) = default;
//MyWorker(MyWorker&& mw) = default;
//~MyWorker() = default;
void operator()() {
TRACE("Thread Start");
while (true) {
try {
io_context.run();
break; // normal end
} catch (boost::system::system_error const& se) {
TRACE("MyWorker: received an error: " << se.code().message());
} catch (std::exception const& ex) {
TRACE("MyWorker: caught an exception: " << ex.what());
}
}
TRACE("Thread Finish");
}
};
At this point, you could very well just make it a lambda:
auto mw = [&io_context] {
TRACE("Thread Start");
while (true) {
try {
io_context.run();
break; // normal end
} catch (boost::system::system_error const& se) {
TRACE("MyWorker: received an error: " << se.code().message());
} catch (std::exception const& ex) {
TRACE("MyWorker: caught an exception: " << ex.what());
}
}
TRACE("Thread Finish");
};
Much simpler.
Just see e.g. the simplicity of the new connection code:
void Start() {
tcp::resolver resolver(io_context);
ba::async_connect(sock,
resolver.resolve({"unegare.info", "80"}),
[self=shared_from_this()](error_code ec, tcp::endpoint) { self->OnConnect(ec); });
}
The endpoint is printed when OnConnect
is called.
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iomanip>
#include <iostream>
#include <memory>
#include <mutex>
namespace ba = boost::asio;
using ba::ip::tcp;
using namespace std::literals;
static std::mutex s_stdout;
#define TRACE(expr) { \
std::lock_guard<std::mutex> lk(s_stdout); \
std::cout << "[" << std::this_thread::get_id() << "] " << expr << std::endl; \
}
class Client : public std::enable_shared_from_this<Client> {
ba::io_context& io_context;
tcp::socket sock;
std::string buf;
public:
Client(ba::io_context& _io_context) : io_context(_io_context), sock{io_context}
{ }
void Start() {
tcp::resolver resolver(io_context);
ba::async_connect(sock,
resolver.resolve({"unegare.info", "80"}),
std::bind(std::mem_fn(&Client::OnConnect), shared_from_this(), std::placeholders::_1));
}
void OnConnect(const boost::system::error_code& ec) {
TRACE(__FUNCTION__ << " ep:" << sock.remote_endpoint());
if (ec) {
TRACE(__FUNCTION__ << ": " << ec.message());
} else {
buf = "GET / HTTP/1.1\r\nHost: unegare.info\r\n\r\n";
TRACE(std::quoted(buf));
sock.async_write_some(ba::buffer(buf),
std::bind(std::mem_fn(&Client::OnSend), shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
}
void OnSend(const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (ec) {
TRACE(__FUNCTION__ << ": " << ec.message() << " and bytes_transferred: " << bytes_transferred);
} else {
TRACE(__FUNCTION__);
buf.assign(512, '\0');
sock.async_read_some(ba::buffer(buf), std::bind(std::mem_fn(&Client::OnRecv), shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
}
void OnRecv(const boost::system::error_code& ec, std::size_t bytes_transferred) {
TRACE(__FUNCTION__);
if (ec) {
TRACE(__FUNCTION__ << ": " << ec.message() << " and bytes_transferred: " << bytes_transferred);
} else {
buf.resize(bytes_transferred);
TRACE(std::quoted(buf));
}
}
};
int main() {
ba::io_context io_context;
auto work_guard = make_work_guard(io_context);
boost::thread_group vth;
auto mw = [&io_context] {
TRACE("Thread Start");
while (true) {
try {
io_context.run();
break; // normal end
} catch (boost::system::system_error const& se) {
TRACE("MyWorker: received an error: " << se.code().message());
} catch (std::exception const& ex) {
TRACE("MyWorker: caught an exception: " << ex.what());
}
}
TRACE("Thread Finish");
};
vth.create_thread(mw);
try {
std::make_shared<Client>(io_context)->Start();
char t;
std::cin >> t;
work_guard.reset();
} catch (std::exception const& ex) {
TRACE("Main Thread: caught an exception: " << ex.what());
}
vth.join_all();
}
Prints:
[140095938852608] Thread Start
[140095938852608] OnConnect ep:95.165.130.37:80
[140095938852608] "GET / HTTP/1.1
Host: unegare.info
"
[140095938852608] OnSend
[140095938852608] OnRecv
[140095938852608] "HTTP/1.1 200 OK
Date: Sun, 22 Dec 2019 22:26:56 GMT
Server: Apache/2.4.18 (Ubuntu)
Last-Modified: Sun, 10 Mar 2019 10:17:38 GMT
ETag: \"37-583bac3f3c843\"
Accept-Ranges: bytes
Content-Length: 55
Content-Type: text/html
<html>
<head>
</head>
<body>
It works.
</body>
</html>
"
q
[140095938852608] Thread Finish
std::bind
is obsolete since C++11, consider using lambdas instead. Since Coliru didn't want to co-operate any more, I'll just post the three changed functions in full:
void Start() {
tcp::resolver resolver(io_context);
ba::async_connect(sock,
resolver.resolve({"unegare.info", "80"}),
[self=shared_from_this()](error_code ec, tcp::endpoint) { self->OnConnect(ec); });
}
void OnConnect(const boost::system::error_code& ec) {
TRACE(__FUNCTION__ << " ep:" << sock.remote_endpoint());
if (ec) {
TRACE(__FUNCTION__ << ": " << ec.message());
} else {
buf = "GET / HTTP/1.1\r\nHost: unegare.info\r\n\r\n";
TRACE(std::quoted(buf));
sock.async_write_some(ba::buffer(buf),
[self=shared_from_this()](error_code ec, size_t bytes_transferred) { self->OnSend(ec, bytes_transferred); });
}
}
void OnSend(const boost::system::error_code& ec, std::size_t bytes_transferred) {
if (ec) {
TRACE(__FUNCTION__ << ": " << ec.message() << " and bytes_transferred: " << bytes_transferred);
} else {
TRACE(__FUNCTION__);
buf.assign(512, '\0');
sock.async_read_some(ba::buffer(buf),
[self=shared_from_this()](error_code ec, size_t bytes_transferred) { self->OnRecv(ec, bytes_transferred); });
}
}
¹ Does boost::bind() copy parameters by reference or by value?
² Coliru doesn't allow network access