Search code examples
c++c++20boost-asioc++-coroutine

Returning a value from a Co-routine started via boost::asio::co_spawn


I have some asio code which is used for transferring a file from a client. This code is working correctly and is as follows:

bool FileTransferBehaviour::ConnectAndTransferFile(const std::string& _sender,
                                               const std::string& _filePath,
                                               size_t _numBytes)
{
  // use a coroutine to accept the connection and receive the file.
  bool result = false;

  asio::co_spawn(m_ioContext,
     [&result, &filePath, _numBytes, this]() mutable
      -> asio::awaitable<void> {
        auto maybeSock = co_await CoAwaitClientConnection();
        if (maybeSock.has_value()) {
            result = ReceiveFile(maybeSock.value(),                                               filePath, _numBytes);
        }
    }, asio::detached);

  m_ioContext.restart();
  m_ioContext.run();

  return result;
}

Currently a result is set in the internal lambda expression. I would prefer to return this result from the coroutine instead of capturing a local variable by reference, i.e.

bool FileTransferBehaviour::ConnectAndTransferFile(const std::string& _sender,
                                               const std::string& _filePath,
                                               size_t _numBytes)
{
  // use a coroutine to accept the connection and receive the file.
  bool result = co_await asio::co_spawn(m_ioContext,
     &filePath, _numBytes, this]() mutable
      -> asio::awaitable<bool> {
        auto maybeSock = co_await CoAwaitClientConnection();
        if (maybeSock.has_value()) {
            co_return ReceiveFile(maybeSock.value(),                                               filePath, _numBytes);
        }
        co_return false;
    }, asio::detached);

  m_ioContext.restart();
  m_ioContext.run();

  return result;
} 

When I change the return type of the lambda the code fails with

FileTransferBehaviour.cpp(101,42): error C2228: left of '.await_ready' must have 
class/struct/union [MyProject,vcxproj]
FileTransferBehaviour.cpp(101,17): error C2440: 'initializing': cannot convert from 
'void' to 'bool' [MyProject.vcxproj]

I am using C++20 with CL version 19.37.32824 (Visual Studio 2022)

With GCC 11.4.0 I get the following error:

FileTransferBehaviour.cpp:101:19: error: unable to find the promise type for this coroutine
  101 |     bool result = co_await asio::co_spawn(m_ioContext,

What am I missing?


Solution

  • I'd suggest running the io service on some other thread. That way you can post a promise and await the future:

    bool FileTransferBehaviour::ConnectAndTransferFile(         //
        std::string const& sender, std::string const& filePath) //
    {
        std::promise<bool> p;
        std::future<bool>  f = p.get_future();
    
        // use a coroutine to accept the connection and receive the file.
        co_spawn(ioc,
            [=, this] -> asio::awaitable<bool> {
                if (auto s = co_await CoAwaitClientConnection(sender)) {
                    bool r = co_await ReceiveFile(*s, filePath);
                    co_return r;
                }
                co_return false;
            },
            [p = std::move(p)](std::exception_ptr e, bool r) mutable {
                if (e)
                    p.set_exception(e);
                else
                    p.set_value(r);
            });
    
        return f.get();
    }
    

    But see much simplified using asio::use_future below

    With a working live example:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/endian/arithmetic.hpp>
    #include <filesystem>
    #include <fstream>
    #include <iomanip>
    #include <iostream>
    namespace asio = boost::asio;
    using NetSize  = boost::endian::big_uint64_t;
    using asio::ip::tcp;
    using std::filesystem::path;
    
    struct FileTransferBehaviour {
        bool ConnectAndTransferFile(std::string const& sender, std::string const& filePath);
    
      private:
        using Sock     = asio::deferred_t::as_default_on_t<tcp::socket>;
        using Resolver = asio::deferred_t::as_default_on_t<tcp::resolver>;
    
        asio::awaitable<std::optional<Sock>> CoAwaitClientConnection(std::string host) try {
            Sock s{ioc};
            co_await async_connect(s, co_await Resolver(ioc).async_resolve(host, "8989"));
            co_return s;
        } catch (...) {
            co_return std::nullopt;
        }
    
        asio::awaitable<bool> ReceiveFile(Sock& s, path filePath) try {
            NetSize header[2];
            co_await async_read(s, asio::buffer(header));
    
            auto& [content_size, name_len] = header;
    
            // read name and contents
            std::string       name(name_len, '\0');
            std::vector<char> data(content_size);
            co_await async_read(s, std::array{asio::buffer(name), asio::buffer(data)});
    
            // write target file
            std::ofstream ofs(filePath / name, std::ios::binary);
            ofs.write(data.data(), data.size());
            std::cout << "Wrote " << filePath / name << ", " << content_size << " bytes" << std::endl;
            co_return true;
        } catch (...) {
            co_return false;
        }
    
        asio::thread_pool ioc{1};
    };
    
    bool FileTransferBehaviour::ConnectAndTransferFile(         //
        std::string const& sender, std::string const& filePath) //
    {
        std::promise<bool> p;
        std::future<bool>  f = p.get_future();
    
        // use a coroutine to accept the connection and receive the file.
        co_spawn(ioc,
            [=, this] -> asio::awaitable<bool> {
                if (auto s = co_await CoAwaitClientConnection(sender)) {
                    bool r = co_await ReceiveFile(*s, filePath);
                    co_return r;
                }
                co_return false;
            },
            [p = std::move(p)](std::exception_ptr e, bool r) mutable {
                if (e)
                    p.set_exception(e);
                else
                    p.set_value(r);
            });
    
        return f.get();
    }
    
    void demoServer(uint16_t port, std::string name) {
        asio::io_context  ioc;
        tcp::acceptor     acc{ioc, {{}, port}};
        tcp::socket client = acc.accept();
    
        std::ifstream     ifs(name, std::ios::binary);
        std::vector<char> payload(std::istreambuf_iterator{ifs}, {});
    
        NetSize content_size(payload.size()), name_len(name.length());
    
        std::cout << "Sending " << quoted(name) << ", " << content_size << " bytes" << std::endl;
        write(client,
              std::array{
                  asio::buffer(&content_size, sizeof(content_size)),
                  asio::buffer(&name_len, sizeof(name_len)),
                  asio::buffer(name),
                  asio::buffer(payload),
              });
    
        std::cout << "Server shutting down" << std::endl;
    }
    
    int main() {
        std::thread server(demoServer, 8989, "main.cpp");
    
        FileTransferBehaviour ftb;
        bool b = ftb.ConnectAndTransferFile("127.0.0.1", "outputdir");
        std::cout << "Received: " << std::boolalpha << b << std::endl;
    
        server.join();
        std::cout << "Bye" << std::endl;
    }
    

    Tested with

    g++ -std=c++2b -O2 -Wall -pedantic -pthread main.cpp
    mkdir -pv outputdir && ./a.out; md5sum main.cpp outputdir/*
    

    Which prints

    mkdir: created directory 'outputdir'
    
    Sending "main.cpp", 3370 bytes
    Server shutting down
    Wrote "outputdir/main.cpp", 3370 bytes
    Received: true
    Bye
    
    a6d1a03fd575856c9180919ebd710c24  main.cpp
    a6d1a03fd575856c9180919ebd710c24  outputdir/main.cpp
    

    Notes

    It seems easier to just set the promise in a void awaitable:

    bool FileTransferBehaviour::ConnectAndTransferFile(         //
        std::string const& sender, std::string const& filePath) //
    {
        std::promise<bool> p;
        std::future<bool>  f = p.get_future();
    
        // use a coroutine to accept the connection and receive the file.
        co_spawn(ioc,
            [=, this, p = std::move(p)]() mutable -> asio::awaitable<void> {
                if (auto s = co_await CoAwaitClientConnection(sender)) {
                    p.set_value(co_await ReceiveFile(*s, filePath));
                }
                p.set_value(false);
            },
            asio::detached);
    
        return f.get();
    }
    

    In fact, the "bool" and "optional" return values seem antipattern here to deal with exceptional situations. I'd simplify all the way:

    void FileTransferBehaviour::ConnectAndTransferFile(std::string const& sender, std::string const& filePath) {
        return co_spawn(ioc, [=, this] -> asio::awaitable<void> {
                       auto s = co_await Connect(sender);
                       co_await ReceiveFile(s, filePath);
                   },
                   asio::use_future)
            .get();
    }
    

    Again with a Live Demo, this time also demonstrating error handling:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/endian/arithmetic.hpp>
    #include <filesystem>
    #include <fstream>
    #include <iomanip>
    #include <iostream>
    namespace asio = boost::asio;
    using NetSize  = boost::endian::big_uint64_t;
    using asio::ip::tcp;
    using std::filesystem::path;
    
    struct FileTransferBehaviour {
        void ConnectAndTransferFile(std::string const& sender, std::string const& filePath);
    
      private:
        using Sock     = asio::deferred_t::as_default_on_t<tcp::socket>;
        using Resolver = asio::deferred_t::as_default_on_t<tcp::resolver>;
    
        asio::awaitable<Sock> Connect(std::string host) {
            Sock s{ioc};
            co_await async_connect(s, co_await Resolver(ioc).async_resolve(host, "8989"));
            co_return s;
        }
    
        asio::awaitable<void> ReceiveFile(Sock& s, path filePath) {
            NetSize header[2];
            co_await async_read(s, asio::buffer(header));
    
            auto& [content_size, name_len] = header;
    
            // read name and contents
            std::string       name(name_len, '\0');
            std::vector<char> data(content_size);
            co_await async_read(s, std::array{asio::buffer(name), asio::buffer(data)});
    
            // write target file
            std::ofstream ofs(filePath / name, std::ios::binary);
            ofs.write(data.data(), data.size());
            std::cout << "Wrote " << filePath / name << ", " << content_size << " bytes" << std::endl;
        }
    
        asio::thread_pool ioc{1};
    };
    
    void FileTransferBehaviour::ConnectAndTransferFile(std::string const& sender, std::string const& filePath) {
        return co_spawn(
                   ioc,
                   [=, this]() mutable -> asio::awaitable<void> {
                       auto s = co_await Connect(sender);
                       co_await ReceiveFile(s, filePath);
                   },
                   asio::use_future)
            .get();
    }
    
    void demoServer(uint16_t port, std::string name) {
        asio::io_context  ioc;
        tcp::acceptor     acc{ioc, {{}, port}};
        tcp::socket client = acc.accept();
    
        std::ifstream     ifs(name, std::ios::binary);
        std::vector<char> payload(std::istreambuf_iterator{ifs}, {});
    
        NetSize content_size(payload.size()), name_len(name.length());
    
        std::cout << "Sending " << quoted(name) << ", " << content_size << " bytes" << std::endl;
        write(client,
              std::array{
                  asio::buffer(&content_size, sizeof(content_size)),
                  asio::buffer(&name_len, sizeof(name_len)),
                  asio::buffer(name),
                  asio::buffer(payload),
              });
    
        std::cout << "Server shutting down" << std::endl;
    }
    
    int main() {
        FileTransferBehaviour ftb;
    
        {
            std::thread server(demoServer, 8989, "main.cpp");
    
            ftb.ConnectAndTransferFile("127.0.0.1", "outputdir");
            std::cout << "Received!" << std::endl;
            server.join();
        }
    
        try {
            ftb.ConnectAndTransferFile("127.0.0.1", "outputdir"); // there is no server anymore
        } catch (boost::system::system_error const& se) {
            std::cout << "Second receive fails: " << se.code().message() << std::endl;
        }
    
        std::cout << "Bye" << std::endl;
    }
    

    Which prints:

    mkdir: created directory 'outputdir'
    
    Sending "main.cpp", 3091 bytes
    Server shutting down
    Wrote "outputdir/main.cpp", 3091 bytes
    Received!
    
    Second receive fails: Connection refused
    Bye
    
    4fe9a130e1e2f7516c0dfbe57a3e962a  main.cpp
    4fe9a130e1e2f7516c0dfbe57a3e962a  outputdir/main.cpp