Search code examples
rustactix-web

Clone crossbeam channel


How do you access a crossbeam channel "send" from every actix-ws callback?

This is a version of this asked on a specific example with a beautiful MRE.

Specifically, I've made as few changes as possible to the actix-ws example server to keep it neat + simple.

The exact problem is to access "send" on the first commented line ('// use "send"...')


use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_ws::Message;
use futures_util::StreamExt;

async fn ws(req: HttpRequest, body: web::Payload, send: crossbeam::channel::Sender<u32>) -> Result<HttpResponse, Error> {
    // use "send", possible cloned

    let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;

    actix_rt::spawn(async move {
        while let Some(Ok(msg)) = msg_stream.next().await {
            match msg {
                Message::Ping(bytes) => {
                    if session.pong(&bytes).await.is_err() {
                        return;
                    }
                }
                Message::Text(s) => println!("Got text, {}", s),
                _ => break,
            }
        }

        let _ = session.close(None).await;
    });
    
    Ok(response)
}

#[actix_web::main]
async fn main() -> Result<(), anyhow::Error> {
    let (send, recv) = crossbeam::channel::unbounded();

    HttpServer::new(move || {
        App::new()
            .wrap(Logger::default())
            .route("/ws", web::get().to(|req: HttpRequest, body: web::Payload| {
                ws(req, body, send);
            }))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await?;

    Ok(())
}

Error:

error[E0277]: the trait bound `[closure@src/file:35:41: 35:79]: Handler<_>` is not satisfied
   --> src/file:35:41
    |
35  |               .route("/ws", web::get().to(|req: HttpRequest, body: web::Payload| {
    |  ______________________________________--_^
    | |                                      |
    | |                                      required by a bound introduced by this call
36  | |                 ws(req, body, send);
37  | |             }))
    | |_____________^ the trait `Handler<_>` is not implemented for closure `[closure@src/fuck-so.rs:35:41: 35:79]`
    |
note: required by a bound in `Route::to`

Solution

  • A few issues are happening here:

    1. Your posted error is because your closure doesn't return anything and returning () isn't a valid response for a Handler. However, all you need to do is remove the ; after ws(...).

    2. The closure must not reference the local variable send because handlers must be 'static. You can fix that by using the move keyword so any captured variables are moved into your closure:

      web::get().to(move |req: HttpRequest, body:
                 // ^^^^
      
    3. The closure for HttpServer::new() can be called multiple times since its constrained by Fn. In this case, we've already moved send in but we need to also move it out. You can do this by .clone()-ing it within the closure (fortunately crossbeam Senders are cheap to clone):

      HttpServer::new(move || {
          let send = send.clone(); // need to make a new copy to move into the route handler
          App::new(...
      
    4. Again, in your route closure, you can't move the variable by passing it to ws(...) since it needs to be called multiple times. In other situations you could just pass by reference, Sender doesn't require ownership to do anything, but because async functions return Futures that capture their arguments and functions are not allowed to return values that reference their captures, you'll need to .clone() it anyway:

      ws(req, body, send.clone())
                     // ^^^^^^^^
      

    With those changes, your code compiles. Here's the full fix:

    use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
    use actix_ws::Message;
    use futures_util::StreamExt;
    
    async fn ws(
        req: HttpRequest,
        body: web::Payload,
        send: crossbeam::channel::Sender<u32>,
    ) -> Result<HttpResponse, Error> {
        // use "send", possible cloned
    
        let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
    
        actix_rt::spawn(async move {
            while let Some(Ok(msg)) = msg_stream.next().await {
                match msg {
                    Message::Ping(bytes) => {
                        if session.pong(&bytes).await.is_err() {
                            return;
                        }
                    }
                    Message::Text(s) => println!("Got text, {}", s),
                    _ => break,
                }
            }
    
            let _ = session.close(None).await;
        });
    
        Ok(response)
    }
    
    #[actix_web::main]
    async fn main() -> Result<(), anyhow::Error> {
        let (send, recv) = crossbeam::channel::unbounded();
    
        HttpServer::new(move || {
            let send = send.clone();
            App::new().wrap(Logger::default()).route(
                "/ws",
                web::get().to(move |req: HttpRequest, body: web::Payload| ws(req, body, send.clone())),
            )
        })
        .bind("127.0.0.1:8080")?
        .run()
        .await?;
    
        Ok(())
    }