I am trying to receive a stream of bytes from async code into sync code.
The MultiBuf
type contains a tokio::sync::mpsc::Receiver
and a thread (not shown here) sends Bytes
objects to it. In my sync code, I want to call next_chunk
to receive bytes from the Receiver, but only when the caller is ready to consume these bytes.
The difficulty is that calling rx.blocking_recv()
requires the calling thread to own the Receiver.
pub struct MultiBuf {
recv: Receiver<Bytes>,
curr_chunk: Bytes,
}
pub fn new_multibuf(recv: Receiver<Bytes>) -> MultiBuf {
MultiBuf {
recv,
curr_chunk: Bytes::new(),
}
}
pub fn next_chunk(buf: &mut MultiBuf) -> &[u8] {
let next = {
let mut rx = buf.recv.clone();
let sync_call = std::thread::spawn(move || rx.blocking_recv());
sync_call.join().unwrap()
};
match next {
Some(v) => {
buf.curr_chunk = v;
&buf.curr_chunk[..]
}
None => &[],
}
}
The problem here is that buf.recv
cannot be cloned, and we cannot give ownership of the Receiver
to the spawned thread, because it needs to be called repeatedly.
I realize this may not be the right way to do this, but what would a better way be?
let sync_call = std::thread::spawn(move || rx.blocking_recv());
sync_call.join().unwrap()
This thread does nothing useful; by calling join()
immediately, you're blocking on the thread's completion, and the thread is blocking on blocking_recv()
, so the current thread is indirectly blocking on blocking_recv()
. You might as well do it directly:
let next = buf.recv.blocking_recv();
That will have the same effect as your code would have, but without the ownership problem you're asking about.