Search code examples
pythonmultithreadingsocketspython-multithreadingthread-synchronization

Python: Store and delete Threads to/from List


I want to implement a streaming server which sends and endless stream of data to all connected clients. Multiple clients should be able to connect and disconnect from the server in order to process the data in different ways.

Each client is served by a dedicated ClientThread, which sub-classes Thread and contains a queue of the data to be sent to the client (necessary, since clients might process data at different speeds and because bursts of data can occur which the clients might be unable to handle).

The program listens to incoming client connections via a seperate ClientHandlerThread. Whenever a client connects, the ClientHandlerThread spawns a ClientThread and adds it to a list.

As a dummy example, the main Thread increments an integer each second and pushes it to all ClientThread queues through ClientHandlerThread.push_item().

Every 10 increments, the number of items in the client queues is printed.


Now to my questions:

When a client disconnects, the Thread terminates and no more data is send, however, the ClientThread object remains in the ClientHandlerThreads list of clients and items are continuously pushed to its queue.

I'm therefore looking for either (1) a way to delete the ClientThread object from the list whenever a client disconnects, (2) a better way to monitor the ClientThreads than a list or (3) a different (better) architecture to archive my goal.

Many thanks!


Server

import socket
import time

from threading import Thread
from queue import Queue


class ClientThread(Thread):

    def __init__(self, conn, client_addr):
        Thread.__init__(self)

        self.queue = Queue()

        self.conn = conn
        self.client_addr = client_addr

    def run(self):
        print('Client connected')

        while True:
            try:
                self.conn.sendall(self.queue.get().encode('utf-8'))
                time.sleep(1)
            except BrokenPipeError:
                print('Client disconnected')
                break


class ClientHandlerThread(Thread):

    def __init__(self):
        Thread.__init__(self, daemon = True)

        self.clients = list()

    def push_item(self, item):
        for client in self.clients:
            client.queue.put(str(i))

    def run(self):

        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:

            s.bind('./socket')
            s.listen()

            i = 1

            while True:
                conn, client_addr = s.accept()

                client = ClientThread(conn, client_addr)
                client.start()
                self.clients.append(client)

                i += 1


if __name__ == '__main__':

    client_handler = ClientHandlerThread()
    client_handler.start()

    i = 1

    while True:
        client_handler.push_item(str(i))

        if i % 10 == 0:
            print('[' + ', '.join([str(client.queue.qsize()) for client in client_handler.clients]) + ']')


        i += 1
        time.sleep(1)

Client:

import socket

if __name__ == '__main__':
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
        s.connect('./socket')

        print('Connected to server')

        while True:
            data = s.recv(1024)

            if not data:
                print('Disconnected from server')
                break

            print(data.decode('utf-8'))

Solution

  • Note You should probably read up on things like aiohttp for much more scalable versions to your answer.


    For your question, you can make a few changes to achieve this:

    First, change ClientThread's constructor:

    class ClientThread(Thread):
    
        def __init__(self, client_handler, conn, client_addr):
            self.client_handler = client_handler
            self.running = True
            ...
    

    When the handler creates the object, it should pass self to it as client_handler.

    In the run method, use

       def run(self):
            while True:
                ...
    
            self.running = False
            self.client_handler.purge() 
    

    That is, it marks itself as not running, and calls the purge method of handler. This can be written as

    class ClientHandlerThread(Thread):
        ...
        def purge(self):
            self.clients = [c for c in self.clients if c.running]