Search code examples
rustvectorrust-tokio

How to send to a tokio oneshot::channel when the Sender is in a Vec?


I want to use a container to manage tokio::oneshot::Senders. I'm using a Vec, but seems that values saved in Vec are references and I need to use self, not a reference, to call it:

use bytes::BytesMut;
use tokio::sync::oneshot;

#[derive(Clone)]
pub enum ChannelData {
    Video { timestamp: u32, data: BytesMut },
    Audio { timestamp: u32, data: BytesMut },
    MetaData {},
}

pub type PlayerPublisher = oneshot::Sender<ChannelData>;

pub struct Channel {
    player_producers: Vec<PlayerPublisher>, // consumers who subscribe this channel.
}

impl Channel {
    fn new() -> Self {
        Self {
            player_producers: Vec::new(),
        }
    }

    async fn transmit(&mut self) {
        let b = BytesMut::new();
        let data = ChannelData::Video {
            timestamp: 234,
            data: b,
        };

        for i in self.player_producers {
            i.send(data);
        }
    }
}

The errors:

error[E0507]: cannot move out of `self.player_producers` which is behind a mutable reference
  --> src/lib.rs:31:18
   |
31 |         for i in self.player_producers {
   |                  ^^^^^^^^^^^^^^^^^^^^^
   |                  |
   |                  move occurs because `self.player_producers` has type `Vec<tokio::sync::oneshot::Sender<ChannelData>>`, which does not implement the `Copy` trait
   |                  help: consider iterating over a slice of the `Vec<_>`'s content: `&self.player_producers`

error[E0382]: use of moved value: `data`
  --> src/lib.rs:32:20
   |
26 |         let data = ChannelData::Video {
   |             ---- move occurs because `data` has type `ChannelData`, which does not implement the `Copy` trait
...
32 |             i.send(data);
   |                    ^^^^ value moved here, in previous iteration of loop

How can I achieve my goal?

pub fn send(mut self, t: T) -> Result<(), T> {
    let inner = self.inner.take().unwrap();

    inner.value.with_mut(|ptr| unsafe {
        *ptr = Some(t);
    });

    if !inner.complete() {
        unsafe {
            return Err(inner.consume_value().unwrap());
        }
    }

    Ok(())
}

Solution

  • Calling send requires ownership of the oneshot channel. To get that ownership, you can take ownership of the container. In this case, the easiest way is to take ownership of the Channel:

    async fn transmit(self) { // Changed to `self`
        for i in self.player_producers {
            let data = ChannelData::Video {
                timestamp: 234,
                data: BytesMut::new(),
            };
            if i.send(data).is_err() {
                panic!("Unable to send data");
            }
        }
    }
    

    Other options are to drain the collection:

    for i in self.player_producers.drain(..) {
    

    Or swap the collection with an empty one:

    use std::mem;
    for i in mem::take(&mut self.player_producers) {
    

    In each case, the data payload has to be constructed (or cloned) each time it is sent.

    See also: