Search code examples
c++boostfuture

Use Futures with Boost Thread Pool


I'm implementing a TCP client which read and send files and strings and I'm using Boost as my main library. I'd like to continue reading or sending files while I keep sending strings, which in these case are the commands to send to the server. For this purpose I thought about using a Thread Pool in order to not overload the client. My question is, can I use futures to use callbacks when on of the thread in the pool ends? In case I can't, is there any other solution? I was doing something like this, where pool_ is a boost:asio:thread_pool

void send_file(std::string const& file_path){
    boost::asio::post(pool_, [this, &file_path] {
        handle_send_file(file_path);
    });
    // DO SOMETHING WHEN handle_send_file ENDS
}

void handle_send_file(std::string const& file_path) {
    boost::array<char, 1024> buf{};
    boost::system::error_code error;
    std::ifstream source_file(file_path, std::ios_base::binary | std::ios_base::ate);

    if(!source_file) {
        std::cout << "[ERROR] Failed to open " << file_path << std::endl;
        //TODO gestire errore
    }
    size_t file_size = source_file.tellg();
    source_file.seekg(0);

    std::string file_size_readable = file_size_to_readable(file_size);

    // First send file name and file size in bytes to server
    boost::asio::streambuf request;
    std::ostream request_stream(&request);
    request_stream << file_path << "\n"
                   << file_size << "\n\n"; // Consider sending readable version, does it change anything?

    // Send the request
    boost::asio::write(*socket_, request, error);
    if(error){
        std::cout << "[ERROR] Send request error:" << error << std::endl;
        //TODO lanciare un'eccezione? Qua dovrò controllare se il server funziona o no
    }
    if(DEBUG) {
        std::cout << "[DEBUG] " << file_path << " size is: " << file_size_readable << std::endl;
        std::cout << "[DEBUG] Start sending file content" << std::endl;
    }

    long bytes_sent = 0;
    float percent = 0;
    print_percentage(percent);

    while(!source_file.eof()) {
        source_file.read(buf.c_array(), (std::streamsize)buf.size());

        int bytes_read_from_file = source_file.gcount(); //int is fine because i read at most buf's size, 1024 in this case

        if(bytes_read_from_file<=0) {
            std::cout << "[ERROR] Read file error" << std::endl;
            break;
            //TODO gestire questo errore
        }

        percent = std::ceil((100.0 * bytes_sent) / file_size);
        print_percentage(percent);

        boost::asio::write(*socket_, boost::asio::buffer(buf.c_array(), source_file.gcount()),
                           boost::asio::transfer_all(), error);
        if(error) {
            std::cout << "[ERROR] Send file error:" << error << std::endl;
            //TODO lanciare un'eccezione?
        }

        bytes_sent += bytes_read_from_file;
    }

    std::cout << "\n" << "[INFO] File " << file_path << " sent successfully!" << std::endl;
}

Solution

  • The operations posted to the pool end without the threads ending. That's the whole purpose of pooling the threads.

    void send_file(std::string const& file_path){
        post(pool_, [this, &file_path] {
            handle_send_file(file_path);
        });
        // DO SOMETHING WHEN handle_send_file ENDS
    }
    

    This has several issues. The largest one is that you should not capture file_path by reference, as the argument is soon out of scope, and the handle_send_file call will run at an unspecified time in another thread. That's a race condition and dangling reference. Undefined Behaviour results.

    Then the

        // DO SOMETHING WHEN handle_send_file ENDS
    

    is on a line which has no sequence relation with handle_send_file. In fact, it will probably run before that operation ever has a chance to start.

    Simplifying

    Here's a simplified version:

    #include <boost/array.hpp>
    #include <boost/asio.hpp>
    #include <fstream>
    #include <iostream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
        
    static asio::thread_pool pool_;
    
    struct X {
        std::unique_ptr<tcp::socket> socket_;
        
        explicit X(unsigned short port) : socket_(new tcp::socket{ pool_ }) {
            socket_->connect({ {}, port });
        }
    
        asio::thread_pool pool_;
        std::unique_ptr<tcp::socket> socket_{ new tcp::socket{ pool_ } };
    
        void send_file(std::string file_path) {
            post(pool_, [=, this] {
                send_file_implementation(file_path);
                // DO SOMETHING WHEN send_file_implementation ENDS
            });
        }
    
        // throws system_error exception
        void send_file_implementation(std::string file_path) {
            std::ifstream source_file(file_path,
                                      std::ios_base::binary | std::ios_base::ate);
            size_t file_size = source_file.tellg();
            source_file.seekg(0);
    
            write(*socket_,
                    asio::buffer(file_path + "\n" + std::to_string(file_size) + "\n\n"));
    
            boost::array<char, 1024> buf{};
            while (source_file.read(buf.c_array(), buf.size()) ||
                   source_file.gcount() > 0)
            {
                int n = source_file.gcount();
    
                if (n <= 0) {
                    using namespace boost::system;
                    throw system_error(errc::io_error, system_category());
                }
    
                write(*socket_, asio::buffer(buf), asio::transfer_exactly(n));
            }
        }
    };
    

    Now, you can indeed run several of these operations in parallel (assuming several instances of X, so you have separate socket_ connections).

    To do something at the end, just put code where I moved the comment:

    // DO SOMETHING WHEN send_file_implementation ENDS
    

    If you don't know what to do there and you wish to make a future ready at that point, you can:

    std::future<void> send_file(std::string file_path) {
        std::packaged_task<void()> task([=, this] {
            send_file_implementation(file_path);
        });
    
        return post(pool_, std::move(task));
    }
    

    This overload of post magically¹ returns the future from the packaged task. That packaged task will set the internal promise with either the (void) return value or the exception thrown.

    See it in action: Live On Coliru

    int main() {
        // send two files simultaneously to different connections
        X clientA(6868);
        X clientB(6969);
    
        std::future<void> futures[] = {
            clientA.send_file("main.cpp"),
            clientB.send_file("main.cpp"),
        };
    
        for (auto& fut : futures) try {
            fut.get();
            std::cout << "Everything completed without error\n";
        } catch(std::exception const& e) {
            std::cout << "Error occurred: " << e.what() << "\n";
        };
    
        pool_.join();
    }
    

    I tested this while running two netcats to listen on 6868/6969:

    nc -l -p 6868 | head& nc -l -p 6969 | md5sum&
    ./a.out
    wait
    

    The server prints:

    Everything completed without error
    Everything completed without error
    

    The netcats print their filtered output:

    main.cpp
    1907
    
    #include <boost/array.hpp>
    #include <boost/asio.hpp>
    #include <fstream>
    #include <iostream>
    #include <future>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    7ecb71992bcbc22bda44d78ad3e2a5ef  -
    

    ¹ not magic: see https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/async_result.html