Search code examples
rustjmeterhyperwarp

How to get high throughput with warp


I am setting up my first api using the Rust warp crate, and I can't get anywhere close to the performance I was hoping for.

To eliminate issues with my implementation, I added a simple route like this:

async fn ping() -> Result<impl Reply, Rejection> {
    Ok(html("pong"))
}

pub fn routes(app: Arc<App>) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
    path!("v1" / "pub" / "ping")
        .and(get())
        .and_then(ping)
}

When I curl this endpoint I get the result I expect, but when I try to test with some load using JMeter, the maximum throughput I can get is about 2000 requests/sec, and for anything over a few hundred requests/sec JMeter records a high ratio of errors (60%+).

The JMeter errors all say "Connection refused". It looks like warp/hyper is refusing connections when handling many requests rather than returning 503 responses which is one issue, but more importantly, during this load test my CPU is below 5%, and 2000 requests/sec is quite poor.

The web benchmark site claims 500,000 requests/sec so I must be missing something, but this is a very basic setup.

-- EDIT --

To eliminate JMeter from the equation, I wrote this Rust test client, and was able to get up to 7,000 requests/sec, but it's still not startling.

use core::fmt;
use std::error::Error;
use std::sync::Arc;
use std::time::Instant;

use hyper::{Request, StatusCode};
use hyper_util::rt::TokioIo;
use bytes::Bytes;
use http_body_util::Empty;
use tokio::net::TcpStream;
use tokio::task;

type Result<T> = std::result::Result<T, Box<dyn Error + Send + Sync>>;

#[tokio::main]
async fn main() -> Result<()> {
    let url = "http://localhost:8000/v1/pub/ping".parse::<hyper::Uri>()?;
    let concurrency = 20;
    let repeat_count = 500;

    let authority = url.authority().unwrap();

    let addr = Arc::new(String::from(authority.as_str()));
    let req = Arc::new(Request::builder()
        .method("GET")
        .uri(url.path())
        .header(hyper::header::HOST, authority.as_str())
        .header(hyper::header::CONNECTION, "keep-alive")
        .body(Empty::<Bytes>::new())?);

    // Warm up the connection
    fetch(addr.clone(), req.clone()).await?;

    let start = Instant::now();

    let mut tasks = Vec::new();

    for _ in 0..concurrency {
        tasks.push(task::spawn(fetch(addr.clone(), req.clone())));
    }

    for _ in 1..repeat_count {
        for _ in 0..concurrency {
            let task = tasks.remove(0);
            let _ = task.await?;
            tasks.push(task::spawn(fetch(addr.clone(), req.clone())));
        }
    }

    for task in tasks {
        let _ = task.await?;
    }

    let elapsed = start.elapsed();
    println!("Elapsed: {:.2?} GET {} {} times with {} concurrency", elapsed, url, concurrency * repeat_count, concurrency);
    println!("Average {} req/sec", 1000 * concurrency * repeat_count / elapsed.as_millis());

    Ok(())
}

async fn fetch(addr: Arc<String>, req: Arc<Request<Empty<Bytes>>>) -> Result<()> {
    let stream = TcpStream::connect(&*addr).await?;
    let io = TokioIo::new(stream);
    
    let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
    task::spawn(async move {
        if let Err(err) = conn.await {
            println!("Connection failed: {:?}", err);
        }
    });

    let res = sender.send_request((*req).clone()).await?;

    match res.status() {
        StatusCode::OK => Ok(()),
        _ => Err(Box::new(HttpError::new(res.status()))) 
    }
}

#[derive(Debug)]
struct HttpError {
    pub status: StatusCode,
}

impl HttpError {
    pub fn new(status: StatusCode) -> Self {
        Self { status }
    }
}

impl fmt::Display for HttpError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Http status code {}", self.status )
    }
}

impl Error for HttpError {
    fn source(&self) -> Option<&(dyn Error + 'static)> { None }
    fn description(&self) -> &str { "" }
    fn cause(&self) -> Option<&dyn Error> { self.source() }
}

-- EDIT --

Just for the hell of it, I decided to write a bare bones web server using just std. It is slightly better than warp, serving about 7,500 requests/sec. This is the code:

use crate::App;
use std::{
    io::{prelude::*, BufReader},
    net::{Ipv4Addr, TcpListener, TcpStream},
    sync::{atomic::Ordering, mpsc, Arc},
    thread::{self, available_parallelism, Thread},
};

pub fn run(app: Arc<App>, ipv4: Ipv4Addr, port: u16) {
    let authority = format!("{}:{}", ipv4, port);
    let listener = TcpListener::bind(authority).unwrap();

    let concurrency = available_parallelism().unwrap().get();
    let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(concurrency);
    let mut senders: Vec<mpsc::Sender<TcpStream>> = Vec::with_capacity(concurrency);

    for _ in 0..concurrency {
        let (sender, receiver) = mpsc::channel();
        senders.push(sender);

        let app = app.clone();
        threads.push(thread::spawn(move || process_connections(app, receiver)))
    }

    let mut thread_index = 0;
    for stream in listener.incoming() {
        senders[thread_index].send(stream.unwrap()).unwrap();
        thread_index = (thread_index + 1) % concurrency;
    }
}

fn process_connections(app: Arc<App>, receiver: mpsc::Receiver<TcpStream>) {
    loop {
        let stream = receiver.recv().unwrap();
        handle_connection(app.clone(), stream);
    }
}

fn handle_connection(app: Arc<App>, mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let _http_request: Vec<_> = buf_reader
        .lines()
        .map(|result| result.unwrap())
        .take_while(|line| !line.is_empty())
        .collect();

    let response = "HTTP/1.1 200 OK\r\n\r\n";
    stream.write_all(response.as_bytes()).unwrap();

    app.clone()
        .request_count
        .clone()
        .fetch_add(1, Ordering::Relaxed);
}

Maybe the problem is on the client side, or maybe this is just some fundamental limit of my MacBook pro. It's a mystery.

Maybe I will try streaming multiple requests over a single connection and see how that works.


Solution

  • A couple of days later, and I now know exactly what's going on. The problem is that hyper does not support keep-alive correctly. I figured this out by writing a client application that uses low level sockets. I could see that the server (I tried warp, and also directly using hyper) closes the connection after the first request regardless of whether I pass the keep-alive header or not.

    After discovering this, I wrote the server side using sockets as well, and implemented keep-alive properly, and this solution processes 70,000 requests/sec.

    I'm still not sure how the web framework benchmarks claim 500,000 requests/sec unless they are running on very big machines with 10's of physical cores, but I am quite satisfied with 70k req/sec on my MacBook pro.

    I guess the answer to my original question of how to make warp fast, is to fix the keep-alive issues in hyper.

    For anyone that wants to try this, here is my code:

    Server:

    use std::{
        io::{
            prelude::*, BufReader, BufWriter
        }, 
        net::{Ipv4Addr, TcpListener, TcpStream}, 
        sync::{atomic::Ordering, mpsc, Arc}, 
        thread::{self, available_parallelism}
    };
    use crate::App;
    
    pub fn run(app: Arc<App>, ipv4: Ipv4Addr, port: u16) {
        let authority = format!("{}:{}", ipv4, port);
        let listener = TcpListener::bind(authority).unwrap();
    
        let concurrency = available_parallelism().unwrap().get();
        let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(concurrency);
        let mut senders: Vec<mpsc::Sender<TcpStream>> = Vec::with_capacity(concurrency);
    
        for index in 0..concurrency {
            let (sender, receiver) = mpsc::channel();
            senders.push(sender);
    
            let app = app.clone();
            threads.push(thread::spawn(move || process_connections(app, receiver, index + 1)))
        }
    
        let mut thread_index = 0;
        for stream in listener.incoming() {
            senders[thread_index].send(stream.unwrap()).unwrap();
            thread_index = (thread_index + 1) % concurrency;
        }
    }
    
    fn process_connections(app: Arc<App>, receiver: mpsc::Receiver<TcpStream>, index: usize) {
        loop {
            let stream = receiver.recv().unwrap();
            handle_connection(app.clone(), stream, index);
        }
    }
    
    fn handle_connection(app: Arc<App>, stream: TcpStream, index: usize) {
        let mut request_lines = BufReader::new(&stream)
            .lines()
            .map(|result| match result {
                Ok(line) => line,
                Err(err) => {
                    println!("{}# {}", index, err);
                    String::default()
                },
            });
    
        let mut response_writer = BufWriter::new(&stream);
    
        loop {
            let mut keep_alive = false;
            
            let http_request = extract_request(&mut request_lines);
    
            if http_request.len() == 0 {
                return;
            } else {
                for line in http_request {
                    if line.to_lowercase() == "connection: keep-alive" { 
                        keep_alive = true;
                    }
                }
            }
            
            let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".as_bytes();
            match response_writer.write_all(response) {
                Ok(_) => { 
                    response_writer.flush();
                },
                Err(err) => {
                    println!("{}# Err writing response to stream: {}", index, err);
                    return;
                }
            };
    
            app.clone().request_count.fetch_add(1, Ordering::Relaxed);
            
            if !keep_alive { 
                return
            }
        }
    }
    
    fn extract_request(lines: &mut impl Iterator<Item = String>) -> Vec<String> {
        let mut result = Vec::new();
        loop {
            match lines.next() {
                Some(line) => { 
                    if line.len() == 0 { return result; }
                    result.push(line);
                },
                None => { return result; }
            }
        }
    }
    

    Client:

    use std::{
        io::{BufRead, BufReader, BufWriter, Write}, 
        net::TcpStream, 
        thread::{self, available_parallelism}, 
        time::Instant
    };
    
    pub async fn run_test() -> super::Result<()> {
        let authority = "localhost:8000";
        let path = "/v1/admin/nodes";
        let repeat_count: usize = 100000;
        let concurrency = available_parallelism().unwrap().get();
    
        let request_text = format!("GET {} HTTP/1.1\r\nHost: {}\r\nConnection: keep-alive\r\n\r\n", path, authority);
    
        let start = Instant::now();
    
        let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(concurrency);
        for _ in 0..concurrency {
            let request = request_text.clone();
            threads.push(thread::spawn(move || make_request(authority, repeat_count, request.as_bytes())));
        }
        for thread in threads {
            thread.join();
        }
    
        let elapsed = start.elapsed();
        println!(
            "Elapsed: {:.2?} GET {}{} {} times with {} concurrency",
            elapsed,
            authority,
            path,
            concurrency * repeat_count,
            concurrency
        );
        println!(
            "Average {} req/sec",
            ((concurrency * repeat_count) as f32 / (elapsed.as_micros() as f32 / 1000000.0)).floor()
        );
    
        Ok(())
    }
    
    fn make_request(authority: &str, count: usize, request: &[u8]) {
        let stream = TcpStream::connect(authority).unwrap();
    
        let mut response_lines = BufReader::new(&stream)
            .lines()
            .map(|result| match result {
                Ok(line) => line,
                Err(err) => {
                    println!("{}", err);
                    String::default()
                },
            });
    
        let mut request_writer = BufWriter::new(&stream);
    
        for _ in 0..count {
            request_writer.write_all(&request).unwrap();
            request_writer.flush().unwrap();
    
            let _ = extract_response(&mut response_lines);
        }
    }
    
    fn extract_response(lines: &mut impl Iterator<Item = String>) -> Vec<String> {
        let mut result = Vec::new();
        loop {
            match lines.next() {
                Some(line) => { 
                    if line.len() == 0 { return result; }
                    result.push(line);
                },
                None => { return result; }
            }
        }
    }