Search code examples
httppostrusthyper

How to use Hyper body channel?


I am trying to make a POST HTTP request using the Hyper library, and then write data to it until I close it (from multiple functions/threads). I've found in documentation that I can use Body::channel() to do this, but I am using it improperly as I can only write to the channel once. I haven't found any examples, can someone please point me into the right direction?

let (mut sender, body) = Body::channel();

let request = Request::builder()
    .method(Method::POST)
    .uri("http://localhost:3000/")
    .header("content-type", "text")
    .body(body)
    .unwrap();

let client = Client::new();
let response = client.request(request);

//Does get sent
println!("Body: {:?}", sender.send_data(hyper::body::Bytes::from("test\n")).await);
//Stuck on this one
println!("Body: {:?}", sender.send_data(hyper::body::Bytes::from("test2\n")).await);
//Debug print
println!("{:?}", response.await);

Solution

  • You'd need to wrap the send_data() lines in tokio::spawn(async move { ... });.

    The issue is that Body/Sender only has a buffer size of 1. So the second send_data() call causes it to await (internally) until the Sender is ready. This is what subsequently causes it to "get stuck".

    This is resolved by using tokio::spawn() as it finally allows awaiting the ResponseFuture, which causes the request to be performed.

    let (sender, body) = Body::channel();
    
    let request = Request::builder()
        .method(Method::POST)
        .uri("http://localhost:3000/")
        .header("content-type", "text")
        .body(body)
        .unwrap();
    
    let client = Client::new();
    
    let response = client.request(request);
    
    tokio::spawn(async move {
        let mut sender = sender;
    
        println!("Body: {:?}", sender.send_data(hyper::body::Bytes::from("test\n")).await);
        println!("Body: {:?}", sender.send_data(hyper::body::Bytes::from("test2\n")).await);
    });
    
    println!("{:?}", response.await);
    

    The buffer size isn't mentioned in the docs (that I know of). However, you can figure that out by checking the source related to Body::channel(), where you can see it constructs a MPSC using futures_channel::mpsc::channel(0). The docs mention that the buffer size would be buffer + num-senders, which in our case would be 0 + 1.