Search code examples
python-2.7subprocessstanford-nlpipython-parallel

subprocess.Popen works outside but not inside ipyparallel?


I'm trying to parallelize some code from here using ipyparallel. In short, I can make functions that work fine outside of apply_sync(), but I can't seem to get them to work inside it (I swear I had this working earlier, but I can't find a version of the code that isn't broken). A simple example:

def test3(fname = '7_1197_.txt'):
    import subprocess
    command = 'touch data/sentiment/' + fname + '.test'
    child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
    while True:
        out = child.stdout.read(1)
        if out == '' and child.poll() != None:
            return 
test3() #this works, creates a file with the .test extention
results = view.map_sync(test3, speeches) #this doesn't work. No files created.

Here's a short version of the function I'm actually going to use. It works fine on its own. In apply_sync() it spins up java processes according to htop, but it doesn't seem to get anything back from those processes.

def test2(fname = '7_1197_.txt'):
    import subprocess

    settings = ' -mx5g edu.stanford.nlp.sentiment.SentimentPipeline'
    inputFile = ' -file data/sentiment/' + fname
    command = 'java ' + settings + inputFile
    child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
    results = []
    while True:
        out = child.stdout.read(1)
        if out == '' and child.poll() != None:
            return ''.join(results)
        if out != '':
            results.extend(out)
test2() #Works fine, produces output
results = view.map_sync(test2, speeches) #Doesn't work: the results are empty strings.

I tried a version where I return the command variable. The commands sent to Popen are fine, and they work when pasted manually in the command line. I thought maybe it was just an issue with piping, but changing the command to redirect the output to files with ' > '+fname+'.out' doesn't work inside the apply_sync() call either (no output files are produced).

How should I be doing this so I get the stdout from the system calls back?


Solution

  • I see two potential gotchas. One for the blocking, one for the missing files. For the missing files, you should make sure that your engines and your local session are in the same working directory, or make sure to use absolute paths. A quick way to synchronize paths locally and remotely:

    client[:].apply_sync(os.chdir, os.getcwd())
    

    That says: get the local cwd, then call os.chdir everywhere, so that we all share the same working directory. A quick shortcut for this if you are in an IPython session is:

    %px cd {os.getcwd()}
    

    As for the blocking, my first thought is: are you perhaps using Python 3 when running in parallel? If so, child.stdout.read returns bytes not text. In Python 2, str is bytes, so out == '' will work, but in Python 3, the condition out == '' will never be true because b'' != u'', and your function will never return.

    Some more useful bits of info:

    1. stdout.read(N) will read up to that number of bytes, and truncate if the output is complete. This is useful because read(1) will loop many times, even if the output is all waiting to be read.
    2. stdout.read() will only return an empty bytestring if output is finished, so you only need to check that, not child.poll() before returning. (this is true as long as you haven't set NOWAIT on the FD, which is some advanced usage).
    3. if you want to see partial output before the function returns, you can redisplay output on sys.stdout, and see the partial outputs in IPython without waiting for the final result.

    So here are a couple of implementations of your function, with different goals.

    The first one appears to accomplish your current goal using Popen.communicate, which is the simplest choice if you don't actually want to do anything with partial output and/or have nothing to do in the function wile you are waiting for output:

    def simple(fname = '7_1197_.txt'):
        import subprocess
        command = 'echo "{0}" && touch -v data/sentiment/{0}.test'.format(fname)
        child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
        # if we aren't doing anything with partial outputs,
        # child.communicate() does all of our waiting/capturing for us:
        out, err = child.communicate()
        return out
    

    (it might be useful to include stderr capturing as well, with stderr=subprocess.PIPE or merge stderr into stdout with stderr=subprocess.STDOUT).

    Here's another example, collecting stderr into stdout, and reading in chunks:

    def chunked(fname = '7_1197_.txt'):
        import subprocess
        command = 'echo "{0}" && touch data/sentiment/{0}.test'.format(fname)
        child = subprocess.Popen(command, shell=True,
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT,
                                )
        chunks = []
        while True:
            chunk = child.stdout.read(80) # read roughly one line at a time
            if chunk:
                chunks.append(chunk)
                continue
            else:
                # read will only return an empty bytestring when output is finished
                break
        return b''.join(chunks)
    

    Note that we can use the if not chunk condition to determine when output is finished, rather than if chunk == '', since empty bytestrings are Falsy. If we aren't doing something with the partial output, there's really no reason to use this instead of the simpler .communicate() version above.

    Finally, here's a version you can use with IPython that, instead of capturing and returning the output, redisplays it, which we can use to display partial output in the client:

    def chunked_redisplayed(fname = '7_1197_.txt'):
        import sys, subprocess
        command = 'for i in {{1..20}}; do echo "{0}"; sleep 0.25; done'.format(fname)
        child = subprocess.Popen(command, shell=True,
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT,
                                )
        while True:
            chunk = child.stdout.read(80) # read roughly one line at a time
            if chunk:
                sys.stdout.write(chunk.decode('utf8', 'replace'))
                continue
            else:
                # read will only return an empty bytestring when output is finished
                break
    

    In the client, if you use map_async instead of map_sync, you can check on result.stdout, which is a list of the stdout-streams so far, so you can check on the progress:

    amr = view.map_async(chunked_redisplayed, speeches)
    amr.stdout # list of stdout text, updated in the background as output is produced
    amr.wait_interactive() # waits and shows progress
    amr.get() # waits for and returns the actual result