I'm working on a RS485 communication class and I'm trying to make a function that reads until a certain char is on the line, but with a time out. The problem is that my system timer immediately returns, doesn't matter which time out I enter. I tried changing the timer to be a member variable of the class, so it doesn't go out of scope, but that wasn't the problem. I tried different implementations of timers (deadline_timer mostly) but that didn't help. If I remove the timer from the code, then the read succeeds, but when I add it, even if I give it a timeout of 10 seconds (which should be waay more than enough), it will respond with an immediate timeout.
I tried making a simple version of the class here, but I guess that the options mostly depend on the type of machine you're talking to:
class RS485CommunicationLayer final {
public:
RS485CommunicationLayer(
const std::string& path,
/* options */
): io(), port(io), timer(port.get_io_service()) {
open(/* options */);
};
std::size_t write(const char* const buffer, const size_t size) {
/*impl*/
}
// THIS FUNCTION --v
void readUntil(std::vector<char>& buffer, char delim,std::chrono::microseconds timeout) {
boost::optional<boost::system::error_code> timer_result;
boost::optional<boost::system::error_code> read_result;
port.get_io_service().reset();
timer.expires_from_now(timeout);
boost::asio::async_read_until(port, asio::dynamic_buffer(buffer), delim, [&read_result] (const boost::system::error_code& error, size_t) { read_result.reset(error); });
timer.async_wait([&timer_result] (const boost::system::error_code& error) { timer_result.reset(error); });
while (port.get_io_service().run_one())
{
if (read_result)
timer.cancel();
else if (timer_result) {
port.cancel();
}
}
if (read_result)
throw boost::system::system_error(*read_result);
};
private:
asio::io_context io;
asio::serial_port port;
boost::asio::system_timer timer;
void open(/*args*/) {
port.open(path);
/*set options*/
}
};
Edit:
I also tried the following implementation after finding out that run_for()
exists. But then the buffer stays empty weirdly enough.
void RS485CommunicationLayer::readUntil(std::vector<char>& buffer, char delim, std::chrono::microseconds timeout) {
boost::optional<boost::system::error_code> read_result;
boost::asio::async_read_until(port, asio::dynamic_buffer(buffer), delim, [&read_result] (const boost::system::error_code& error, size_t) { read_result.reset(error); });
port.get_io_service().run_for(timeout);
if (read_result)
throw boost::system::system_error(*read_result);
}
First off, get_io_service()
indicates a Very Old(TM) boost version. Also, it just returns io
.
Secondly, why so complicated? I don't even really have the energy to see whether there is a subtle problem with the run_one()
loop (it looks fine at a glance).
I'd simplify:
size_t readUntil(std::vector<char>& buffer, char delim,
std::chrono::microseconds timeout) {
error_code read_result;
size_t msglen = 0;
io.reset();
asio::system_timer timer(io, timeout);
asio::async_read_until(port, asio::dynamic_buffer(buffer), delim,
[&](error_code ec, size_t n) {
timer.cancel();
read_result = ec;
msglen = n;
});
timer.async_wait([&](error_code ec) { if (!ec) port.cancel(); });
io.run();
if (read_result)
boost::throw_with_location(boost::system::system_error(read_result),
read_result.location());
return msglen;
}
You can just cancel the complementary IO object from the respective completion handlers.
The timer is per-op and local to the readUntil
, so it doesn't have to be a member.
Let's also throw in the write side, which is all of:
size_t write(char const* const data, const size_t size) {
return asio::write(port, asio::buffer(data, size));
}
And I can demo it working:
#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using boost::system::error_code;
using namespace std::chrono_literals;
class RS485CommunicationLayer final {
public:
RS485CommunicationLayer(std::string const& path) : io(), port(io) { open(path); };
size_t write(char const* const data, const size_t size) {
return asio::write(port, asio::buffer(data, size));
}
size_t readUntil(std::vector<char>& buffer, char delim,
std::chrono::microseconds timeout) {
error_code read_result;
size_t msglen = 0;
io.reset();
asio::system_timer timer(io, timeout);
asio::async_read_until(port, asio::dynamic_buffer(buffer), delim,
[&](error_code ec, size_t n) {
timer.cancel();
read_result = ec;
msglen = n;
});
timer.async_wait([&](error_code ec) { if (!ec) port.cancel(); });
io.run();
if (read_result)
boost::throw_with_location(boost::system::system_error(read_result),
read_result.location());
return msglen;
}
private:
asio::io_context io;
asio::serial_port port;
void open(std::string path) {
port.open(path);
/*set options*/
}
void close();
};
int main(int argc, char** argv) {
RS485CommunicationLayer comm(argc > 1 ? argv[1] : "");
comm.write("Hello world\n", 12);
for (std::vector<char> response_buffer;
auto len = comm.readUntil(response_buffer, '\n', 100ms);) //
{
std::cout << "Received " << response_buffer.size() << " bytes, next "
<< quoted(std::string_view(response_buffer.data(), len - 1))
<< std::endl;
// consume
response_buffer.erase(begin(response_buffer), begin(response_buffer) + len);
}
}
Demo locally with a socat
PTS tunnel:
socat -d -d pty,raw,echo=0 pty,raw,echo=0
And throwing dictionaries at the other end:
while true; do cat /etc/dictionaries-common/words ; done | pv > /dev/pts/10