Search code examples
multithreadingrustthread-safetythreadpoolrust-cargo

Rust error[E0308]: mismatched types when working on ThreadPool and Worker


I am trying to recreate a multi threaded server example.

I have an impl called FileReader where the point is to perform a simple reading job from a file.

I am attempting to perform this same reading job multiple types with threads.

Following the chapter 20 example here are the structs, enums, types I am using

// Error for ThreadPool
#[derive(Debug)]
pub enum PoolCreationError {
    InvalidSize(i32),
}

// Error for ThreadPool
#[derive(Debug)]
pub enum PoolTerminationError {
    ThreadTerminationError(String),
}

// Defines the ThreadPool class
pub struct ThreadPool{
    workers: Vec<Worker>,
    sender: channel::Sender<Message>,
}

// Job and Job termination message
enum Message {
    NewJob(Job),
    Terminate,
}

// 'Job' type will be a heap allocated space for a one time use function
type Job = Box<dyn FnOnce() + Send + 'static>;

// Defines the Worker class
pub struct Worker {
    id: i32,
    thread: Option<thread::JoinHandle<()>>,
}

The idea is that the ThreadPool will send jobs to a crossbeam channel, and the workers will start up threads since they are the receivers. To join the threads afterwards I need to perform work on the JoiHandle

impl ThreadPool {
    pub fn new_cpu_cores() -> Result<ThreadPool, PoolCreationError> {
        let num_cores = num_cpus::get();
        let pool_size = &num_cores - 1;
        let mut workers = Vec::with_capacity(pool_size);
        let (sender, receiver) = crossbeam::channel::bounded(5);
        let receiver = Arc::new(receiver);
        if pool_size > 0 {
            println!("pool_size is greater than 0 -> {}", &pool_size);
            for id in 0..pool_size {
                workers.push(Worker::new(id as i32, Arc::clone(&receiver)));
            }
            Ok(ThreadPool{workers, sender})
        } else {
            println!("pool_size is not greater than 0 -> {}", &pool_size);
            Err(PoolCreationError::InvalidSize(pool_size as i32))
        }
    }
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        let job2 = Message::NewJob(job);
        self.sender.send(job2).unwrap();
    }

    pub fn multi_join(&mut self) {
        println!("Sending terminate message to all workers.");

        // Need to make sure workers arent listening to channel for new requests
        for _ in &mut self.workers { 
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        // Can join the threads now
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

And here is what I am doing on the Worker side

impl Worker {
    fn new(id: i32, receiver: Arc<channel::Receiver<Job>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.recv().unwrap();
                println!("Worker id is -> {}, thread id is -> {:?}", &id, thread::current().id());
                match message {
                    Message::NewJob(job) => {
                        println!("Worker {} got a job; executing.", id);

                        job.call_box();
                    },
                    Message::Terminate => {
                        println!("Worker {} was told to terminate.", id);

                        break;
                    },
                }
            }
        });
        Worker{id, thread: Some(thread)}
    }
}

The main issue I am having right now is in relation to error[E0308]: mismatched types

--> src/lib.rs:69:36
   |
69 |             Ok(ThreadPool{workers, sender})
   |                                    ^^^^^^ expected enum `Message`, found struct `Box`
   |
   = note: expected struct `crossbeam::crossbeam_channel::Sender<Message>`
              found struct `crossbeam::crossbeam_channel::Sender<Box<(dyn FnOnce() + Send + 'static)>>`

As far as I can tell the Receiver and Sender should be of type Message based on the ThreadPool sender field. Why is type being used what I am using for my Job type definition?

EDIT:

Tried the following change

let (sender, receiver) = crossbeam::channel::bounded:<channel::Sender<Message>>(5);

But got this error

error[E0308]: mismatched types
   --> src/lib.rs:67:64
    |
67  |                 workers.push(Worker::new(id as i32, Arc::clone(&receiver)));
    |                                                     ---------- ^^^^^^^^^ expected struct `Box`, found struct `crossbeam::crossbeam_channel::Sender`
    |                                                     |
    |                                                     arguments to this function are incorrect
    |
    = note: expected reference `&Arc<crossbeam::crossbeam_channel::Receiver<Box<(dyn FnOnce() + Send + 'static)>>>`
               found reference `&Arc<crossbeam::crossbeam_channel::Receiver<crossbeam::crossbeam_channel::Sender<Message>>>`

Now I am extra confused, why is it no longer expecting type Message?


Solution

  • In the new method you specified the type of Receiver is Receiver<Job>

    impl Worker {
        fn new(id: i32, receiver: Arc<channel::Receiver<Job>>) -> Worker {
    ...
    

    So Rust will expect Job in the channel but you pass a Message into it. Change the parameter type to Receiver<Message> will fix the problem.