Search code examples
rustserver-sent-events

why send channel message shows channel closed in rust


I am working with a cloud xelatex online editor, when user invoke compile action, I want to output the cloud xelatex compile log to the browser using sse(server send events), I have made a minimal demo and tried this. Now I am facing a issue that when I send the xelatex compile log by channel, shows error:

channel closed

this is my minimal rust code demo:

use std::fmt::Display;
use std::io::Read;
use std::process::{Command, Stdio};
use std::time::Duration;

use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::{interval, Interval};

async fn run_xelatex() -> String {
    let mut cmd = Command::new("xelatex")
        .arg("-interaction=batchmode")
        .arg("/Users/xiaoqiangjiang/Nutstore/document/dolphin-book-2023/src/dolphin-book-2023.tex")
        .stdout(Stdio::piped())
        .spawn()
        .expect("Failed to execute xelatex command");

    let mut output = String::new();
    cmd.stdout
        .take()
        .unwrap()
        .read_to_string(&mut output)
        .unwrap();

    output
}

async fn sse_endpoint(req: HttpRequest) -> HttpResponse {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    task::spawn(async move {
        let output = run_xelatex().await;
        let message = format!("data: {}\n\n", output);
        let send_result = tx.send(message);
        match send_result {
            Ok(_) => {}
            Err(e) => {
                println!("send xelatex compile log error: {}", e);
            }
        }
    });

    let response = HttpResponse::Ok()
        .content_type("text/event-stream")
        .streaming(SseStream {
            counter: 4,
            interval: interval(Duration::from_secs(5000)),
            receiver: Some(rx),
        });

    response
}

/// SSE stream implementation
struct SseStream<T>
where
    T: Display,
{
    counter: usize,
    interval: Interval,
    receiver: Option<mpsc::UnboundedReceiver<T>>,
}

impl<T: std::fmt::Display> SseStream<T> {
    fn new() -> Self {
        Self {
            counter: 0,
            interval: interval(Duration::from_secs(1000)),
            receiver: None,
        }
    }
}

impl<T: std::fmt::Display> futures::Stream for SseStream<T> {
    type Item = Result<actix_web::web::Bytes, actix_web::Error>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        self.counter += 1;
        // get reciver data and send
        if let Some(receiver) = &mut self.receiver {
            match receiver.try_recv() {
                Ok(data) => {
                    // Create the SSE event
                    let message = format!("data: {}\n\n", data);

                    // Return the event as a stream item
                    return std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))));
                }
                Err(_err) => {
                    // handle error
                    return std::task::Poll::Ready(None);
                }
            }
        } else {
            return std::task::Poll::Ready(None);
        }

        // Return the event as a stream item
        //std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))))
    }
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().route("/sse", web::get().to(sse_endpoint)))
        .bind("127.0.0.1:8080")?
        .run()
        .await
}

and this is the Cargo.toml:

[package]
name = "rust-learn"
version = "0.1.0"
edition = "2018"

[dependencies]
tokio = { version = "1.17.0", features = ["full"] }
serde = { version = "1.0.64", features = ["derive"] }
serde_json = "1.0.64"
actix-web = "4"
futures = "0.3"
eventsource = "0.5"
bytes = "1"

Am I missing something? what should I do make it work? this is the command in terminal to reqeust the api:

> curl -N http://localhost:8080/sse

Solution

  • You get "channel closed" because your receiver is being destroyed before you can send a message.

    Your poll_next() implementation always returns Ready which means it is not waiting on a future value. And when you return Ready(None) you are signaling that your stream has reached the end; so your SseStream and thus the receiver will be dropped.

    You should instead return Poll::Pending when you don't have a value but also aren't done yet. Better yet, you should use .poll_recv() instead of .try_recv() since it will handle waking. Something like this (untested):

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        ctx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        self.counter += 1;
        // get reciver data and send
        if let Some(receiver) = &mut self.receiver {
            match receiver.poll_recv(ctx) {
                std::task::Poll::Ready(Some(data)) => {
                    // Create the SSE event
                    let message = format!("data: {}\n\n", data);
    
                    // Return the event as a stream item
                    return std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))));
                }
                std::task::Poll::Ready(None) => {
                    return std::task::Poll::Ready(None);
                }
                std::task::Poll::Pending => {
                    return std::task::Poll::Pending;
                }
            }
        } else {
            return std::task::Poll::Ready(None);
        }
    }