Python - 合并多个进程的stdout和stderr


在开发中我们经常需要同时运行多个服务程序。以web开发为例,我们首先要启一个web server(django/flask/webpy/bottle等),一个或多个db server(mysql/mongodb/redis等),还可能用到一些watcher当源文件变化时自动编译(coffeescript/sass等)。如果这些程序运行在不同的terminal中,我们可能需要经常在这些terminal之间来回切换来查看它们的输出。因此我用python写了一个小的工具,用来在一个terminal中同时运行多个程序,合并它们的stdout和stderr。这个工具的工作原理很简单,启动时它读取一个配置文件并启动其中指定的若干个程序,之后它会捕获所有这些程序的输出,在每一行输出前加上对应程序的名字和进程ID,再输出到terminal上。它的代码加上帮助文档一共69行,麻雀虽小,却也用到了几个比较有意思的python模块,对初学python的朋友可以有一些帮助。以下是这个工具的完整代码

#!/usr/bin/env python
from argh import ArghParser, arg
import argparse
import yaml
import subprocess
import select
import sys
import os
import fcntl

def set_non_blocking(fd):
    fileno = fd.fileno()
    fl = fcntl.fcntl(fileno, fcntl.F_GETFL)
    fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK)

class MultiRunner(object):
    def __init__(self, cmd_objs):
        self.processes = {}
        self.cmd_objs = cmd_objs

    def start_process(self, name, command):
        proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout_prefix, stderr_prefix = '%s(%s)|OUT' % (name, proc.pid) ,  '%s(%s)|ERR' % (name, proc.pid)
        self.processes[proc.stdout] = (proc, stdout_prefix)
        self.processes[proc.stderr] = (proc, stderr_prefix)
        map(set_non_blocking, [proc.stdout, proc.stderr])

    def run(self):
        for obj in self.cmd_objs:
            assert obj.has_key('command'), "'command' parameter is required for every process."
            self.start_process(self.get_name(obj), obj['command'])

        while self.processes:
            rlist, _, _ = select.select(self.processes.keys(), [], [])
            for fd in rlist:
                proc, prefix = self.processes[fd]
                content = fd.read()
                if content:
                    lines = ['[%s] %s\n' % (prefix, line) for line in content.split('\n')]
                    sys.stdout.writelines(lines)
                if proc.poll() is not None:
                    self.processes.pop(fd, None)

    def get_name(self, obj):
        return obj.get('name', os.path.basename(obj['command'].split(' ')[0]))
        
@arg('--config', '-f', default='multirun.yml', help='Configuration file that specifies the commands to be executed.(default: multirun.conf)')
def main(args):
    """
    Launch multiple processes and manage all of their outputs together.
    Example of YAML-formatted config file
    ---
    name: proc1
    command: shell-command-to-start-proc1
    ---
    name: proc2
    command: shell-command-to-start-proc2
    """
    cmd_objs = yaml.load_all(open(args.config))
    runner = MultiRunner(cmd_objs)
    try:
        runner.run()
    except KeyboardInterrupt:
        pass

if __name__ == '__main__':
    parser = ArghParser(formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.set_default_command(main)
    parser.dispatch()

首先,这个工具用了argh来处理命令行参数,可以在入口函数前用多个@arg来指定多个命令行参数,并且使用入口函数的docstring来自动生成帮助,用起来比optparseargparse更方便。

配置文件使用了yaml格式,与xml,json等格式相比,yaml更简洁,有更好的可读性,比如一个dict(first_name='Ryan', last_name='Ye', description="I'm a software developer.\nLife is short, you should use Python."),转成yaml格式

first_name: Ryan
last_name: Ye
description: >
  I"m a software developer.
  Life is short, you should use Python.

接下去就是程序的主体部分,这里用到了subprocessselect两个模块,subprocess固名思义就是用来启动一个子进程,它的stdin, stdout和stderr是文件对象,stdin可以用write方法写入,stdout, stderr可以用read方法来读取。要同时监控若干个进程的输出有几种方法,最直观的方法是使用多线程,在每个线程中反复查询一个进程的输出,并对每一行输出加上进程名与进程ID后再用print输出。这个方法有一个问题,就是不同线程同时print时会造成最后的输出结果混乱。解决的方法是,把所有的输出写到一个Queue中,再由一个单独的线程从Queue中取出并print

这里并没有使用多线程模型,而是用了selectselect方法的输入一堆文件对像,当其中的一个或是多个可读或可写时select函数就会返回,返回值就是所有可读写的文件对象。通过循环反复调用select就可以实现对多个文件的监控。特别要注意的是,这里必须把所有的文件读写模式设为非阻塞式(non-blocking I/O),不然调用read方法时会一直等到相应的子进程退出才返回。各个不同的操作系统都实现了一套高效的select机制,详细的使用方法可以参见epoll, kqueue, FSEvents等模块。基于select机制实现并发的模型,我们也常称为消息循环模型(event loop)。

多线程模型和消息循环模型是在网络并发中常用的两个模型,消息循环模型由于没有线程切换的开销,所以一般来说性能上更占优。但由于它使用非阻塞式的I/O,所有的I/O操作都必须使用回调函数,使代码更复杂,影响可读性。这些年红红火火的gevent就是取两者之长,在最底层使用select机制来监控所有的I/O,根据I/O事件来调度coroutine,实现了接近于消息循环模型的性能,而应用程序的代码又可以用与多线程模型相同的方式来书写。