Search code examples
rustsmart-pointersdereferencerust-tokioactix-web

Share state between actix-web server and async closure


I want to periodically fetch data (using asynchronous reqwest), which is then served at an http endpoint using actix-web as a server. (I have a data source that has a fixed format, that I want to have read by a service that require a different format, so I need to transform the data.) I've tried to combine actix concepts with the thread sharing state example from the Rust book, but I don't understand the error or how to solve it. This is the code minified as much as I was able:

use actix_web::{get, http, web, App, HttpResponse, HttpServer, Responder};
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};

struct AppState {
    status: String,
}

#[get("/")]
async fn index(data: web::Data<Mutex<AppState>>) -> impl Responder {
    let state = data.lock().unwrap();
    HttpResponse::Ok()
        .insert_header(http::header::ContentType::plaintext())
        .body(state.status.to_owned())
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let status_string = get_state().await.unwrap();
    let app_data = Arc::new(Mutex::new(web::Data::new(AppState {
        status: status_string,
    })));

    let app_data1 = Arc::clone(&app_data);
    actix_web::rt::spawn(async move {
        loop {
            println!("I get executed every 2-ish seconds!");
            sleep(Duration::from_millis(2000)).await;

            let res = get_state().await;
            let mut app_data = app_data1.lock().unwrap();
            // Edit 2: this line is not accepted by the compiler
            // Edit 2: *app_data.status = res.unwrap();
            // Edit 2: but this line is accepted
            *app_data = web::Data::new(AppState { status: res });
        }
    });

    let app_data2 = Arc::clone(&app_data);
    // Edit 2: but I get an error here now
    HttpServer::new(move || App::new().app_data(app_data2).service(index))
        .bind(("127.0.0.1", 9090))?
        .run()
        .await
}

async fn get_state() -> Result<String, Box<dyn std::error::Error>> {
    let client = reqwest::Client::new().get("http://ipecho.net/plain".to_string());
    let status = client.send().await?.text().await?;
    println!("got status: {status}");

    Ok(status)
}

But I get the following error:

error[E0308]: mismatched types
  --> src/main.rs:33:32
   |
33 |             *app_data.status = res.unwrap();
   |             ----------------   ^^^^^^^^^^^^ expected `str`, found struct `String`
   |             |
   |             expected due to the type of this binding

error[E0277]: the size for values of type `str` cannot be known at compilation time
  --> src/main.rs:33:13
   |
33 |             *app_data.status = res.unwrap();
   |             ^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
   |
   = help: the trait `std::marker::Sized` is not implemented for `str`
   = note: the left-hand-side of an assignment must have a statically known size

Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.

Why do I suddenly get a str? Is there an easy fix or is my approach to solving this wrong? Edit: Maybe removing the * is the right way to go, as Peter Hall suggests, but that gives me the following error instead:

error[E0594]: cannot assign to data in an `Arc`
  --> src/main.rs:33:13
   |
33 |             app_data.status = res.unwrap();
   |             ^^^^^^^^^^^^^^^ cannot assign
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<AppState>`

error[E0507]: cannot move out of `app_data2`, a captured variable in an `Fn` closure
  --> src/main.rs:38:49
   |
37 |     let app_data2 = Arc::clone(&app_data);
   |         --------- captured outer variable
38 |     HttpServer::new(move || App::new().app_data(app_data2).service(index))
   |                     -------                     ^^^^^^^^^ move occurs because `app_data2` has type `Arc<std::sync::Mutex<Data<AppState>>>`, which does not implement the `Copy` trait
   |                     |
   |                     captured by this `Fn` closure

Some errors have detailed explanations: E0507, E0594.
For more information about an error, try `rustc --explain E0507`.

Edit 2: I now get the following error (code changes commented with 'Edit 2' above):

error[E0507]: cannot move out of `app_data2`, a captured variable in an `Fn` closure
  --> src/main.rs:46:49
   |
45 |     let app_data2 = app_data.clone();
   |         --------- captured outer variable
46 |     HttpServer::new(move || App::new().app_data(app_data2).service(index))
   |                     -------                     ^^^^^^^^^ move occurs because `app_data2` has type `Arc<Mutex<Data<AppState>>>`, which does not implement the `Copy` trait
   |                     |
   |                     captured by this `Fn` closure

For more information about this error, try `rustc --explain E0507`.

My Cargo.toml dependencies:

[dependencies]
actix-web = "4.2.1"
reqwest = "0.11.12"
tokio = "1.21.2"

Solution

  • async solution

    I had my types mixed up a bit, having the app state as Arc<Mutex<T>> seemed to be the way to go, maybe it would be better with Arc<RwLock<T>>.

    use actix_web::{get, http, web, App, HttpResponse, HttpServer, Responder};
    use std::sync::{Arc, Mutex};
    use tokio::time::{sleep, Duration};
    
    struct AppState {
        status: String,
    }
    
    #[get("/")]
    async fn index(data: web::Data<Arc<Mutex<AppState>>>) -> impl Responder {
        let state = data.lock().unwrap();
        HttpResponse::Ok()
            .insert_header(http::header::ContentType::plaintext())
            .body(state.status.to_owned())
    }
    
    #[actix_web::main]
    async fn main() -> std::io::Result<()> {
        let status_string = get_state().await.unwrap();
        let app_data = Arc::new(Mutex::new(AppState {
            status: status_string,
        }));
    
        let app_data1 = app_data.clone();
        actix_web::rt::spawn(async move {
            loop {
                println!("I get executed every 2-ish seconds!");
                sleep(Duration::from_millis(2000)).await;
    
                let res = get_state().await.unwrap();
                let mut app_data = app_data1.lock().unwrap();
                *app_data = AppState { status: res };
            }
        });
    
        HttpServer::new(move || {
            App::new()
                .app_data(web::Data::new(app_data.clone()))
                .service(index)
        })
        .bind(("127.0.0.1", 9090))?
        .run()
        .await
    }
    
    async fn get_state() -> Result<String, Box<dyn std::error::Error>> {
        let client = reqwest::Client::new().get("http://ipecho.net/plain".to_string());
        let status = client.send().await?.text().await?;
        println!("got status: {status}");
    
        Ok(status)
    }