Search code examples
rustrust-tokiorust-iced

How can I send a "Message" back to a "Higher" Struct?


I currently have 2 files: Main.rs and Connection.rs.

Connection.rs currently contains the ability to Send, Listen and Connect to a TcpStream.

Connection.rs

use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use std::sync::Arc;
use iced_futures::futures;

#[derive(Debug, Clone)]
pub enum ConnMessage {
    Code(usize, String),
    Chat(usize, String),
    View(usize, String),
    None,
}


#[derive(Debug, Clone)]
pub enum ConnectionError {
    ConnectError(usize),
    SendError(usize),
    ReadError(usize),
}

#[derive(Debug, Clone)]
pub struct Connection {
    pub write_stream: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub read_stream: Arc<Mutex<ReadHalf<TcpStream>>>,
    pub loc: usize,
    pub code: String,
}

impl Connection {
    
    pub async fn connect(loc:usize) -> Result<Connection, ConnectionError> {
        
        let socket = TcpStream::connect("3.92.0.221:80").await.map_err(|_| ConnectionError::ConnectError(loc))?;
        
        let (rd, wr) = tokio::io::split(socket);
        
        let conn = Connection {
                    write_stream: Arc::new(Mutex::new(wr)),
                    read_stream: Arc::new(Mutex::new(rd)),
                    loc: loc,
                    code: String::from(""),
                };
        
        Ok( conn )
        
    }

    pub fn listen(conn: Connection) -> Result<(), ConnectionError> {
        tokio::spawn(async move {
            let mut message = String::from("");
                loop {
                        let mut buf = [0u8; 16];
                let mut rd = conn.read_stream.lock().await;
                        rd.read(&mut buf).await.unwrap();
                
                // ASSUMPTION - Disconnected when Array is all 0s, i.e. a set of bytes that contained nothing is sent
                let mut disconnected = true;
                for i in buf.iter() {
                    if i != &0u8 {
                        disconnected = false;
                    }
                }
                if disconnected {
                    println!("Disconnected");
                }
                else {
                    let string_result = std::str::from_utf8(&buf).map_err(|_| ConnectionError::ReadError(conn.loc));
                    if string_result.is_ok() {
                        let string_contents = string_result.unwrap();
                        println!("conn.loc: {} -- Contents: {}", conn.loc, string_contents);

                        message += string_contents;
                        
                        // End of Message - Parse and Reset
                        if message.contains("\\.") {
                            println!("EOM");
                            message = message.replace("\\.", "");
                            // Send `message` to Message inside Main.rs
                            message = String::from("");
                            println!("Resetting Msg");
                        }
                        else {
                            println!("Not end of message");
                        }
                    }
                    else {
                        println!("String Result Error");
                    }
                }
                }
            });
        
        Ok(())
    }

    pub async fn send(connection: Connection, string: String) -> Result<(), ConnectionError> {
        let mut stream = connection.write_stream.lock().await;
        stream.write_all(string.as_bytes()).await.map_err(|_| ConnectionError::SendError(connection.loc))?;
        //println!("Code: {}", connection.code);
        Ok( () )
    }
}

Main.rs currently contains utilization of Iced to include a GUI that I created to establish connections on button presses.

use iced::{
    pane_grid, PaneGrid, Application, Settings, executor, Command, Container, Length, Element, 
    scrollable, button, Align, HorizontalAlignment, Column, Scrollable, Text, Button, Row,
    text_input, TextInput, 
};

use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
//use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
//use futures::prelude::*;
use std::sync::Arc;

mod Connection;

pub fn main() -> iced::Result {
    ClientAdminGUI::run(Settings::default())
}

struct ClientAdminGUI {
    sessions_pane: pane_grid::State<ClientAdminSessionPaneContent>,
    chat_pane: pane_grid::State<ClientAdminChatPaneContent>, 
    image_viewer_pane: pane_grid::State<ClientAdminViewerPaneContent>,
    connections: Vec<Connection::Connection>,
    cur_connection: Option<Connection::Connection>,
}

#[derive(Debug, Clone)]
enum Message {
    Session(Result<Connection::Connection, Connection::ConnectionError>), //Async Handler
    SendMessage,
    Send(Result<(), Connection::ConnectionError>), //Async Handler
    InputChanged(String),
    Button(usize, ButtonMessage),
    //None,
    UpdateCode(String),
    ReadConnMessage(Result<Connection::ConnMessage, ()>),
}

impl Application for ClientAdminGUI {
    type Message = Message;
    type Executor = executor::Default;
    type Flags = ();

    fn new(_flags: ()) -> (Self, Command<Message>) {
        let sessions_pane_content_value = ClientAdminSessionPaneContent::new();
        let (sessions_pane, _) = pane_grid::State::new(sessions_pane_content_value);
        let chat_pane_content_value = ClientAdminChatPaneContent::new();
        let (chat_pane, _) = pane_grid::State::new(chat_pane_content_value);
        let (image_viewer_pane, _) = pane_grid::State::new(ClientAdminViewerPaneContent::new());

        (
            ClientAdminGUI {
                sessions_pane,
                chat_pane,
                image_viewer_pane,
                connections: Vec::new(),
                cur_connection: None
            },
            Command::none(),
        )
    }

    fn title(&self) -> String {
        String::from("Client Admin GUI")
    }       

    fn update(&mut self, message: Message) -> Command<Message> {
        match message {
            Message::Session(Ok(result)) => {
                // result is a connection

                self.connections.push(result);

                // ...
                Connection::Connection::listen(Some(self.connections[self.connections.len()-1].clone()).unwrap());
            }
            ... //For all the rest of `Message`s 
        }
    }
}

Inside my listen function, I get the reply from a connection inside of there. But, I am not entirely sure how to feed that back to the application to be able to do some action with it.

Question:

How can I send the data I obtain from the listen function within Connection.rs back to my main.rs as a Message - say to my Message::UpdateCode(String) functionality?


Solution

  • I simply needed to pass a closure as an argument to the function. From there, I had to ensure the lifetime was correct and it had the correct paramters.

    You can accomplish this by passing a generic type then using where to set the exact type of the generic.

    Connection.rs

    use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
    use tokio::net::TcpStream;
    use tokio::sync::Mutex;
    use std::sync::Arc;
    use iced_futures::futures;
    
    #[derive(Debug, Clone)]
    pub enum ConnMessage {
        Code(usize, String),
        Chat(usize, String),
        View(usize, String),
        None,
    }
    
    
    #[derive(Debug, Clone)]
    pub enum ConnectionError {
        ConnectError(usize),
        SendError(usize),
        ReadError(usize),
    }
    
    #[derive(Debug, Clone)]
    pub struct Connection {
        pub write_stream: Arc<Mutex<WriteHalf<TcpStream>>>,
        pub read_stream: Arc<Mutex<ReadHalf<TcpStream>>>,
        pub loc: usize,
        pub code: String,
    }
    
    impl Connection {
        
        pub async fn connect(loc:usize) -> Result<Connection, ConnectionError> {
            
            let socket = TcpStream::connect("3.92.0.221:80").await.map_err(|_| ConnectionError::ConnectError(loc))?;
            
            let (rd, wr) = tokio::io::split(socket);
            
            let conn = Connection {
                        write_stream: Arc::new(Mutex::new(wr)),
                        read_stream: Arc::new(Mutex::new(rd)),
                        loc: loc,
                        code: String::from(""),
                    };
            
            Ok( conn )
            
        }
    
        pub fn listen<F>(conn: Connection, read_message: F) where F: Fn(String, usize) + 'static + std::marker::Send {
            tokio::spawn(async move {
                let mut message = String::from("");
                    loop {
                            let mut buf = [0u8; 16];
                    let mut rd = conn.read_stream.lock().await;
                            rd.read(&mut buf).await.unwrap();
                    
                    // ASSUMPTION - Disconnected when Array is all 0s, i.e. a set of bytes that contained nothing is sent
                    let mut disconnected = true;
                    for i in buf.iter() {
                        if i != &0u8 {
                            disconnected = false;
                        }
                    }
                    if disconnected {
                        println!("Disconnected");
                    }
                    else {
                        let string_result = std::str::from_utf8(&buf).map_err(|_| ConnectionError::ReadError(conn.loc));
                        if string_result.is_ok() {
                            let string_contents = string_result.unwrap();
                            println!("conn.loc: {} -- Contents: {}", conn.loc, string_contents);
    
                            message += string_contents;
                            
                            // End of Message - Parse and Reset
                            if message.contains("\\.") {
                                println!("EOM");
                                message = message.replace("\\.", "");
                                read_message(message, conn.loc);
                                message = String::from("");
                                println!("Resetting Msg");
                            }
                            else {
                                println!("Not end of message");
                            }
                        }
                        else {
                            println!("String Result Error");
                        }
                    }
                    }
                });
        }
    
        pub async fn send(connection: Connection, string: String) -> Result<(), ConnectionError> {
            let mut stream = connection.write_stream.lock().await;
            stream.write_all(string.as_bytes()).await.map_err(|_| ConnectionError::SendError(connection.loc))?;
            Ok( () )
        }
    }
    

    Main.rs (Just relevant portion)

    Connection::Connection::listen(Some(self.connections[self.connections.len()-1].clone()).unwrap(), 
                        (|string:String, loc:usize| {
                            println!("String is: {} -- loc: {}", string, loc);
                         }));