Search code examples
pythonpython-3.xsubprocessstdin

write to stdin and read from stdout on long-running child process in python


I have a long-running computational model that i wish to control, feed data to, and read data from using STDIN and STDOUT. Inside this external code, there's a control feedback loop that needs new data from STDIN every 100ms or so.

for this reason, subprocess.communicate() is not appropriate, since it waits for the process to finish. The process' estimated runtime is on the order of several weeks.

The code below doesn't work because control hangs on stdout.read() and never comes back.

What is the correct way to talk over STDIN and STDOUT ?

import subprocess as sb


class Foo:
    def process_output(self,values: str) -> ():
        """ gets 7 comma separated floats back from ADC.elf and returns them as a tuple of two vectors """
        floats = [float(f) for f in values.split(',') if values and f]
        # if len(floats) == 7:
        mag = (floats[0], floats[1], floats[2])
        quat = (floats[3], floats[4], floats[5], floats[6])
        return (mag, quat)

    def format_input(self,invals: {}) -> bytes:
        """ takes a dict of arrays and stuffs them into a comma-separated bytestring to send to ADC.elf with a trailing newline"""
        concat = lambda s, f: ''.join([f % x for x in s])
        retval = ''
        retval += concat(invals['mag_meas'], '%3.2f,')
        retval += concat(invals['euler_angle'], '%3.2f,')
        retval += concat(invals['sun_meas'], '%3.2f,')
        retval += concat(invals['epoch'], '%02.0f,')
        retval += concat(invals['lla'], '%3.2f,')
        retval += concat([invals['s_flag']], '%1.0f,')
        retval = retval[:-1]
        retval += '\n'
        return retval.encode('utf-8')

    def page(self,input: {}) -> ():
        """ send a bytestring with 19 floats to ADC.elf.  Process the returned 7 floats into a data struture"""
        formatted = self.format_input(input)
        self.pid.stdin.write(formatted)
        response = self.pid.stdout.read()

        return self.process_output(response.decode())

    def __init__(self):
        """ start the long-running process ADC.elf that waits for input and performs some scientific computation"""
        self.pid = sb.Popen(args=['./ADC.elf'], stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE)

    def exit(self):
        """ send SIGTERM to ADC.elf"""
        self.pid.terminate()



if __name__ == "__main__":
    # some dummy data
    testData = {'mag_meas': [1, 2, 3],
                'euler_angle': [4, 5, 6],
                'sun_meas': [7, 8, 9],
                'epoch': [0, 1, 2, 3, 4, 5],
                'lla': [6, 7, 8],
                's_flag': 9
                }
    #initialize
    runner = Foo()
    # send and receive once.
    result = runner.page(testData)
    print(result)
    #clean up
    runner.exit()

Solution

  • No idea how to do this with subprocess directly, but pexpect did exactly the right thing:

    import pexpect, os
    from time import sleep
    
    class Foo:
        def process_output(self,values: str) -> ():
            floats = [float(f) for f in values.split(',') if values and f]
            # if len(floats) == 7:
            mag = (floats[0], floats[1], floats[2])
            quat = (floats[3], floats[4], floats[5], floats[6])
            return (mag, quat)
    
        def format_input(self,invals: {}) -> bytes:
            concat = lambda s, f: ''.join([f % x for x in s])
            retval = ''
            retval += concat(invals['mag_meas'], '%3.2f,')
            retval += concat(invals['euler_angle'], '%3.2f,')
            retval += concat(invals['sun_meas'], '%3.2f,')
            retval += concat(invals['epoch'], '%02.0f,')
            retval += concat(invals['lla'], '%3.2f,')
            retval += concat([invals['s_flag']], '%1.0f,')
            retval = retval[:-1]
            retval += '\n'
            return retval.encode('utf-8')
    
        def page(self,input: {}) -> ():
            formatted = self.format_input(input)
            self.pid.write(formatted)
            response = self.pid.readline()
    
            return self.process_output(response.decode())
    
        def __init__(self):
    
            self.pid = pexpect.spawn('./ADC.elf')
            self.pid.setecho(False)
    
        def exit(self):
            self.pid.terminate()
    
    
    
    if __name__ == "__main__":
        testData = {'mag_meas': [1, 2, 3],
                    'euler_angle': [4, 5, 6],
                    'sun_meas': [7, 8, 9],
                    'epoch': [0, 1, 2, 3, 4, 5],
                    'lla': [6, 7, 8],
                    's_flag': 9
                    }
        runner = Foo()
        i = 0
        while i < 100:
            result = runner.page(testData)
            print(result)
            i += 1
            sleep(.1)
    
    
    
        runner.exit()