Search code examples
rustrust-cargorust-tokio

Tokio channel rust


I am trying to implement a Promise system with Tokio, I simulate the callback and the resolve, this is the code:

use std::time::Duration;

use tokio::sync::oneshot::{channel};
use tokio::time::sleep;

async fn task_prom(callback: impl Fn(i32)) {
    for x in 1..=10 {
        if x == 10 {
            callback(x);
        }
        sleep(Duration::from_secs(1)).await;
    }
}

async fn handler() -> i32 {
    let (tx, rx) = channel::<i32>();
    task_prom(move |x| tx.send(x).unwrap()).await;
    rx.await.unwrap()
}

#[tokio::main]
async fn main() {
    let res = handler().await;
    println!("{}", res);
}

When I try to run, I get this error:

error[E0507]: cannot move out of `tx`, a captured variable in an `Fn` closure


--> src\main.rs:17:24
    |
16  |     let (tx, rx) = channel::<i32>();
    |          -- captured outer variable
17  |     task_prom(move |x| tx.send(x).unwrap()).await;
    |               -------- ^^ ------- `tx` moved due to this method call
    |               |        |
    |               |        move occurs because `tx` has type `tokio::sync::oneshot::Sender<i32>`, which does not implement the `Copy` trait
    |               captured by this `Fn` closure
    |
note: `tokio::sync::oneshot::Sender::<T>::send` takes ownership of the receiver `self`, which moves `tx`
   --> C:\Users\titof\.cargo\registry\src\index.crates.io-6f17d22bba15001f\tokio-1.28.1\src\sync\oneshot.rs:594:21
    |
594 |     pub fn send(mut self, t: T) -> Result<(), T> {
    |

                 ^^^^

I see the Tokio's documentation, but I don't know why I have this error. Thanks.


Solution

  • Since a oneshot channel can only be used once, you can't capture and then use it in a Fn or FnMut closure. The closure must consume it, so it needs to be a FnOnce closure.

    async fn task_prom(callback: impl FnOnce(i32))
    

    Unfortunately, this still doesn't compile.

    error[E0382]: use of moved value: `callback`
     --> src/main.rs:9:13
      |
    6 | async fn task_prom(callback: impl FnOnce(i32)) {
      |                    -------- move occurs because `callback` has type `impl FnOnce(i32)`, which does not implement the `Copy` trait
    ...
    9 |             callback(x);
      |             ^^^^^^^^---
      |             |
      |             `callback` moved due to this call, in previous iteration of loop
      |
    note: this value implements `FnOnce`, which causes it to be moved when called
     --> src/main.rs:9:13
      |
    9 |             callback(x);
      |             ^^^^^^^^
    help: consider further restricting this bound
      |
    6 | async fn task_prom(callback: impl FnOnce(i32) + Copy) {
      |                                               ++++++
    
    For more information about this error, try `rustc --explain E0382`.
    

    Rust does not statically know how many times a for loop is run. It also doesn't statically know which branch in an if expression will run. So you need to restructure your code to ensure callback is only run once. How you do this will depend on the function, but in this case you can just move it outside the loop.

    async fn task_prom(callback: impl FnOnce(i32)) {
        for x in 1..10 {
            sleep(Duration::from_secs(1)).await;
        }
        callback(10);
        sleep(Duration::from_secs(1)).await;
    }
    

    You could also insert a return in the same path as the callback call.

    async fn task_prom(callback: impl FnOnce(i32)) {
        for x in 1..=10 {
            if x == 10 {
                callback(x);
                sleep(Duration::from_secs(1)).await;
                return;
            }
            sleep(Duration::from_secs(1)).await;
        }
    }
    

    Aside from the iterator in for loops and the condition in if, match, and while, rust is pretty smart about when branches can be taken.