Search code examples
multithreadingrustasync-awaitrust-tokioiota

Rust tokio: Awaiting an async function in spawned thread


Trying to call an async-function inside a new tokio thread results in an error for some functions.

In this minimal example, crates tokio and iota-streams are used. Method send_announce() is async and returns an Address. Awaiting this method results in a compile error, stating that std::Marker::Send trait is not implemented for

dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>
dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>

As for my understanding, the problem is the lack of one or more Sync/Send trait implementations and to be able to pass data between threads Rust needs the whole chain to have Sync and Send implemented.

The documentation states that the mentioned structs have Sync and Send as auto traits implemented: (iota_streams_core::Error, wrap::Context, TangleAddress, BinaryBody, command::sizeof::Context, KeccakF1600 ...)

Calling the same functions inside the main thread works fine.

I tried wrapping the resulting future from send_announce() into a Box, implementing the Send trait unsafe and wrapping the response into a struct ..., without changing the compile error.

It seems that the dynamic future response is somehow problematic in this context. I'm new to rust and would appreciate every help or idea I can get on how to solve this problem. Is this approach even possible?

My program should be invoked by a call and handle the request in a separate thread. Inside this thread for e.g. this announcement link is generated.

The example shown is a minimal example to reduce the issue to the important parts. Tested on Ubuntu with rust-stable and nightly.

// main.rs
use iota_streams::{
    app::transport::tangle::client::Client,
    app_channels::api::tangle::{Author, ChannelType},
    core::Result,
};
use rand::Rng;

#[tokio::main]
async fn main() -> Result<()> {
    //
    // This works fine
    //
    let seed = generate_seed();
    let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
    let mut author = Author::new(&seed, ChannelType::SingleBranch, client.clone());
    //
    // No error occurs here
    //
    let announcement_link = author.send_announce().await?;
    //
    // Spawn new thread
    //
    tokio::spawn(async move {
        let seed = generate_seed();
        let client = Client::new_from_url("https://chrysalis-nodes.iota.org");
        //
        // Error occurs here
        //
        let announcement_link = author.send_announce().await?;
        Ok(())
    });

    Ok(())
}
// Make a seed
const ALPH9: &str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9";
fn generate_seed() -> String {
    let seed: String = (0..81)
        .map(|_| { ALPH9
                .chars().nth(rand::thread_rng().gen_range(0..27)).unwrap()
        }).collect::<String>();
    seed
}
# Cargo.toml
[package]
name = "example"
version = "0.1.0"
edition = "2021"

[dependencies]
iota-streams = { git = "https://github.com/iotaledger/streams", branch  = "develop"}
tokio = { version = "1.17.0", features = ["full"] }
rand = "0.8.5"
error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<GenericMessage<TangleAddress, BinaryBody>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<(), iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::sizeof::Context<KeccakF1600>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: generator cannot be sent between threads safely
   --> src/main.rs:17:5
    |
17  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = Result<&mut iota_streams::iota_streams_ddml::command::wrap::Context<KeccakF1600, &mut [u8]>, iota_streams::iota_streams_core::Error>>`
note: required by a bound in `tokio::spawn`
   --> /home/-/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: could not compile `teststh` due to 4 previous errors

Solution

  • The future returned by author.send_announce() does not impl Send, so you can't use it in tokio::spawn().

    You could try using tokio::task::LocalSet which lets you spawn non-Send futures with tokio::task::spawn_local. This works by running any futures you spawn on the single OS thread where you created the LocalSet.

    If instead you want to spawn non-Send futures onto a threadpool, you can use tokio_util::task::LocalPoolHandle, which works by distributing work onto a given number of OS threads, each with its own LocalSet.