I am trying to recreate a multi threaded server example.
I have an impl
called FileReader
where the point is to perform a simple reading job from a file.
I am attempting to perform this same reading job multiple types with threads.
Following the chapter 20 example here are the structs, enums, types I am using
// Error for ThreadPool
#[derive(Debug)]
pub enum PoolCreationError {
InvalidSize(i32),
}
// Error for ThreadPool
#[derive(Debug)]
pub enum PoolTerminationError {
ThreadTerminationError(String),
}
// Defines the ThreadPool class
pub struct ThreadPool{
workers: Vec<Worker>,
sender: channel::Sender<Message>,
}
// Job and Job termination message
enum Message {
NewJob(Job),
Terminate,
}
// 'Job' type will be a heap allocated space for a one time use function
type Job = Box<dyn FnOnce() + Send + 'static>;
// Defines the Worker class
pub struct Worker {
id: i32,
thread: Option<thread::JoinHandle<()>>,
}
The idea is that the ThreadPool
will send jobs to a crossbeam channel, and the workers will start up threads since they are the receivers. To join the threads afterwards I need to perform work on the JoiHandle
impl ThreadPool {
pub fn new_cpu_cores() -> Result<ThreadPool, PoolCreationError> {
let num_cores = num_cpus::get();
let pool_size = &num_cores - 1;
let mut workers = Vec::with_capacity(pool_size);
let (sender, receiver) = crossbeam::channel::bounded(5);
let receiver = Arc::new(receiver);
if pool_size > 0 {
println!("pool_size is greater than 0 -> {}", &pool_size);
for id in 0..pool_size {
workers.push(Worker::new(id as i32, Arc::clone(&receiver)));
}
Ok(ThreadPool{workers, sender})
} else {
println!("pool_size is not greater than 0 -> {}", &pool_size);
Err(PoolCreationError::InvalidSize(pool_size as i32))
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
let job2 = Message::NewJob(job);
self.sender.send(job2).unwrap();
}
pub fn multi_join(&mut self) {
println!("Sending terminate message to all workers.");
// Need to make sure workers arent listening to channel for new requests
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
// Can join the threads now
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
And here is what I am doing on the Worker
side
impl Worker {
fn new(id: i32, receiver: Arc<channel::Receiver<Job>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.recv().unwrap();
println!("Worker id is -> {}, thread id is -> {:?}", &id, thread::current().id());
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job.call_box();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
}
});
Worker{id, thread: Some(thread)}
}
}
The main issue I am having right now is in relation to error[E0308]: mismatched types
--> src/lib.rs:69:36
|
69 | Ok(ThreadPool{workers, sender})
| ^^^^^^ expected enum `Message`, found struct `Box`
|
= note: expected struct `crossbeam::crossbeam_channel::Sender<Message>`
found struct `crossbeam::crossbeam_channel::Sender<Box<(dyn FnOnce() + Send + 'static)>>`
As far as I can tell the Receiver and Sender should be of type Message
based on the ThreadPool
sender field. Why is type being used what I am using for my Job
type definition?
EDIT:
Tried the following change
let (sender, receiver) = crossbeam::channel::bounded:<channel::Sender<Message>>(5);
But got this error
error[E0308]: mismatched types
--> src/lib.rs:67:64
|
67 | workers.push(Worker::new(id as i32, Arc::clone(&receiver)));
| ---------- ^^^^^^^^^ expected struct `Box`, found struct `crossbeam::crossbeam_channel::Sender`
| |
| arguments to this function are incorrect
|
= note: expected reference `&Arc<crossbeam::crossbeam_channel::Receiver<Box<(dyn FnOnce() + Send + 'static)>>>`
found reference `&Arc<crossbeam::crossbeam_channel::Receiver<crossbeam::crossbeam_channel::Sender<Message>>>`
Now I am extra confused, why is it no longer expecting type Message?
In the new
method you specified the type of Receiver is Receiver<Job>
impl Worker {
fn new(id: i32, receiver: Arc<channel::Receiver<Job>>) -> Worker {
...
So Rust will expect Job
in the channel but you pass a Message
into it. Change the parameter type to Receiver<Message>
will fix the problem.