Search code examples
multithreadingrustclosuresmutex

Is this the idiomatic way to share a closure callback among threads in rust?


I'd like to pass a callback function do_something to a function new which in turn creates several threads that need the callback (here by calling a function in LibThreaded). For instance such a situation occurs if I have a lib that receives socket messages in a thread and then calls the callback to do something with them. The callback may itself call code from another lib OtherLib, for which the Clonetrait must be implement so far.

I came up with a version that appears to work but it looks overly complex. Is this really the correct/best way to share the callback? Would it be possible to lift the Clonetrait requirement on the do_something function in some other way?

#![feature(async_await)]
#![warn(rust_2018_idioms)]
use std::sync::{Arc, Mutex};
use std::error::Error;
use tokio::{runtime::Runtime};

#[derive(Clone)]
struct OtherLib { }

impl OtherLib {
    pub fn do_something(&self, text1: String, text2: String) {
        println!("doing something in other lib: {} + {}", text1, text2);
    }
}

type Callback = Arc<Mutex<Box<dyn 'static + FnMut(String, String) + Send + Sync>>>;

struct LibThreaded {
    something_threaded: String,
    callback: Callback,
}

impl LibThreaded {
    pub fn new(callback: Option<impl 'static + FnMut(String, String) + Send + Sync + Clone>) -> LibThreaded {
        if callback.is_some() {
            LibThreaded { something_threaded: "I am in a thread: ".to_string(), callback: Arc::new(Mutex::new(Box::new(callback.unwrap()))) }
        } else {
            LibThreaded { something_threaded: "I am in a thread: ".to_string(), callback: Arc::new(Mutex::new(Box::new(|_,_| {}))) }
        }

    }

    async fn receiving(&mut self) {
        println!("in receiving loop");
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something_threaded.clone(), "hello world".to_string());
    }
}

struct Lib {
    something: String,
    callback: Callback,
}

impl Lib {
    pub fn new() -> Lib {
        Lib { something: "I am lib: ".to_string(), callback: Arc::new(Mutex::new(Box::new(|_, _| {}))) }
    }

    pub fn set_callback(&mut self, callback: Option<impl 'static + FnMut(String, String) + Send + Sync + Clone>) {
        println!("in lib2");
        if callback.is_some() {
            self.callback = Arc::new(Mutex::new(Box::new(callback.clone().unwrap())));
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something.clone(), "hello world".to_string());
        }
        let mut t = LibThreaded::new(callback);

        tokio::spawn(async move {
            t.receiving().await;
        });
    }
}

fn main() -> Result<(), Box<dyn Error>> {
    let ol = OtherLib {};

    let callback = move |text1: String, text2: String| {
            ol.do_something(text1, text2);
    };

    let rt = Runtime::new()?;
    rt.block_on(async {
        let mut lib = Lib::new();
        lib.set_callback(Some(callback));
    });
    rt.shutdown_on_idle();
    Ok(())
}

Using the program above, I get the correct output:

in lib2
doing something in other lib: I am lib:  + hello world
in receiving loop
doing something in other lib: I am in a thread:  + hello world

I am wondering if there is an easier solution without Arc<Mutex<Box... that does not impose additional requirements on fn do_something. Thanks for your help!

Edited version: Thanks to the help from the comments/answers below, I have the following working code (see comments for line 1 and 2 in answer by rodrigo):

#![feature(async_await)]
#![warn(rust_2018_idioms)]
use std::sync::{Arc, Mutex};
use std::error::Error;
use tokio::{runtime::Runtime};

#[derive(Clone)]
struct OtherLib { }

impl OtherLib {
    pub fn do_something(&self, text1: String, text2: String) {
        println!("doing something in other lib: {} + {}", text1, text2);
    }
}

type Callback = Arc<Mutex<dyn 'static + FnMut(String, String) + Send + Sync>>;

struct LibThreaded {
    something_threaded: String,
    callback: Callback,
}

impl LibThreaded {
    pub fn new(callback: Option<Callback>) -> LibThreaded {
        LibThreaded {
            something_threaded: "I am in a thread: ".to_string(),
            callback: callback.unwrap_or_else(|| Arc::new(Mutex::new(|_,_| {})))
        }
    }

    async fn receiving(&mut self) {
        println!("in receiving loop");
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something_threaded.clone(), "hello world".to_string());
    }
}

struct Lib {
    something: String,
    callback: Callback,
}

impl Lib {
    pub fn new() -> Lib {
        Lib { something: "I am lib: ".to_string(), callback: Arc::new(Mutex::new(|_, _| {})) }
    }

    pub async fn set_callback(&mut self, callback: Option<impl 'static + FnMut(String, String) + Send + Sync>) {
        println!("in lib2");
        let callback = callback.map(|cb| Arc::new(Mutex::new(cb)) as Callback); //line 1
        if let Some(cb) = &callback {  //line 2
            self.callback = cb.clone();
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something.clone(), "hello world".to_string());
        }
        let mut t = LibThreaded::new(callback);

        tokio::spawn(async move {
            t.receiving().await;
        });
    }
}

fn main() -> Result<(), Box<dyn Error>> {
    let ol = OtherLib {};

    let callback = move |text1: String, text2: String| {
            ol.do_something(text1, text2);
    };

    let rt = Runtime::new()?;
    rt.block_on(async {
        let mut lib = Lib::new();
        lib.set_callback(Some(callback)).await;
    });
    rt.shutdown_on_idle();
    Ok(())
}

Solution

  • Let me rewrite the interesting pieces of code, as I understand the idiomatic Rust:

    First, the LibThreaded::new can be easily rewritten with a call to unwrap_or_else:

    pub fn new(callback: Option<Callback>) -> LibThreaded {
        LibThreaded {
            something_threaded: "I am in a thread: ".to_string(),
            callback: callback.unwrap_or_else(|| Arc::new(Mutex::new(|_,_| {})))
        }
    }
    

    You could also use Option::unwrap_or, but this way is nicer because you allocate the Mutex lazily, that is if Option is Some it will cost you nothing.

    Then the Lib::set_callback can be improved with a few changes: first remove the Clone requirement; then use if let Some(...) instead of is_some(); finally convert the callback into a Callback soon so it can be cloned:

    pub async fn set_callback(&mut self, callback: Option<impl FnMut(String, String) + Send + Sync + 'static>) {
        let callback = callback.map(|cb| Arc::new(Mutex::new(cb)) as Callback); //line 1
        if let Some(cb) = &callback {  //line 2
            self.callback = cb.clone();
            let c = &mut *self.callback.lock().unwrap();
            (c)(self.something.clone(), "hello world".to_string());
        }
        let mut t = LibThreaded::new(callback);
    
        //...
    }
    

    There are a couple of lines that deserve additional comments:

    Line 1: The value inside an Option is replaced using Option::map. If we do it naively, callback.map(|cb| Arc::new(Mutex::new(cb))); we would get an Option<impl FnMut...> instead of a Option<dyn FnMut>. Fortunately we can coerce an Arc<impl T> into a Arc<dyn T> so we do just that with help of the handy type alias.

    Line 2: You can do this in several ways. You could also write if let Some(cb) = callback.clone() (Option<T:Clone> is also Clone) or if let Some(ref cb) = callback. Personally I prefer the way I wrote it. The idea is not to consume callback in this block, so that it can be reused later.