Search code examples
pythontriggersnidaqmxdaq-mx

Retriggering without native support NI-DAQmx Python


I have a motorized stage, which outputs triggers at specific positions. For example, the stage moves from 0.0mm to 5.0mm and every 0.1mm it outputs a trigger (trigger at 0.0mm, 0.1mm, 0.2mm, ...etc.). The stage starts from a stopped position and therefore it has an acceleration phase until it reaches cruising speed. This means the triggers are not equally spaced in time. Without going into the details, during the acceleration phase, I have fewer triggers whose temporal inter-spacing decreases. Once at cruising speed, the triggers come more regularly (almost uniform temporal intervals), and the shortest amount of time between triggers is about 1ms.

My goal is to acquire 2-5 samples from the analog input channel AI0 (at maximum clock rate) at every trigger (rising edge) even if the triggers come only 1ms apart. I have wired the stage trigger to PFI0.

I have a working code for triggers that come with a minimum inter-trigger time of 17ms. If I require more positions, such that the inter-trigger time goes below those 17ms, my code skips some triggers and isn't able to complete.

import nidaqmx

with nidaqmx.Task() as task:
    clk_rate = 50e3

    task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
    task.timing.cfg_samp_clk_timing(rate=float(clk_rate), samps_per_chan=5)

    data = []

    for ii in range(10):
        task.triggers.start_trigger.cfg_dig_edge_start_trig("PFI0")
        data.append(task.read(number_of_samples_per_channel=5))

print(data)

I have read about retriggerable tasks in NI-DAQmx, and I am aware that USB-6002 isn't supporting those. However, the article also explains: "how to achieve similar functionality with previous generation DAQ devices using onboard counters". The idea being that the counter counts the triggers and that same counter can be used as a clock source for the analog input task. I have written another code to test the counter.

import nidaqmx
import time

with nidaqmx.Task() as task:
    task.ci_channels.add_ci_count_edges_chan("Dev1/ctr0")
    task.start()
    time.sleep(5)
    print(task.read())

And if I run my stage during the 5 seconds sleep time, and the stage completes its translation. Then I count the correct number of triggers, even if the spacing between triggers goes below 1ms, which is good enough for my application. My problem is that I do not know how to use this counter as the clock source for my AI task in Python. Can someone help me with that or point me to the relevant resources?

Thank you in advance for your help.


Solution

  • I asked the NI LabView community about this here, and it doesn't seem to be possible in LabView with USB-6002. I am assuming it won't be possible in Python either. I implemented the solution from ZYOng of the NI Community, that is to record the trigger as an additional analog input and then perform an offline processing to select the samples from my analog input of interest based on the analog input triggers. I split the trigger signal such that I can still use the very first trigger as a single digital trigger for the start of both tasks. Finally, I am using the threading module to start the triggering of USB-6002, while my code sends commands to the stage. The code is as follows. I created a function that will be threaded to start the DAQ trigger:

    import nidaqmx
    
    def arm_daq(data):
        with nidaqmx.Task() as task:
            task.ai_channels.add_ai_voltage_chan("Dev1/ai0:1")
            task.timing.cfg_samp_clk_timing(rate=25000, samps_per_chan=9000)
            task.triggers.start_trigger.cfg_dig_edge_start_trig("PFI0")
            data[:] = task.read(number_of_samples_per_channel=9000)
    

    Note that the rate should be 25000 samples per seconds because USB-6002 supports up to 50000 samples per seconds aggregated across all channels (two channels means maximum rate of 25000). In my main code, I start a thread with the arm_daq function and I pass it an array such that it can be modified in-place:

    import threading
    
    data = np.zeros((2, 9000))
    t1 = threading.Thread(target=arm_daq, args=(data,))
    
    # Send motor commands, which generates a trigger signal
    
    t1.join()
    

    Lastly, I used this post to perform the offline analysis and retrieve the analog input data based on the location of the triggers (on the analog input channel).

    I'm still not sure if all of this is really optimal or pythonic, but it works for me. Happy to hear some comments on this workflow.

    Take care,

    David