Search code examples
pythonmultithreadingreal-time

Sharing data between Threads in Python


i am currently programming a little application that gets a dataArray from a UDP socket and then 3 different threads should check this dataArray for items that belong to it (all datachunks have IDs, if the ID matches it belongs to the thread). The 3 side threads use a while True loop so they allways process the data as soon as something new arrives.


def mlFunc(inp):
    while True:
        temp = 
        if temp:
            canDo.publish(temp)

this is the function the side threads use. temp is supposed to be the data canDo needs

def consumer(in_q):
    while True:
        canAr = canParser.b2can(in_q.get())

and this is the code that gets the data from the UDP script

I tried using a global variable but when i use that variable in my side thread, the main thread just freezes. When i use a queue as soon as one thread takes the data its just gone.

If there is any better way to tell those threads that an update happened it would be more then welcome. The processing of the data needs to be realtime.


Solution

  • If there are three consumers, create a queue for each of them, and pass all of them to the publisher, and have the publisher push messages to all of them.

    import threading
    import time
    from queue import Queue
    
    
    def publisher(consumers):
        for x in range(10):
            value = 2 ** x
            for consumer in consumers:
                consumer.put(value)
            time.sleep(0.1)
        for consumer in consumers:
            consumer.put(None)  # sentinel value to indicate end of stream
    
    
    def consumer(name, queue):
        while True:
            value = queue.get()
            if value is None:
                print(f"{name} will quit now")
                break
            print(f"{name}: Got {value}")
    
    
    def main():
        consumer_threads = []
        consumer_queues = []
        for x in range(3):
            queue = Queue()
            consumer_queues.append(queue)
            thread = threading.Thread(target=consumer, args=(f"Consumer {x}", queue))
            thread.start()
            consumer_threads.append(thread)
        publisher_thread = threading.Thread(target=publisher, args=(consumer_queues,))
        publisher_thread.start()
        publisher_thread.join()
        for thread in consumer_threads:
            thread.join()
    
    
    main()