I know this question has been asked many times, but I still can't figure out what to do (more below).
I'm trying to spawn a new thread using std::thread::spawn
and then run an async loop inside of it.
The async function I want to run:
#[tokio::main]
pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
let mut scheduler = AsyncScheduler::new();
scheduler.every(10.seconds()).run(move || {
let arc_pool = pg_pool2.clone();
let arc_config = config2.clone();
async {
pull_from_main(arc_pool, arc_config).await;
}
});
tokio::spawn(async move {
loop {
scheduler.run_pending().await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
}
Spawning a thread to run in:
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let handle = thread::spawn(move || async {
pull_tweets(pg_pool2, config2).await;
});
}
The error:
error[E0277]: `()` is not a future
--> src/main.rs:89:9
|
89 | pull_tweets(pg_pool2, config2).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `()` is not a future
|
= help: the trait `std::future::Future` is not implemented for `()`
= note: required by `poll`
The last comment here does an amazing job at generalizing the problem. It seems at some point a return value is expected that implements IntoFuture
and I don't have that. I tried adding Ok(())
to both the closure and the function, but no luck.
`Result<(), ()>` is not a future
Then I noticed that the answer specifically talks about extension functions, which I'm not using. This also talks about extension functions.
Some other SO answers:
So none seem to work. Could someone help me understand 1)why the error exists here and 2)how to fix it?
Note 1: All of this can be easily solved by replacing std::thread::spawn
with tokio::task::spawn_blocking
. But I'm purposefully experimenting with thread spawn as per this article.
Note 2: Broader context on what I want to achieve: I'm pulling 150,000 tweets from twitter in an async loop. I want to compare 2 implementations: running on the main runtime vs running on separate thread. The latter is where I struggle.
Note 3: in my mind threads and async tasks are two different primitives that don't mix. Ie spawning a new thread doesn't affect how tasks behave and spawning a new task only adds work on existing thread. Please let me know if this worldview is mistaken (and what I can read).
#[tokio::main]
converts your function into the following:
#[tokio::main]
pub fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
let rt = tokio::Runtime::new();
rt.block_on(async {
let mut scheduler = AsyncScheduler::new();
// ...
});
}
Notice that it is a synchronous function, that spawns a new runtime and runs the inner future to completion. You do not await
it, it is a separate runtime with it's own dedicate thread pool and scheduler:
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let handle = thread::spawn(move || {
pull_tweets(pg_pool2, config2);
});
}
Note that your original example was wrong in another way:
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let handle = thread::spawn(move || async {
pull_tweets(pg_pool2, config2).await;
});
}
Even if pull_tweets
was an async function, the thread would not do anything, as all you do is create another future in the async
block. The created future is not executed, because futures are lazy (and there is no executor context in that thread anyways).
I would structure the code to spawn the runtime directly in the new thread, and call any async
functions you want from there:
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let handle = thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
pull_tweets(pg_pool2, config2).await;
});
});
}
pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
// ...
}