Search code examples
rustrust-tokio

handling user input


How can I have some part of my application reading user input and also listening for a shutdown.

According to tokio-docs to do this type of things I should use blocking IO in a spawned task.

For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.

And so I did with something like this

async fn read_input(mut rx: watch::Receiver<&str>) {
    let mut line = String::new();
    let stdin = io::stdin();

    loop {
        stdin.lock().read_line(&mut line).expect("Could not read line");

        let op = line.trim_right();
        if op == "EXIT" {
            break;
        } else if op == "send" {
            // send_stuff();
        }
        line.clear();

    }
}

the thing is, how can I check the receiver channel for a shutdown and break this loop? If I await the code will block.

Am I approaching this with the wrong concept/architecture ?


Solution

  • Without managing your own thread, there has to be a way to use some non-blocking OS API on stdin and wrap it for tokio (tokio::io::Stdin 1.12 uses a blocking variant).

    Otherwise if we follow the advice from the docs and spawn our own thread, this is how it could be done:

    fn start_reading_stdin_lines(
        sender: tokio::sync::mpsc::Sender<String>,
        runtime: tokio::runtime::Handle
    ) {
        std::thread::spawn(move || {
            let stdin = std::io::stdin();
            let mut line_buf = String::new();
            while let Ok(_) = stdin.read_line(&mut line_buf) {
                let line = line_buf.trim_end().to_string();
                line_buf.clear();
                let sender2 = sender.clone();
    
                runtime.spawn(async move {
                    let result = sender2.send(line).await;
                    if let Err(error) = result {
                        println!("start_reading_stdin_lines send error: {:?}", error);
                    }
                });
            }
        });
    }
    
    fn start_activity_until_shutdown(watch_sender: tokio::sync::watch::Sender<bool>) {
        tokio::spawn(async move {
            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
            println!("exiting after a signal...");
            let result = watch_sender.send(true);
            if let Err(error) = result {
                println!("watch_sender send error: {:?}", error);
            }
        });
    }
    
    async fn read_input(
        mut line_receiver: tokio::sync::mpsc::Receiver<String>,
        mut watch_receiver: tokio::sync::watch::Receiver<bool>
    ) {
        loop {
            tokio::select! {
                Some(line) = line_receiver.recv() => {
                    println!("line: {}", line);
                    // process the input
                    match line.as_str() {
                        "exit" => {
                            println!("exiting manually...");
                            break;
                        },
                        "send" => {
                            println!("send_stuff");
                        }
                        unexpected_line => {
                            println!("unexpected command: {}", unexpected_line);
                        }
                    }
                }
                Ok(_) = watch_receiver.changed() => {
                    println!("shutdown");
                    break;
                }
            }
        }
    }
    
    #[tokio::main]
    async fn main() {
        let (line_sender, line_receiver) = tokio::sync::mpsc::channel(1);
        start_reading_stdin_lines(line_sender, tokio::runtime::Handle::current());
    
        let (watch_sender, watch_receiver) = tokio::sync::watch::channel(false);
        // this will send a shutdown signal at some point
        start_activity_until_shutdown(watch_sender);
    
        read_input(line_receiver, watch_receiver).await;
    }
    

    Potential improvements:

    • if you are ok with tokio_stream wrappers, this could be combined more elegantly with start_reading_stdin_lines producing a stream of lines and mapping them to typed commands. read_input could be then based on StreamMap instead of select!.
    • enabling experimental stdin_forwarders feature makes reading lines easier with a for loop over lines()