I need to implement a long-running program that receives messages via stdin. The protocol defines that messages are in form of length indicator (for simplicity 1 byte integer) and then string of a length represented by length indicator. Messages are NOT separated by any whitespace. The program is expected to consume all messages from stdin and wait for another messages.
How do I implement such waiting on stdin?
I implemented the iterator in a way that it tries to read from stdin and repeats in case of error. It works, but it is very inefficient. I would like the iterator to read the message when new data comes.
My implementation is using read_exact
:
use std::io::{Read, stdin, Error as IOError, ErrorKind};
pub struct In<R>(R) where R: Read;
pub trait InStream{
fn read_one(&mut self) -> Result<String, IOError>;
}
impl <R>In<R> where R: Read{
pub fn new(stdin: R) -> In<R> {
In(stdin)
}
}
impl <R>InStream for In<R> where R: Read{
/// Read one message from stdin and return it as string
fn read_one(&mut self) -> Result<String, IOError>{
const length_indicator: usize = 1;
let stdin = &mut self.0;
let mut size: [u8;length_indicator] = [0; length_indicator];
stdin.read_exact(&mut size)?;
let size = u8::from_be_bytes(size) as usize;
let mut buffer = vec![0u8; size];
let _bytes_read = stdin.read_exact(&mut buffer);
String::from_utf8(buffer).map_err(|_| IOError::new(ErrorKind::InvalidData, "not utf8"))
}
}
impl <R>Iterator for In<R> where R:Read{
type Item = String;
fn next(&mut self) -> Option<String>{
self.read_one()
.ok()
}
}
fn main(){
let mut in_stream = In::new(stdin());
loop{
match in_stream.next(){
Some(x) => println!("x: {:?}", x),
None => (),
}
}
}
I went trough Read and BufReader documentation, but none method seems to solve my problem as read
doc contains following text:
This function does not provide any guarantees about whether it blocks waiting for data, but if an object needs to block for a read and cannot, it will typically signal this via an Err return value.
How do I implement waiting for data on stdin?
===
Edit: minimum use-case that does not block and loops giving UnexpectedEof error instead of waiting for data:
use std::io::{Read, stdin};
fn main(){
let mut stdin = stdin();
let mut stdin_handle = stdin.lock();
loop{
let mut buffer = vec![0u8; 4];
let res = stdin_handle.read_exact(&mut buffer);
println!("res: {:?}", res);
println!("buffer: {:?}", buffer);
}
I run it on OSX by cargo run < in
where in
is named pipe. I fill the pipe by echo -n "1234" > in
.
It waits for the first input and then it loops.
res: Ok(())
buffer: [49, 50, 51, 52]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
buffer: [0, 0, 0, 0]
res: Err(Error { kind: UnexpectedEof, message: "failed to fill whole buffer" })
...
I would like the program to wait until there is sufficient data to fill the buffer.
As others explained, the docs on Read
are written very generally and don't apply to standard input, which is blocking. In other words, your code with the buffering added is fine.
The problem is how you use the pipe. For example, if you run mkfifo foo; cat <foo
in one shell, and echo -n bla >foo
in another, you'll see that the cat
in the first shell will display foo
and exit. That closing the last writer of the pipe sends EOF to the reader, rendering your program's stdin
useless.
You can work around the issue by starting another program in the background that opens the pipe in write mode and never exits, for example tail -f /dev/null >pipe-filename
. Then echo -n bla >foo
will be observed by your program, but won't cause its stdin to close. The "holding" of the write end of the pipe could probably also be achieved from Rust as well.