Search code examples
asynchronousrustfuturerust-tokio

Performance and memory problems in rust async program


I wanted to benchmark requests from rust to particular service using async client, and created async benchmarker for that.

This function should run specified amount of concurrent threads (actually, parallel chains of futures) for specified duration and report count of iterations achieved.

use futures::future;
use futures::prelude::*;
use std::error::Error;
use std::time::{Duration, Instant};
use std::{cell, io, rc};
use tokio::runtime::current_thread::Runtime;
use tokio::timer;

struct Config {
    workers: u32,
    duration: Duration,
}

/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
    f: F,
) -> Box<dyn Future<Item = (), Error = P::Error> + 'a> {
    Box::new(f().and_then(move |_| cycle(f)))
}

fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Error = io::Error> + 'a>(
    config: Config,
    f: F,
) -> impl Future<Item = u32, Error = io::Error> + 'a {
    let counter = rc::Rc::new(cell::Cell::new(0u32));
    let f = rc::Rc::new(f);
    future::select_all((0..config.workers).map({
        let counter = rc::Rc::clone(&counter);
        move |_| {
            let counter = rc::Rc::clone(&counter);
            let f = rc::Rc::clone(&f);
            cycle(move || {
                let counter = rc::Rc::clone(&counter);
                f().map(move |_| {
                    counter.set(counter.get() + 1);
                })
            })
        }
    }))
    .map(|((), _, _)| ())
    .map_err(|(err, _, _)| err)
    .select(
        timer::Delay::new(Instant::now() + config.duration)
            .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description())),
    )
    .map(move |((), _)| counter.get())
    .map_err(|(err, _)| err)
}

fn main() {
    let duration = std::env::args()
        .skip(1)
        .next()
        .expect("Please provide duration in seconds")
        .parse()
        .expect("Duration must be integer number");

    let ms = Duration::from_millis(1);

    let mut rt = Runtime::new().expect("Could not create runtime");

    loop {
        let iters = rt
            .block_on(
                benchmark(
                    Config {
                        workers: 65536,
                        duration: Duration::from_secs(duration),
                    },
                    || {
                        /// Substitute actual benchmarked call
                        timer::Delay::new(Instant::now() + ms)
                            .map_err(|err| panic!("Failed to set delay: {:?}", err))
                    },
                )
                .map_err(|err| panic!("Benchamrking error: {:?}", err)),
            )
            .expect("Runtime error");
        println!("{} iters/sec", iters as u64 / duration);
    }
}

However, the result this benchmark reports and memory consumption degrades with increase of benchmark duration, e.g. on my pc:

cargo run --release 1 ~ 900k iterations/sec
cargo run --release 2 ~ 700k iterations/sec
cargo run --release 10 ~ 330k iterations/sec

Also, memory usage rapidly grows as benchmark function runs. I tried using valgrind to find memory leak but it only reports that all allocated memory can still be reached.

How can I fix the issue?


Solution

  • So it turns out cycle really was a culprit as Gregory suspected. I found this useful function in futures crate: loop_fn, and rewritten cycle using it:

    /// Build infinitely repeating future
    fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
        f: F,
    ) -> impl Future<Item = (), Error = P::Error> + 'a {
        future::loop_fn((), move |_| f().map(|_| future::Loop::Continue(())))
    }
    

    The rest of the code remains the same. Now this compiles with stable rust and even reports almost twice as much iterations per second as proposed nightly futures solution (for what it's worth with this synthetic test).