I am working with a cloud xelatex online editor, when user invoke compile action, I want to output the cloud xelatex compile log to the browser using sse(server send events), I have made a minimal demo and tried this. Now I am facing a issue that when I send the xelatex compile log by channel, shows error:
channel closed
this is my minimal rust code demo:
use std::fmt::Display;
use std::io::Read;
use std::process::{Command, Stdio};
use std::time::Duration;
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::{interval, Interval};
async fn run_xelatex() -> String {
let mut cmd = Command::new("xelatex")
.arg("-interaction=batchmode")
.arg("/Users/xiaoqiangjiang/Nutstore/document/dolphin-book-2023/src/dolphin-book-2023.tex")
.stdout(Stdio::piped())
.spawn()
.expect("Failed to execute xelatex command");
let mut output = String::new();
cmd.stdout
.take()
.unwrap()
.read_to_string(&mut output)
.unwrap();
output
}
async fn sse_endpoint(req: HttpRequest) -> HttpResponse {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
task::spawn(async move {
let output = run_xelatex().await;
let message = format!("data: {}\n\n", output);
let send_result = tx.send(message);
match send_result {
Ok(_) => {}
Err(e) => {
println!("send xelatex compile log error: {}", e);
}
}
});
let response = HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(SseStream {
counter: 4,
interval: interval(Duration::from_secs(5000)),
receiver: Some(rx),
});
response
}
/// SSE stream implementation
struct SseStream<T>
where
T: Display,
{
counter: usize,
interval: Interval,
receiver: Option<mpsc::UnboundedReceiver<T>>,
}
impl<T: std::fmt::Display> SseStream<T> {
fn new() -> Self {
Self {
counter: 0,
interval: interval(Duration::from_secs(1000)),
receiver: None,
}
}
}
impl<T: std::fmt::Display> futures::Stream for SseStream<T> {
type Item = Result<actix_web::web::Bytes, actix_web::Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.counter += 1;
// get reciver data and send
if let Some(receiver) = &mut self.receiver {
match receiver.try_recv() {
Ok(data) => {
// Create the SSE event
let message = format!("data: {}\n\n", data);
// Return the event as a stream item
return std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))));
}
Err(_err) => {
// handle error
return std::task::Poll::Ready(None);
}
}
} else {
return std::task::Poll::Ready(None);
}
// Return the event as a stream item
//std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))))
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().route("/sse", web::get().to(sse_endpoint)))
.bind("127.0.0.1:8080")?
.run()
.await
}
and this is the Cargo.toml
:
[package]
name = "rust-learn"
version = "0.1.0"
edition = "2018"
[dependencies]
tokio = { version = "1.17.0", features = ["full"] }
serde = { version = "1.0.64", features = ["derive"] }
serde_json = "1.0.64"
actix-web = "4"
futures = "0.3"
eventsource = "0.5"
bytes = "1"
Am I missing something? what should I do make it work? this is the command in terminal to reqeust the api:
> curl -N http://localhost:8080/sse
You get "channel closed" because your receiver is being destroyed before you can send a message.
Your poll_next()
implementation always returns Ready
which means it is not waiting on a future value. And when you return Ready(None)
you are signaling that your stream has reached the end; so your SseStream
and thus the receiver will be dropped.
You should instead return Poll::Pending
when you don't have a value but also aren't done yet. Better yet, you should use .poll_recv()
instead of .try_recv()
since it will handle waking. Something like this (untested):
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.counter += 1;
// get reciver data and send
if let Some(receiver) = &mut self.receiver {
match receiver.poll_recv(ctx) {
std::task::Poll::Ready(Some(data)) => {
// Create the SSE event
let message = format!("data: {}\n\n", data);
// Return the event as a stream item
return std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))));
}
std::task::Poll::Ready(None) => {
return std::task::Poll::Ready(None);
}
std::task::Poll::Pending => {
return std::task::Poll::Pending;
}
}
} else {
return std::task::Poll::Ready(None);
}
}