在开发中我们经常需要同时运行多个服务程序。以web开发为例,我们首先要启一个web server(django/flask/webpy/bottle等),一个或多个db server(mysql/mongodb/redis等),还可能用到一些watcher当源文件变化时自动编译(coffeescript/sass等)。如果这些程序运行在不同的terminal中,我们可能需要经常在这些terminal之间来回切换来查看它们的输出。因此我用python写了一个小的工具,用来在一个terminal中同时运行多个程序,合并它们的stdout和stderr。这个工具的工作原理很简单,启动时它读取一个配置文件并启动其中指定的若干个程序,之后它会捕获所有这些程序的输出,在每一行输出前加上对应程序的名字和进程ID,再输出到terminal上。它的代码加上帮助文档一共69行,麻雀虽小,却也用到了几个比较有意思的python模块,对初学python的朋友可以有一些帮助。以下是这个工具的完整代码
#!/usr/bin/env python
from argh import ArghParser, arg
import argparse
import yaml
import subprocess
import select
import sys
import os
import fcntl
def set_non_blocking(fd):
fileno = fd.fileno()
fl = fcntl.fcntl(fileno, fcntl.F_GETFL)
fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK)
class MultiRunner(object):
def __init__(self, cmd_objs):
self.processes = {}
self.cmd_objs = cmd_objs
def start_process(self, name, command):
proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout_prefix, stderr_prefix = '%s(%s)|OUT' % (name, proc.pid) , '%s(%s)|ERR' % (name, proc.pid)
self.processes[proc.stdout] = (proc, stdout_prefix)
self.processes[proc.stderr] = (proc, stderr_prefix)
map(set_non_blocking, [proc.stdout, proc.stderr])
def run(self):
for obj in self.cmd_objs:
assert obj.has_key('command'), "'command' parameter is required for every process."
self.start_process(self.get_name(obj), obj['command'])
while self.processes:
rlist, _, _ = select.select(self.processes.keys(), [], [])
for fd in rlist:
proc, prefix = self.processes[fd]
content = fd.read()
if content:
lines = ['[%s] %s\n' % (prefix, line) for line in content.split('\n')]
sys.stdout.writelines(lines)
if proc.poll() is not None:
self.processes.pop(fd, None)
def get_name(self, obj):
return obj.get('name', os.path.basename(obj['command'].split(' ')[0]))
@arg('--config', '-f', default='multirun.yml', help='Configuration file that specifies the commands to be executed.(default: multirun.conf)')
def main(args):
"""
Launch multiple processes and manage all of their outputs together.
Example of YAML-formatted config file
---
name: proc1
command: shell-command-to-start-proc1
---
name: proc2
command: shell-command-to-start-proc2
"""
cmd_objs = yaml.load_all(open(args.config))
runner = MultiRunner(cmd_objs)
try:
runner.run()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
parser = ArghParser(formatter_class=argparse.RawDescriptionHelpFormatter)
parser.set_default_command(main)
parser.dispatch()
首先,这个工具用了argh
来处理命令行参数,可以在入口函数前用多个@arg
来指定多个命令行参数,并且使用入口函数的docstring来自动生成帮助,用起来比optparse
和argparse
更方便。
配置文件使用了yaml格式,与xml,json等格式相比,yaml更简洁,有更好的可读性,比如一个dict(first_name='Ryan', last_name='Ye', description="I'm a software developer.\nLife is short, you should use Python.")
,转成yaml格式
first_name: Ryan
last_name: Ye
description: >
I"m a software developer.
Life is short, you should use Python.
接下去就是程序的主体部分,这里用到了subprocess
和select
两个模块,subprocess
固名思义就是用来启动一个子进程,它的stdin, stdout和stderr是文件对象,stdin可以用write
方法写入,stdout, stderr可以用read
方法来读取。要同时监控若干个进程的输出有几种方法,最直观的方法是使用多线程,在每个线程中反复查询一个进程的输出,并对每一行输出加上进程名与进程ID后再用print
输出。这个方法有一个问题,就是不同线程同时print
时会造成最后的输出结果混乱。解决的方法是,把所有的输出写到一个Queue
中,再由一个单独的线程从Queue
中取出并print
。
这里并没有使用多线程模型,而是用了select
,select
方法的输入一堆文件对像,当其中的一个或是多个可读或可写时select
函数就会返回,返回值就是所有可读写的文件对象。通过循环反复调用select
就可以实现对多个文件的监控。特别要注意的是,这里必须把所有的文件读写模式设为非阻塞式(non-blocking I/O),不然调用read
方法时会一直等到相应的子进程退出才返回。各个不同的操作系统都实现了一套高效的select机制,详细的使用方法可以参见epoll
, kqueue
, FSEvents
等模块。基于select机制实现并发的模型,我们也常称为消息循环模型(event loop)。
多线程模型和消息循环模型是在网络并发中常用的两个模型,消息循环模型由于没有线程切换的开销,所以一般来说性能上更占优。但由于它使用非阻塞式的I/O,所有的I/O操作都必须使用回调函数,使代码更复杂,影响可读性。这些年红红火火的gevent就是取两者之长,在最底层使用select机制来监控所有的I/O,根据I/O事件来调度coroutine,实现了接近于消息循环模型的性能,而应用程序的代码又可以用与多线程模型相同的方式来书写。