Search code examples
asynchronousruststreamactorrust-tokio

Simple generic stream actor with tokio


Good day, I am trying to write a simple and generic actor trait for streams with tokio. I fail to listen to the stream and the mpsc reciever at the same time in one task (StreamActor::run).

I tried it with the macro tokio::select. Here is my attempt:

#[async_trait]
trait StreamActor<S>
where
    Self: Sized + Sync + Send + 'static,
    S: Stream + Unpin + Send + 'static,
{
    type Message: Send + Debug;

    async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
        info!("started");
        self.initialize(&mut ctx).await?;

        loop {
            tokio::select! {
                Some(msg) = ctx.receiver.recv() => {
                    self.handle_actor_message(msg, &mut ctx).await?
                },
                Some(msg) = ctx.stream.next() => {
                    self.handle_stream_message(msg, &mut ctx).await?
                },
                else => {
                    ctx.receiver.close();
                    break;
                }
            }
        }

        self.finalize(&mut ctx).await?;
        info!("ended");

        Ok(())
    }

    async fn handle_actor_message(
        &mut self,
        msg: Self::Message,
        ctx: &mut Context<Self, S>,
    ) -> Result<()>;

    async fn handle_stream_message(
        &mut self,
        msg: S::Item,
        ctx: &mut Context<Self, S>,
    ) -> Result<()>;

    async fn initialize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
        Ok(())
    }

    async fn finalize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
        Ok(())
    }
}

Full code: Rust Playground (unfortunately not executable due to dependencies)

Compiler error:

error: future cannot be sent between threads safely
  --> src\main.rs:73:70
   |
73 |       async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
   |  ______________________________________________________________________^
74 | |         info!("started");
75 | |         self.initialize(&mut ctx).await?;
76 | |
...  |
95 | |         Ok(())
96 | |     }
   | |_____^ future created by async block is not `Send`
   |
   = help: within `impl futures::Future<Output = Result<(), anyhow::Error>>`, the trait `std::marker::Send` is not implemented for `<S as Stream>::Item`
note: future is not `Send` as this value is used across an await
  --> src\main.rs:83:62
   |
78 | /             tokio::select! {
79 | |                 Some(msg) = ctx.receiver.recv() => {
80 | |                     self.handle_actor_message(msg, &mut ctx).await?
81 | |                 },
82 | |                 Some(msg) = ctx.stream.next() => {
   | |                      --- has type `<S as Stream>::Item` which is not `Send`
83 | |                     self.handle_stream_message(msg, &mut ctx).await?
   | |                                                              ^^^^^^ await occurs here, with `msg` maybe used later
...  |
88 | |                 }
89 | |             }
   | |_____________- `msg` is later dropped here
   = note: required for the cast from `impl futures::Future<Output = Result<(), anyhow::Error>>` to the object type `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send`

Here the problem, as far as I understand it, is that the stream item (S::Item) doesn't have the 'Send' trait, but needs it because I need to await the asyncronous message handlers and so it can be sent across threads. I don't know how and if it is possible to restrict an associated type of a trait to 'Send'. If I replace the Generic type S with a concrete type, like in my case for example 'SplitStream<Websocket>' everything works.

But I need the actor trait also for other streams, so my question is, is there a way to make the generic approach work and if so how?


Solution

  • Just add a bound S::Item: Send in the correct place:

    #[async_trait]
    trait StreamActor<S>
    where
        Self: Sized + Sync + Send + 'static,
        S: Stream + Unpin + Send + 'static,
        S::Item: Send,
    { ... }
    

    This causes a few cascaded errors and you need to add S::Item: Send in few other places to fix them.