Good day, I am trying to write a simple and generic actor trait for streams with tokio. I fail to listen to the stream and the mpsc reciever at the same time in one task (StreamActor::run).
I tried it with the macro tokio::select. Here is my attempt:
#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
{
type Message: Send + Debug;
async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
info!("started");
self.initialize(&mut ctx).await?;
loop {
tokio::select! {
Some(msg) = ctx.receiver.recv() => {
self.handle_actor_message(msg, &mut ctx).await?
},
Some(msg) = ctx.stream.next() => {
self.handle_stream_message(msg, &mut ctx).await?
},
else => {
ctx.receiver.close();
break;
}
}
}
self.finalize(&mut ctx).await?;
info!("ended");
Ok(())
}
async fn handle_actor_message(
&mut self,
msg: Self::Message,
ctx: &mut Context<Self, S>,
) -> Result<()>;
async fn handle_stream_message(
&mut self,
msg: S::Item,
ctx: &mut Context<Self, S>,
) -> Result<()>;
async fn initialize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
Ok(())
}
async fn finalize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
Ok(())
}
}
Full code: Rust Playground (unfortunately not executable due to dependencies)
Compiler error:
error: future cannot be sent between threads safely
--> src\main.rs:73:70
|
73 | async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
| ______________________________________________________________________^
74 | | info!("started");
75 | | self.initialize(&mut ctx).await?;
76 | |
... |
95 | | Ok(())
96 | | }
| |_____^ future created by async block is not `Send`
|
= help: within `impl futures::Future<Output = Result<(), anyhow::Error>>`, the trait `std::marker::Send` is not implemented for `<S as Stream>::Item`
note: future is not `Send` as this value is used across an await
--> src\main.rs:83:62
|
78 | / tokio::select! {
79 | | Some(msg) = ctx.receiver.recv() => {
80 | | self.handle_actor_message(msg, &mut ctx).await?
81 | | },
82 | | Some(msg) = ctx.stream.next() => {
| | --- has type `<S as Stream>::Item` which is not `Send`
83 | | self.handle_stream_message(msg, &mut ctx).await?
| | ^^^^^^ await occurs here, with `msg` maybe used later
... |
88 | | }
89 | | }
| |_____________- `msg` is later dropped here
= note: required for the cast from `impl futures::Future<Output = Result<(), anyhow::Error>>` to the object type `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send`
Here the problem, as far as I understand it, is that the stream item (S::Item) doesn't have the 'Send' trait, but needs it because I need to await the asyncronous message handlers and so it can be sent across threads. I don't know how and if it is possible to restrict an associated type of a trait to 'Send'. If I replace the Generic type S with a concrete type, like in my case for example 'SplitStream<Websocket>' everything works.
But I need the actor trait also for other streams, so my question is, is there a way to make the generic approach work and if so how?
Just add a bound S::Item: Send
in the correct place:
#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
S::Item: Send,
{ ... }
This causes a few cascaded errors and you need to add S::Item: Send
in few other places to fix them.