Consider the following code, with the input being a simple csv file like the one in https://github.com/gophercises/quiz/blob/master/problems.csv:
#[derive(Debug)]
struct Problem {
q: String,
a: String,
}
impl Problem {
pub fn new(q: &str, a: &str) -> Problem {
Self {
q: q.to_owned(),
a: a.to_owned(),
}
}
}
const FILE_NAME: &str = "../../input/problems.csv";
#[tokio::main]
async fn main() {
let probs = parse_lines().expect("Could not parse CSV file");
println!("Banana quiz about to start. Press enter when ready.");
let mut buf = String::new();
match std::io::stdin().read_line(&mut buf).ok() {
None => {
println!("Error reading user input");
std::process::exit(1)
}
_ => {}
}
let mut correct_ans = 0;
for p in &probs {
println!("What banana? {}", p.q);
let (banana_s, mut banana_r) = mpsc::channel(512);
tokio::spawn(async move {
let mut banana = String::new();
std::io::stdin().read_line(&mut banana);
banana.pop();
banana_s.send(banana).await.unwrap();
});
let mut banana = String::new();
match tokio::time::timeout(Duration::from_secs(5), banana_r.recv()).await {
Ok(opt) => {
match opt {
Some(b) => banana.push_str(&b),
None => {},
}
},
Err (_) => {
println!("Only have 5 seconds to input the answer!");
return;
}
};
println!("Your Banana: {}, Correct Banana: {}\n", banana, p.a);
if banana != p.a {
println!("BAD BANANA!");
break;
}
println!("good banana!");
println!("------------");
correct_ans += 1;
}
println!("Correct answers: {}/{}\n", correct_ans, probs.len())
}
fn parse_lines() -> Result<Vec<Problem>, csv::Error> {
let mut builder = ReaderBuilder::new();
builder.has_headers(false);
let mut reader = builder.from_path(FILE_NAME)?;
let mut probs = Vec::new();
for r in reader.records() {
let rec = r?;
probs.push(Problem::new(&rec[0], &rec[1]));
}
Ok(probs)
}
Everything works fine except in the case in which the user waits for too long (5 seconds) to enter an answer.
After printing the message Only have 5 seconds to input the answer
, the program does not return unless one hits enter. Once enter is hit, the program panics with the following message:
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("")', src/main.rs:46:41
Line 46 is the line where the string is sent on the channel:
banana_s.send(banana).await.unwrap();
My understanding is that happens because the thread spawned with tokio is still being blocked by the read_line()
function. The same thing does not happen if one uses std::thread
and std::sync::mpsc::channel
, in fact the program exits happily.
Why is that? And how do I get the program to close once the timeout expired?
I looked at this post (which is fairly similar) but adapting its solution didn't work for me. Is it possible to preserve items in a Tokio MPSC when the last Sender is dropped, but the Reciever is still active?
Your understanding seems to be correct. This problem is caused by blocking in an async
context.
Lets say the question isn't answered in time, the timeout triggers, the user gets a message, and you return
from main()
. So the program should exit right?
Well not so fast. Since you're actually returning from an async
block being executed by a runtime created by tokio::main
, that runtime needs to shutdown. A standard tokio
runtime doesn't need to wait for outstanding tasks to complete, but it does need them to yield execution. The tokio::spawn
-d block is one of those tasks, but unfortunately, std::io::stdin().read_line(&mut banana)
is a blocking call, meaning it won't yield until it is done (i.e. when you press enter again).
And even when that does complete, it doesn't immediately yield and execution flows to banana_s.send(banana)
, which returns an error because the receiver, banana_r
, has already been destroyed and thus the message will never be received. See Disconnection in the tokio::sync::mpsc
docs.
That is why you get the behavior you're seeing.
This machinery can work fine with non-async mechanisms (std::thread
and std::sync::mpsc
) because returning from main()
does end the process, meaning other threads are immediately destroyed without requiring their participation.
To fix this you should use a thread instead of a task for accepting user input. You'll probably want to shift from having different channels per question to having one channel that behaves like a stream of input lines. Otherwise you'd attempt to spawn new blocking threads for each one, which could swallow new input from previous timeouts.
let (banana_s, mut banana_r) = mpsc::channel(512);
std::thread::spawn(move || loop {
let mut banana = String::new();
std::io::stdin().read_line(&mut banana).unwrap();
banana.pop();
banana_s.blocking_send(banana).unwrap(); // use blocking instead of async to send()
});
for p in &probs { // ...
Unless exiting the program on a timeout is the long-term plan, then it wouldn't matter if a previous timeout happened because it would've stopped running anyway.