Search code examples
pythonclasssocketspython-3.7serversocket

How do I make a server be in a class structure?


So I have a file that works as a socket server and I need it to be in a class structure. I would like to know how you would do it. There are some global vars and non global vars so for example which type of those will be self. or be private. The class should be Server with an init and all the methods should be included in the class. After the class is written the code needs to create Server object and run the essential methods i.e:

server_socket.bind(('0.0.0.0', 2303))

server_socket.listen(5)

handle_new_connection()

handle_receive_data()

handle_sending_msgs()

del public_msgs[:]

del private_msgs[:]

public_msgs.append((None, "bye"))

handle_sending_msgs()

server_socket.close()

Here's the code:

import socket
import select
import datetime


def get_current_time():
    """Returns hh:mm"""
    now = datetime.datetime.now()
    time = str(now.hour) + ":" + str(now.minute)
    return time


def get_name(user_socket):
    """Returns the name of the user_socket, if he is an admin he gets @ before his name"""
    name = sockets_names[user_socket]
    if len(admins) != 0:
        if user_socket in admins:
            name = "@" + name
    return name


def get_data_length(data):
    """Returns the length of the data as string with length 3"""
    length = str(len(data))
    while len(length) < 3:
        length = "0" + length
    return length


def get_socket_by_name(name):
    """Returns the socket with the name, if none exists return None"""
    if len(sockets_names) != 0:
        for socket_pair, socket_name in sockets_names.items():
            if name == socket_name:
                return socket_pair
    return None


def get_admins_as_string():
    admins_names_lst = []
    for admin_socket in admins:
        admins_names_lst.append(sockets_names[admin_socket])
    return str(admins_names_lst)[1:-1]


def remove_socket(removed_socket):
    open_sockets.remove(removed_socket)
    if removed_socket in admins:
        admins.remove(removed_socket)
    del sockets_names[removed_socket]


def handle_new_connection():
    for new_connection in rlist:
        if new_connection is server_socket:
            (new_socket, address) = server_socket.accept()
            open_sockets.append(new_socket)
            sockets_names[new_socket] = "Anonymous"
            if len(admins) == 0:
                admins.append(new_socket)
                print("New Connection And Admin")
            else:
                print("New Connection")


def handle_receive_data():
    for current_socket in rlist:
        if current_socket is not server_socket:
            data_length = int(current_socket.recv(3).decode('utf-8'))
            data = current_socket.recv(data_length).decode('utf-8')
            if data[0] == '/':
                handle_commands(current_socket, data[1:])
            else:
                private_msgs.append((current_socket, "You: " + data))
                if current_socket in muted_sockets:
                    private_msgs.append((current_socket, """"You are muted and so can't send msgs to everyone. You can 
ask one of the admins to unmute you in a private msg"""))
                else:
                    public_msgs.append((current_socket, get_name(current_socket) + ": " + data))


def handle_sending_msgs():
    for message in public_msgs:
        (sender_socket, data) = message
        data = get_current_time() + " " + data
        for receiver_socket in wlist:
            if receiver_socket is not sender_socket:
                receiver_socket.send(bytes(get_data_length(data), 'utf8'))
                receiver_socket.send(bytes(data, 'utf8'))
                if message in public_msgs:
                    public_msgs.remove(message)

    for message in private_msgs:
        (receiver_socket, data) = message
        data = get_current_time() + " " + data
        if receiver_socket in wlist:
            receiver_socket.send(get_data_length(data).encode('utf-8'))
            receiver_socket.send(data.encode('utf-8'))
            if message in private_msgs:
                private_msgs.remove(message)
            if data.split(' ')[1] == "bye":
                remove_socket(receiver_socket)


def handle_commands(current_socket, data):
    command = data.split(' ')[0].lower()
    data = ' '.join(data.split(' ')[1:])

    if command == "exit":
        public_msgs.append((current_socket, get_name(current_socket) + " left the chat."))
        private_msgs.append((current_socket, "bye"))
        print("Connection with " + get_name(current_socket) + " closed.")

    elif command == 'rename' or command == 'setname':
        if data not in sockets_names.values():
            if data.lower() != "you" and data.lower() != "server" and data.lower()[0] != "@":
                sockets_names[current_socket] = data
                private_msgs.append((current_socket, "Your name has been successfully changed to " + data + "."))
            else:
                private_msgs.append((current_socket, data + " is not a valid name."))
        else:
            private_msgs.append((current_socket, "This name is already taken."))

    elif command == 'setadmin' or command == "promote":
        if current_socket in admins:
            if data not in sockets_names.values():
                private_msgs.append((current_socket, "This name doesn't exist in this server."))

            else:
                new_admin_socket = get_socket_by_name(data)
                admins.append(new_admin_socket)
                private_msgs.append((current_socket, data + " has been promoted to admin."))
                public_msgs.append((current_socket, get_name(current_socket) + " promoted " + data + " to admin."))

        else:
            private_msgs.append((current_socket, "You don't have access to this command."))

    elif command == 'kick' or command == 'remove':
        if current_socket in admins:
            if data not in sockets_names.values():
                private_msgs.append((current_socket, "This name doesn't exist in this server."))

            else:
                kicked_socket = get_socket_by_name(data)
                private_msgs.append((current_socket, data + " has been successfully kicked and removed."))
                public_msgs.append((current_socket, get_name(current_socket) + " kicked and removed " + data))
                private_msgs.append((kicked_socket, get_name(current_socket) + " kicked you."))
                private_msgs.append((kicked_socket, "bye"))

    elif command == 'mute':
        if current_socket in admins:
            if data not in sockets_names.values():
                private_msgs.append((current_socket, "This name doesn't exist in this server."))

            else:
                muted_socket = get_socket_by_name(data)
                muted_sockets.append(muted_socket)
                private_msgs.append((current_socket, data + " has been successfully muted."))
                public_msgs.append((current_socket, get_name(current_socket) + " muted " + data))
                private_msgs.append((muted_socket, get_name(current_socket) + " muted you."))

        else:
            private_msgs.append((current_socket, "You are not an admin and so you have no such permissions."))

    elif command == 'unmute':
        if current_socket in admins:
            if data not in sockets_names.values():
                private_msgs.append((current_socket, "This name doesn't exist in this server."))

            else:
                unmute_socket = get_socket_by_name(data)
                if unmute_socket not in muted_sockets:
                    private_msgs.append((current_socket, "This user isn't muted."))

                else:
                    muted_sockets.remove(unmute_socket)
                    private_msgs.append((current_socket, data + " has been successfully unmuted."))
                    public_msgs.append((current_socket, get_name(current_socket) + " unmuted " + data))
                    private_msgs.append((unmute_socket, get_name(current_socket) + " unmuted you."))

        else:
            private_msgs.append((current_socket, "You are not an admin and so you have no such permissions."))

    elif command == 'msg' or command == 'message' or command == "prvmsg" or command == "privatemessage":
        send_to_name = data.split(' ')[0]
        data = ' '.join(data.split(' ')[1:])
        if send_to_name not in sockets_names.values():
            private_msgs.append((current_socket, "This name doesn't exist in this server."))

        else:
            send_to_socket = get_socket_by_name(send_to_name)
            private_msgs.append(
                (current_socket, "You -> " + send_to_name + ": " + data))
            private_msgs.append((send_to_socket, get_name(current_socket) + " -> " + send_to_name + ": " + data))

    elif command == 'admin' or command == "admins" or command == "adminlist" or command == "adminslist":
        private_msgs.append((current_socket, "Admins: " + get_admins_as_string()))

    elif command == 'users' or command == "userslist" or command == 'user' or command == "userlist":
        private_msgs.append((current_socket, "Users: " + str(sockets_names.values())[1:-1]))

    elif command == 'help' or command == '?':
        commands = """/rename <name> - change your name.\n/msg <user> <msg> - will send <msg> as a private massage that only <user>
            can see.\n/users - returns the names of all the connected users.\n/admins - returns  the names of all the connected admins.\n/exit - 
            will disconnect you.\n\nAdmins' Commends Only:\n/kick <user> - kick the <user> from the server.\n/promote <user> - will ser <user> to 
            be an admin.\nmute <user> - will no let him send public msgs, only privates.\nunmute <user> - will cancel the mute on this user."""
        private_msgs.append((current_socket, "\nCommands:\n" + commands + "\n"))

    else:
        private_msgs.append((current_socket, command + " is not a valid command."))


def main():
    global server_socket, open_sockets, sockets_names, admins, muted_sockets, public_msgs, private_msgs
    global rlist, wlist, xlist
    server_socket = socket.socket()
    try:
        server_socket.bind(('0.0.0.0', 2303))

        server_socket.listen(5)

        open_sockets = []
        sockets_names = {}
        admins = []
        muted_sockets = []
        public_msgs = []
        private_msgs = []

        while True:
            rlist, wlist, xlist = select.select([server_socket] + open_sockets, open_sockets, [])

            handle_new_connection()
            handle_receive_data()
            handle_sending_msgs()

    finally:
        del public_msgs[:]
        del private_msgs[:]
        public_msgs.append((None, "bye"))
        handle_sending_msgs()
        server_socket.close()


if __name__ == '__main__':
    main()

Solution

  • Here is the solution:

    import socket
    import select
    import datetime
    
    open_sockets=[]
    sockets_names= {}
    admins = []
    muted_sockets = []
    public_msgs = []
    private_msgs = []
    
    global server_socket
    global rlist, wlist, xlist
    class Server:
        def __init__(self):
            server_socket = socket.socket()
            try:
                server_socket.bind(('0.0.0.0', 2303))
                server_socket.listen(5)
    
                while True:
                    rlist, wlist, xlist = select.select([server_socket] + open_sockets, open_sockets, [])
    
                    self.handle_new_connection()
                    self.handle_receive_data()
                    self.handle_sending_msgs()
    
            finally:
                del public_msgs[:]
                del private_msgs[:]
                public_msgs.append((None, "bye"))
                self.handle_sending_msgs()
                server_socket.close()
    
        def get_current_time(self):
            """Returns hh:mm"""
            now = datetime.datetime.now()
            time = str(now.hour) + ":" + str(now.minute)
            return time
    
    
        def get_name(self,user_socket):
            """Returns the name of the user_socket, if he is an admin he gets @ before his name"""
            name = sockets_names[user_socket]
            if len(admins) != 0:
                if user_socket in admins:
                    name = "@" + name
            return name
    
    
        def get_data_length(self,data):
            """Returns the length of the data as string with length 3"""
            length = str(len(data))
            while len(length) < 3:
                length = "0" + length
            return length
    
    
        def get_socket_by_name(self,name):
            """Returns the socket with the name, if none exists return None"""
            if len(sockets_names) != 0:
                for socket_pair, socket_name in sockets_names.items():
                    if name == socket_name:
                        return socket_pair
            return None
    
    
        def get_admins_as_string(self):
            admins_names_lst = []
            for admin_socket in admins:
                admins_names_lst.append(sockets_names[admin_socket])
            return str(admins_names_lst)[1:-1]
    
    
        def remove_socket(self,removed_socket):
            open_sockets.remove(removed_socket)
            if removed_socket in admins:
                admins.remove(removed_socket)
            del sockets_names[removed_socket]
    
    
        def handle_new_connection(self):
            for new_connection in rlist:
                if new_connection is server_socket:
                    (new_socket, address) = server_socket.accept()
                    open_sockets.append(new_socket)
                    sockets_names[new_socket] = "Anonymous"
                    if len(admins) == 0:
                        admins.append(new_socket)
                        print("New Connection And Admin")
                    else:
                        print("New Connection")
    
    
        def handle_receive_data(self):
            for current_socket in rlist:
                if current_socket is not server_socket:
                    data_length = int(current_socket.recv(3).decode('utf-8'))
                    data = current_socket.recv(data_length).decode('utf-8')
                    if data[0] == '/':
                        self.handle_commands(current_socket, data[1:])
                    else:
                        private_msgs.append((current_socket, "You: " + data))
                        if current_socket in muted_sockets:
                            private_msgs.append((current_socket, """"You are muted and so can't send msgs to everyone. You can
        ask one of the admins to unmute you in a private msg"""))
                        else:
                            public_msgs.append((current_socket, self.get_name(current_socket) + ": " + data))
    
    
        def handle_sending_msgs(self):
            for message in public_msgs:
                (sender_socket, data) = message
                data = self.get_current_time() + " " + data
                for receiver_socket in wlist:
                    if receiver_socket is not sender_socket:
                        receiver_socket.send(bytes(self.get_data_length(data), 'utf8'))
                        receiver_socket.send(bytes(data, 'utf8'))
                        if message in public_msgs:
                            public_msgs.remove(message)
    
            for message in private_msgs:
                (receiver_socket, data) = message
                data = self.get_current_time() + " " + data
                if receiver_socket in wlist:
                    receiver_socket.send(self.get_data_length(data).encode('utf-8'))
                    receiver_socket.send(data.encode('utf-8'))
                    if message in private_msgs:
                        private_msgs.remove(message)
                    if data.split(' ')[1] == "bye":
                        self.remove_socket(receiver_socket)
    
    
        def handle_commands(self,current_socket, data):
            command = data.split(' ')[0].lower()
            data = ' '.join(data.split(' ')[1:])
    
            if command == "exit":
                public_msgs.append((current_socket, self.get_name(current_socket) + " left the chat."))
                private_msgs.append((current_socket, "bye"))
                print("Connection with " + self.get_name(current_socket) + " closed.")
    
            elif command == 'rename' or command == 'setname':
                if data not in sockets_names.values():
                    if data.lower() != "you" and data.lower() != "server" and data.lower()[0] != "@":
                        sockets_names[current_socket] = data
                        private_msgs.append((current_socket, "Your name has been successfully changed to " + data + "."))
                    else:
                        private_msgs.append((current_socket, data + " is not a valid name."))
                else:
                    private_msgs.append((current_socket, "This name is already taken."))
    
            elif command == 'setadmin' or command == "promote":
                if current_socket in admins:
                    if data not in sockets_names.values():
                        private_msgs.append((current_socket, "This name doesn't exist in this server."))
    
                    else:
                        new_admin_socket = self.get_socket_by_name(data)
                        admins.append(new_admin_socket)
                        private_msgs.append((current_socket, data + " has been promoted to admin."))
                        public_msgs.append((current_socket, self.get_name(current_socket) + " promoted " + data + " to admin."))
    
                else:
                    private_msgs.append((current_socket, "You don't have access to this command."))
    
            elif command == 'kick' or command == 'remove':
                if current_socket in admins:
                    if data not in sockets_names.values():
                        private_msgs.append((current_socket, "This name doesn't exist in this server."))
    
                    else:
                        kicked_socket = self.get_socket_by_name(data)
                        private_msgs.append((current_socket, data + " has been successfully kicked and removed."))
                        public_msgs.append((current_socket, self.get_name(current_socket) + " kicked and removed " + data))
                        private_msgs.append((kicked_socket, self.get_name(current_socket) + " kicked you."))
                        private_msgs.append((kicked_socket, "bye"))
    
            elif command == 'mute':
                if current_socket in admins:
                    if data not in sockets_names.values():
                        private_msgs.append((current_socket, "This name doesn't exist in this server."))
    
                    else:
                        muted_socket = self.get_socket_by_name(data)
                        muted_sockets.append(muted_socket)
                        private_msgs.append((current_socket, data + " has been successfully muted."))
                        public_msgs.append((current_socket, self.get_name(current_socket) + " muted " + data))
                        private_msgs.append((muted_socket, self.get_name(current_socket) + " muted you."))
    
                else:
                    private_msgs.append((current_socket, "You are not an admin and so you have no such permissions."))
    
            elif command == 'unmute':
                if current_socket in admins:
                    if data not in sockets_names.values():
                        private_msgs.append((current_socket, "This name doesn't exist in this server."))
    
                    else:
                        unmute_socket = self.get_socket_by_name(data)
                        if unmute_socket not in muted_sockets:
                            private_msgs.append((current_socket, "This user isn't muted."))
    
                        else:
                            muted_sockets.remove(unmute_socket)
                            private_msgs.append((current_socket, data + " has been successfully unmuted."))
                            public_msgs.append((current_socket, self.get_name(current_socket) + " unmuted " + data))
                            private_msgs.append((unmute_socket, self.get_name(current_socket) + " unmuted you."))
    
                else:
                    private_msgs.append((current_socket, "You are not an admin and so you have no such permissions."))
    
            elif command == 'msg' or command == 'message' or command == "prvmsg" or command == "privatemessage":
                send_to_name = data.split(' ')[0]
                data = ' '.join(data.split(' ')[1:])
                if send_to_name not in sockets_names.values():
                    private_msgs.append((current_socket, "This name doesn't exist in this server."))
    
                else:
                    send_to_socket = self.get_socket_by_name(send_to_name)
                    private_msgs.append(
                        (current_socket, "You -> " + send_to_name + ": " + data))
                    private_msgs.append((send_to_socket, self.get_name(current_socket) + " -> " + send_to_name + ": " + data))
    
            elif command == 'admin' or command == "admins" or command == "adminlist" or command == "adminslist":
                private_msgs.append((current_socket, "Admins: " + self.get_admins_as_string()))
    
            elif command == 'users' or command == "userslist" or command == 'user' or command == "userlist":
                private_msgs.append((current_socket, "Users: " + str(sockets_names.values())[1:-1]))
    
            elif command == 'help' or command == '?':
                commands = """/rename <name> - change your name.\n/msg <user> <msg> - will send <msg> as a private massage that only <user>
                    can see.\n/users - returns the names of all the connected users.\n/admins - returns  the names of all the connected admins.\n/exit -
                    will disconnect you.\n\nAdmins' Commends Only:\n/kick <user> - kick the <user> from the server.\n/promote <user> - will ser <user> to
                    be an admin.\nmute <user> - will no let him send public msgs, only privates.\nunmute <user> - will cancel the mute on this user."""
                private_msgs.append((current_socket, "\nCommands:\n" + commands + "\n"))
    
            else:
                private_msgs.append((current_socket, command + " is not a valid command."))
    
    
    
    

    Just call the class:

    ServerClass()