Search code examples
rusttimeoutp2prust-tokiohandshake

Using tokio::time::timeout does not trap delayed response from TCPReadStream


I'm writing a PoC P2P node handshake to a BitCoin server. I send a 'version' message to the target node and it responds with its corresponding version message. So far so good.

However, every now and again, the version message from the target node can take over two minutes to arrive. The server does eventually give a successful response, but since I have applied a 5 second timeout to other network activity, I thought that for consistency, I should do the same here.

Here's the code without any check for a timeout. This works, albeit sometimes very slowly:

use bitcoin::{
    consensus::Decodable,
    p2p::message::{self},
};
use std::{
    io::BufReader,
    net::{IpAddr, SocketAddr, TcpStream},
    time::{Duration, SystemTime},
};

pub async fn handshake(to_address: IpAddr, port: u16, timeout: u64) -> Result<()> {
    let target_node = SocketAddr::new(to_address, port);

    let mut write_stream = TcpStream::connect_timeout(&target_node, Duration::from_secs(timeout))?;

    // Build and send version message
    send_msg_version(&target_node, &mut write_stream)?;

    let read_stream = write_stream.try_clone()?;
    let mut stream_reader = BufReader::new(read_stream);

    let response1 = message::RawNetworkMessage::consensus_decode(&mut stream_reader)?;

    match response1.payload() {
        // Do stuff here
    }

    // More stuff here...
}

The call to consensus_decode() is what sometimes takes a very long time, so I've tried wrapping it in a future::ready(), then placing that inside a call to tokio::time::timeout() like this:

    let response1 = if let Ok(net_msg) = tokio::time::timeout(
        Duration::from_secs(timeout),
        future::ready(message::RawNetworkMessage::consensus_decode(&mut stream_reader)?),
    )
    .await
    {
        net_msg
    } else {
        return Err(CustomError(format!(
            "TIMEOUT: {} failed to respond with VERSION message within {} seconds",
            to_address, timeout
        )));
    };

The (&mut stream_reader)? part successfully traps any decoding errors, yet if the message arrives successfully, but too slowly, the tokio::time::timeout isn't able to trap it.

What am I missing here?


Solution

  • You can't use future::ready to make a synchronous function call asynchronous, use tokio::task::spawn_blocking instead:

    use std::time::Duration;
    fn takes_long() {
        std::thread::sleep(Duration::from_millis(100));
    }
    
    #[tokio::main]
    async fn main() {
        let x = tokio::time::timeout(Duration::from_millis(10), tokio::task::spawn_blocking(|| {
            takes_long();
        })).await;
        eprintln!("{x:?}");
    }
    

    produces this output:

    Err(Elapsed(()))
    

    The problem with your approach is that before future::ready is called (let alone tokio::time::timeout which is called even later), the function consensus_decode must have already returned and produced a value, at which point the future::ready can immediately return and your timeout never triggers.