I'm trying to create a websocket client using tokio_tungstenite
and mio
but I couldn't initialize a stream because of handshake issues. Here is the code I have:
let addr: Vec<_> = ws_url
.to_socket_addrs()
.map_err(|err| ClientError {
message: err.to_string(),
})
.unwrap()
.collect();
println!("{:?}", addr);
let connector = TlsConnector::new().unwrap();
let stream = TcpStream::connect(addr[0]).unwrap();
let mut stream = match connector.connect(ws_url.as_str(), stream) {
Ok(stream) => Ok(stream),
Err(err) => match err {
native_tls::HandshakeError::Failure(err) => Err(ClientError::new(format!(
"Handshake failed: {}",
err.to_string()
))),
native_tls::HandshakeError::WouldBlock(mh) => match mh.handshake() {
Ok(stream) => Ok(stream),
Err(err) => Err(ClientError::new(format!( // <-- the handshake process was interrupted
"Handshake failed: {}",
err.to_string()
))),
},
},
}?;
This code fails on mh.handshake()
with an error: the handshake process was interrupted
.
Does anyone know why that happens and how to fix it?
After long research decided to not use mio completely and create an event loop manually. This is doable but too time consuming for a simple task I do.
If you were to create a single threaded event loop, you can just use tungstenite
and set_nonblocking
method of the underling socket:
let url = Url::parse(ws_url.as_str()).unwrap();
match tungstenite::connect(url) {
Ok((mut sock, _)) => {
let s = sock.get_mut();
match s {
tungstenite::stream::MaybeTlsStream::Plain(s) => s.set_nonblocking(true),
tungstenite::stream::MaybeTlsStream::NativeTls(s) => {
s.get_mut().set_nonblocking(true)
}
x => panic!("Received unknown stream type: {:?}", x),
}
.map_err(|err| ClientError::new(format!("Failed to unblock the socket: {}", err)))
.unwrap();
Ok(Box::new(KrakenWsClient { sock }))
}
Err(err) => Err(ClientError {
message: format!(
"Failed to establish websocket connection: {}",
err.to_string()
),
}),
}
Then reading will look like this:
fn next_message(&mut self) -> Result<Option<Message>> {
match self.sock.read_message() {
Ok(msg) => self.parse_msg(msg),
Err(err) => match err {
Error::Io(err) => {
if err.kind() == ErrorKind::WouldBlock {
Ok(None)
} else {
Err(ClientError::new(format!(
"Error reading from websocket: {}",
err
)))
}
}
_ => Err(ClientError::new(format!(
"Error reading from websocket: {}",
err
))),
},
}
}
Remember to control timing of yours event loop to prevent it using 100% of CPU. Hope this helps!