Search code examples
rustprotocol-buffersrust-tokiorust-tonicprost

the trait `std::marker::Copy` is not implemented for Type


I'm trying to move some data around b/w different threads but am getting the ole Copy trait-not-implemented error. Here's some code:

use std::future::Future;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};

/// Start external crates mocked here

#[derive(Clone, PartialEq, Eq)]
pub struct DecodeError {
    inner: Box<Inner>,
}
#[derive(Clone, PartialEq, Eq)]
struct Inner {}
#[derive(Clone)]
pub struct Connection {}

pub trait Message: core::fmt::Debug + Send + Sync {
    fn decode<B>(mut buf: B) -> Result<Self, DecodeError>
    where
        B: bytes::Buf,
        Self: Default,
    {
        // do stuff
        let mut message = Self::default();
        Ok(message)
    }
}

#[derive(Clone, Debug, Default)]
pub struct Request {}
impl Message for Request {}
#[derive(Clone, Debug, Default)]
pub struct Response {}
impl Message for Response {}

pub struct OtherResponse {}
pub enum ReplyError {
    InvalidData,
}
pub struct EventMessage {
    data: Vec<u8>,
}

pub struct Subscription {}

impl Subscription {
    pub async fn next(&self) -> Option<EventMessage> {
        Some(EventMessage { data: vec![] })
    }
}
/// End external crates mocked here

#[derive(Clone)]
pub struct Publisher<T> {
    connection: Connection,
    subject: String,
    resource_type: PhantomData<*const T>,
}

#[derive(Debug)]
pub enum PublishError {
    SerializeError(String),
    PublishError(String),
}

pub type PublishResult<T> = std::result::Result<T, PublishError>;

impl<T: Message> Publisher<T> {
    pub fn new(connection: Connection, subject: String) -> Self {
        let resource_type = PhantomData;

        Publisher {
            connection: connection,
            subject,
            resource_type,
        }
    }
    pub async fn publish(&self, msg: T) -> PublishResult<()>
    where
        T: Message,
    {
        // do stuff to msg
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let node = Node::new("127.0.0.1", "node".into())
        .await
        .expect("connecting to NATS");
    let p: Publisher<Request> = node.get_publisher("TOPIC".into());
    let _submission_replyer: AsynkReplyer<Request, Response> = node
        .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
            let mut req = req.clone().lock().unwrap();
            p.clone().publish(*req);
            Ok(Response {})
        })
        .await;

    Ok(())
}

pub struct Node {
    name: String,
    connection: Connection,
}

pub type ReplyResult<T> = std::result::Result<T, ReplyError>;

impl Node {
    pub async fn new(_nats_url: &str, name: String) -> std::io::Result<Self> {
        env_logger::init();

        let connection = Connection {};
        Ok(Node { name, connection })
    }

    pub fn get_publisher<T>(&self, subject: String) -> Publisher<T>
    where
        T: Message + Default,
    {
        Publisher::new(self.connection.clone(), subject)
    }

    pub async fn get_replyer<Req, Resp, Fut>(
        &self,
        subject: String,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> AsynkReplyer<Req, Resp>
    where
        Req: Message + Default + 'static,
        Resp: Message + Default,
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        AsynkReplyer::new(&self.connection, subject, callback).await
    }
}

pub struct AsynkReplyer<Req, Resp> {
    request_type: PhantomData<Req>,
    response_type: PhantomData<Resp>,
}

impl<Req: Message + Default + 'static, Resp: Message + Default> AsynkReplyer<Req, Resp> {
    pub async fn new<Fut>(
        connection: &Connection,
        subject: String,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> AsynkReplyer<Req, Resp>
    where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        Self::start_subscription_handler(Subscription {}, callback).await;

        AsynkReplyer {
            request_type: PhantomData,
            response_type: PhantomData,
        }
    }

    pub async fn start_subscription_handler<Fut>(
        subscription: Subscription,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        tokio::spawn(async move {
            loop {
                match subscription.next().await {
                    Some(msg) => {
                        Self::handle_request(msg, callback).await;
                    }
                    None => {
                        break;
                    }
                }
            }
        });
    }

    /// Decodes + spins up another task to handle the request
    pub async fn handle_request<Fut>(
        msg: EventMessage,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> ReplyResult<()>
    where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        let decoded = Req::decode(msg.data.as_slice()).map_err(|_| ReplyError::InvalidData)?;

        tokio::spawn(async move {
            match callback(Arc::new(Mutex::new(decoded))).await {
                Ok(response) => {
                    // do stuff
                }
                Err(e) => {}
            }
        });
        Ok(())
    }
}

error:

error[E0277]: the trait bound `Publisher<Request>: std::marker::Copy` is not satisfied in `[closure@src/main.rs:93:40: 97:10]`
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          within `[closure@src/main.rs:93:40: 97:10]`, the trait `std::marker::Copy` is not implemented for `Publisher<Request>`
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

error[E0277]: `*const Request` cannot be sent between threads safely
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          `*const Request` cannot be sent between threads safely
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Send` is not implemented for `*const Request`
   = note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
  --> src/main.rs:53:12
   |
53 | pub struct Publisher<T> {
   |            ^^^^^^^^^
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

error[E0277]: `*const Request` cannot be shared between threads safely
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          `*const Request` cannot be shared between threads safely
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Sync` is not implemented for `*const Request`
   = note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
  --> src/main.rs:53:12
   |
53 | pub struct Publisher<T> {
   |            ^^^^^^^^^
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

I can't (or can I) add the Copy attribute on the Publisher struct but that wont work since not all of its fields implement Copy. Despite this I've commented out the fields in Publisher that don't impl Copy and added the attribute to it just to see, and with that approach I get:

the trait `std::marker::Copy` is not implemented for `Request`

Request is a protobuf based struct compiled using the prost lib. I'm not able to add the Copy attribute to that because of some of its fields not implementing Copy such as String and Timestamp.

I'm wondering if the design here is just inherently bad or if there's a simple fix.


Solution

  • It seems to me, you've constrained that the Fn is Copy because you are passing it to multiple tokio::spawn calls. You've found that Copy is very restrictive, however Clone is not. You should use it instead and simply call .clone() when you handle the new request:

    Self::handle_request(msg, callback.clone()).await;
    

    Then the only errors are '*const Request' cannot be sent between threads safely. The compiler does not automatically implement Send or Sync for pointers because it doesn't know if that's safe, but your Fn needs to be called from different threads. Fortunately, you don't need to worry about that. Whether your PhantomData<*const T> is there simply to satisfy the compiler or to enforce specific variance, you can get the same result like this:

    resource_type: PhantomData<fn() -> *const T>
    

    Then, now that we've fixed the type constraint errors, the compiler now produces errors about lifetimes:

    • req.clone().lock().unwrap() doesn't work because the result of .lock() is tied to the value from req.clone(), but that gets dropped immediately. The fix is that the .clone() is unnecessary and can be removed.

    • p.clone().publish(*req) doesn't work since dereferencing a MutexLockGuard cannot provide a owned value, only a reference. You can fix this by adding a .clone() instead. If instead you think the Arc parameter is exclusive, you can get ownership by following the advice here: How to take ownership of T from Arc<Mutex<T>>?

    • The last lifetime error is a bit fuzzy because it has to do with the lifetime of the returned Future being tied to the req parameter. This can be fixed by using async move { } but then p is moved out of the closure into the future, meaning it is no longer Fn. What you want is to move req but move a clone of p. You can do this like so:

      move |req: Arc<Mutex<Request>>| {
          let p = p.clone();
          async move {
              // ...
          }
      }
      
    • "now I'm trying to resolve errs related to await-ing p.publish" - the error has to do with the lock now persisting across an await point, but since the mutex guard doesn't implement Send, the Future cannot be Send. You can fix this by locking and cloning in one step, so the lock isn't held:

      let req = req.lock().unwrap().clone();
      p.publish(req);
      Ok(Response {})
      

    See this compiling on the playground. There are still a number of warnings that should be addressed (unused Results), but I hope this gets you on the right path.