I'm trying to send messages via async_write, but they are only sent after I shutdown the server ( ctrl-c)
For example: as client I send "test" and "test2", and only after closing the server client recieves "testtest2"
I'm making a chat that accepts a message from a user (successfully) and has to broadcast it to everyone
message sending code
void Server::writeHandler(int id, boost::system::error_code error){
if (!error){
std::cout << "[DEBUG] message broadcasted ";
} else {
close_connection(id);
}
}
void Server::broadcast(std::string msg, boost::system::error_code error){
for (auto& user : m_users){ // for every user in unordered map of users
asio::async_write(user.second->socket, asio::buffer(msg, msg.size()),
std::bind(&Server::writeHandler, this, user.first, std::placeholders::_1));
}
}
broadcast calls in onMessage
void Server::onMessage(int id, boost::system::error_code error){
if (!error){
broadcast(m_read_msg, error); // char m_read_msg[PACK_SIZE] // PACK_SIZE = 512
asio::async_read(m_users[id].get()->socket, asio::buffer(m_read_msg, PACK_SIZE), // PACK_SIZE = 512
std::bind(&Server::onMessage, this, id, std::placeholders::_1));
} else {
close_connection(id);
}
}
server run function:
void Server::run(int port){
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
std::cout << "[DEBUG] binded on " << port << std::endl;
// acceptor initialization
m_acceptor = std::make_shared<asio::ip::tcp::acceptor>(m_io, endpoint);
// start to listen for connections
listen();
// run the io_service in thread
io_thread = std::thread( [&]{ m_io.run(); } ); // m_io = io_serivce
while (true){
}
}
void Server::listen(){
std::shared_ptr<User> pUser(new User(m_io));
m_acceptor->async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
}
Infinite loops are UB in C++.
Your std::string msg
is a local (parameter) and passing it to an async operation (async_write
) is also UB.
To start a thread, immediately followed by an infinite loop is useless, unless the objective is to waste a lot of CPU power. So, replace it with just
m_io.run();
Next up, the code is far from self contained and lacks all manner of error handling. Here's my imaged minimal completion, fixing the UBs already mentioned:
#include <boost/asio.hpp>
#include <iostream>
#include <map>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::placeholders;
struct User {
User(asio::io_service& io) : socket(io) {}
tcp::socket socket;
};
struct Server {
void run(uint16_t port) {
m_acceptor = {m_io, {{}, port}};
std::cout << "[DEBUG] bound on " << port << std::endl;
listen();
m_io.run();
}
void close_connection(int) {}
std::map<int, std::shared_ptr<User>> m_users;
void writeHandler(int id, error_code error, std::shared_ptr<void>) {
if (!error) {
std::cout << "[DEBUG] message broadcasted ";
} else {
close_connection(id);
}
}
void broadcast(std::string msg, boost::system::error_code /*error*/) {
for (auto& [id, usr] : m_users) { // for every user in unordered map of users
asio::async_write(usr->socket, asio::buffer(*shared_msg),
bind(&Server::writeHandler, this, id, _1, shared_msg));
}
}
void listen() {
std::shared_ptr<User> pUser(new User(m_io));
m_acceptor.async_accept(pUser->socket, std::bind(&Server::onAccept, this, pUser));
}
void onAccept(std::shared_ptr<User> /*user*/) { listen(); }
asio::io_service m_io;
tcp::acceptor m_acceptor{m_io};
};
int main() {
Server s;
s.run(8989);
}
Note that nothing ever broadcasts in the first place, so I'm not sure what the problem is.
What I can see is that
io_service
is deprecated (use io_context
)io_service&
around is anti-pattern (use executors)Server
and User
. The handlers for each user session need to use the state from User
and are much more naturally member of User
. In fact, that's the only way to do it correctly because with a out-of-session broadcast you MUST guarantee the correct sequencing of writes, which means you MUST have a per-session (user) queueenable_shared_from_this
m_users
store a weak_ptr
so you can observe when connections terminateHere's what it would look like with those addressed:
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <map>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::placeholders;
using Message = std::string;
struct UserSession : std::enable_shared_from_this<UserSession> {
UserSession(tcp::socket s, int /*id*/) : socket_(std::move(s)) {
std::cout << "New UserSession from " << ep_ << std::endl;
}
~UserSession() {
std::cout << "Closed UserSession from " << ep_ << std::endl;
}
void start() {
read_loop();
}
void send(Message msg) {
outbox_.push_back(std::move(msg));
if (outbox_.size()==1)
write_loop();
}
private:
tcp::socket socket_;
tcp::endpoint ep_; // cache for use after socket_ becomes invalid
Message incoming_;
std::deque<Message> outbox_;
void read_loop() {
async_read(socket_, asio::dynamic_buffer(incoming_),
[this, self = shared_from_this()](error_code ec, size_t) {
if (!ec) {
// TODO
read_loop();
}
});
}
void write_loop() {
if (outbox_.empty())
return;
async_write(socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()](error_code ec, size_t) {
if (!ec) {
outbox_.pop_front();
write_loop();
}
});
}
};
using Handle = std::weak_ptr<UserSession>;
struct Server {
explicit Server(asio::any_io_executor ex, uint16_t port) : m_acceptor{ex, {{}, port}} {
std::cout << "[DEBUG] bound on " << m_acceptor.local_endpoint() << std::endl;
}
void listen() { accept_loop(); }
void broadcast(Message msg) {
auto shared_msg = std::make_shared<Message>(std::move(msg));
for (auto& [id, handle] : m_users) // for every user in unordered map of users
if (auto usr = handle.lock())
usr->send(msg);
}
private:
int next_user_id = 0;
std::map<int, Handle> m_users;
void accept_loop() {
m_acceptor.async_accept([this](error_code ec, tcp::socket s) {
if (!ec) {
auto id = next_user_id++;
auto usr = std::make_shared<UserSession>(std::move(s), id);
m_users.emplace(id, usr);
usr->start();
accept_loop();
}
});
}
tcp::acceptor m_acceptor;
};
int main() {
asio::io_context ioc;
Server s(ioc.get_executor(), 8989);
s.listen();
ioc.run();
}
Local demo:
Of course, still nothing ever calls broadcast
. Assuming you might actually want something like a chat-server:
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <map>
namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::placeholders;
using Id = int;
struct Message {
Id sender;
std::string text;
};
struct IServer {
virtual ~IServer() = default;
virtual void broadcast(Message msg) = 0;
};
struct UserSession : std::enable_shared_from_this<UserSession> {
UserSession(tcp::socket s, Id id, IServer& srv) : socket_(std::move(s)), id_(id), svr_(srv) {
std::cout << "New UserSession from " << ep_ << std::endl;
}
~UserSession() {
std::cout << "Closed UserSession from " << ep_ << std::endl;
}
void start() {
read_loop();
}
void send(Message msg) {
outbox_.push_back(std::move(msg));
if (outbox_.size()==1)
write_loop();
}
Id get_id() const { return id_; }
private:
tcp::socket socket_;
Id id_;
IServer& svr_;
tcp::endpoint ep_; // cache for use after socket_ becomes invalid
Message incoming_;
std::deque<Message> outbox_;
void read_loop() {
incoming_.sender = id_;
async_read_until( //
socket_, asio::dynamic_buffer(incoming_.text), "\n",
[this, self = shared_from_this()](error_code ec, size_t n) {
std::cout << ep_ << ": Read " << ec.message() << "\n";
if (!ec) {
svr_.broadcast(std::move(incoming_));
incoming_.text.erase(0, n);
read_loop();
}
});
}
void write_loop() {
if (outbox_.empty())
return;
async_write(socket_, asio::buffer(outbox_.front().text),
[this, self = shared_from_this()](error_code ec, size_t) {
if (!ec) {
outbox_.pop_front();
write_loop();
}
});
}
};
using Handle = std::weak_ptr<UserSession>;
struct Server : IServer {
explicit Server(asio::any_io_executor ex, uint16_t port) : m_acceptor{ex, {{}, port}} {
std::cout << "[DEBUG] bound on " << m_acceptor.local_endpoint() << std::endl;
}
void listen() { accept_loop(); }
virtual void broadcast(Message msg) override {
m_history.push_back(msg);
while (m_history.size() > MAX_HIST)
m_history.pop_front();
for (auto& [id, handle] : m_users) // for every user in unordered map of users
if (auto usr = handle.lock())
if (usr->get_id() != msg.sender)
usr->send(msg);
}
private:
Id next_user_id = 0;
std::map<Id, Handle> m_users;
static size_t constexpr MAX_HIST = 100;
std::deque<Message> m_history;
void accept_loop() {
m_acceptor.async_accept([this](error_code ec, tcp::socket s) {
if (!ec) {
erase_if(m_users, [](auto& pair) { return pair.second.expired(); }); // garbage collect
auto id = next_user_id++;
auto usr = std::make_shared<UserSession>(std::move(s), id, *this);
m_users.emplace(id, usr);
for (auto& msg : m_history)
usr->send(msg);
broadcast({-1, "** Now " + std::to_string(m_users.size()) + " users online **\n"});
usr->start();
accept_loop();
}
});
}
tcp::acceptor m_acceptor;
};
int main() {
asio::io_context ioc;
Server s(ioc.get_executor(), 8989);
s.listen();
ioc.run();
}
With a local demo again: