Search code examples
pythonpython-multithreading

The executing order gets incorrect in python threading, how to make it correct?


I would like to get data from a sensor, and put that record on the cloud service in the correct order.

It takes time to put record, so I try using threading method so that I can get data simultaneously (otherwise data would be gone while putting records)

Here is my dummy code.

import threading
import time

def putrecord(data_put):
    time.sleep(0.8) #it takes time to put record
    print(data_put[-1]) #sometimes unordered (ex. 9,29,19,...)

def main():
    data_put=[]
    for i in range(100):
        time.sleep(0.08) #getting data from a sensor
        data_put.append(i)
        if len(data_put)>=10: #every 10 records
            sub = threading.Thread(target=putrecord,args=(data_put,))
            sub.start()
            data_put=[]
    

if __name__ == '__main__':
    main()

Then somehow sometime, the order gets incorrect (it does not happen on the dummy code). I have no confident idea why it happens, but I guess it happens when getting/putting data takes time longer than usual. Is there any way to put record in the correct order in that case?

I hope to make myself understood.


Solution

  • The way you get and put data can fail if a thread takes too long "putting a record". If it takes long enough, new data will arrive and a new thread will be created and started. This thread will "put" records concurrently with the older one, resulting in out of order data.

    Moreover, you are not joining the threads you create. This results in a very small resource leak (it's equivalent to not closing a file), that probably does not matter (unless you program runs continuously for many years).

    The most elegant way of implementing what you need is to use a queue, as @jarmod proposed in the comments. Nevertheless, you can use a simpler approach reusing your code.

    If you need the data to be processed in chunks of fixed number of records (as in you example), you can keep those chunks on a list and start a thread only after the previous one has finished:

    import threading
    import time
    
    def putrecord(chunk):
        time.sleep(0.8) #it takes time to put record
        for data_put in chunk:
            print(data_put[-1])
    
    def main():
        data=[]
        chunks = []
        sub = None
        for i in range(100):
            time.sleep(0.08) #getting data from a sensor
            data.append(i)
            if len(data) >= 10: #every 10 records
                chunks.append(data)
                data = []
                if sub and  not sub.is_alive():
                    sub.join(0)
                    sub = None
                if not sub:
                    sub = threading.Thread(target=putrecord,args=(chunks,))
                    sub.start()
                    chunks = []
        if sub:
            sub.join()
    
    if __name__ == '__main__':
        main()
    

    If you actually don't need to process the data in chunks, you can feed whatever data you got from the sensor since the previous thread was started:

    import threading
    import time
    
    def putrecord(data_put):
        time.sleep(0.8) #it takes time to put record
        print(data_put, data_put[-1]) #sometimes unordered (ex. 9,29,19,...)
    
    def main():
        data_put=[]
        # sub = threading.Thread(target=putrecord,args=(data_put,))
        sub = None
        for i in range(100):
            time.sleep(0.01) #getting data from a sensor
            data_put.append(i)
            if len(data_put) >= 10: #every 10 records
                if sub and  not sub.is_alive():
                    sub.join(0)
                    sub = None
                if not sub:
                    sub = threading.Thread(target=putrecord,args=(data_put,))
                    sub.start()
                    data_put=[]
        if sub:
            sub.join()
    
    if __name__ == '__main__':
        main()
    

    Please note that in both approaches you can miss the final data: you have to check if there was data waiting to be written and take care of it.