Search code examples
c++serial-portthread-safetyboost-asio

How to close/end a blocked boost::asio::read from serial port operation in C++ so std::thread can join?


Since it is a synchronous read, it is a blocked operation and t1 never joins the main thread.

Main Questions:

  1. How to convert this code to use a simple deadline timer?
  2. How to use async in as simple way as possible?
  3. Is it possible to simulate an input to the serial port to unblock the read, in codeblock m2? If yes, how?

// #include headers

bool flag = true; 

void f1()
{
   while(flag)
   {
      // BLOCKING READ OPERATION
      boost::asio::read( serial_port, boost::asio::buffer( byte ,1 )) ;
      // codeblock f1 -> do stuff with byte
   }
}

void main()
{
    std::thread t1 (f1); 
    
    // codeblock m1 -> do stuff

    flag = false ; 

    // codeblock m2 -> do more stuff
   
    t1.join(); 

    // codeblock m3 -> do eve more stuff

}


I've looked into different resources that require using async read, but it's not super straightforward. So I'm making this to discuss exactly the strategies to tackle this in the most straightforward and minimal code possible.


Solution

  • The questions are in the wrong order :)

    I'll just start with simplest, and in the end we'll have addressed the options you describe.

    1. Indeed, blocking cannot be canceled full stop. The code fixed up to be self-contained and without global variables and data-races looks like:

      #include <boost/asio.hpp>
      #include <iomanip>
      #include <iostream>
      namespace asio = boost::asio;
      using std::this_thread::sleep_for;
      using namespace std::chrono_literals;
      
      struct Program {
          std::atomic_bool     flag{true};
          std::array<char, 10> buf;
      
          asio::serial_port serial_port;
      
          Program(std::string device) : serial_port{asio::system_executor{}, device} {}
      
          void read_loop() {
              while (flag) {
                  auto n = serial_port.read_some(asio::buffer(buf));
                  f1(n);
              }
          }
      
          void f1(size_t n) {
              std::cout << "f1 do stuff with bytes: " << quoted(std::string_view(buf.data(), n)) << std::endl;
          }
      };
      
      void m1() {
          std::cout << "m1 do stuff" << std::endl;
          sleep_for(6s);
      }
      
      void m2() {
          std::cout << "m2 do more stuff" << std::endl;
          sleep_for(4s);
      }
      
      void m3() { std::cout << "m3 do even more stuff (bye)" << std::endl; }
      
      int main(int argc, char** argv) {
          Program p(argc > 1 ? argv[1] : "/dev/pts/5");
      
          std::thread t1([&p] { p.read_loop(); });
      
          m1();
          p.flag = false;
          m2();
          t1.join(); 
          m3();
      }
      

      Which I can demo locally using socat for the serial port:

      enter image description here

    2. Simplest async:

      using Buffer = std::array<char, 10>;
      
      void read_loop(asio::serial_port& stream, Buffer& buf) {
          stream.async_read_some(asio::buffer(buf), [&](error_code ec, size_t n) {
              std::cout << "f1 do stuff with bytes: " << quoted(std::string_view(buf.data(), n)) << " (" << ec.message() << ")" << std::endl;
              if (!ec.failed())
                  read_loop(stream, buf);
          });
      }
      
      int main(int argc, char** argv) {
          asio::io_context io(1);
      
          asio::serial_port sp{make_strand(io),  argc > 1 ? argv[1] : "/dev/pts/5"};
      
          Buffer buf;
          read_loop(sp, buf);
      
          std::cout << "m1 do stuff" << std::endl;
          io.run_for(6s);
      
          std::cout << "m2 do more stuff" << std::endl;
          sleep_for(4s);
      
          std::cout << "m3 do even more stuff (bye)" << std::endl;
      }
      

      To get "true" cancellation and join, replace sleep_for(4s) with e.g.

      sp.cancel();
      io.run(); // "join"
      

      Or indeed

      sp.cancel();
      io.run_for(4s);
      

      In case you don't trust the cancel to always work

    3. The previous already gives you the simplest form of deadline: just stop the io context.

    4. As an intermediate step, let's encapsulate port and operations into a class again, also putting IO on a thread that is joined, so we can actually do work in main:

      struct Program {
          Program(asio::any_io_executor ex, std::string device) : serial_port{ex, device} { read_loop(); }
      
          void cancel() {
              asio::post(serial_port.get_executor(), [this] { serial_port.cancel(); });
          }
      
        private:
          asio::serial_port serial_port;
          std::array<char, 10> buf;
      
          void read_loop() {
              serial_port.async_read_some(asio::buffer(buf), [this](error_code ec, size_t n) {
                  std::cout << "f1 do stuff with bytes: " << quoted(std::string_view(buf.data(), n)) << "(" << ec.message() << ")" << std::endl;
                  if (!ec.failed())
                      read_loop();
              });
          }
      };
      
      void m1() {
          std::cout << "m1 do stuff" << std::endl;
          sleep_for(6s);
      }
      
      void m2() {
          std::cout << "m2 do more stuff" << std::endl;
          sleep_for(4s);
      }
      
      void m3() { std::cout << "m3 do even more stuff (bye)" << std::endl; }
      
      int main(int argc, char** argv) {
          asio::thread_pool io(1);
          Program p(make_strand(io), argc > 1 ? argv[1] : "/dev/pts/5");
      
          m1();
          p.cancel();
          m2();
          io.join(); 
          m3();
      }
      

      Still working as advertised: enter image description here

    5. Adding an explicit deadline for the specific operation:

      using duration = std::chrono::steady_clock::duration;
      
      struct Program {
          Program(asio::any_io_executor ex, duration dur, std::string device)
              : serial_port{ex, device}
              , deadline{ex, dur} //
          {
              deadline.async_wait([this](error_code ec) {
                  if (!ec) {
                      std::cerr << "Deadline" << std::endl;
                      serial_port.cancel();
                  }
              });
              read_loop();
          }
      
          void cancel() {
              asio::post(serial_port.get_executor(), [this] { serial_port.cancel(); });
          }
      
        private:
          asio::serial_port    serial_port;
          asio::steady_timer   deadline;
          std::array<char, 10> buf;
      
          void read_loop() {
              serial_port.async_read_some(asio::buffer(buf), [this](error_code ec, size_t n) {
                  std::cout << "f1 do stuff with bytes: " << quoted(std::string_view(buf.data(), n)) << "(" << ec.message() << ")" << std::endl;
                  if (!ec.failed())
                      read_loop();
              });
          }
      };
      
      int main(int argc, char** argv) {
          asio::thread_pool io(1);
          Program p(make_strand(io), 4s, argc > 1 ? argv[1] : "/dev/pts/5");
      
          std::cout << "m1 do stuff" << std::endl;
          sleep_for(6s);
          std::cout << "m2 do more stuff" << std::endl;
          sleep_for(4s);
      
          io.join();
          std::cout << "m3 do even more stuff (bye)" << std::endl;
      }
      

      Note how the deadline of 4s now happens independently of the work in main. Note also how you can still cancel() the operation from main instead if you want.

      enter image description here