Search code examples
multithreadingrustterminate

What is the standard way to get a Rust thread out of blocking operations?


Coming from Java, I am used to idioms along the lines of

while (true) {
  try {
    someBlockingOperation();
  } catch (InterruptedException e) {
    Thread.currentThread.interrupt(); // re-set the interrupted flag
    cleanup(); // whatever is necessary
    break;
  }
}

This works, as far as I know, across the whole JDK for anything that might block, like reading from files, from sockets, from a queue and even for Thread.sleep().

Reading on how this is done in Rust, I find lots of seemingly special solutions mentioned like mio, tokio. I also find ErrorKind::Interrupted and tried to get this ErrorKind with sending SIGINT to the thread, but the thread seems to die immediately without leaving any (back)trace.

Here is the code I used (note: not very well versed in Rust yet, so it might look a bit strange, but it runs):

use std::io;
use std::io::Read;
use std::thread;

pub fn main() {
    let sub_thread = thread::spawn(|| {
        let mut buffer = [0; 10];
        loop {
            let d = io::stdin().read(&mut buffer);
            println!("{:?}", d);
            let n = d.unwrap();
            if n == 0 {
                break;
            }
            println!("-> {:?}", &buffer[0..n]);
        }
    });

    sub_thread.join().unwrap();
}

By "blocking operations", I mean:

  • sleep
  • socket IO
  • file IO
  • queue IO (not sure yet where the queues are in Rust)

What would be the respective means to signal to a thread, like Thread.interrupt() in Java, that its time to pack up and go home?


Solution

  • There is no such thing. Blocking means blocking.

    Instead, you deliberately use tools that are non-blocking. That's where libraries like mio, Tokio, or futures come in — they handle the architecture of sticking all of these non-blocking, asynchronous pieces together.

    catch (InterruptedException e)

    Rust doesn't have exceptions. If you expect to handle a failure case, that's better represented with a Result.

    Thread.interrupt()

    This doesn't actually do anything beyond setting a flag in the thread that some code may check and then throw an exception for. You could build the same structure yourself. One simple implementation:

    use std::{
        sync::{
            atomic::{AtomicBool, Ordering},
            Arc,
        },
        thread,
        time::Duration,
    };
    
    fn main() {
        let please_stop = Arc::new(AtomicBool::new(false));
    
        let t = thread::spawn({
            let should_i_stop = please_stop.clone();
            move || {
                while !should_i_stop.load(Ordering::SeqCst) {
                    thread::sleep(Duration::from_millis(100));
                    println!("Sleeping");
                }
            }
        });
    
        thread::sleep(Duration::from_secs(1));
        please_stop.store(true, Ordering::SeqCst);
        t.join().unwrap();
    }
    

    Sleep

    No way of interrupting, as far as I know. The documentation even says:

    On Unix platforms this function will not return early due to a signal

    Socket IO

    You put the socket into nonblocking mode using methods like set_nonblocking and then handle ErrorKind::WouldBlock.

    See also:

    File IO

    There isn't really a good cross-platform way of performing asynchronous file IO. Most implementations spin up a thread pool and perform blocking operations there, sending the data over something that does non-blocking.

    See also:

    Queue IO

    Perhaps you mean something like a MPSC channel, in which case you'd use tools like try_recv.

    See also: