I'm trying to work up some code in Tokio that creates a TCP Listener and then automatically turns JSON received into a struct. It seemed like maybe using codecs would be a good solution but what I've tried so far doesn't seem to be working.
Below is a minimal program where I create such a listener and just use the LinesCodec
provided to try and get this to interpret the connections made to it:
use std::net::SocketAddr;
use tokio::codec::{LinesCodec, FramedRead};
use tokio::net::TcpListener;
use tokio::prelude::*;
fn main() {
let addr = "127.0.0.1:12345".parse::<SocketAddr>().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
tokio::run(listener.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let transport = FramedRead::new(socket, LinesCodec::new());
transport.for_each(|msg| {
println!("Received: {:?}", msg);
Ok(())
});
Ok(())
})
);
}
This compiles and when I send a line over telnet or via a quick python script by sending a string with a newline I never get the printed message as I expect.
The compiler does give a helpful warning:
warning: unused `tokio::prelude::stream::ForEach` that must be used
--> src/main.rs:18:13
|
18 | / transport.for_each(|msg| {
19 | | println!("Received: {:?}", msg);
20 | | Ok(())
21 | | });
| |_______________^
|
= note: #[warn(unused_must_use)] on by default
= note: streams do nothing unless polled
So I tried altering the transport.for_each
as follows:
let mut stream = transport.for_each(|msg| {
println!("Received: {:?}", msg);
Ok(())
});
loop {
match stream.poll() {
Ok(s) => {
match s {
Async::Ready(t) => println!("Ready: {:?}", t),
Async::NotReady => println!("Not ready"),
}
}
<snip>
}
}
The warning now goes away and I just get loads of lines saying "Not ready" to the terminal. What am I doing wrong?
I also tried creating my own basic LinesCodec
with debugging in but as far as I can tell this never gets called. Yet from what I read in the end part of https://tokio.rs/docs/going-deeper/frames/ it should receive my messages.
Any help would be greatly appreciated, including answers that say move from tokio-0.1.18 to 0.3.x as there's better support for what you're trying to achieve.
With thanks to the tokio community for helping on this one. The warning was correct and the right hint, however polling is not the right thing to do here, this will cause it to poll indefinitely. Instead we end up with a future with item () and error io::Error, we can use map_err to map this to a future with item () and error () and then we can run tokio::spawn over this future and that works.