Search code examples
boostboost-asioboost-beast

How to write content to file asynchronously with boost::beast or boost::asio?


Based on websocket_client_async_ssl.cpp, I modify the function of on_read so that I can save the content into a local file.

class session : public std::enable_shared_from_this<session>
{
    std::ofstream outfile_text;  // outfile_text.open("test.txt", std::ofstream::out);
    const int MAX_LINE_COUNT;    // 10
    int current_line_;
...
}

void on_read_version2( beast::error_code ec, std::size_t)
{
  if(ec)
      return fail(ec, "read");
  else
  {
    ++current_line_;
    const std::string buf_string = beast::buffers_to_string(buffer_.data());
    buffer_.consume(buffer_.size());

    outfile_text.write((char*)buf_string.data(), buf_string.size());
    outfile_text.write("\n", 1);

    if (current_line_ > MAX_LINE_COUNT)
    {
      outfile_text.close();
      return;
    }

    // Re-read a message into our buffer
    ws_.async_read( buffer_, beast::bind_front_handler( &session::on_read, shared_from_this()));
  }
}

void on_read_version3( beast::error_code ec, std::size_t)
{
  if(ec)
      return fail(ec, "read");
  else
  {
    ++current_line_;
    buffer_.consume(buffer_.size());
    
    queue_.push_back(beast::buffers_to_string(buffer_.data()));

    // Are we already writing?
    if (queue_.size() > 1)
      return;
    else
      // async_write to file from queue_

    if (current_line_ > MAX_LINE_COUNT)
    {
      outfile_text.close();
      return;
    }

    // Re-read a message into our buffer
    ws_.async_read( buffer_, beast::bind_front_handler( &session::on_read, shared_from_this()));
  }
}

In version2, I used a blocking method to write the content to file. While in version 3, I list the psedo-code where I like to write this part of logic with async-method.

Question> Does boost::asio or boost::beast support async_write file? If not, what is the best way to write content to file within the on_read function?

Thank you


Solution

  • Assuming POSIX, you could use stream_descriptor:

    net::posix::stream_descriptor  stream_{ex_, ::creat("test.txt", 0755)};
    

    Which models the ASIO AsyncStream concept.

    On Windows, you have similar types (including stream_handle).

    Demo:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/asio/posix/stream_descriptor.hpp>
    #include <boost/beast.hpp>
    
    #include <chrono>
    #include <deque>
    #include <fstream>
    #include <iostream>
    
    namespace net       = boost::asio;
    namespace beast     = boost::beast;
    namespace websocket = beast::websocket;
    using net::ip::tcp;
    using namespace std::chrono_literals;
    
    static inline void fail(beast::error_code ec, std::string_view msg) {
        if (ec) {
            std::cerr << msg << ": " << ec.message() << std::endl;
        }
    }
    
    class session : public std::enable_shared_from_this<session> {
      public:
        session(net::any_io_executor ex) : ex_(ex) {}
    
        void start()
        {
            // assumed on logical strand
            ws_.next_layer().connect({{}, 8989});
            ws_.handshake("localhost", "/");
            do_read();
        }
    
      private:
        const int       MAX_LINE_COUNT = 10;
        int             current_line_  = 0;
        net::streambuf  buffer_;
    
        net::any_io_executor           ex_;
        net::posix::stream_descriptor  stream_{ex_, ::creat("test.txt", 0755)};
        websocket::stream<tcp::socket> ws_{ex_};
        std::deque<std::string>        queue_;
    
        void do_read() {
            // assumed on strand
            ws_.async_read(
                buffer_,
                beast::bind_front_handler(&session::on_read, shared_from_this()));
        }
    
        void on_read(beast::error_code ec, std::size_t)
        {
            if (ec)
                return fail(ec, "read");
    
            ++current_line_; // TODO fixme count `\n` in buffer?
    
            enqueue_output(beast::buffers_to_string(buffer_.data()) + '\n');
    
            do_read();
        }
    
        bool file_full() const { 
            return current_line_ > MAX_LINE_COUNT;
        }
    
        void enqueue_output(std::string msg) {
            if (file_full())
                return;
    
            queue_.push_back(std::move(msg));
            buffer_.consume(buffer_.size());
    
            // Are we already writing?
            if (queue_.size() == 1)
                do_write_loop();
        }
    
        void do_write_loop()
        {
            if (queue_.empty()){
                if (file_full())
                    stream_.close();
    
                return;
            }
    
            // async_write to file from queue_
            net::async_write(
                stream_, net::buffer(queue_.front()),
                [this, self = shared_from_this()](beast::error_code ec, size_t) {
                    if (!ec) {
                        queue_.pop_front();
                        do_write_loop();
                    } // TODO error handling
                });
        }
    };
    
    int main()
    {
        net::io_context io;
    
        std::make_shared<session>(make_strand(io.get_executor())) //
            ->start();
    
        io.run_for(5s);
    }
    

    And a live demo using websocat: https://i.sstatic.net/X18AS.jpg

    enter image description here