python 2.7 - subprocess.Popen works outside but not inside ipyparallel? -


i'm trying parallelize code here using ipyparallel. in short, can make functions work fine outside of apply_sync(), can't seem them work inside (i swear had working earlier, can't find version of code isn't broken). simple example:

def test3(fname = '7_1197_.txt'):     import subprocess     command = 'touch data/sentiment/' + fname + '.test'     child = subprocess.popen(command, shell=true, stdout=subprocess.pipe)     while true:         out = child.stdout.read(1)         if out == '' , child.poll() != none:             return  test3() #this works, creates file .test extention results = view.map_sync(test3, speeches) #this doesn't work. no files created. 

here's short version of function i'm going use. works fine on own. in apply_sync() spins java processes according htop, doesn't seem processes.

def test2(fname = '7_1197_.txt'):     import subprocess      settings = ' -mx5g edu.stanford.nlp.sentiment.sentimentpipeline'     inputfile = ' -file data/sentiment/' + fname     command = 'java ' + settings + inputfile     child = subprocess.popen(command, shell=true, stdout=subprocess.pipe)     results = []     while true:         out = child.stdout.read(1)         if out == '' , child.poll() != none:             return ''.join(results)         if out != '':             results.extend(out) test2() #works fine, produces output results = view.map_sync(test2, speeches) #doesn't work: results empty strings. 

i tried version return command variable. commands sent popen fine, , work when pasted manually in command line. thought maybe issue piping, changing command redirect output files ' > '+fname+'.out' doesn't work inside apply_sync() call either (no output files produced).

how should doing stdout system calls back?

i see 2 potential gotchas. 1 blocking, 1 missing files. missing files, should make sure engines , local session in same working directory, or make sure use absolute paths. quick way synchronize paths locally , remotely:

client[:].apply_sync(os.chdir, os.getcwd()) 

that says: local cwd, call os.chdir everywhere, share same working directory. quick shortcut if in ipython session is:

%px cd {os.getcwd()} 

as blocking, first thought is: perhaps using python 3 when running in parallel? if so, child.stdout.read returns bytes not text. in python 2, str bytes, out == '' work, in python 3, condition out == '' never true because b'' != u'', , function never return.

some more useful bits of info:

  1. stdout.read(n) read up to number of bytes, , truncate if output complete. useful because read(1) loop many times, if output waiting read.
  2. stdout.read() return empty bytestring if output finished, need check that, not child.poll() before returning. (this true long haven't set nowait on fd, advanced usage).
  3. if want see partial output before function returns, can redisplay output on sys.stdout, , see partial outputs in ipython without waiting final result.

so here couple of implementations of function, different goals.

the first 1 appears accomplish current goal using popen.communicate, simplest choice if don't want partial output and/or have nothing in function wile waiting output:

def simple(fname = '7_1197_.txt'):     import subprocess     command = 'echo "{0}" && touch -v data/sentiment/{0}.test'.format(fname)     child = subprocess.popen(command, shell=true, stdout=subprocess.pipe)     # if aren't doing partial outputs,     # child.communicate() of our waiting/capturing us:     out, err = child.communicate()     return out 

(it might useful include stderr capturing well, stderr=subprocess.pipe or merge stderr stdout stderr=subprocess.stdout).

here's example, collecting stderr stdout, , reading in chunks:

def chunked(fname = '7_1197_.txt'):     import subprocess     command = 'echo "{0}" && touch data/sentiment/{0}.test'.format(fname)     child = subprocess.popen(command, shell=true,                              stdout=subprocess.pipe,                              stderr=subprocess.stdout,                             )     chunks = []     while true:         chunk = child.stdout.read(80) # read 1 line @ time         if chunk:             chunks.append(chunk)             continue         else:             # read return empty bytestring when output finished             break     return b''.join(chunks) 

note can use if not chunk condition determine when output finished, rather if chunk == '', since empty bytestrings falsy. if aren't doing partial output, there's no reason use instead of simpler .communicate() version above.

finally, here's version can use ipython that, instead of capturing , returning output, redisplays it, can use display partial output in client:

def chunked_redisplayed(fname = '7_1197_.txt'):     import sys, subprocess     command = 'for in {{1..20}}; echo "{0}"; sleep 0.25; done'.format(fname)     child = subprocess.popen(command, shell=true,                              stdout=subprocess.pipe,                              stderr=subprocess.stdout,                             )     while true:         chunk = child.stdout.read(80) # read 1 line @ time         if chunk:             sys.stdout.write(chunk.decode('utf8', 'replace'))             continue         else:             # read return empty bytestring when output finished             break 

in client, if use map_async instead of map_sync, can check on result.stdout, list of stdout-streams so far, can check on progress:

amr = view.map_async(chunked_redisplayed, speeches) amr.stdout # list of stdout text, updated in background output produced amr.wait_interactive() # waits , shows progress amr.get() # waits , returns actual result 

Comments