I am trying to create an async server using asio but when the acceptor calls async_accept function, I get this error The I/o operation has been aborted because of either a thread exit or an application requests, which doesn't let the process continue. I have tried to change the port but it doesn't work. When I try to make a sync server so it works. Also I use asio as a separate lib from boost.
Here's the function where I get the error:
void startAsyncAccept()
{
acceptor.async_accept(socket, [&](const asio::error_code& error)
{
if(!error)
{
std::cout << "Client is connected" << "\n";
startAsyncRead();
}
else
{
std::cerr << "It is: " << error.message() << '\n';
return 0;
}
startAsyncAccept();
}
);
}
Ans here's my async server:
class Server
{
public:
Server(asio::io_context& io_context) : acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::address::from_string("127.0.0.1"), 2000)), socket(io_context)
{
startAsyncAccept();
}
~Server() {}
private:
// Members
asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::socket socket;
asio::streambuf buffer;
std::vector<asio::ip::tcp::socket> sockets;
// Functions
void startAsyncAccept()
{
acceptor.async_accept(socket, [&](const asio::error_code& error)
{
if(!error)
{
std::cout << "Client is connected" << "\n";
startAsyncRead();
}
else
{
std::cerr << "It is: " << error.message() << '\n';
return 0;
}
startAsyncAccept();
}
);
}
void startAsyncRead()
{
asio::async_read_until(socket, buffer, '\n', [&](const asio::error_code& error, size_t length)
{
if(!error)
{
std::string message(asio::buffers_begin(buffer.data()), asio::buffers_begin(buffer.data()) + length);
std::cout << "Received from client: " << message;
for(auto& clients : sockets)
{
asio::async_write(clients, asio::buffer(message), [&](const asio::error_code& error, size_t length)
{
if(error)
{
std::cerr << "Failed to write to client: " << error.message() << "\n";
}
}
);
}
buffer.consume(length);
startAsyncRead();
}
else
{
std::cout << "Client disconnected." << std::endl;
removeClient();
}
}
);
}
void startAsyncWrite()
{
asio::async_write(socket, asio::buffer("Connected to chat Server. \n"), [&](const asio::error_code& error, size_t length)
{
if(error)
{
std::cerr << "Failed to write to the client: " << error.message() << "\n";
removeClient();
}
}
);
}
void removeClient()
{
auto it = std::find_if(sockets.begin(), sockets.end(),
[&](const auto& client_socket)
{
return &client_socket == &socket;
});
if (it != sockets.end())
{
sockets.erase(it);
}
}
};
First of all, there's Undefined Behaviour because the lambda misses a return on some paths. This seems to be a copy-paste error of some sort, so I'll just remove the return 0;
that was meaningless.
The real problem is that you use the same socket
on each accept. That means that you reset the socket that you started the first asyncRead
on as soon as you call startAsyncAccept
again. That causes the asio::error::operation_aborted
error code.
Simply don't use the same socket. Move it into a "session" object e.g. Or, as you seem to have wanted to do, put them in your collection of sockets
.
Ironically, it is hard to know how you wanted to use sockets
since it is literally unused in your current code (except for the loops that will never iterate, because sockets
is always empty).
What's worse, it's only good that didn't work, because otherwise you get more UB, because std::vector
will reallocate to grow capacity, meaning that any references to existing sockets are invalidated.
The usual way to deal with this is not to have a collection of sockets, but rather collection of (weak) shared pointers to sessions objects. For simplicity, let me fix the reference stability here by changing std::vector
to std::list
and removing the problematic socket
variable.
tcp::socket socket;
asio::streambuf buffer;
std::vector<tcp::socket> sockets;
Becomes
asio::streambuf buffer;
std::list<tcp::socket> sockets;
There's big problems as well with async_write
as
message
as the buffer. This is UB again, because its lifetime ends before the write operations completeasync_read_until
completes (and you might have an arbitrary number of connected clients), you cannot be sure no writes overlap. The only solution there is to have an "outbox" queue - typically per client, for independent operation of each connection.As a sidenote, it's unclear what startAsyncWrite
was supposed to do (it was unused, and doesn't help to solve the problem just described). Same with removeClient
which would never do anything even if you had filled sockets
, it would just cause UB due to the invalidation of socket references. I dropped those two for now.
Finally, you use a streambuf, where it seems you might just use a dynamic string buffer directly. In any case, put the consume
closer to the "consume" so that you don't risk missing it under some error condition.
This simplified version removes the problems that you ask about as well as the lifetime of the message
:
class Server {
public:
Server(asio::io_context& io_context) //
: acceptor(io_context, {{}, 2000}) {
startAsyncAccept();
}
private:
// Members
tcp::acceptor acceptor;
asio::streambuf buffer;
std::list<tcp::socket> sockets;
void startAsyncAccept() {
acceptor.async_accept([&](error_code error, tcp::socket accepted) {
if (!error) {
auto& client = sockets.emplace_back(std::move(accepted));
std::cout << "Client is connected\n";
startAsyncRead(client);
} else {
std::cerr << "It is: " << error.message() << "\n";
}
startAsyncAccept();
});
}
void startAsyncRead(tcp::socket& client) {
asio::async_read_until(client, buffer, '\n', [&](error_code error, size_t n) {
if (!error) {
auto f = asio::buffers_begin(buffer.data()), l = f + static_cast<ptrdiff_t>(n);
auto message = std::make_shared<std::string>(f, l);
buffer.consume(n);
std::cout << "Received from client: " << message;
for (auto& peer : sockets) {
// SEHE: TODO FIXME serializing writes per peer!
asio::async_write(
peer, asio::buffer(*message), [&, message](error_code error, size_t /*length*/) {
if (error) {
std::cerr << "Failed to write to client: " << error.message() << "\n";
}
});
}
startAsyncRead(client);
} else {
std::cout << "Client disconnected." << std::endl;
// SEHE: TODO ERASE
}
});
}
};
Some more work to fix the other problems:
[after breakfast]
UPDATE You absolutely need a separate Session
, because I missed earlier (before breakfast) that you were also falsely using the same streambuf
instance for all connections... A Session
type makes for a logical place to group the incoming buffer, outgoing buffer queue and socket.
We use enable_shared_from_this
together with shared_from_this()
to get automatic lifetime management when a connection gets bad/closed.
Note that it all comes down to correct Separation Of Concerns: The Server manages the listening, the ChatRoom manages connected clients, the Session manages a single client.
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <list>
#ifndef STANDALONE_ASIO
namespace asio = boost::asio;
using boost::system::error_code;
#else
using asio::error_code;
#endif
using asio::ip::tcp;
struct Session;
using SessionPtr = std::shared_ptr<Session>;
using Handle = std::weak_ptr<Session>;
struct ChatRoom {
std::list<Handle> clients_;
void garbage_collect();
void broadcast(std::string_view message);
// just for example:
void direct_message(std::string recipient, std::string_view message);
};
struct Session : std::enable_shared_from_this<Session> {
Session(tcp::socket s, ChatRoom& room) : socket_(std::move(s)), room_(room) {}
void start() { readLoop(); }
void send(std::string_view message){
outbox_.emplace_back(message);
if (outbox_.size() == 1)
writeLoop();
}
private:
tcp::socket socket_;
ChatRoom& room_;
asio::streambuf incoming_;
std::deque<std::string> outbox_;
void readLoop() {
asio::async_read_until(
socket_, incoming_, '\n', [&, self = shared_from_this()](error_code ec, size_t n) {
if (ec) {
std::cout << "Client disconnect (" << ec.message() << ")" << std::endl;
return;
}
auto f = asio::buffers_begin(incoming_.data()), l = f + static_cast<ptrdiff_t>(n);
room_.broadcast(std::string(f, l));
incoming_.consume(n);
readLoop();
});
}
void writeLoop() {
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()](error_code ec, size_t /*length*/) {
outbox_.pop_front();
if (ec)
std::cerr << "Failed to write to client: " << ec.message() << "\n";
else
writeLoop();
});
}
};
class Server {
public:
Server(asio::any_io_executor ex) : acceptor_(ex, {{}, 2000}) { acceptLoop(); }
private:
tcp::acceptor acceptor_;
ChatRoom room_;
void acceptLoop() {
room_.garbage_collect(); // optionally prevent dead connections piling up
acceptor_.async_accept([&](error_code ec, tcp::socket accepted) {
if (!ec) {
auto sess = std::make_shared<Session>(std::move(accepted), room_);
room_.clients_.push_back(sess);
std::cout << "Client is connected\n";
sess->start();
} else {
std::cerr << "Accept error: " << ec.message() << "\n";
}
acceptLoop();
});
}
};
void ChatRoom::garbage_collect() {
clients_.remove_if(std::mem_fn(&Handle::expired));
}
void ChatRoom::broadcast(std::string_view message) {
for (auto& handle : clients_) {
if (auto peer = handle.lock()) {
peer->send(message);
}
}
}
using namespace std::chrono_literals;
int main() {
asio::io_context io;
Server chat(io.get_executor());
io.run_for(30s);
}
Testing with a number of clients:
for a in {1..10}; do (
sleep 1.$RANDOM
echo "hello from $a"
sleep 1.$RANDOM
echo "bye from $a"
) | nc 127.0.0.1 2000 -w3 | (while read line; do echo "Client $a received '$line'"; done) & done
Prints output like:
Also live on Coliru :)