I have a future which wraps a TCP stream in a Framed
using the LinesCodec
.
When I try to wrap this in a test, I get the future blocking around 20% of the time, but because I have nothing listening on the socket I'm trying to connect to, I expect to always get the error:
thread 'tokio-runtime-worker-0' panicked at 'error: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', src/lib.rs:35:24 note: Run with 'RUST_BACKTRACE=1' for a backtrace.
This is the test code I have used:
#[macro_use(try_ready)]
extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;
struct MyFuture {
addr: SocketAddr,
}
impl Future for MyFuture {
type Item = Framed<TcpStream, LinesCodec>;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
let strm = try_ready!(TcpStream::connect(&self.addr).poll());
Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = MyFuture { addr: addr }
.and_then(|f| {
println!("connected");
let cn = f.get_ref();
cn.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
I have seen patterns in other languages where the test binary itself offers a mechanism to return results asynchronously, but haven't found a good way of using a similar mechanism in Rust.
A simple way to test async code may be to use a dedicated runtime for each test: start it, wait for future completion and shutdown the runtime at the end of the test.
#[test]
fn my_case() {
// setup future f
// ...
tokio::run(f);
}
I don't know if there are consolidated patterns already in the Rust ecosystem; see this discussion about the evolution of testing support for future based code.
When you invoke poll()
, the future is queried to check if a value is available.
If a value is not available, an interest is registered so that poll()
will be invoked again when something happens that can resolve the future.
When your MyFuture::poll()
is invoked:
TcpStream::connect
creates a new future TcpStreamNew
TcpStreamNew::poll
is invoked immediately only once on the future's creation at step 1.MyFuture::poll
you never resolve the previously created futures.You have registered an interest for a future that, if not resolved the first time you poll it, you never ask back again (poll) for a resolved value or for an error.
The reason of the "nondeterministic" behavior is because the first poll
sometimes resolve immediately with a ConnectionRefused
error and sometimes it waits forever for a future connection event or a failure that it is never retrieved.
Look at mio::sys::unix::tcp::TcpStream
used by Tokio:
impl TcpStream {
pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
set_nonblock(stream.as_raw_fd())?;
match stream.connect(addr) {
Ok(..) => {}
Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(e) => return Err(e),
}
Ok(TcpStream {
inner: stream,
})
}
When you connect
on a non-blocking socket, the system call may connect/fail immediately or return EINPROGRESS
, in this last case a poll must be triggered for retrieving the value of the error.