Search code examples
multithreadingrustthreadpoolrust-cargorust-tokio

What is the benefit of using tokio instead of OS threads in Rust


I am trying to make a multithreaded tcp communication program in rust

the idea is that there exists a listening socket on the main thread, and as connections come in the work is handled by worker threads

I previously used a ThreadPool approach I found in the Rust book, but as I understand tokio is able to 'automatically' assign work to threads from a pool

I am confused with what is the difference between OS threads and tokio tasks (mostly because you use spawn to create both)

here is some code

fn main() {
    println!("Hello World!");
    let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 103, 7)), 2048);
    println!("socket -> {}", socket);

    // You need to use the tokio runtime to execute async functions
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let listener = StartListen::new(socket).await.unwrap();
    });
}

and i have a StartListen defined in another file

// Defines the StartListen class
pub struct StartListen {
    listener: TcpListener,
}


// Implementation for StartListen class
impl StartListen {
    pub async fn new(socket: SocketAddr) -> Result<StartListen, StartListenError>{
        println!("Attempting to listen");
        let bind_result = TcpListener::bind(socket).await;
        sleep(Duration::from_secs(5)).await;
        match bind_result {
            Ok(listener) => {
                println!("Server is listening on {}", &socket);
                Ok(StartListen { listener })
            }
            Err(err) => Err(StartListenError::BindError(err)),
        }
    }
}

to add some more context, the idea is there are 2 types of messages that this socket is expecting

// Defines the types of messages to expect
pub enum RequestType {
    RequestWork {
        request_message: String,
        parameter: String,
        sender_timestamp: String,
    },
    CloseConnection {
        initial_timestamp: String,
        final_timestamp: String,
    },
    Invalid(String),
}

I am yet to add a handle_connection method, would I have to define the handle_connection to sit in a loop and spawn tasks?

    pub async fn accept_connections(&self) {
        loop {
            let (mut stream, addr) = self.listener.accept().await.unwrap();
            println!("New connection from {}", addr);

            // Spawn a new task to handle the connection
            tokio::spawn(async move {
                let mut buffer = [0; 1024];
                loop {
                    let n = match stream.read(&mut buffer).await {
                        Ok(n) if n == 0 => return, // Connection closed
                        Ok(n) => n,
                        Err(e) => {
                            eprintln!("Error reading from socket: {}", e);
                            return;
                        }
                    };

                    // Convert the received message into a RequestType
                    let message = String::from_utf8_lossy(&buffer[..n]);

Solution

  • You are very close to understanding already. You are asking the right questions.

    Let me give you a short rundown of possible server implementations:

    • Synchronous. One thread that does everything, every send/recv operation is blocking.
      • The drawback is obvious: Only one connection can be dealt with simultaneously.
    • Multi-threaded. One thread gets spawned per connection.
      • The drawback of the previous point is mitigated, we can now use multiple connections simultaneously
      • But: number of threads on our system depends on the user interaction
      • A large number of threads can break the system to the point where there are even DOS attacks that use exactly that mechanism to run a system out of memory. They make a lot of connection attempts to force the server to allocate a lot of threads, then just don't respond any more.
    • Async. The idea is that you basically have a list of tasks, and one single thread jumps back and forth between them whenever it has time.
      • You can view it as a worker queue of coroutines; basically a list of functions that can be executed, but can be paused and put back into the queue until a certain even happens (like receiving data)
      • Fixes the DOS problem of multi-threading because jumping back and forth between work packets in a single thread is a lot more efficient than spawning a lot of threads
    • Multi-threaded async. Basically the same as async, but with multiple worker threads.
      • requires a lot of thread safety, which is why this approach fits very well to the Rust programming language
      • The default type of async reactor for tokio. If you simply do #[tokio::main], this is what you will get.
      • Has a fixed amount of threads (most likely identical to the number of cores on the system) and is therefore more robust against system overload
      • Can process a lot of concurrent connections efficiently.

    There are a lot of subtleties to be considered, though. I strongly recommend reading the tokio tutorial, it explains many of those concepts. After that, I recommend reading the async book.

    For example, some important subtleties:

    • Do not block a task using std's synchronization primitives, under any circumstance. Blocking a task will block everything, because async scheduling is non-preemptive, meaning, the scheduler cannot unschedule a task. It can only switch tasks at .await points, so whenever you are waiting for something, make sure it's inside of an .await point. (exception: short-lived std::sync::Mutex, see here)
    • Don't use async tasks for heavy computation. While it technically isn't blocking, a heavy computation introduces a long time between two .await points. Instead, off-load it to a real thread using spawn_blocking, which introduces an .await point to the worker and performs the actual computation on a different threadpool.