Search code examples
rusttcpbackgroundforkrust-tokio

Having problem in Rust with Tokio and daemonize. How to get them to work together?


I have written a simple program that uses Tokio and fern to accept input from TCP and log it to the stdio and to a file. Now, I wanted to have the program run on my remote server in the background so it can log at any time whether I am connected by terminal or not. So googled a bit and found the daemonize crate appears to be quite popular for this.

Here is the code I have:

use std::fs::File;
use std::str;
use std::time::SystemTime;

use fern::colors::{Color, ColoredLevelConfig};

use log::{debug, info, trace, warn};

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

use daemonize::Daemonize;

fn setup_logging(verbosity: u64) -> Result<(), fern::InitError> {
    let mut base_config = fern::Dispatch::new();

    base_config = match verbosity {
        0 => {
            // Let's say we depend on something which whose "info" level messages are too
            // verbose to include in end-user output. If we don't need them,
            // let's not include them.
            base_config
                .level(log::LevelFilter::Info)
                .level_for("overly-verbose-target", log::LevelFilter::Warn)
        }
        1 => base_config
            .level(log::LevelFilter::Debug)
            .level_for("overly-verbose-target", log::LevelFilter::Info),
        2 => base_config.level(log::LevelFilter::Debug),
        _3_or_more => base_config.level(log::LevelFilter::Trace),
    };

    // Separate file config so we can include year, month and day in file logs
    let file_config = fern::Dispatch::new()
        .format(|out, message, record| {
            let colors = ColoredLevelConfig::new().debug(Color::Magenta);
            out.finish(format_args!(
                "[{} {} {}] {}",
                humantime::format_rfc3339_seconds(SystemTime::now()),
                colors.color(record.level()),
                //record.level(),
                record.target(),
                message
            ))
        })
        .chain(fern::log_file("program.log")?);

    let stdout_config = fern::Dispatch::new()
        .format(|out, message, record| {
            let colors = ColoredLevelConfig::new().debug(Color::Magenta);
            // special format for debug messages coming from our own crate.
            if record.level() > log::LevelFilter::Info && record.target() == "cmd_program" {
                out.finish(format_args!(
                    "DEBUG @ {}: {}",
                    humantime::format_rfc3339_seconds(SystemTime::now()),
                    message
                ))
            } else {
                out.finish(format_args!(
                    "[{} {} {}] {}",
                    humantime::format_rfc3339_seconds(SystemTime::now()),
                    colors.color(record.level()),
                    //record.level(),
                    record.target(),
                    message
                ))
            }
        })
        .chain(std::io::stdout());

    base_config
        .chain(file_config)
        .chain(stdout_config)
        .apply()?;

    Ok(())
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let stdout = File::create("/tmp/daemon.out").unwrap();
    let stderr = File::create("/tmp/daemon.err").unwrap();

    let daemonize = Daemonize::new()
        .pid_file("/tmp/test.pid") // Every method except `new` and `start`
        .chown_pid_file(true) // is optional, see `Daemonize` documentation
        .working_directory("/tmp") // for default behaviour.
        .user("nobody")
        .group("daemon") // Group name
        .group(2) // or group id.
        .umask(0o777) // Set umask, `0o027` by default.
        .stdout(stdout) // Redirect stdout to `/tmp/daemon.out`.
        .stderr(stderr) // Redirect stderr to `/tmp/daemon.err`.
        .privileged_action(|| "Executed before drop privileges");

    match daemonize.start() {
        Ok(_) => println!("Success, daemonized"),
        Err(e) => eprintln!("Error, {}", e),
    }

    setup_logging(3).expect("failed to initialize logging.");
    info!("Server starting...");
    let listener = TcpListener::bind("###IP:port###").await?;
    info!("TcpListener bound to ###IP:port###");
    info!("{:?}", listener.local_addr());
    loop {
        println!("Listening for connection.");
        let (mut socket, _) = listener.accept().await?;
        info!("Connection made from client: {:?}", socket.peer_addr());
        println!("Connection made from client: {:?}", socket.peer_addr());

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // Return value of `Ok(0)` signifies that the remote has
                    // closed
                    Ok(0) => {
                        info!("Client ({:?}) closed the connection", socket.peer_addr());
                        println!("Client ({:?}) closed the connection", socket.peer_addr());
                        return;
                    }
                    Ok(n) => {
                        let s = match str::from_utf8(buf.as_slice()) {
                            Ok(v) => v,
                            Err(e) => {
                                trace!("Invalid UTF-8 sequence: {}", e);
                                return;
                            }
                        };
                        print!("{}", s);
                        info!("{}", s);
                        // Copy the data back to socket
                        // if socket.write_all(&buf[..n]).await.is_err() {
                        //     // Unexpected socket error. There isn't much we can
                        //     // do here so just stop processing.
                        //     return;
                        // }
                    }
                    Err(_) => {
                        debug!("Something went wrong. Socket Error.");
                        // Unexpected socket error. There isn't much we can do
                        // here so just stop processing.
                        return;
                    }
                }
            }
        });
    }
    debug!("Program outside of loop and exiting. Did something go wrong?");
}

Okay, so the code works fine up until let (mut socket, _) = listener.accept().await?;. Once the code reaches there, it goes into a black hole. No more print statements come out. Now, this doesn't mean the program fails though. When I go and telnet into the program, it connects fine. I send it messages fine. If I kill the rust program, telnet states the client disconnected, but only when I kill it. So I know Tokio is accepting connections and interacting over TCP. It just no longer works with my code. It's like under the hood it handles the TCP connection, but it forgets to notify the Future it returned.

Any clues what may be going wrong here? I have read in other areas that Tokio doesn't mix with forks, but I also hear that as long as you fork the program before Tokio does anything, it should work fine because it's contained in the fork before it does any of its stuff.

//////////////////////////////////////////////////////////////
// Changes made after answer that got the code working
//////////////////////////////////////////////////////////////

fn main() -> Result<(), Box<dyn std::error::Error>> {
let stdout = File::create("/tmp/daemon.out").unwrap();
let stderr = File::create("/tmp/daemon.err").unwrap();

let daemonize = Daemonize::new()
    .pid_file("/tmp/test.pid") // Every method except `new` and `start`
    .chown_pid_file(true) // is optional, see `Daemonize` documentation
    .working_directory("/tmp") // for default behaviour.
    .user("nobody")
    .group("daemon") // Group name
    .group(2) // or group id.
    .umask(0o777) // Set umask, `0o027` by default.
    .stdout(stdout) // Redirect stdout to `/tmp/daemon.out`.
    .stderr(stderr) // Redirect stderr to `/tmp/daemon.err`.
    .privileged_action(|| "Executed before drop privileges");

match daemonize.start() {
    Ok(_) => println!("Success, daemonized"),
    Err(e) => eprintln!("Error, {}", e),
}

tokio::runtime::Builder::new_multi_thread()
    .enable_all()
    .build()
    .unwrap()
    .block_on(async {
        setup_logging(3).expect("failed to initialize logging.");
        main_functionality().await
    })
//debug!("Program outside of loop and exitting. Did something go wrong?");
}

async fn main_functionality() -> Result<(), Box<dyn 
std::error::Error>> {
info!("Server starting...");
let listener = TcpListener::bind("###IP:port###").await?;
info!("TcpListener bound to ###IP:port###");
info!("{:?}", listener.local_addr());
loop {
    println!("Listening for connection.");
    let (mut socket, _) = listener.accept().await?;
    info!("Connection made from client: {:?}", socket.peer_addr());
    println!("Connection made from client: {:?}", socket.peer_addr());

    tokio::spawn(async move {
        let mut buf = vec![0; 1024];

        loop {
            match socket.read(&mut buf).await {
                // Return value of `Ok(0)` signifies that the remote has
                // closed
                Ok(0) => {
                    info!("Client ({:?}) closed the connection", socket.peer_addr());
                    println!("Client ({:?}) closed the connection", socket.peer_addr());
                    return;
                }
                Ok(n) => {
                    let s = match str::from_utf8(buf.as_slice()) {
                        Ok(v) => v,
                        Err(e) => {
                            trace!("Invalid UTF-8 sequence: {}:[{}]", e, n);
                            return;
                        }
                    };
                    print!("{}", s);
                    info!("{}", s);
                    // Copy the data back to socket
                    // if socket.write_all(&buf[..n]).await.is_err() {
                    //     // Unexpected socket error. There isn't much we can
                    //     // do here so just stop processing.
                    //     return;
                    // }
                }
                Err(_) => {
                    debug!("Something went wrong. Socket Error.");
                    // Unexpected socket error. There isn't much we can do
                    // here so just stop processing.
                    return;
                }
            }
        }
    });
}
}

Of course, now that this works. I have to wonder if it's useful to do it this way. In the time between asking the question and getting help to fix it, I was reminded of utilities like systemd that do this all for you.


Solution

  • #[tokio::main] means that Tokio starts before you enter your main function, and so before daemonize has a chance to fork the program → remove #[tokio::main] from your main function, and have a tokio_main function that is called after daemonize to run the Tokio stuff:

    fn main() -> io::Result<()> {
        let stdout = File::create("/tmp/daemon.out").unwrap();
        let stderr = File::create("/tmp/daemon.err").unwrap();
    
        let daemonize = Daemonize::new()
            .pid_file("/tmp/test.pid") // Every method except `new` and `start`
            .chown_pid_file(true) // is optional, see `Daemonize` documentation
            .working_directory("/tmp") // for default behaviour.
            .user("nobody")
            .group("daemon") // Group name
            .group(2) // or group id.
            .umask(0o777) // Set umask, `0o027` by default.
            .stdout(stdout) // Redirect stdout to `/tmp/daemon.out`.
            .stderr(stderr) // Redirect stderr to `/tmp/daemon.err`.
            .privileged_action(|| "Executed before drop privileges");
    
        match daemonize.start() {
            Ok(_) => println!("Success, daemonized"),
            Err(e) => eprintln!("Error, {}", e),
        }
    
        tokio_main()
    }
    
    #[tokio::main]
    async fn tokio_main() -> io::Result<()> {
        setup_logging(3).expect("failed to initialize logging.");
        info!("Server starting...");
        let listener = TcpListener::bind("45.33.110.226:2999").await?;
        info!("TcpListener bound to 45.33.110.226:2999");
        info!("{:?}", listener.local_addr());
        loop {
            println!("Listening for connection.");
            let (mut socket, _) = listener.accept().await?;
            info!("Connection made from client: {:?}", socket.peer_addr());
            println!("Connection made from client: {:?}", socket.peer_addr());
    
            tokio::spawn(async move {
                let mut buf = vec![0; 1024];
    
                loop {
                    match socket.read(&mut buf).await {
                        // Return value of `Ok(0)` signifies that the remote has
                        // closed
                        Ok(0) => {
                            info!("Client ({:?}) closed the connection", socket.peer_addr());
                            println!("Client ({:?}) closed the connection", socket.peer_addr());
                            return;
                        }
                        Ok(n) => {
                            let s = match str::from_utf8(buf.as_slice()) {
                                Ok(v) => v,
                                Err(e) => {
                                    trace!("Invalid UTF-8 sequence: {}", e);
                                    return;
                                }
                            };
                            print!("{}", s);
                            info!("{}", s);
                            // Copy the data back to socket
                            // if socket.write_all(&buf[..n]).await.is_err() {
                            //     // Unexpected socket error. There isn't much we can
                            //     // do here so just stop processing.
                            //     return;
                            // }
                        }
                        Err(_) => {
                            debug!("Something went wrong. Socket Error.");
                            // Unexpected socket error. There isn't much we can do
                            // here so just stop processing.
                            return;
                        }
                    }
                }
            });
        }
        debug!("Program outside of loop and exiting. Did something go wrong?");
    }