The following code tries to asynchronously update a master dataframe df
(from polars package) after getting a msg
by concatenating it.
I have seen the "duplicate" posts on stack overflow but still don't understand what I am doing wrong. I just want to mutably borrow the dataframe and update it, that's all! I tried it with a string, and it worked fine...
pub async fn new_handler(endpoint: &str) -> tokio::task::JoinHandle<()> {
// Make master df for this handler
let mut df = DataFrame::empty().lazy();
// Make a stream for this handler
let stream = new_stream(endpoint).await;
let handle = tokio::spawn(async move {
// let handle = tokio::spawn(async {
stream
.for_each(|msg| async move {
match msg {
Ok(msg) => {
// Parse the json message into a struct
let jsonmsg: AggTrade =
serde_json::from_str(&msg.to_string()).expect("Failed to parse json");
let s0 = Series::new(
"price",
vec![jsonmsg.price.parse::<f32>().expect("Failed to parse price")],
);
let s1 = Series::new(
"quantity",
vec![jsonmsg
.quantity
.parse::<f32>()
.expect("Failed to parse quantity")],
);
// Create new dataframe from the json data
let df2 = DataFrame::new(vec![s0.clone(), s1.clone()]).unwrap().lazy();
// append the new data from df2 to the master df
df = polars::prelude::concat([df, df2], false, true)
.expect("Failed to concat");
}
Err(e) => {
println!("Error: {}", e);
}
}
})
.await
});
handle
}
I get the following error:
error: captured variable cannot escape `FnMut` closure body
--> src/websockets.rs:33:29
|
27 | let mut df = DataFrame::empty().lazy();
| ------ variable defined here
...
33 | .for_each(|msg| async {
| ___________________________-_^
| | |
| | inferred to be a `FnMut` closure
34 | | match msg {
35 | | Ok(msg) => {
36 | | // Parse the json message into a struct
... |
58 | | df = polars::prelude::concat([df.clone(), df2.clone()], false, true)
| | -- variable captured here
... |
86 | | }
87 | | })
| |_____________^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
The problem is that the closure passed to stream.for_each()
can be called multiple times, but the df
variable is moved into the closure when it's referenced by the df.clone()
call.
Here's a self-contained minimal code example showing the same compilation error. If you uncomment the last lines in the function, it will fail to compile:
async fn fails_moved_into_closure_called_multiple_times() {
println!("fails_moved_into_closure_called_multiple_times():");
let mut df = vec![];
let closure = || async move {
let new_value = df.len();
println!("in the closure, pushing {}", new_value);
df.push(new_value);
};
let future = closure();
future.await;
let future2 = closure(); // FAIL
future2.await;
println!("final value: {:?}", df); // FAIL
}
In fact, Rust can't be sure that your for_each
function doesn't call the closure multiple time concurrently in multiple threads. Here's a solution using Arc<Mutex<T>>
that is thread-safe and fixes the ownership issues:
async fn fix_using_arc() {
println!("fix_using_arc():");
let df = Arc::new(Mutex::new(vec![]));
let closure = || async {
let my_df = Arc::clone(&df);
let mut shared = my_df.lock().unwrap();
let new_value = shared.len();
println!("in the closure, pushing {}", new_value);
shared.push(new_value);
};
let future = closure();
future.await;
let future2 = closure();
future2.await;
println!("final value: {:?}", df);
}