Search code examples
jsonrustrust-tokioserde-json

Rust Deserializing JSON


I am having trouble deserializing json data sent from my client.

server.rs

use std::collections::HashMap;
use std::sync::{Arc,Mutex};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use serde_json::{ Value};

/*
The type Arc<T> provides shared ownership of a value of type T, allocated in the heap. Invoking clone on Arc produces a new Arc instance, which points to the same allocation on the heap as the source Arc, while increasing a reference count. When the last Arc pointer to a given allocation is destroyed, the value stored in that allocation (often referred to as “inner value”) is also dropped.
*/
// creating a type alias for user to socket map 
// Arc points top 
type UserToSocket = Arc<Mutex<HashMap<String,TcpStream>>>;


#[tokio::main]
async fn main() {

    let listener = TcpListener::bind("127.0.0.1:9090").await;

    // creating a threadsafe hashmap mutex
    let local_db: UserToSocket = Arc::new(Mutex::new(HashMap::new()));

   let listener = match listener{
        Result::Ok(value) => {value},
        Result::Err(_)=> {panic!("ERROR OCCURED")},
    };

    println!("[+] Listener has been started");

    loop {
        // now waiting for connection
        println!("[+] Listening for connection");
        let (socket,addr) = listener.accept().await.unwrap();
        println!("[+] A connection accepted from {:?}, spawwning a new task for it",addr);

        // cloning does not actually clone, but rather just increases counter to it
        let ld = Arc::clone(&local_db);

        // spawning a new task
        tokio::spawn(
            async move {
                handler(socket,ld).await;
            }
        );
    }


}

//  a handler for new connection
async fn handler(mut socket: TcpStream, _db: UserToSocket) {

    socket.write_all(b"[+] Hello Friend, Welcome to my program\r\n").await.unwrap();

    let mut buf = vec![0; 1024];
    
    loop {
        // n holds the number of bytes read i think
        match  socket.read(&mut buf).await {
            Ok(0) => {
                println!("Client Closed connection");
                return;
            }

            // getting some data
            Ok(_n) => {

                // ownership is transferred so need to clone it
                let bufc = buf.clone();

                // unmarshalling json
                //let parsed:Value = serde_json::from_slice(&bufc).unwrap();

                // obtaining string
                match String::from_utf8(bufc) {
                    Ok(val) => {
                        println!("[+] So the parsed value is {}",val);
                        //let temp = val.as_str();
                        let parsed:Value = serde_json::from_str(&val).unwrap();
                       
                        println!("{:?}",parsed);
                        socket.write_all(b"So yeah thanks for sending this\r\n").await.unwrap();
                        continue;
                    }
                    Err(err) => {
                        println!("ERROR Could not convert to string {:?}",err);
                        continue;
                    }
                };
                //socket.write_all(b"Vnekai bujena\r\n").await.unwrap();
            }
            Err(_) => {
                println!("Unhandeled error occured");
                return;
            }
        }
    }
    
}

client.rs

use tokio::net::{TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::{thread,time};

#[tokio::main]
async fn main() {
let sleep_time = time::Duration::from_secs(2);

    let socket = TcpStream::connect("127.0.0.1:9090").await;

    let mut socket = match socket {
        Ok(v) => {
            println!("[+] Successfully connected");
            v
        }
        Err(_) => {
            println!("ERROR could not connect to the server");
            std::process::exit(-1);
        }
    };

    let mut buf = vec![0;1024];

    //let mut user_input = String::new();
    loop {
        thread::sleep(sleep_time);

        match socket.read(&mut buf).await {
            Ok(0) => {
                println!("[+] Connection with server has been closed");
                std::process::exit(1);
            }
            Ok(_n) => {
                let bc = buf.clone();
                let res = String::from_utf8(bc).unwrap();
                println!("[+] Server responded with {}",res);
            }
            Err(_) => {
                panic!("[-] Some fatal error occured");
            }
        }

        println
        !("You want to say: ");
        /*let _v =  match io::stdin().read_line(&mut user_input){
            Ok(val) => {val}
            Err(_) => panic!("ERROR"),
        };*/

        let val = "{\"name\": \"John Doe\",\"age\": 43,\"phones\": [\"+44 1234567\",\"+44 2345678\"]}\r\n";
        socket.write(val.as_bytes()).await.unwrap();

    }
    

    
}

When I send json data to server, I receive an error. thread 'tokio-runtime-worker' panicked at 'called Result::unwrap() on an Err value: Error("trailing characters", line: 2, column: 1)', src\bin\simple_server.rs:79:71

This error does not occur when I try to desterilize the json string directly. It only occurs when I send the data through network.


Solution

  • Since your JSON is newline-terminated, you should use something like read_line() to read it. (And you should never send a formatted JSON, because it will contain newlines - but serde_json is creating non-formatted JSON by default.)

    For example, this compiles and should work as intended

    use serde_json::Value;
    use std::collections::HashMap;
    use std::sync::{Arc, Mutex};
    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufStream};
    use tokio::net::{TcpListener, TcpStream};
    
    type UserToSocket = Arc<Mutex<HashMap<String, TcpStream>>>;
    
    // ... main unchanged from your implementation ...
    
    async fn handler(socket: TcpStream, _db: UserToSocket) {
        let mut socket = BufStream::new(socket);
        socket
            .write_all(b"[+] Hello Friend, Welcome to my program\r\n")
            .await
            .unwrap();
        socket.flush().await.unwrap();
    
        let mut line = vec![];
        loop {
            line.clear();
            if let Err(e) = socket.read_until(b'\n', &mut line).await {
                println!("Unhandled error occured: {}", e);
                return;
            }
            if line.is_empty() {
                println!("Client Closed connection");
                return;
            }
            println!(
                "[+] So the received value is {}",
                String::from_utf8_lossy(&line)
            );
            let parsed: Value = serde_json::from_slice(&line).unwrap();
    
            println!("{:?}", parsed);
            socket
                .write_all(b"So yeah thanks for sending this\r\n")
                .await
                .unwrap();
            socket.flush().await.unwrap();
            continue;
        }
    }