i'm trying parallelize code here using ipyparallel
. in short, can make functions work fine outside of apply_sync()
, can't seem them work inside (i swear had working earlier, can't find version of code isn't broken). simple example:
def test3(fname = '7_1197_.txt'): import subprocess command = 'touch data/sentiment/' + fname + '.test' child = subprocess.popen(command, shell=true, stdout=subprocess.pipe) while true: out = child.stdout.read(1) if out == '' , child.poll() != none: return test3() #this works, creates file .test extention results = view.map_sync(test3, speeches) #this doesn't work. no files created.
here's short version of function i'm going use. works fine on own. in apply_sync()
spins java
processes according htop
, doesn't seem processes.
def test2(fname = '7_1197_.txt'): import subprocess settings = ' -mx5g edu.stanford.nlp.sentiment.sentimentpipeline' inputfile = ' -file data/sentiment/' + fname command = 'java ' + settings + inputfile child = subprocess.popen(command, shell=true, stdout=subprocess.pipe) results = [] while true: out = child.stdout.read(1) if out == '' , child.poll() != none: return ''.join(results) if out != '': results.extend(out) test2() #works fine, produces output results = view.map_sync(test2, speeches) #doesn't work: results empty strings.
i tried version return command variable. commands sent popen
fine, , work when pasted manually in command line. thought maybe issue piping, changing command redirect output files ' > '+fname+'.out'
doesn't work inside apply_sync()
call either (no output files produced).
how should doing stdout
system calls back?
i see 2 potential gotchas. 1 blocking, 1 missing files. missing files, should make sure engines , local session in same working directory, or make sure use absolute paths. quick way synchronize paths locally , remotely:
client[:].apply_sync(os.chdir, os.getcwd())
that says: local cwd, call os.chdir
everywhere, share same working directory. quick shortcut if in ipython session is:
%px cd {os.getcwd()}
as blocking, first thought is: perhaps using python 3 when running in parallel? if so, child.stdout.read
returns bytes not text. in python 2, str bytes
, out == ''
work, in python 3, condition out == ''
never true because b'' != u''
, , function never return.
some more useful bits of info:
stdout.read(n)
read up to number of bytes, , truncate if output complete. useful becauseread(1)
loop many times, if output waiting read.stdout.read()
return empty bytestring if output finished, need check that, notchild.poll()
before returning. (this true long haven't set nowait on fd, advanced usage).- if want see partial output before function returns, can redisplay output on sys.stdout, , see partial outputs in ipython without waiting final result.
so here couple of implementations of function, different goals.
the first 1 appears accomplish current goal using popen.communicate, simplest choice if don't want partial output and/or have nothing in function wile waiting output:
def simple(fname = '7_1197_.txt'): import subprocess command = 'echo "{0}" && touch -v data/sentiment/{0}.test'.format(fname) child = subprocess.popen(command, shell=true, stdout=subprocess.pipe) # if aren't doing partial outputs, # child.communicate() of our waiting/capturing us: out, err = child.communicate() return out
(it might useful include stderr capturing well, stderr=subprocess.pipe
or merge stderr stdout stderr=subprocess.stdout
).
here's example, collecting stderr stdout, , reading in chunks:
def chunked(fname = '7_1197_.txt'): import subprocess command = 'echo "{0}" && touch data/sentiment/{0}.test'.format(fname) child = subprocess.popen(command, shell=true, stdout=subprocess.pipe, stderr=subprocess.stdout, ) chunks = [] while true: chunk = child.stdout.read(80) # read 1 line @ time if chunk: chunks.append(chunk) continue else: # read return empty bytestring when output finished break return b''.join(chunks)
note can use if not chunk
condition determine when output finished, rather if chunk == ''
, since empty bytestrings falsy. if aren't doing partial output, there's no reason use instead of simpler .communicate()
version above.
finally, here's version can use ipython that, instead of capturing , returning output, redisplays it, can use display partial output in client:
def chunked_redisplayed(fname = '7_1197_.txt'): import sys, subprocess command = 'for in {{1..20}}; echo "{0}"; sleep 0.25; done'.format(fname) child = subprocess.popen(command, shell=true, stdout=subprocess.pipe, stderr=subprocess.stdout, ) while true: chunk = child.stdout.read(80) # read 1 line @ time if chunk: sys.stdout.write(chunk.decode('utf8', 'replace')) continue else: # read return empty bytestring when output finished break
in client, if use map_async
instead of map_sync
, can check on result.stdout
, list of stdout-streams so far, can check on progress:
amr = view.map_async(chunked_redisplayed, speeches) amr.stdout # list of stdout text, updated in background output produced amr.wait_interactive() # waits , shows progress amr.get() # waits , returns actual result
Comments
Post a Comment