Search code examples
pythonrusttcppython-asynciorust-tokio

TCP sockets. Missing bytes when transmitting data over internet


I have simple client-server setup that communicate JPG bytes. When running locally it works perfectly. However when transmitting over internet JPGs get corrupted and when decoded have severe visual artifacts.

Client is a Rust Tokio application. It consumes jpeg stream from a camera and pushed the jpeg bytes to TCP socket.

// Async application to run on the edge (Raspberry Pi) 
// Reads MJPEG HTTP stream from provided URL and sends it to the CWS server over TCP

#![warn(rust_2018_idioms)] 

use std::convert::TryFrom;

use futures::StreamExt;

use tokio::signal;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

use std::error::Error;

use std::path::PathBuf;
use structopt::StructOpt;

//DEBUG
use std::fs;

#[derive(StructOpt, Debug)]
#[structopt(name = "basic")]

struct Opt {
    #[structopt(long = "stream", required(true))]
    stream: String,

    #[structopt(long = "server", required(true))]
    server: String,
}

async fn acquire_tcp_connection(server: &str) -> Result<TcpStream, Box<dyn Error>> {
    // "127.0.0.1:6142"
    loop {
        match TcpStream::connect(server).await {
            Ok(stream) => {
                println!("Connected to server");
                return Ok(stream);
            }
            Err(e) => {
                println!("Failed to connect to server: {}", e);
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            }
        }
    }

}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let opt = Opt::from_args();
    
    let url = http::Uri::try_from(opt.stream).unwrap();

    loop {
        let mut tcp_conn = acquire_tcp_connection(&opt.server).await?;

        // hyper client
        let client = hyper::Client::new();
        // Do the request
        let res = client.get(url.clone()).await.unwrap();
        // Check the status
        if !res.status().is_success() {
            eprintln!("HTTP request failed with status {}", res.status());
            std::process::exit(1);
        }
        // https://docs.rs/mime/latest/mime/#what-is-mime
        // Basically HTTP response content 
        let content_type: mime::Mime = res
            .headers()
            .get(http::header::CONTENT_TYPE)
            .unwrap()
            .to_str()
            .unwrap()
            .parse()
            .unwrap();

        assert_eq!(content_type.type_(), "multipart");

        let boundary = content_type.get_param(mime::BOUNDARY).unwrap();
        let stream = res.into_body();
        // https://github.com/scottlamb/multipart-stream-rs
        let mut stream = multipart_stream::parse(stream, boundary.as_str());

    
        'outer: while let Some(p) = stream.next().await {
            let p = p.unwrap();
            // Split the jpeg bytes into chunks of 2048 bytes
            for slice in p.body.chunks(2048) {
                // DEBUG capture bytes to a file just for debugging
                fs::write("tcp_debug.txt", &slice).expect("Unable to write file"); 

                let tcp_result = tcp_conn.write(&slice).await;

                match tcp_result {
                    Ok(_) => {
                        println!("Sent {} bytes", slice.len());
                    }
                    Err(e) => {
                        println!("Failed to send data: {}", e);
                        break 'outer;
                    }
                }
            }
        }
    }

    Ok(())
}

Server is a Python app that accepts TCP connection and decodes the received bytes:

import asyncio
from threading import Thread

import cv2
import numpy as np


class SingleFrameReader:
    """
    Simple API for reading a single frame from a video source
    """
    def __init__(self, video_source, ..., tcp=False):
        self._video_source = video_source
        ...
        elif tcp:
            self._stop_tcp_server = False
            self._tcp_image = None
            self._tcp_addr = video_source
            self._tcp_thread = Thread(target=asyncio.run, args=(self._start_tcp_server(),)).start()
            self.read = lambda: self._tcp_image

    async def _start_tcp_server(self):
        uri, port = self._tcp_addr.split(':')
        port = int(port)
        print(f"Starting TCP server on {uri}:{port}")
        server = await asyncio.start_server(
            self.handle_client, uri, port
        )

        addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
        print(f'Serving on {addrs}')

        async with server:
            await server.serve_forever()

    async def handle_client(self, reader, writer):
        """
        Handle a client connection. Receive JPEGs from the client and have them ready to be ready
        """
        client_addr = writer.get_extra_info('peername')
        print(f"New connection from {client_addr}")
        jpg_bytes = b''
        while True:
            if self._stop_tcp_server:
                break
            data = await reader.read(2**16)

            #DEBUG
            print(f"Received {len(data)} bytes from {client_addr}")
            byes_file = open('tcp_bytes_server.txt', 'wb')

            if not data:
                print("Client disconnected")
                break

            #DEBUG the corruption issue
            byes_file.write(data)

            jpg_bytes += data
            
            start_idx = jpg_bytes.find(b'\xff\xd8')
            end_idx = jpg_bytes.find(b'\xff\xd9')

            if start_idx != -1 and end_idx != -1:
                nparr  = np.frombuffer(jpg_bytes[start_idx:end_idx+2], np.uint8)
                img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                if img_np is None:
                    continue
                self._tcp_image = img_np
                jpg_bytes = jpg_bytes[end_idx+2:]
                
    ...

I have tried sending different chunk sizes, when sending the entire image at a time, image artifact are more severe.

I captured the bytes to a file that were sent from the client (rust) and the bytes received on server (python). The bytes from the server were smaller ~1.6KB vs ~2KB from the rust. And the first received bytes do not match each other.

Again, when running locally it this works smoothly, you can see the camera stream in real time. When client and server are separated by the internet the bytes seem to get corrupted.


Solution

  • To write the bytes to the TCP socket you are calling AsyncWriteExt::write:

    let tcp_result = tcp_conn.write(&slice).await;
    

    And quoting the docs:

    This function will attempt to write the entire contents of buf, but the entire write may not succeed...

    If the return value is Ok(n) then it must be guaranteed that n <= buf.len().

    You are looking if there is an error and logging the number of bytes, but doing nothing if they are less than the length of the buffer.

    But when will the buffer not be written entirely? Well, usually that might happen when your program is much faster than the network and/or your slices are too big. It looks like when you are connecting to localhost that is not an issue but when you connect to a remote system it is. YMMV.

    If you were programming C, you would have to do a loop, advance a pointer and so on. In Rust you can do that too, but it is easier to call AsyncWriteExt::write_all that does the loop for you:

    let tcp_result = tcp_conn.write_all(&slice).await;
    

    Note that now tcp_result is a io::Result<()> instead of a io::Result<usize> because now not writing all the bytes in the slice is an error. Exactly what you want.

    Also note that the peer, when reading from the TCP connection, also has this potential issue, even if using Python. This line could do a partial read, no matter if the server wrote the whole buffer or not:

        data = await reader.read(2**16)
    

    But this line is already in a loop concatenating the data and waiting for the whole image, so it causes no issues.