Search code examples
pythonpython-3.xsocketsblockingnonblocking

How to use Non-Blocking sockets


I am trying to write non-blocking server/client scripts.

Firstly, Here is my code:

Server.py ->

import socket
import select
import threading

class ChatServer(threading.Thread):
    """
    SERVER THREAD
    """

    MAX_WAITING_CONNECTION = 10
    RECV_HEADER_LENGTH = 10

    def __init__(self, host, port):
        """
        Initialize new ChatServer

        :param host: Binding Host IP
        :param port: Binding Port number
        """


        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.connections = [] ## Will keep active client connections.
        self.clients = {}
        self.running = True

    def _bind_socket(self):
        """
        Creates the server socket and binds it to the given host and port
        """
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(self.MAX_WAITING_CONNECTION)
        self.connections.append(self.server_socket)

    def _send(self, sock, client_message):
        """
        Prefixes each message with a 4-byte length before sending.

        :param sock: the incoming sock
        :param msg: the massage to send
        """
        user = self.clients[sock]
        client_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
        sock.send(client_message)

    def _receive(self, sock):
        try:
            ## Bytes type header
            message_header = sock.recv(self.RECV_HEADER_LENGTH)

            if not len(message_header):
                return False

            message_length = int(message_header.decode('utf-8').strip())
            ## Bytes type data
            return {"header": message_header, "data": sock.recv(message_length)}
        except Exception as e:
            print('exception occur')
            print(e)
            return False

    def _broadcast(self, sending_client_socket, client_message):
        """
        Breadcasts a message to all the clients different from both the server itself and
        the client sending the message.
        :param client_socket: the socket of the client sending the message
        :param client_message: the message to broadcast ({'header': <bytes header>, 'data': <bytes data message>})
        """

        for sock in self.clients:
            is_not_the_server = sock != self.server_socket
            is_not_the_client_sending = sock != sending_client_socket ## sending client socket

            if is_not_the_server and is_not_the_client_sending:
                try:
                    user = self.clients[sending_client_socket]
                    print(f"Type client_message: {type(client_message)}")
                    print(f"Type user: {type(user)}")
                    sending_message = user['header'] + user['data'] + client_message['header'] + client_message['data']
                    sock.send(sending_message)
                except socket.error:
                    ## handles a possible disconnection of client "sock" by ..
                    sock.close()
                    self.connections.remove(sock) ## removing sock form active connections.
                    del self.clients[sock]

    def _log(self, sock, message):
        user = self.clients[sock]
        print(f"Received message from {user['data'].decode('utf-8')}: {message['data'].decode('utf-8')}")

    def _run(self):
        """
        Actually runs the server.
        """
        while self.running:
            ## Get the list of sockets which are ready to be read through select non-blocking calls
            ## The select has a timeout of 60 seconds

            try:
                ready_to_read, ready_to_write, in_error = select.select(self.connections, [], self.connections)
            except socket.error as e:
                print(f"General Error: {e}")
                continue
            else:
                for sock in ready_to_read:
                    ## if socket is server socket.
                    if sock == self.server_socket:
                        try:
                            client_socket, client_address = self.server_socket.accept()
                        except socket.error as e:
                            print(f"General Error: {e}")
                            break
                        else:
                            user = self._receive(client_socket)
                            if user is False:
                                continue
                            self.connections.append(client_socket)
                            self.clients[client_socket] = user
                            print(f"Accepted new connection from {client_address[0]}:{client_address[1]}..")
                    else:
                        message = self._receive(sock) ## Get client message
                        if message is False:
                            print(f"Closed connection from {self.clients[sock]['data'].decode('utf-8')}")
                            self.connections.remove(sock)
                            del self.clients[sock]
                            continue

                        self._log(sock, message)
                        print(message)
                        self._broadcast(sock, message)

                for sock in in_error:
                    self.connections.remove(sock)
                    del self.clients[sock]
        self.stop()

    def run(self):
        """
        Given a host and a port, binds the socket and runs the server.
        """
        self._bind_socket()
        self._run()

    def stop():
        """
        Stops the server by setting the "running" flag before closing
        the socket connection.
        """
        self.running = False
        self.server_socket.close()


if __name__ == '__main__':

    _HOST = '127.0.0.1'
    _PORT = 6667

    chat_server = ChatServer(_HOST, _PORT)
    chat_server.start()
    chat_server.join()

And my client.py ->

import socket
import select
import errno
import threading
import sys

RECV_HEADER_LENGTH = 10

class ChatClient(threading.Thread):


    def __init__(self, host, port):
        """
        Initialize new ChatClient

        :param host: Connect Host IP
        :param port: Connect Port number
        """

        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.username = input("Username: ")
        self.running = True

    def _send(self, sock, message):
        sock.send(message.encode('utf-8'))  

    def _connect(self):
        """
        Connecting to the ChatServer
        """
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.connect((self.host, self.port))
        self.client_socket.setblocking(0)
        self.username_header = f"{len(self.username):<{RECV_HEADER_LENGTH}}"
        self._send(self.client_socket, self.username_header+self.username)

    def prompt(self) :
        sys.stdout.write(f"#{self.username}$ ")
        sys.stdout.flush()

    def _run(self):
        """
        Actually run client.
        """
        while self.running:
            reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket], [self.client_socket], [])
            for sock in reading_sockets:
                if sock == self.client_socket:
                    username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
                    if not len(username_header):
                        print("Connection closed by the server.")
                        sys.exit()
                    username_length = int(username_header.decode("utf-8").strip())
                    username = self.client_socket.recv(username_length).decode("utf-8")
                    message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
                    message_length = int(message_header.decode('utf-8').strip())
                    message = self.client_socket.recv(message_length).decode('utf-8')
                    print(f"#{username}$ {message}")
            for sock in writing_sockets:
                self.prompt()
                message = input()
                print(len(message))
                if not message:
                    continue
                message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
                self._send(sock, message_header+message)
        self.stop()

    def run(self):
        """
        Given a host and a port, binds the socket and runs the server.
        """
        self._connect()
        self._run()

    def stop():
        """
        Stops the server by setting the "running" flag before closing
        the socket connection.
        """
        self.running = False
        self.client_socket.close()


if __name__ == '__main__':

    _HOST = '127.0.0.1'
    _PORT = 6667

    chat_server = ChatClient(_HOST, _PORT)
    chat_server.start()
    chat_server.join()

Now my problem is on client.py I think. In _run function, I use select reading_socket and writing_socket for same socket.

When I run this code, Blocking for loop for the reading_socket. Because in for loop for writing_sockets keep my shell and never release even comes another massage comes. So I want to make wait user input but at the same time read other messages and print on shell. I am using python3.7. How can I achieve this?


Solution

  • So I want to make wait user input but at the same time read other messages and print on shell. I am using python3.7. How can I achieve this?

    By making sure only to read from sys.stdin when sys.stdin actually has user-input ready to give you; that way your input() call won't block. You can do that by passing sys.stdin as one of the sockets in your first argument to select(). (Note: this won't work under Windows, because Microsoft in its wisdom decided that their implementation of select() would not support selecting-on-stdin. Under Windows you'll have to use a separate thread to do blocking reads from stdin instead, along with inter-thread messaging of some sort to get the data read from stdin back to the network-thread, and it's a huge pain to get working)

    Here's how I modified your _run(self) method to get the behavior you want (tested under MacOS/X):

    def _run(self):
        """
        Actually run client.
        """
        while self.running:
            reading_sockets, writing_sockets, exceptional_sockets = select.select([self.client_socket, sys.stdin], [], [])
            for sock in reading_sockets:
                if sock == self.client_socket:
                    username_header = self.client_socket.recv(RECV_HEADER_LENGTH)
                    if not len(username_header):
                        print("Connection closed by the server.")
                        sys.exit()
                    username_length = int(username_header.decode("utf-8").strip())
                    username = self.client_socket.recv(username_length).decode("utf-8")
                    message_header = self.client_socket.recv(RECV_HEADER_LENGTH)
                    message_length = int(message_header.decode('utf-8').strip())
                    message = self.client_socket.recv(message_length).decode('utf-8')
                    print(f"#{username}$ {message}")
                elif sock == sys.stdin:
                   self.prompt()
                   message = input()
                   print(len(message))
                   if not message:
                       continue
                   message_header = f"{len(message):<{RECV_HEADER_LENGTH}}"
                   self._send(self.client_socket, message_header+message)
        self.stop()
    

    Note that I added sys.stdin to the read-sockets argument of the select() call (so that select() will return when their is data ready-to-read from stdin), and that I also removed self.client_socket from the write-sockets argument (because placing it there will cause select() to return as soon as self.client_socket has buffer-space to accept more outgoing data, which is to say, it would return immediately almost all the time, which would spin your event loop and cause your client program to use close to 100% of a core, which is not what you want).

    I also modified your read-from-stdin code to be called only when the readable-socket is sys.stdin, since there's no point trying to read from stdin unless it has data to give you; and finally I have your self._send() call send on the TCP socket rather than trying to send bytes back to stdin (because stdin is read/input-only, so sending bytes to it doesn't make any sense).