Search code examples
pythonmultithreadingparallel-processingsaving-data

Writing data to disc in seperate thread (in parallel)


I would like to start a function multiple times in a loop that each time acquire and image from a camera and writes the image to disc without the loop waiting for this process to finish. So every time this function is called it runs in parallel with the loop that started the function such that I can continue doing other time sensitive stuff in the meantime.

I have made this example which make the first "execution" of the function run in parallel with the loop and then fails the second time as I cannot .start() it twice. Can this be achieved by other means?

Example (original post - updated below)

import numpy as np
import threading
import time

def imacq():
    print('acquiring image...')
    time.sleep(1.8)
    print('saved image...')
    return

# Start image acqusition and writing to disc thread
imacq_thread = threading.Thread(target=imacq)

starttime = time.time()
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 5
running = True
flag = True
for cycles in range(1,20):
    print(cycles)
    if cycles%image_cycles == 0:
        if flag is True:
            imacq_thread.start() # this works well the first time as intended
            # imacq() # this does not work as everything is paused until imacp() returns
            flag = False
    else:
        flag = True
    time.sleep(0.4)

EDIT: After feedback from Sylvaus: I have made two different versions for triggering a function that eventually will be used to acquire and store and image on the drive in parallel with a main script that decides the time to send a trigger/execute the function. One version is based on Sylvaus' answer (threading) and another is based on multiprocessing.

Example based on Sylvaus's answer (Threading):

import matplotlib.pyplot as plt
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor


def imacq():
    print('taking image')
    n = 10000
    np.ones((n, n))*np.ones((n, n))  # calculations taking time
    print('saving image')
    return


sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 20
max_cycles = 100
freq = 10
cycles = 1
sigSign = 1

running = True
flag = True
timeinc = []
tic = time.time()
tic2 = tic
timeinc = np.zeros(max_cycles)
starttime = time.time()
with ThreadPoolExecutor() as executor:
    while running:
        t = time.time()-starttime
        tim_arr[:-1] = tim_arr[1:]
        tim_arr[-1] = t
        signal = np.sin(freq*t*(2.0*np.pi))
        sig_arr[:-1] = sig_arr[1:]
        sig_arr[-1] = signal

        time.sleep(0.00001)
        # Calculate cycle number
        sigSignOld = sigSign
        sigSign = np.sign(sig_arr[-1]-sig_arr[-2])
        if sigSign == 1 and sigSignOld != sigSign:
            timeinc[cycles] = time.time()-tic
            cycles += 1
            print('cycles: ', cycles, ' time inc.: ', str(timeinc[cycles-1]))
            tic = time.time()

        if cycles%image_cycles == 0:
            if flag is True:
                # The function is submitted and will be processed by a
                # a thread as soon as one is available
                executor.submit(imacq)
                flag = False
        else:
            flag = True
        if cycles >= max_cycles:
            running = False

print('total time: ', time.time()-tic2)

fig = plt.figure()
ax = plt.axes()
plt.plot(timeinc)

Example based on multiprocessing:

import matplotlib.pyplot as plt
import numpy as np
import time
from multiprocessing import Process, Value, Lock


def trig_resp(running, trigger, p_count, pt, lock):
    while running.value == 1:  # note ".value" on each sharedctype variable
        time.sleep(0.0001)  # sleeping in order not to load CPU too excessively
        if trigger.value == 1:
            with lock:  # lock "global" variable before wrtting to it
                trigger.value = 0  # reset trigger
            tic = time.time()
            # Do a calculation that takes a significant time
            n = 10000; np.ones((n, n))*np.ones((n, n))
            with lock:
                pt.value = time.time() - tic  # calculate process time
                p_count.value += 1  # count number of finished processes
    return


if __name__ == "__main__":
    # initialize shared values (global accross processes/sharedctype).
    # Type 'i': integer, type 'd': double.
    trigger = Value('i', 0)  # used to trigger execution placed in trig_resp()
    running = Value('i', 1)  # A way to break the loop in trig_resp()
    p_count = Value('i', 0)  # process counter and flag that process is done
    pt = Value('d', 0.0)  # process time of latest finished process
    lock = Lock() # lock object used to avoid raise conditions when changing "global" values.
    p_count_old = p_count.value
    p1 = Process(target=trig_resp, args=(running, trigger, p_count, pt, lock))
    p1.start()  # Start process

    # A "simulated" sinusiodal signal
    array_len = 50
    sig_arr = np.zeros(array_len)  # Signal array
    tim_arr = np.zeros(array_len)  # Correpsonding time array
    freq = 10  # frequency of signal

    # trigger settings
    im_int = 20  # cycle interval for triggering (acquiring images)
    max_cycles = 100  # max number of cycles before stopping main

    # initializing counters etc.
    cycles = 1  # number of cycles counted
    sigSign = 1  # sign of signal gradient
    flag = 1  # used to only set trigger once for the current cycle count
    trigger_count = 0  # counts how many times a trigger has been set

    tic = time.time()
    tic2 = tic
    timeinc = np.zeros(max_cycles) # Array to keep track of time used for each main loop run
    starttime = time.time()
    while running.value == 1:
        time.sleep(0.00001)  # mimics sample time (real world signal)
        t = time.time()-starttime  # local time
        signal = np.sin(freq*t*(2.0*np.pi))  # simulated signal
        # Keeping the latest array_len values (FIFO) of t and signal.
        tim_arr[:-1] = tim_arr[1:]
        tim_arr[-1] = t
        sig_arr[:-1] = sig_arr[1:]
        sig_arr[-1] = signal

        if p_count.value == p_count_old + 1:  # process have finished
            print('Process counter: ', p_count.value,  'process_time: ', pt.value)
            p_count_old = p_count.value

        # Calculate cycle number by monotoring sign of the gradient
        sigSignOld = sigSign  # Keeping track of previous signal gradient sign
        sigSign = np.sign(sig_arr[-1]-sig_arr[-2])  # current gradient sign
        if sigSign == 1 and sigSignOld == -1:  # a local minimum just happened
            timeinc[cycles] = time.time()-tic
            cycles += 1
            print('cycles: ', cycles, ' time inc.: ', str(timeinc[cycles-1]))
            tic = time.time()
            flag = 1

        if cycles % im_int == 0 and flag == 1:
            if cycles > 0:
                if trigger_count > p_count.value:
                    print('WARNING: Process: ', p_count.value,
                          'did not finish yet. Reduce freq or increase im_int')
                trigger.value = 1
                trigger_count += 1
                print('Trigger number: ', trigger_count)
                flag = 0

        if cycles >= max_cycles:
            running.value = 0

    print('total cycle time: ', time.time()-tic2)

    # Print the process time of the last run
    if p_count.value < max_cycles//im_int:
        if p_count.value == p_count_old + 1:
            print('process counter: ', p_count.value,  'process_time: ', pt.value)
            p_count_old = p_count.value

    print('total process time: ', time.time()-tic2)

    fig = plt.figure()
    ax = plt.axes()
    plt.plot(timeinc)

I am on a windows 10 laptop so the timing (time increment in each loop of the main while loop "while running...:") is dependent on what else is happening on my computer, but the version based on multiprocessing seems less sensitive to this than the one based on threading. However the one based on multiprocessing is not very elegant and I am suspecting that a smarter solution is possible (simpler and less easy to make a mistake) that can achieve the same or better (consistent time increments with lower load on the CPU).

I have attached graphs of the time increments I get here for the Multiprocess and Threading example, respectively here: Multiprocess example Threading example

Any feedback on improving the two solutions is much appreciated.


Solution

  • The details of your acquisition devices, data rates and volumes don't seem to be very clear but I get the impression that the issue is that you want to acquire one signal as fast as possible and want to get an image captured and written to disk as soon as possible whenever that signal is "interesting" but without delaying the next acquisition of the signal.

    So, it seems there is minimal data exchange necessary between the main signal acquisition process and the image capture process. IMHO, that suggests multiprocessing (therefore no GIL) and use of a queue (no large volumes of data to pickle) to communicate between the two processes.

    So, I would be looking at this type of setup:

    #!/usr/bin/env python3
    
    from multiprocessing import Process, Queue, freeze_support
    
    def ImageCapture(queue):
        while True:
            # Wait till told to capture image - message could contain event reference number
            item = queue.get()
            if item == -1:
               break
            # Capture image and save to disk
    
    def main():
        # Create queue to send image capture requests on
        queue = Queue(8)
    
        # Start image acquisition process
        p = Process(target=ImageCapture, args=(queue,))
        p.start()
    
        # do forever
        #    acquire from DAQ
        #    if interesting
        #       queue.put(event reference number or filename)
    
        # Stop image acquisition process
        queue.put(-1)
        p.join()
    
    if __name__ == "__main__":
    
        # Some Windows thing
        freeze_support()
        main()
    

    If the ImageCapture() process can't keep up, start two or more.

    On my Mac, I measured a mean message delivery time on a queue of 32 microseconds, and a maximum latency of 120 microseconds over 1 million messages.