Search code examples
rustrust-tokio

How to read a single value from a tokio::sync::mpsc::Receiver from sync code?


I am trying to receive a stream of bytes from async code into sync code.

The MultiBuf type contains a tokio::sync::mpsc::Receiver and a thread (not shown here) sends Bytes objects to it. In my sync code, I want to call next_chunk to receive bytes from the Receiver, but only when the caller is ready to consume these bytes.

The difficulty is that calling rx.blocking_recv() requires the calling thread to own the Receiver.

    pub struct MultiBuf {
        recv: Receiver<Bytes>,
        curr_chunk: Bytes,
    }
    
    pub fn new_multibuf(recv: Receiver<Bytes>) -> MultiBuf {
        MultiBuf {
            recv,
            curr_chunk: Bytes::new(),
        }
    }
    
    pub fn next_chunk(buf: &mut MultiBuf) -> &[u8] {
        let next = {
            let mut rx = buf.recv.clone();
            let sync_call = std::thread::spawn(move || rx.blocking_recv());
            sync_call.join().unwrap()
        };
        match next {
            Some(v) => {
                buf.curr_chunk = v;
                &buf.curr_chunk[..]
            }
            None => &[],
        }
    }

The problem here is that buf.recv cannot be cloned, and we cannot give ownership of the Receiver to the spawned thread, because it needs to be called repeatedly.

I realize this may not be the right way to do this, but what would a better way be?


Solution

  • let sync_call = std::thread::spawn(move || rx.blocking_recv());
    sync_call.join().unwrap()
    

    This thread does nothing useful; by calling join() immediately, you're blocking on the thread's completion, and the thread is blocking on blocking_recv(), so the current thread is indirectly blocking on blocking_recv(). You might as well do it directly:

    let next = buf.recv.blocking_recv();
    

    That will have the same effect as your code would have, but without the ownership problem you're asking about.