Search code examples
rustrust-tokio

Async loop on a new thread in rust: the trait `std::future::Future` is not implemented for `()`


I know this question has been asked many times, but I still can't figure out what to do (more below).

I'm trying to spawn a new thread using std::thread::spawn and then run an async loop inside of it.

The async function I want to run:

#[tokio::main] 
pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    let mut scheduler = AsyncScheduler::new();

    scheduler.every(10.seconds()).run(move || {
        let arc_pool = pg_pool2.clone();
        let arc_config = config2.clone();
        async {
            pull_from_main(arc_pool, arc_config).await;
        }
    });

    tokio::spawn(async move {
        loop {
            scheduler.run_pending().await;
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });
}

Spawning a thread to run in:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || async {
        pull_tweets(pg_pool2, config2).await;
    });
}

The error:

error[E0277]: `()` is not a future
  --> src/main.rs:89:9
   |
89 |         pull_tweets(pg_pool2, config2).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `()` is not a future
   |
   = help: the trait `std::future::Future` is not implemented for `()`
   = note: required by `poll`

The last comment here does an amazing job at generalizing the problem. It seems at some point a return value is expected that implements IntoFuture and I don't have that. I tried adding Ok(()) to both the closure and the function, but no luck.

  • Adding into closure does literally nothing
  • Adding into async function gives me a new, but similar-sounding error:
`Result<(), ()>` is not a future

Then I noticed that the answer specifically talks about extension functions, which I'm not using. This also talks about extension functions.

Some other SO answers:

  • This is caused by a missing async.
  • This and this are reqwest library specific.

So none seem to work. Could someone help me understand 1)why the error exists here and 2)how to fix it?

Note 1: All of this can be easily solved by replacing std::thread::spawn with tokio::task::spawn_blocking. But I'm purposefully experimenting with thread spawn as per this article.

Note 2: Broader context on what I want to achieve: I'm pulling 150,000 tweets from twitter in an async loop. I want to compare 2 implementations: running on the main runtime vs running on separate thread. The latter is where I struggle.

Note 3: in my mind threads and async tasks are two different primitives that don't mix. Ie spawning a new thread doesn't affect how tasks behave and spawning a new task only adds work on existing thread. Please let me know if this worldview is mistaken (and what I can read).


Solution

  • #[tokio::main] converts your function into the following:

    #[tokio::main] 
    pub fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
        let rt = tokio::Runtime::new();
        rt.block_on(async {
            let mut scheduler = AsyncScheduler::new();
            // ...
        });
    }
    

    Notice that it is a synchronous function, that spawns a new runtime and runs the inner future to completion. You do not await it, it is a separate runtime with it's own dedicate thread pool and scheduler:

    #[actix_web::main]
    async fn main() -> std::io::Result<()> {
        let handle = thread::spawn(move || {
            pull_tweets(pg_pool2, config2);
        });
    }
    

    Note that your original example was wrong in another way:

    #[actix_web::main]
    async fn main() -> std::io::Result<()> {
        let handle = thread::spawn(move || async {
            pull_tweets(pg_pool2, config2).await;
        });
    }
    

    Even if pull_tweets was an async function, the thread would not do anything, as all you do is create another future in the async block. The created future is not executed, because futures are lazy (and there is no executor context in that thread anyways).

    I would structure the code to spawn the runtime directly in the new thread, and call any async functions you want from there:

    #[actix_web::main]
    async fn main() -> std::io::Result<()> {
        let handle = thread::spawn(move || {
            let rt = tokio::runtime::Builder::new_multi_thread()
                .enable_all()
                .build()
                .unwrap();
            rt.block_on(async {
                pull_tweets(pg_pool2, config2).await;
            });
        });
    }
    
    pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
        // ...
    }