同时读取子进程标准输出和标准错误
我试图在Python中运行一个冗长的命令,输出到stdout和stderr。 我想轮询子进程并将输出写入单独的文件。
我尝试了以下,基于这个答案非阻塞读取python中的subprocess.PIPE
import subprocess
from Queue import Queue, Empty
from threading import Thread
def send_cmd(cmd, shell=False):
"""
Send cmd to the shell
"""
if not isinstance(cmd, list): cmd = shlex.split(cmd)
params = {'args' : cmd,
'stdout' : subprocess.PIPE,
'stderr' : subprocess.PIPE,
'shell' : shell}
proc = subprocess.Popen(**params)
return proc
def monitor_command(process, stdout_log=os.devnull, stderr_log=os.devnull):
"""
Monitor the process that is running, and log it if desired
"""
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
def setup_process(log_name, proc):
FID = open(log_name, 'w')
queue = Queue()
thread = Thread(target=enqueue_output, args=(proc, queue))
thread.daemon = True # Thread dies with program
thread.start()
return (queue, FID)
def check_queues(queue_list, errors):
for queue, FID in queue_list:
try:
line = queue.get_nowait()
if 'error' in line.lower() or 'failed' in line.lower():
errors.append(line)
except Empty:
pass
else:
FID.write(line)
errors = []
queue_list = []
for log, proc in [(stdout_log, process.stdout), (stderr_log, process.stderr)]:
queue_list.append(setup_process(log, proc)
while process.poll() is None:
check_queues(queue_list, errors)
while not queue_list[0][0].empty() or queue_list[1][0].empty():
check_queues(queue_list, errors)
for queue, FID in queue_list:
FID.close()
return errors
process = send_cmd('long_program.exe')
errors = monitor_command(process, stdout_log='stdout.log', stderr_log='stderr.log')
但是stdout的输出文件是空的,stderr的输出文件只有几行,而两者都应该很大。
我错过了什么?
我曾经这样做过......这是我写的一些旧代码
class Process_Communicator():
def join(self):
self.te.join()
self.to.join()
self.running = False
self.aggregator.join()
self.ti.join()
def enqueue_in(self):
while self.running and self.p.stdin is not None:
while not self.stdin_queue.empty():
s = self.stdin_queue.get()
self.p.stdin.write(str(s) + 'nr')
pass
def enqueue_output(self):
if not self.p.stdout or self.p.stdout.closed:
return
out = self.p.stdout
for line in iter(out.readline, b''):
self.qo.put(line)
# out.flush()
def enqueue_err(self):
if not self.p.stderr or self.p.stderr.closed:
return
err = self.p.stderr
for line in iter(err.readline, b''):
self.qe.put(line)
def aggregate(self):
while (self.running):
self.update()
self.update()
def update(self):
line = ""
try:
while self.qe.not_empty:
line = self.qe.get_nowait() # or q.get(timeout=.1)
self.unbblocked_err += line
except Queue.Empty:
pass
line = ""
try:
while self.qo.not_empty:
line = self.qo.get_nowait() # or q.get(timeout=.1)
self.unbblocked_out += line
except Queue.Empty:
pass
while not self.stdin_queue.empty():
s = self.stdin_queue.get()
self.p.stdin.write(str(s))
def get_stdout(self, clear=True):
ret = self.unbblocked_out
if clear:
self.unbblocked_out = ""
return ret
def has_stdout(self):
ret = self.get_stdout(False)
if ret == '':
return None
else:
return ret
def get_stderr(self, clear=True):
ret = self.unbblocked_out
if clear:
self.unbblocked_out = ""
return ret
def has_stderr(self):
ret = self.get_stdout(False)
if ret == '':
return None
else:
return ret
def __init__(self, subp):
'''This is a simple class that collects and aggregates the
output from a subprocess so that you can more reliably use
the class without having to block for subprocess.communicate.'''
self.p = subp
self.unbblocked_out = ""
self.unbblocked_err = ""
self.running = True
self.qo = Queue.Queue()
self.to = threading.Thread(name="out_read",
target=self.enqueue_output,
args=())
self.to.daemon = True # thread dies with the program
self.to.start()
self.qe = Queue.Queue()
self.te = threading.Thread(name="err_read",
target=self.enqueue_err,
args=())
self.te.daemon = True # thread dies with the program
self.te.start()
self.stdin_queue = Queue.Queue()
self.aggregator = threading.Thread(name="aggregate",
target=self.aggregate,
args=())
self.aggregator.daemon = True # thread dies with the program
self.aggregator.start()
pass
您可能不需要整个示例,但可以随意剪切并粘贴所需的内容。 显示我是如何进行线程处理也很重要。
代码看起来比任务要求更复杂。 我不明白为什么你需要在这里调用process.poll()
或queue.get_nowait()
。 将子进程的stdout / stderr传递给多个接收器; 你可以用接受任意类文件对象的teed_call()
开始:你可以传递日志文件和特殊的文件类对象,这些对象在他们的.write()
方法中累积errors
。
用最少的更改修复你的代码; 你应该在读线程中调用.join()
(即使process.poll()
不是None
也就是说,子process.poll()
退出;可能有一些待处理的输出,连接读取器的线程确保读取所有输出。