Search code examples
rustrust-tokiobevy

How to connect bevy game to externel TCP server using tokios async TcpStream?


I want to send Events between the game client and server and I already got it working, but I do not know how to do it with bevy.

I am dependent to use tokios async TcpStream, because I have to be able to split the stream into a OwnedWriteHalf and OwnedReadhalf using stream.into_split().

My first idea was to just spawn a thread that handles the connection and then send the received events to a queue using mpsc::channel

Then I include this queue into a bevy resource using app.insert_resource(Queue) and pull events from it in the game loop.

the Queue:

use tokio::sync::mpsc;

pub enum Instruction {
    Push(GameEvent),
    Pull(mpsc::Sender<Option<GameEvent>>),
}

#[derive(Clone, Debug)]
pub struct Queue {
    sender: mpsc::Sender<Instruction>,
}
impl Queue {
    pub fn init() -> Self {
        let (tx, rx) = mpsc::channel(1024);
        init(rx);
        Self{sender: tx}
    }
    pub async fn send(&self, event: GameEvent) {
        self.sender.send(Instruction::Push(event)).await.unwrap();
    }
    pub async fn pull(&self) -> Option<GameEvent> {
        println!("new pull");
        let (tx, mut rx) = mpsc::channel(1);
        self.sender.send(Instruction::Pull(tx)).await.unwrap();
        rx.recv().await.unwrap()
    }
}

fn init(mut rx: mpsc::Receiver<Instruction>) {
    tokio::spawn(async move {
        let mut queue: Vec<GameEvent> = Vec::new();

        loop {
            match rx.recv().await.unwrap() {
                Instruction::Push(ev) => {
                    queue.push(ev);
                }
                Instruction::Pull(sender) => {
                    sender.send(queue.pop()).await.unwrap();
                }
            }
        }
    });
}

But because all this has to be async I have block the pull() function in the sync game loop. I do this using the futures-lite crate:

fn event_pull(
    communication: Res<Communication>
) {
    let ev = future::block_on(communication.event_queue.pull());
    println!("got event: {:?}", ev);
}

And this works fine, BUT after around 5 seconds the whole program just halts and does not receive any more events.

It seems like that future::block_on() does block indefinitely.

Having the main function, in which bevy::prelude::App gets built and run, to be the async tokio::main function might also be a problem here.

It would probably be best to wrap the async TcpStream initialisation and tokio::sync::mpsc::Sender and thus also Queue.pull into synchronous functions, but I do not know how to do this.

Can anyone help?

How to reproduce

The repo can be found here

Just compile both server and client and then run both in the same order.


Solution

  • I got it to work by just replacing every tokio::sync::mpsc with crossbeam::channel, which might be a problem, as it does block

    and manually initializing the tokio runtime.

    so the init code looks like this:

    pub struct Communicator {
        pub event_bridge: bridge::Bridge,
        pub event_queue: event_queue::Queue,
        _runtime: Runtime,
    }
    impl Communicator {
        pub fn init(ip: &str) -> Self {
            let rt = tokio::runtime::Builder::new_multi_thread()
                .enable_io()
                .build()
                .unwrap();
        
            let (bridge, queue, game_rx) = rt.block_on(async move {
                let socket = TcpStream::connect(ip).await.unwrap();
                let (read, write) = socket.into_split();
                let reader = TcpReader::new(read);
                let writer = TcpWriter::new(write);
            
                let (bridge, tcp_rx, game_rx) = bridge::Bridge::init(); 
                reader::init(bridge.clone(), reader);
                writer::init(tcp_rx, writer);
            
                let event_queue = event_queue::Queue::init();
            
                return (bridge, event_queue, game_rx);
            });
        
            // game of game_rx events to queue for game loop
            let eq_clone = queue.clone();
            rt.spawn(async move {
                loop {
                    let event = game_rx.recv().unwrap(); 
                    eq_clone.send(event);
                }
            });
        
            Self {
                event_bridge: bridge,
                event_queue: queue,
                _runtime: rt,
            }
        }
    }
    

    And main.rs looks like this:

    fn main() {
        let communicator = communication::Communicator::init("0.0.0.0:8000");
    
        communicator.event_bridge.push_tcp(TcpEvent::Register{name: String::from("luca")});
    
        App::new()
            .insert_resource(communicator)
            .add_system(event_pull)
            .add_plugins(DefaultPlugins)
            .run();
    }
    
    fn event_pull(
        communication: Res<communication::Communicator>
    ) {
        let ev = communication.event_queue.pull();
        if let Some(ev) = ev {
            println!("ev");
        }
    }
    

    Perhaps there might be a better solution.