Search code examples
rusttcptcpclientrust-tokio

Why does my socket read lock the write of the data?


I use tokio::net::TcpStream to connect a small TCP server, I write a few bytes and expect to read the response from the server.

When I do that with the nc command, it works perfectly

[denis@docker-1 ~]$ echo "get" | nc 10.0.0.11 9090
[37e64dd7-91db-4c13-9f89-f1c87467ffb3][processed]

and the server logs show

Incoming  peer instructions.
Waiting for peer instructions...
Reading bytes...
Got a few bytes [4]
Got a few bytes [[103, 101, 116, 10, 0, ...]]
Reading bytes...
Got a few bytes [0]
Got a few bytes [[0, 0, 0, 0, 0, 0,...]]
Writing some data back from peer : [37e64dd7-91db-4c13-9f89-f1c87467ffb3]

But from my Rust client, I can write the bytes but as soon as I want to read the data from the server, everything is locked (even the write action)

use std::collections::HashMap;
use std::ops::DerefMut;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use std::sync::RwLock;
use lazy_static::*;

#[tokio::main]
async fn main() {
    let data = "set".to_string();
    let mut stream = TcpStream::connect("10.0.0.11:9090").await.unwrap();
    let ( mut read, mut write) = tokio::io::split(stream);

    let u2 = data.as_bytes();
    write.write_all(u2).await.unwrap();

    let mut msg : [u8;1024] = [0;1024];
    let _response_size = read.read(&mut msg).await.unwrap();
    println!("GOT = {:?}", msg);
}

When looking at the server logs (see below), it reads the 3 bytes sent by the client, but then it is not able to read further, waiting to detect there is 0 byte left to read.

Incoming  peer instructions.
Waiting for peer instructions...
Reading bytes...
Got a few bytes [3]
Got a few bytes [[115, 101, 116, 0, 0, ...]]
Reading bytes...

Here is the server code

use std::collections::HashMap;
use std::ops::DerefMut;

use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
use std::sync::RwLock;
use lazy_static::*;

struct DataPool {
    data : [u8;1024],
    size : usize,
}

async fn whirl_socket( socket : &mut TcpStream ) -> Vec<DataPool> {
    let mut pool: Vec<DataPool> = vec![];
    let mut buf = [0; 1024];

    // In a loop, read data from the socket until finished
    loop {
        println!("Reading bytes...");
        buf = [0; 1024];
        let n = match socket.read(&mut buf).await {
            Ok(n) => n,
            Err(e) => {
                eprintln!("failed to read from socket; err = {:?}", e);
                break;
            }
        };
        println!("Got a few bytes [{}]", n);
        println!("Got a few bytes [{:?}]", &buf);
        pool.push(DataPool {
            data: buf,
            size: n,
        });
        if n == 0 {
            break;
        }
    }
    pool
}


async fn launch_server_listener() -> io::Result<()> {

    println!("Listen to 9090...");
    let listener = TcpListener::bind("10.0.0.11:9090").await?;

    loop {
        println!("Waiting for peer instructions...");
        let (mut socket, _) = listener.accept().await?;
        println!("Incoming  peer instructions.");

        tokio::spawn(async move {
            let mut pool= whirl_socket(&mut socket).await;

            let my_uuid = Uuid::new_v4();

            // Write the data back
            println!("Writing some data back from peer : [{}]",  my_uuid);
            let s = format!( "[{}][processed]\n", my_uuid.to_string());
            let u = s.as_bytes();
            if let Err(e) = socket.write_all(u).await {
                eprintln!("failed to write to socket; err = {:?}", e);
                return;
            }

        });
    }

}


async fn start_servers() -> Result<(), Box<dyn std::error::Error>> {
    let _r = tokio::join!(launch_server_listener());
    Ok(())
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    start_servers().await?;
    Ok(())
}

Solution

  • A read of 0 bytes means the read stream has closed. So in your client code you need to close the write stream. You can do this with .shutdown() from the AsyncWriteExt trait:

    write.write_all(u2).await.unwrap();
    write.shutdown().await.unwrap();