Search code examples
pythonmultithreadingdesign-patterns

All thread receive same results from function


Imagine you want to broadcast the data generate with a generator. There are so many consumer which receive the data and consume it. Generator will generate the data regarding if there is any consumer or not. Every consumer which join can receive the data from the moment which joined. The best solution is to use observer pattern, but what if the number of consumer are so high and the rate of generating the information is also so high. In this case when the data 1 is available and you are broadcasting the data to all observers, the data 2 is ready and you didn't finish broadcasting the data 1 yet. I think about is there any solution with this problem or not? One thing that I think about is: Is it possible some thread execute a same function in a shared space? In other word, is it possible to run function once and all the thread get the result all together.

Look at this code:

class data_provider():

    def __init__(self) -> None:
        self.cnt = 0
        self.stop = False
        self.data_received = Event()

        self.data_received.clear()

        data_generator_thread = threading.Thread(target= self.data_generator)
        data_generator_thread.start()

    def data_sender(self):

        self.data_received.wait()
        
        while not self.stop:
            yield { 'Name': 'Data', 'Count': self.cnt}
            self.data_received.clear()

            self.data_received.wait()



    def data_generator(self):
        while(self.cnt < 20):
            self.cnt += 1
            self.data_received.set()
            sleep(1)

        self.stop = True
        self.data_received.set()

and main.py

obj = data_provider()

def printing(id):
    for data in obj.data_sender():
        print(str(id) + ' : ' + str(data))

our_threads = []

for i in range(100):
    our_threads.append(Thread(target= printing, args=(i,)))
    our_threads[i].start()

for i in range(100):
    our_threads[i].join()

print('End')

If all the thread run the data_sender() once and all get the same result as soon as the other thread receive the information.

Any idea?


Solution

  • If you are looking to have a single producer of some data and multiple "consumers" of that data, then the following code uses Condition instances with notification. For demo purposes the producer only produces 10 pieces of data and we have 3 consumers. This will keep the output to a reasonable length:

    Update Using Linked List

    import threading
    
    class Node:
        def __init__(self, data, previous_node):
            self.consumed_count = 0
            self.next = None
            self.data = data
            if previous_node:
                previous_node.next = self
    
        def __repr__(self):
            return f'[data: {repr(self.data)}, consumed_count: {self.consumed_count}, next: {self.next}]'
    
    class data_provider:
        def __init__(self, num_consumers) -> None:
            self.num_consumers = num_consumers
            self.lock = threading.Lock()
            self.condition = threading.Condition()
            self.running = True
            # To simplify the code, the first node in the list is a dummy:
            self.linked_list = Node(None, None)
    
            data_generator_thread = threading.Thread(target=self.data_generator)
            data_generator_thread.start()
    
        def data_generator(self):
            import time
    
            last_node = self.linked_list
    
            for cnt in range(1, 11) :  # Reduced count for demo purposes
                # For demo purposes let's introduce a pause:
                time.sleep(.5)
                last_node = Node({'Name': 'Data', 'Count': cnt}, last_node)
                with self.condition:
                    self.condition.notify_all()
    
            print('Done producing')
            # Let consumers know that no more data will be coming:
            with self.condition:
                self.running = False
                self.condition.notify_all()
    
        def remove_consumed_nodes(self):
            with self.lock:
                # Remove completely consumed links except for the last one:
                prev_node = self.linked_list.next
                node = prev_node.next
                while node and node.consumed_count == self.num_consumers:
                    prev_node = node
                    node = node.next
                self.linked_list.next = prev_node
    
    
    N_PRINTERS = 3  # The number of printer threads:
    
    obj = data_provider(N_PRINTERS)
    
    def printing(id):
        last_node = obj.linked_list
    
        while True:
            with obj.condition:
                obj.condition.wait_for(
                    lambda: not obj.running or last_node.next
                )
    
            if not last_node.next:
                return
    
            last_node = last_node.next
            while True:
                print(id, ':', last_node.data)
                with obj.lock:
                    last_node.consumed_count += 1
                if not last_node.next:
                    break
                last_node = last_node.next
    
            obj.remove_consumed_nodes()
    
    
    printer_threads = []
    
    for i in range(N_PRINTERS):
        thread = threading.Thread(target=printing, args=(i,))
        thread.start()
        printer_threads.append(thread)
    
    for thread in printer_threads:
        thread.join()
    
    print('End')
    print(obj.linked_list)
    

    Prints:

    1 : {'Name': 'Data', 'Count': 1}
    0 : {'Name': 'Data', 'Count': 1}
    2 : {'Name': 'Data', 'Count': 1}
    0 : {'Name': 'Data', 'Count': 2}
    1 : {'Name': 'Data', 'Count': 2}
    2 : {'Name': 'Data', 'Count': 2}
    0 : {'Name': 'Data', 'Count': 3}
    2 : {'Name': 'Data', 'Count': 3}
    1 : {'Name': 'Data', 'Count': 3}
    0 : {'Name': 'Data', 'Count': 4}
    1 : {'Name': 'Data', 'Count': 4}
    2 : {'Name': 'Data', 'Count': 4}
    0 : {'Name': 'Data', 'Count': 5}
    1 : {'Name': 'Data', 'Count': 5}
    2 : {'Name': 'Data', 'Count': 5}
    0 : {'Name': 'Data', 'Count': 6}
    2 : {'Name': 'Data', 'Count': 6}
    1 : {'Name': 'Data', 'Count': 6}
    0 : {'Name': 'Data', 'Count': 7}
    1 : {'Name': 'Data', 'Count': 7}
    2 : {'Name': 'Data', 'Count': 7}
    2 : {'Name': 'Data', 'Count': 8}
    0 : {'Name': 'Data', 'Count': 8}
    1 : {'Name': 'Data', 'Count': 8}
    2 : {'Name': 'Data', 'Count': 9}
    0 : {'Name': 'Data', 'Count': 9}
    1 : {'Name': 'Data', 'Count': 9}
    Done producing
    2 : {'Name': 'Data', 'Count': 10}
    0 : {'Name': 'Data', 'Count': 10}
    1 : {'Name': 'Data', 'Count': 10}
    End
    [data: None, consumed_count: 0, next: [data: {'Name': 'Data', 'Count': 10}, consumed_count: 3, next: None]]
    

    Reusable MultiConsumerProducer Class

    The above code can be re-engineered for improved reusability.

    import threading
    from typing import Iterable, List, Any
    
    class MultiConsumerProducer:
        class Node:
            def __init__(self, data: Any, previous_node: 'Node'):
                self._consumed_count = 0
                self._next = None
                self._data = data
                if previous_node:
                    previous_node._next = self
    
            @property
            def data(self) -> Any:
                return self._data
    
            def __repr__(self):
                return f'[_data: {repr(self._data)}, _consumed_count: {self._consumed_count}, _next: {self._next}]'
    
        def __init__(self, num_consumers: int, data_collection: Iterable) -> None:
            self._num_consumers = num_consumers
            self._lock = threading.Lock()
            self._condition = threading.Condition()
            self._running = True
            # To simplify the code, the first node in the list is a dummy:
            self._linked_list = MultiConsumerProducer.Node(None, None)
    
            threading.Thread(target=self._data_generator, args=(data_collection,), daemon=True).start()
    
        def print_nodes(self) -> None:
            """Print linked list of nodes."""
    
            print(self._linked_list)
    
        def _data_generator(self, data_collection):
            """Generate nodes."""
    
            last_node = self._linked_list
            for data in data_collection:
                last_node = MultiConsumerProducer.Node(data, last_node)
                with self._condition:
                    self._condition.notify_all()
    
            self._running = False
            with self._condition:
                self._condition.notify_all()
    
        def get_next_nodes(self, last_node_processed: Node=None) -> List[Node]:
            """Get next list of ready nodes."""
    
            last_node = last_node_processed or self._linked_list
    
            with self._condition:
                self._condition.wait_for(
                    lambda: not self._running or last_node._next
                )
    
            if not last_node._next:
                return []
    
            nodes = []
            last_node = last_node._next
            while True:
                nodes.append(last_node)
                if not last_node._next:
                    return nodes
                last_node = last_node._next
    
        def consumed_node(self, node: Node) -> None:
            """Show node has been consumed."""
    
            with self._lock:
                node._consumed_count += 1
    
                if node._consumed_count == self._num_consumers:
                    # Remove completely consumed links except for the last one:
                    prev_node = self._linked_list._next
                    node = prev_node._next
                    while node and node._consumed_count == self._num_consumers:
                        prev_node = node
                        node = node._next
                    self._linked_list._next = prev_node
    
    ##############################################################
    
    def producer():
        import time
    
        for cnt in range(1, 11) :  # Reduced count for demo purposes
            # For demo purposes let's introduce a pause:
            time.sleep(.5)
            yield {'Name': 'Data', 'Count': cnt}
        print('Done producing')
    
    
    N_PRINTERS = 3  # The number of printer threads:
    
    obj = MultiConsumerProducer(N_PRINTERS, producer())
    
    def printing(id):
        last_node_processed = None
    
        while (nodes := obj.get_next_nodes(last_node_processed)):
            for last_node_processed in nodes:
                print(id, ':', last_node_processed.data)
                obj.consumed_node(last_node_processed)
    
    printer_threads = []
    
    for i in range(N_PRINTERS):
        thread = threading.Thread(target=printing, args=(i,))
        thread.start()
        printer_threads.append(thread)
    
    for thread in printer_threads:
        thread.join()
    
    print('End')
    
    print('\nNodes:')
    obj.print_nodes()
    

    One More Time

    The following function generates an abstract base class that uses queues for delivering work and supports producer/consumers running in either threads or processes

    def generate_multi_consumer_producer(n_consumers, use_multiprocessing: bool=False, queue_size=0):
        """Generate an abstract base for single producer multiple consumers.
            n_consumers: The number of consumers.
            use_multiprocessing: True to use producer/consumers that run in child processes
                                 otherwise child threads are used.
            queue_size: If producing is faster than consumption, you can specify a
                        a positive value for queue_size to prevent the queues from conyinuously
                        growing."""
    
        from abc import ABC, abstractmethod
        from typing import List, Iterable
    
        if use_multiprocessing:
            from multiprocessing import Process as ExecutionClass, JoinableQueue as QueueClass
        else:
            from threading import Thread as ExecutionClass
            from queue import Queue as QueueClass
    
        class MultiConsumerProducer(ABC):
            def __init__(self, n_consumers: int=n_consumers, queue_size=queue_size) -> None:
                self._n_consumers = n_consumers
                self._queues = [QueueClass(queue_size) for _ in range(n_consumers)]
                self._producer = ExecutionClass(target=self._run)
                self._producer.start()
    
            def _run(self):
                # Start the consumers:
                for consumer_id in range(self._n_consumers):
                    ExecutionClass(
                        target=self._consumer,
                        args=(consumer_id, self._queues[consumer_id]),
                        daemon=True
                    ).start()
    
                # Produce the data
                for data in self.produce():
                    for queue in self._queues:
                        queue.put(data)
    
                # Wait for all work to be completed
                for queue in self._queues:
                    queue.join()
    
            def join(self) -> None:
                """Wait for all work to complete."""
    
                self._producer.join()
    
            def _consumer(self, consumer_id: int, queue: QueueClass):
                while True:
                    data = queue.get()
                    try:
                        self.consume(consumer_id, data)
                    except Exception as e:
                        print(f'Exception in consumer {consumer_id}: {e}')
                    finally:
                        queue.task_done()
    
            @abstractmethod
            def produce(self):
                """This should be a generator function."""
                pass
    
            @abstractmethod
            def consume(self, consumer_id: int, data: object) -> None:
                pass
    
        return MultiConsumerProducer
    

    The following is an example usage where I have 3 existing consumer functions and an existing producer function showing how you could use them without modification:

    def consumer_0(consumer_id, n):
        print(f'id {consumer_id}: {n} ** 1 = {n}')
    
    def consumer_1(consumer_id, n):
        print(f'id {consumer_id}: {n} ** 2 = {n ** 2}')
    
    def consumer_2(consumer_id, n):
        print(f'id {consumer_id}: {n} ** 3 = {n ** 3}')
    
    def producer():
        import time
    
        for n in range(1, 11) :  # Reduced count for demo purposes
            # For demo purposes let's introduce a pause:
            time.sleep(.5)
            yield n
        print('Done producing', flush=True)
    
    MultiConsumerProducer = generate_multi_consumer_producer(3, use_multiprocessing=True)
    
    class MyMultiConsumerProducer(MultiConsumerProducer):
        """An example that uses 3 different consumers."""
        consumers = [consumer_0, consumer_1, consumer_2]
    
        def produce(self):
            yield from producer()
    
        def consume(self, consumer_id, data):
            self.consumers[consumer_id](consumer_id, data)
    
    if __name__ == '__main__':
        p = MyMultiConsumerProducer(3)
        # Wait for all work to complete:
        p.join()
    

    Prints:

    id 0: 1 ** 1 = 1
    id 1: 1 ** 2 = 1
    id 2: 1 ** 3 = 1
    id 0: 2 ** 1 = 2
    id 2: 2 ** 3 = 8
    id 1: 2 ** 2 = 4
    id 0: 3 ** 1 = 3
    id 2: 3 ** 3 = 27
    id 1: 3 ** 2 = 9
    id 2: 4 ** 3 = 64
    id 1: 4 ** 2 = 16
    id 0: 4 ** 1 = 4
    id 0: 5 ** 1 = 5
    id 1: 5 ** 2 = 25
    id 2: 5 ** 3 = 125
    id 0: 6 ** 1 = 6
    id 1: 6 ** 2 = 36
    id 2: 6 ** 3 = 216
    id 0: 7 ** 1 = 7
    id 1: 7 ** 2 = 49
    id 2: 7 ** 3 = 343
    id 0: 8 ** 1 = 8
    id 1: 8 ** 2 = 64
    id 2: 8 ** 3 = 512
    id 0: 9 ** 1 = 9
    id 2: 9 ** 3 = 729
    id 1: 9 ** 2 = 81
    Done producing
    id 2: 10 ** 3 = 1000
    id 1: 10 ** 2 = 100
    id 0: 10 ** 1 = 10