Read subprocess stdout and stderr concurrently
I'm trying to run a lengthy command within Python that outputs to both stdout and stderr. I'd like to poll the subprocess and write the output to separate files.
I tried the following, based on this answer Non-blocking read on a subprocess.PIPE in python
import subprocess
from Queue import Queue, Empty
from threading import Thread
def send_cmd(cmd, shell=False):
"""
Send cmd to the shell
"""
if not isinstance(cmd, list): cmd = shlex.split(cmd)
params = {'args' : cmd,
'stdout' : subprocess.PIPE,
'stderr' : subprocess.PIPE,
'shell' : shell}
proc = subprocess.Popen(**params)
return proc
def monitor_command(process, stdout_log=os.devnull, stderr_log=os.devnull):
"""
Monitor the process that is running, and log it if desired
"""
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
def setup_process(log_name, proc):
FID = open(log_name, 'w')
queue = Queue()
thread = Thread(target=enqueue_output, args=(proc, queue))
thread.daemon = True # Thread dies with program
thread.start()
return (queue, FID)
def check_queues(queue_list, errors):
for queue, FID in queue_list:
try:
line = queue.get_nowait()
if 'error' in line.lower() or 'failed' in line.lower():
errors.append(line)
except Empty:
pass
else:
FID.write(line)
errors = []
queue_list = []
for log, proc in [(stdout_log, process.stdout), (stderr_log, process.stderr)]:
queue_list.append(setup_process(log, proc)
while process.poll() is None:
check_queues(queue_list, errors)
while not queue_list[0][0].empty() or queue_list[1][0].empty():
check_queues(queue_list, errors)
for queue, FID in queue_list:
FID.close()
return errors
process = send_cmd('long_program.exe')
errors = monitor_command(process, stdout_log='stdout.log', stderr_log='stderr.log')
But it the output file for stdout is empty, and the output file for stderr is only a few lines long, whereas both should be quite large.
What am I missing?
I did that once.. here is some old code I wrote
class Process_Communicator():
def join(self):
self.te.join()
self.to.join()
self.running = False
self.aggregator.join()
self.ti.join()
def enqueue_in(self):
while self.running and self.p.stdin is not None:
while not self.stdin_queue.empty():
s = self.stdin_queue.get()
self.p.stdin.write(str(s) + 'nr')
pass
def enqueue_output(self):
if not self.p.stdout or self.p.stdout.closed:
return
out = self.p.stdout
for line in iter(out.readline, b''):
self.qo.put(line)
# out.flush()
def enqueue_err(self):
if not self.p.stderr or self.p.stderr.closed:
return
err = self.p.stderr
for line in iter(err.readline, b''):
self.qe.put(line)
def aggregate(self):
while (self.running):
self.update()
self.update()
def update(self):
line = ""
try:
while self.qe.not_empty:
line = self.qe.get_nowait() # or q.get(timeout=.1)
self.unbblocked_err += line
except Queue.Empty:
pass
line = ""
try:
while self.qo.not_empty:
line = self.qo.get_nowait() # or q.get(timeout=.1)
self.unbblocked_out += line
except Queue.Empty:
pass
while not self.stdin_queue.empty():
s = self.stdin_queue.get()
self.p.stdin.write(str(s))
def get_stdout(self, clear=True):
ret = self.unbblocked_out
if clear:
self.unbblocked_out = ""
return ret
def has_stdout(self):
ret = self.get_stdout(False)
if ret == '':
return None
else:
return ret
def get_stderr(self, clear=True):
ret = self.unbblocked_out
if clear:
self.unbblocked_out = ""
return ret
def has_stderr(self):
ret = self.get_stdout(False)
if ret == '':
return None
else:
return ret
def __init__(self, subp):
'''This is a simple class that collects and aggregates the
output from a subprocess so that you can more reliably use
the class without having to block for subprocess.communicate.'''
self.p = subp
self.unbblocked_out = ""
self.unbblocked_err = ""
self.running = True
self.qo = Queue.Queue()
self.to = threading.Thread(name="out_read",
target=self.enqueue_output,
args=())
self.to.daemon = True # thread dies with the program
self.to.start()
self.qe = Queue.Queue()
self.te = threading.Thread(name="err_read",
target=self.enqueue_err,
args=())
self.te.daemon = True # thread dies with the program
self.te.start()
self.stdin_queue = Queue.Queue()
self.aggregator = threading.Thread(name="aggregate",
target=self.aggregate,
args=())
self.aggregator.daemon = True # thread dies with the program
self.aggregator.start()
pass
You may not need the whole example, but feel free to cut copy and paste what you need. It's also important to show how I did the threading.
The code looks more complicated than the task requires. I don't see why do you need to call process.poll()
or queue.get_nowait()
here. To deliver subprocess' stdout/stderr to several sinks; you could start with teed_call()
that accepts arbitrary file-like objects: you could pass logfiles and special file-like objects that accumulates errors
in theirs .write()
methods.
To fix your code with minimal changes; you should call .join()
on the reader threads (even if process.poll()
is not None
ie, the subprocess exited; there could be some pending output. Joining reader's threads ensures that all output is read).
上一篇: python作为“批处理”脚本(即从python运行命令)
下一篇: 同时读取子进程标准输出和标准错误