I am trying to write non-blocking server/client scripts.
Firstly, Here is my code:
Server.py ->
import socket
import select
import threading
class ChatServer(threading.Thread):
"""
SERVER THREAD
"""
MAX_WAITING_CONNECTION = 10
RECV_HEADER_LENGTH = 10
def __init__(self, host, port):
"""
Initialize new ChatServer
:param host: Binding Host IP
:param port: Binding Port number
"""
threading.Thread.__init__(self)
self.host = host
self.port = port
self.connections = [] ## Will keep active client connections.
self.clients = {}
self.running = True
def _bind_socket(self):
"""
Creates the server socket and binds it to the given host and port
"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(self.MAX_WAITING_CONNECTION)
self.connections.append(self.server_socket)
def _send(self, sock, client_message):
"""
Prefixes each message with a 4-byte length before sending.
:param sock: the incoming sock
:param msg: the massage to send
"""
user = self.clients[sock]
client_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
sock.send(client_message)
def _receive(self, sock):
try:
## Bytes type header
message_header = sock.recv(self.RECV_HEADER_LENGTH)
if not len(message_header):
return False
message_length = int(message_header.decode('utf-8').strip())
## Bytes type data
return {"header": message_header, "data": sock.recv(message_length)}
except Exception as e:
print('exception occur')
print(e)
return False
def _broadcast(self, sending_client_socket, client_message):
"""
Breadcasts a message to all the clients different from both the server itself and
the client sending the message.
:param client_socket: the socket of the client sending the message
:param client_message: the message to broadcast ({'header': <bytes header>, 'data': <bytes data message>})
"""
for sock in self.clients:
is_not_the_server = sock != self.server_socket
is_not_the_client_sending = sock != sending_client_socket ## sending client socket
if is_not_the_server and is_not_the_client_sending:
try:
user = self.clients[sending_client_socket]
print(f"Type client_message: {type(client_message)}")
print(f"Type user: {type(user)}")
sending_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
sock.send(sending_message)
except socket.error:
## handles a possible disconnection of client "sock" by ..
sock.close()
self.connections.remove(sock) ## removing sock form active connections.
del self.clients[sock]
def _log(self, sock, message):
user = self.clients[sock]
print(f"Received message from {user['data'].decode('utf-8')}: {message['data'].decode('utf-8')}")
def _run(self):
"""
Actually runs the server.
"""
while self.running:
## Get the list of sockets which are ready to be read through select non-blocking calls
## The select has a timeout of 60 seconds
try:
ready_to_read, ready_to_write, in_error = select.select(self.connections, [], self.connections)
except socket.error as e:
print(f"General Error: {e}")
continue
else:
for sock in ready_to_read:
## if socket is server socket.
if sock == self.server_socket:
try:
client_socket, client_address = self.server_socket.accept()
except socket.error as e:
print(f"General Error: {e}")
break
else:
user = self._receive(client_socket)
if user is False:
continue
self.connections.append(client_socket)
self.clients[client_socket] = user
print(f"Accepted new connection from {client_address[0]}:{client_address[1]}..")
else:
message = self._receive(sock) ## Get client message
if message is False:
print(f"Closed connection from {self.clients[sock]['data'].decode('utf-8')}")
self.connections.remove(sock)
del self.clients[sock]
continue
self._log(sock, message)
print(message)
self._broadcast(sock, message)
for sock in in_error:
self.connections.remove(sock)
del self.clients[sock]
self.stop()
def run(self):
"""
Given a host and a port, binds the socket and runs the server.
"""
self._bind_socket()
self._run()
def stop():
"""
Stops the server by setting the "running" flag before closing
the socket connection.
"""
self.running = False
self.server_socket.close()
if __name__ == '__main__':
_HOST = '127.0.0.1'
_PORT = 6667
chat_server = ChatServer(_HOST, _PORT)
chat_server.start()
chat_server.join()
And my client.py ->
import socket
import select
import errno
import threading
import sys
RECV_HEADER_LENGTH = 10
class ChatClient(threading.Thread):
def __init__(self, host, port):
"""
Initialize new ChatClient
:param host: Connect Host IP
:param port: Connect Port number
"""
threading.Thread.__init__(self)
self.host = host
self.port = port
self.username = input("Username: ")
self.running = True
def _send(self, sock, message):
sock.send(message.encode('utf-8'))
def _connect(self):
"""
Connecting to the ChatServer
"""
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
self.client_socket.setblocking(0)
self.username_header = f"{len(self.username):<{RECV_HEADER_LENGTH}}"
self._send(self.client_socket, self.username_header+self.username)
def prompt(self) :
sys.stdout.write(f"#{self.username}$ ")
sys.stdout.flush()
def _run(self):
"""
Actually run client.
"""
while self.running:
reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket], [self.client_socket], [])
for sock in reading_sockets:
if sock == self.client_socket:
username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
if not len(username_header):
print("Connection closed by the server.")
sys.exit()
username_length = int(username_header.decode("utf-8").strip())
username = self.client_socket.recv(username_length).decode("utf-8")
message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
message_length = int(message_header.decode('utf-8').strip())
message = self.client_socket.recv(message_length).decode('utf-8')
print(f"#{username}$ {message}")
for sock in writing_sockets:
self.prompt()
message = input()
print(len(message))
if not message:
continue
message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
self._send(sock, message_header+message)
self.stop()
def run(self):
"""
Given a host and a port, binds the socket and runs the server.
"""
self._connect()
self._run()
def stop():
"""
Stops the server by setting the "running" flag before closing
the socket connection.
"""
self.running = False
self.client_socket.close()
if __name__ == '__main__':
_HOST = '127.0.0.1'
_PORT = 6667
chat_server = ChatClient(_HOST, _PORT)
chat_server.start()
chat_server.join()
Now my problem is on client.py I think. In _run function, I use select reading_socket and writing_socket for same socket.
When I run this code, Blocking for loop for the reading_socket. Because in for loop for writing_sockets keep my shell and never release even comes another massage comes. So I want to make wait user input but at the same time read other messages and print on shell. I am using python3.7. How can I achieve this?
So I want to make wait user input but at the same time read other messages and print on shell. I am using python3.7. How can I achieve this?
By making sure only to read from sys.stdin
when sys.stdin
actually has user-input ready to give you; that way your input()
call won't block. You can do that by passing sys.stdin
as one of the sockets in your first argument to select()
. (Note: this won't work under Windows, because Microsoft in its wisdom decided that their implementation of select()
would not support selecting-on-stdin
. Under Windows you'll have to use a separate thread to do blocking reads from stdin
instead, along with inter-thread messaging of some sort to get the data read from stdin
back to the network-thread, and it's a huge pain to get working)
Here's how I modified your _run(self)
method to get the behavior you want (tested under MacOS/X):
def _run(self):
"""
Actually run client.
"""
while self.running:
reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket, sys.stdin], [], [])
for sock in reading_sockets:
if sock == self.client_socket:
username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
if not len(username_header):
print("Connection closed by the server.")
sys.exit()
username_length = int(username_header.decode("utf-8").strip())
username = self.client_socket.recv(username_length).decode("utf-8")
message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
message_length = int(message_header.decode('utf-8').strip())
message = self.client_socket.recv(message_length).decode('utf-8')
print(f"#{username}$ {message}")
elif sock == sys.stdin:
self.prompt()
message = input()
print(len(message))
if not message:
continue
message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
self._send(self.client_socket, message_header+message)
self.stop()
Note that I added sys.stdin
to the read-sockets argument of the select()
call (so that select()
will return when their is data ready-to-read from stdin
), and that I also removed self.client_socket
from the write-sockets argument (because placing it there will cause select()
to return as soon as self.client_socket
has buffer-space to accept more outgoing data, which is to say, it would return immediately almost all the time, which would spin your event loop and cause your client program to use close to 100% of a core, which is not what you want).
I also modified your read-from-stdin
code to be called only when the readable-socket is sys.stdin
, since there's no point trying to read from stdin unless it has data to give you; and finally I have your self._send()
call send on the TCP socket rather than trying to send bytes back to stdin
(because stdin is read/input-only, so sending bytes to it doesn't make any sense).