I have simple client-server setup that communicate JPG bytes. When running locally it works perfectly. However when transmitting over internet JPGs get corrupted and when decoded have severe visual artifacts.
Client is a Rust Tokio application. It consumes jpeg stream from a camera and pushed the jpeg bytes to TCP socket.
// Async application to run on the edge (Raspberry Pi)
// Reads MJPEG HTTP stream from provided URL and sends it to the CWS server over TCP
#![warn(rust_2018_idioms)]
use std::convert::TryFrom;
use futures::StreamExt;
use tokio::signal;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::error::Error;
use std::path::PathBuf;
use structopt::StructOpt;
//DEBUG
use std::fs;
#[derive(StructOpt, Debug)]
#[structopt(name = "basic")]
struct Opt {
#[structopt(long = "stream", required(true))]
stream: String,
#[structopt(long = "server", required(true))]
server: String,
}
async fn acquire_tcp_connection(server: &str) -> Result<TcpStream, Box<dyn Error>> {
// "127.0.0.1:6142"
loop {
match TcpStream::connect(server).await {
Ok(stream) => {
println!("Connected to server");
return Ok(stream);
}
Err(e) => {
println!("Failed to connect to server: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args();
let url = http::Uri::try_from(opt.stream).unwrap();
loop {
let mut tcp_conn = acquire_tcp_connection(&opt.server).await?;
// hyper client
let client = hyper::Client::new();
// Do the request
let res = client.get(url.clone()).await.unwrap();
// Check the status
if !res.status().is_success() {
eprintln!("HTTP request failed with status {}", res.status());
std::process::exit(1);
}
// https://docs.rs/mime/latest/mime/#what-is-mime
// Basically HTTP response content
let content_type: mime::Mime = res
.headers()
.get(http::header::CONTENT_TYPE)
.unwrap()
.to_str()
.unwrap()
.parse()
.unwrap();
assert_eq!(content_type.type_(), "multipart");
let boundary = content_type.get_param(mime::BOUNDARY).unwrap();
let stream = res.into_body();
// https://github.com/scottlamb/multipart-stream-rs
let mut stream = multipart_stream::parse(stream, boundary.as_str());
'outer: while let Some(p) = stream.next().await {
let p = p.unwrap();
// Split the jpeg bytes into chunks of 2048 bytes
for slice in p.body.chunks(2048) {
// DEBUG capture bytes to a file just for debugging
fs::write("tcp_debug.txt", &slice).expect("Unable to write file");
let tcp_result = tcp_conn.write(&slice).await;
match tcp_result {
Ok(_) => {
println!("Sent {} bytes", slice.len());
}
Err(e) => {
println!("Failed to send data: {}", e);
break 'outer;
}
}
}
}
}
Ok(())
}
Server is a Python app that accepts TCP connection and decodes the received bytes:
import asyncio
from threading import Thread
import cv2
import numpy as np
class SingleFrameReader:
"""
Simple API for reading a single frame from a video source
"""
def __init__(self, video_source, ..., tcp=False):
self._video_source = video_source
...
elif tcp:
self._stop_tcp_server = False
self._tcp_image = None
self._tcp_addr = video_source
self._tcp_thread = Thread(target=asyncio.run, args=(self._start_tcp_server(),)).start()
self.read = lambda: self._tcp_image
async def _start_tcp_server(self):
uri, port = self._tcp_addr.split(':')
port = int(port)
print(f"Starting TCP server on {uri}:{port}")
server = await asyncio.start_server(
self.handle_client, uri, port
)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
async def handle_client(self, reader, writer):
"""
Handle a client connection. Receive JPEGs from the client and have them ready to be ready
"""
client_addr = writer.get_extra_info('peername')
print(f"New connection from {client_addr}")
jpg_bytes = b''
while True:
if self._stop_tcp_server:
break
data = await reader.read(2**16)
#DEBUG
print(f"Received {len(data)} bytes from {client_addr}")
byes_file = open('tcp_bytes_server.txt', 'wb')
if not data:
print("Client disconnected")
break
#DEBUG the corruption issue
byes_file.write(data)
jpg_bytes += data
start_idx = jpg_bytes.find(b'\xff\xd8')
end_idx = jpg_bytes.find(b'\xff\xd9')
if start_idx != -1 and end_idx != -1:
nparr = np.frombuffer(jpg_bytes[start_idx:end_idx+2], np.uint8)
img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img_np is None:
continue
self._tcp_image = img_np
jpg_bytes = jpg_bytes[end_idx+2:]
...
I have tried sending different chunk sizes, when sending the entire image at a time, image artifact are more severe.
I captured the bytes to a file that were sent from the client (rust) and the bytes received on server (python). The bytes from the server were smaller ~1.6KB vs ~2KB from the rust. And the first received bytes do not match each other.
Again, when running locally it this works smoothly, you can see the camera stream in real time. When client and server are separated by the internet the bytes seem to get corrupted.
To write the bytes to the TCP socket you are calling AsyncWriteExt::write
:
let tcp_result = tcp_conn.write(&slice).await;
And quoting the docs:
This function will attempt to write the entire contents of
buf
, but the entire write may not succeed...If the return value is
Ok(n)
then it must be guaranteed that n <= buf.len().
You are looking if there is an error and logging the number of bytes, but doing nothing if they are less than the length of the buffer.
But when will the buffer not be written entirely? Well, usually that might happen when your program is much faster than the network and/or your slices are too big. It looks like when you are connecting to localhost that is not an issue but when you connect to a remote system it is. YMMV.
If you were programming C, you would have to do a loop, advance a pointer and so on. In Rust you can do that too, but it is easier to call AsyncWriteExt::write_all
that does the loop for you:
let tcp_result = tcp_conn.write_all(&slice).await;
Note that now tcp_result
is a io::Result<()>
instead of a io::Result<usize>
because now not writing all the bytes in the slice is an error. Exactly what you want.
Also note that the peer, when reading from the TCP connection, also has this potential issue, even if using Python. This line could do a partial read, no matter if the server wrote the whole buffer or not:
data = await reader.read(2**16)
But this line is already in a loop concatenating the data and waiting for the whole image, so it causes no issues.