I'm trying to move some data around b/w different threads but am getting the ole Copy trait-not-implemented error. Here's some code:
use std::future::Future;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
/// Start external crates mocked here
#[derive(Clone, PartialEq, Eq)]
pub struct DecodeError {
inner: Box<Inner>,
}
#[derive(Clone, PartialEq, Eq)]
struct Inner {}
#[derive(Clone)]
pub struct Connection {}
pub trait Message: core::fmt::Debug + Send + Sync {
fn decode<B>(mut buf: B) -> Result<Self, DecodeError>
where
B: bytes::Buf,
Self: Default,
{
// do stuff
let mut message = Self::default();
Ok(message)
}
}
#[derive(Clone, Debug, Default)]
pub struct Request {}
impl Message for Request {}
#[derive(Clone, Debug, Default)]
pub struct Response {}
impl Message for Response {}
pub struct OtherResponse {}
pub enum ReplyError {
InvalidData,
}
pub struct EventMessage {
data: Vec<u8>,
}
pub struct Subscription {}
impl Subscription {
pub async fn next(&self) -> Option<EventMessage> {
Some(EventMessage { data: vec![] })
}
}
/// End external crates mocked here
#[derive(Clone)]
pub struct Publisher<T> {
connection: Connection,
subject: String,
resource_type: PhantomData<*const T>,
}
#[derive(Debug)]
pub enum PublishError {
SerializeError(String),
PublishError(String),
}
pub type PublishResult<T> = std::result::Result<T, PublishError>;
impl<T: Message> Publisher<T> {
pub fn new(connection: Connection, subject: String) -> Self {
let resource_type = PhantomData;
Publisher {
connection: connection,
subject,
resource_type,
}
}
pub async fn publish(&self, msg: T) -> PublishResult<()>
where
T: Message,
{
// do stuff to msg
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let node = Node::new("127.0.0.1", "node".into())
.await
.expect("connecting to NATS");
let p: Publisher<Request> = node.get_publisher("TOPIC".into());
let _submission_replyer: AsynkReplyer<Request, Response> = node
.get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
let mut req = req.clone().lock().unwrap();
p.clone().publish(*req);
Ok(Response {})
})
.await;
Ok(())
}
pub struct Node {
name: String,
connection: Connection,
}
pub type ReplyResult<T> = std::result::Result<T, ReplyError>;
impl Node {
pub async fn new(_nats_url: &str, name: String) -> std::io::Result<Self> {
env_logger::init();
let connection = Connection {};
Ok(Node { name, connection })
}
pub fn get_publisher<T>(&self, subject: String) -> Publisher<T>
where
T: Message + Default,
{
Publisher::new(self.connection.clone(), subject)
}
pub async fn get_replyer<Req, Resp, Fut>(
&self,
subject: String,
callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
) -> AsynkReplyer<Req, Resp>
where
Req: Message + Default + 'static,
Resp: Message + Default,
Fut: Future<Output = ReplyResult<Resp>> + Send,
{
AsynkReplyer::new(&self.connection, subject, callback).await
}
}
pub struct AsynkReplyer<Req, Resp> {
request_type: PhantomData<Req>,
response_type: PhantomData<Resp>,
}
impl<Req: Message + Default + 'static, Resp: Message + Default> AsynkReplyer<Req, Resp> {
pub async fn new<Fut>(
connection: &Connection,
subject: String,
callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
) -> AsynkReplyer<Req, Resp>
where
Fut: Future<Output = ReplyResult<Resp>> + Send,
{
Self::start_subscription_handler(Subscription {}, callback).await;
AsynkReplyer {
request_type: PhantomData,
response_type: PhantomData,
}
}
pub async fn start_subscription_handler<Fut>(
subscription: Subscription,
callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
) where
Fut: Future<Output = ReplyResult<Resp>> + Send,
{
tokio::spawn(async move {
loop {
match subscription.next().await {
Some(msg) => {
Self::handle_request(msg, callback).await;
}
None => {
break;
}
}
}
});
}
/// Decodes + spins up another task to handle the request
pub async fn handle_request<Fut>(
msg: EventMessage,
callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
) -> ReplyResult<()>
where
Fut: Future<Output = ReplyResult<Resp>> + Send,
{
let decoded = Req::decode(msg.data.as_slice()).map_err(|_| ReplyError::InvalidData)?;
tokio::spawn(async move {
match callback(Arc::new(Mutex::new(decoded))).await {
Ok(response) => {
// do stuff
}
Err(e) => {}
}
});
Ok(())
}
}
error:
error[E0277]: the trait bound `Publisher<Request>: std::marker::Copy` is not satisfied in `[closure@src/main.rs:93:40: 97:10]`
--> src/main.rs:93:10
|
93 | .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
| __________^^^^^^^^^^^___________________-
| | |
| | within `[closure@src/main.rs:93:40: 97:10]`, the trait `std::marker::Copy` is not implemented for `Publisher<Request>`
94 | | let mut req = req.clone().lock().unwrap();
95 | | p.clone().publish(*req);
96 | | Ok(Response {})
97 | | })
| |_________- within this `[closure@src/main.rs:93:40: 97:10]`
|
= note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`
error[E0277]: `*const Request` cannot be sent between threads safely
--> src/main.rs:93:10
|
93 | .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
| __________^^^^^^^^^^^___________________-
| | |
| | `*const Request` cannot be sent between threads safely
94 | | let mut req = req.clone().lock().unwrap();
95 | | p.clone().publish(*req);
96 | | Ok(Response {})
97 | | })
| |_________- within this `[closure@src/main.rs:93:40: 97:10]`
|
= help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Send` is not implemented for `*const Request`
= note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
--> src/main.rs:53:12
|
53 | pub struct Publisher<T> {
| ^^^^^^^^^
= note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`
error[E0277]: `*const Request` cannot be shared between threads safely
--> src/main.rs:93:10
|
93 | .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
| __________^^^^^^^^^^^___________________-
| | |
| | `*const Request` cannot be shared between threads safely
94 | | let mut req = req.clone().lock().unwrap();
95 | | p.clone().publish(*req);
96 | | Ok(Response {})
97 | | })
| |_________- within this `[closure@src/main.rs:93:40: 97:10]`
|
= help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Sync` is not implemented for `*const Request`
= note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
--> src/main.rs:53:12
|
53 | pub struct Publisher<T> {
| ^^^^^^^^^
= note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`
I can't (or can I) add the Copy
attribute on the Publisher
struct but that wont work since not all of its fields implement Copy
. Despite this I've commented out the fields in Publisher
that don't impl Copy
and added the attribute to it just to see, and with that approach I get:
the trait `std::marker::Copy` is not implemented for `Request`
Request is a protobuf
based struct compiled using the prost lib. I'm not able to add the Copy
attribute to that because of some of its fields not implementing Copy
such as String
and Timestamp
.
I'm wondering if the design here is just inherently bad or if there's a simple fix.
It seems to me, you've constrained that the Fn
is Copy
because you are passing it to multiple tokio::spawn
calls. You've found that Copy
is very restrictive, however Clone
is not. You should use it instead and simply call .clone()
when you handle the new request:
Self::handle_request(msg, callback.clone()).await;
Then the only errors are '*const Request' cannot be sent between threads safely
. The compiler does not automatically implement Send
or Sync
for pointers because it doesn't know if that's safe, but your Fn
needs to be called from different threads. Fortunately, you don't need to worry about that. Whether your PhantomData<*const T>
is there simply to satisfy the compiler or to enforce specific variance, you can get the same result like this:
resource_type: PhantomData<fn() -> *const T>
Then, now that we've fixed the type constraint errors, the compiler now produces errors about lifetimes:
req.clone().lock().unwrap()
doesn't work because the result of .lock()
is tied to the value from req.clone()
, but that gets dropped immediately. The fix is that the .clone()
is unnecessary and can be removed.
p.clone().publish(*req)
doesn't work since dereferencing a MutexLockGuard
cannot provide a owned value, only a reference. You can fix this by adding a .clone()
instead. If instead you think the Arc
parameter is exclusive, you can get ownership by following the advice here: How to take ownership of T from Arc<Mutex<T>>?
The last lifetime error is a bit fuzzy because it has to do with the lifetime of the returned Future
being tied to the req
parameter. This can be fixed by using async move { }
but then p
is moved out of the closure into the future, meaning it is no longer Fn
. What you want is to move req
but move a clone of p
. You can do this like so:
move |req: Arc<Mutex<Request>>| {
let p = p.clone();
async move {
// ...
}
}
"now I'm trying to resolve errs related to await
-ing p.publish
" - the error has to do with the lock now persisting across an await
point, but since the mutex guard doesn't implement Send
, the Future
cannot be Send
. You can fix this by locking and cloning in one step, so the lock isn't held:
let req = req.lock().unwrap().clone();
p.publish(req);
Ok(Response {})
See this compiling on the playground. There are still a number of warnings that should be addressed (unused Result
s), but I hope this gets you on the right path.