Search code examples
rusthyperrust-tokio

Getting multiple URLs concurrently with Hyper


I am trying to adapt the Hyper basic client example to get multiple URLs concurrently.

This is the code I currently have:

extern crate futures;
extern crate hyper;
extern crate tokio_core;

use std::io::{self, Write};
use std::iter;
use futures::{Future, Stream};
use hyper::Client;
use tokio_core::reactor::Core;

fn get_url() {
    let mut core = Core::new().unwrap();
    let client = Client::new(&core.handle());
    let uris: Vec<_> = iter::repeat("http://httpbin.org/ip".parse().unwrap()).take(50).collect();
    for uri in uris {
        let work = client.get(uri).and_then(|res| {
            println!("Response: {}", res.status());

            res.body().for_each(|chunk| {
                io::stdout()
                    .write_all(&chunk)
                    .map_err(From::from)
            })
        });
        core.run(work).unwrap();
    }
}

fn main() {
    get_url();
}

It doesn't seem to be acting concurrently (it takes a long time to complete), am I giving the work to the core in the wrong way?


Solution

  • am I giving the work to the core in the wrong way?

    Yes, you are giving one request to Tokio and requiring that it complete before starting the next request. You've taken asynchronous code and forced it to be sequential.

    You need to give the reactor a single future that will perform different kinds of concurrent work.

    Hyper 0.14

    use futures::prelude::*;
    use hyper::{body, client::Client};
    use std::{
        io::{self, Write},
        iter,
    };
    use tokio;
    
    const N_CONCURRENT: usize = 1;
    
    #[tokio::main]
    async fn main() {
        let client = Client::new();
    
        let uri = "http://httpbin.org/ip".parse().unwrap();
        let uris = iter::repeat(uri).take(50);
    
        stream::iter(uris)
            .map(move |uri| client.get(uri))
            .buffer_unordered(N_CONCURRENT)
            .then(|res| async {
                let res = res.expect("Error making request: {}");
                println!("Response: {}", res.status());
    
                body::to_bytes(res).await.expect("Error reading body")
            })
            .for_each(|body| async move {
                io::stdout().write_all(&body).expect("Error writing body");
            })
            .await;
    }
    

    With N_CONCURRENT set to 1:

    real    1.119   1119085us
    user    0.012   12021us
    sys     0.011   11459us
    

    And set to 10:

    real    0.216   216285us
    user    0.014   13596us
    sys     0.021   20640us
    

    Cargo.toml

    [dependencies]
    futures = "0.3.17"
    hyper = { version = "0.14.13", features = ["client", "http1", "tcp"] }
    tokio = { version = "1.12.0", features = ["full"] }
    

    Hyper 0.12

    use futures::{stream, Future, Stream}; // 0.1.25
    use hyper::Client; // 0.12.23
    use std::{
        io::{self, Write},
        iter,
    };
    use tokio; // 0.1.15
    
    const N_CONCURRENT: usize = 1;
    
    fn main() {
        let client = Client::new();
    
        let uri = "http://httpbin.org/ip".parse().unwrap();
        let uris = iter::repeat(uri).take(50);
    
        let work = stream::iter_ok(uris)
            .map(move |uri| client.get(uri))
            .buffer_unordered(N_CONCURRENT)
            .and_then(|res| {
                println!("Response: {}", res.status());
                res.into_body()
                    .concat2()
                    .map_err(|e| panic!("Error collecting body: {}", e))
            })
            .for_each(|body| {
                io::stdout()
                    .write_all(&body)
                    .map_err(|e| panic!("Error writing: {}", e))
            })
            .map_err(|e| panic!("Error making request: {}", e));
    
        tokio::run(work);
    }
    

    With N_CONCURRENT set to 1:

    real    0m2.279s
    user    0m0.193s
    sys     0m0.065s
    

    And set to 10:

    real    0m0.529s
    user    0m0.186s
    sys     0m0.075s
    

    See also: