Search code examples
multithreadingrustconcurrencymessage

Control thread from another thread leads to unexpected behavior in Rust


In my example certain events are not handled (messages are not printed) and I don't understand why. To motivate my code a little: The problem I want to solve (currently) is only a simple blinking LED connected to a Raspberry Pi. The main thread is a web server (axum) that handles messages and changes the led-state accordingly. I want to run the LED-control-code in another thread that listens to messages sent over a channel:

enum ThreadControl {
    Quit,
    LedMessage(LedState),
}

enum LedState {
    Off,
    On,
    BlinkOnOff { period_ms: u16 },
    // BreathInOutLinear { period_ms: u16 },
    // BreathInOutLogarithmic { period_ms: u16 },
    // FallingSawtoothLinear { period_ms: u16 },
    // RisingSawtoothLinear { period_ms: u16 },
    // AttackDecay { attack_ms: u16, decay_ms: u16 },
    // ... more mindblowing blink-stuff that might 
    // become complicated and should run independently
}

Next I define a function that controls the LED in a loop and listens to ThreadControl-messages sent over a message-passing-channel and spawn that function in a thread. While the led-control-thread is running, I start another thread that sends ThreadControl-messages to change the LED's behavior or request the function to stop the loop. (Maybe I don't need another thread to send messages, I'm not sure).

use std::{
    sync::mpsc::{channel, Receiver},
    thread::{self, sleep, spawn},
    time::{Duration, Instant},
};

fn run_led_thread(rx: Receiver<ThreadControl>) {
    println!("hello from led control thread");

    let mut pin_state = false; // later will be the actual GPIO pin
    let mut led_state = LedState::Off;
    let mut now = Instant::now();
    let mut keep_running = true;

    loop {
        rx.iter().for_each(|msg| match msg {
            ThreadControl::LedMessage(new_led_state) => {
                println!("Handling led message");
                led_state = new_led_state;
            }
            ThreadControl::Quit => {
                println!("Quitting thread");
                keep_running = false;
            }
        });

        if keep_running {
            match led_state {
                LedState::Off => {
                    pin_state = false;
                    println!("off")
                }
                LedState::On => {
                    pin_state = true;
                    println!("on")
                }
                LedState::BlinkOnOff { period_ms: value } => {
                    if now.elapsed() > Duration::from_millis((value as u64) / 2) {
                        pin_state = !pin_state;
                        now = Instant::now();
                        match pin_state {
                            true => println!("blink: on"),
                            false => println!("blink: off"),
                        }
                    }
                }
            }
            // avoid thread running at 100%
            // sleep arbitrary duration
            // maybe there's a better solution?
            sleep(Duration::from_millis(5))
        } else {
            break;
        }
    }
}

fn main() {
    let (tx, rx) = channel();

    // a thread that controls the LED and waits for new
    // instructions for LED behavior or Quit-Messate
    let handle_led_driver = spawn(|| run_led_thread(rx));

    // another thread that sends control messages.
    // (maybe this could be the main thread)
    let handle_control = spawn(move || {
        tx.send(ThreadControl::LedMessage(LedState::On)).unwrap();
        thread::sleep(Duration::from_millis(200));
        tx.send(ThreadControl::LedMessage(LedState::Off)).unwrap();
        thread::sleep(Duration::from_millis(200));
        tx.send(ThreadControl::LedMessage(LedState::BlinkOnOff {
            period_ms: 10,
        }))
        .unwrap();
        thread::sleep(Duration::from_millis(500));
        tx.send(ThreadControl::Quit).unwrap();
    });

    handle_led_driver.join().unwrap();
    handle_control.join().unwrap();
}

When I run this example I don't get the expected output. I get:

hello from led control thread
Handling led message
Handling led message
Handling led message
Quitting thread

What I would expect is

hello from led control thread
Handling led message
on
Handling led message
off
Handling led message
blink: on
blink: off
blink: on
... // and so on
Quitting thread

Somehow that second match (which matches LedState) is not working as no messages are printed. What am I doing wrong?

Also: Is this a clever solution in the first place, or is it rather stupid for this kind of problem. I noticed that receiver seems to have a kind of queue. In my case LED-example I don't expect the queue to mostly be one item (or none at all). In Rust there is also shared state with Arc<Mutex< ... >> which I also considered. But somewhere I read "don't communicate via shared memory".


Solution

  • The problem is that rx.iter().for_each() loops over every message that's ever (going to be) sent to that rx not only those that are currently in it.

    You can handle the messages and the LED all in the outer loop by using recv* instead of turning the rx into an iterator:

    fn run_led_thread(rx: Receiver<LedState>) {
        println!("hello from led control thread");
    
        let mut pin_state = false; // later will be the actual GPIO pin
        let mut led_state = LedState::Off;
        let mut next_blink = None::<Instant>;
    
        loop {
            let msg = if let Some(next_blink) = next_blink {
                // we're currently blinking the LED, so we have to block for
                // - the next message
                // - or until we need to switch the LED state again
                // whichever comes first
                rx.recv_timeout(next_blink.saturating_duration_since(Instant::now()))
            } else {
                // we're not currently blinking so we can just block
                // until we receive another message
                rx.recv().map_err(RecvTimeoutError::from)
            };
    
            match msg {
                Ok(new_led_state) => {
                    println!("Handling led message");
                    led_state = new_led_state;
                    next_blink = None; // reset potential previous blinking
                }
                Err(RecvTimeoutError::Disconnected) => {
                    println!("Quitting thread");
                    break;
                }
                Err(RecvTimeoutError::Timeout) => {}
            }
    
            match led_state {
                LedState::Off => pin_state = false,
                LedState::On => pin_state = true,
                LedState::BlinkOnOff { period } => {
                    pin_state = !pin_state;
                    *next_blink.get_or_insert_with(Instant::now) += period / 2;
                    print!("blink: ");
                }
            }
    
            println!("{}", if pin_state { "on" } else { "off" });
        }
    }
    
    enum LedState {
        Off,
        On,
        BlinkOnOff { period: Duration },
    }
    

    on nightly you can enable #![feature(deadline_api)] and replace

    rx.recv_timeout(next_blink.saturating_duration_since(Instant::now()))
    

    with

    rx.recv_deadline(next_blink)
    

    Like Masklinn already hinted you can just use the channel itself to signal quitting, that means in turn you don't need ThreadControl but can send LedState directly. I've also gotten rid of keep_running by just immediately breaking out of the loop in that case.