Search code examples
rustrust-tokio

How can I run asynchronous tasks on a single thread in order?


I am working on a program using rust-tokio for asynchronous execution. The main function periodically calls a function to append to a CSV file to log operation over time.

I would like to make the CSV creation function asynchronous and run it as a separate task so I can continue the main function if CSV creation is taking some time (like waiting for another application like Excel to release it).

Is there an elegant way to accomplish this?

LocalSet almost seems like it would do the job, but the tasks need to execute in order so the CSV is chronological. To me, the documentation doesn't seem to guarantee this.

Here's some pseudo code to illustrate the idea. Essentially, I'm thinking a queue of tasks that need to be completed.

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let local = task::LocalSetOrdered::new(); //This is a fictitious struct

    let mut data: usize = 10; //For simplicity, just store a single number
    loop {
        // Some operations here
        data = data + 1;

        let data_clone = data.clone();
        //Add a new task to complete after all prior tasks
        local.push(async move {
            match append_to_csv(data_clone).await {
                Ok(_) => Ok(()),
                Err(_) => Err(()),
            }
        });

        sleep(Duration::from_secs(60)).await;
    }

    Ok(())
}

async fn append_to_csv(data_in: usize) -> Result<(), Box<dyn Error>> {
    loop {
        let file = match OpenOptions::new().write(true).append(true).open(filename) {
            Ok(f) => f,
            Err(_) => {
                //Error opening the file, try again
                sleep(Duration::from_secs(60)).await;
                continue;
            }
        };
        let wtr = csv::Writer::from_writer(file);

        let date_time = Utc::now();
        wtr.write_field(format!("{}", date_time.format("%Y/%m/%d %H:%M")))?;
        wtr.write_field(format!("{}", data_in))?;
        wtr.write_record(None::<&[u8]>)?; //Finish the line
    }
}

Solution

  • You could use a worker task to write to the csv file, and a channel to pass data to be written

    use tokio::sync::mpsc::{channel, Receiver};
    
    #[derive(Debug)]
    pub struct CsvData(i32, &'static str);
    
    async fn append_to_csv(mut rx: Receiver<CsvData>) {
        let mut wtr = csv::Writer::from_writer(std::io::stdout());
        while let Some(data) = rx.recv().await {
            wtr.write_record([&data.0.to_string(), data.1]).unwrap();
            wtr.flush().unwrap();
        }
    }
    
    #[tokio::main]
    async fn main() {
        let (tx, rx) = channel(10);
        tokio::spawn(async {
            append_to_csv(rx).await;
        });
    
        for i in 0.. {
            tx.send(CsvData(i, "Hello world")).await.unwrap();
        }
    }
    

    The channel sender can be cloned if you need to write data sourced from multiple tasks.