Python守护进程

守护进程不是服务

守护进程不是服务,或者说,守护进程不完全是服务,服务意味着一个主进程,和能够通过某种方式与该主进程进行交流与控制的另一个进程;而守护进程仅仅意味着服务的主进程部分

创建一个systemd服务

在几乎所有Linux发行版都使用systemd的现在,使用Python开发一个服务惊人的简单。

第一步:编写systemd单元

systemd兼并了服务管理工具、会话控制工具与监视器工具,systemd既支持系统服务,也支持用户服务。

首先需要创建一个适配服务的systemd服务单元文件,这个服务单元文件可以保存在:

  • 系统级:/etc/systemd/system目录下。
  • 全局用户级:/etc/systemd/user目录下。
  • 用户级:~/.config/systemd/user/目录下。

这个服务单元文件名应当为:

1
服务名.service

内容骨干为:

1
2
3
4
5
[Unit]
Description=描述

[Service]
ExecStart=/usr/bin/python3 -u /脚本路径

就这么简单,接下来我们讲一些详细内容。

标准输出与标准错误

在过去,用户需要将服务的所有输出保存或者重定向到日志文件中,但是对于systemd来说,这一切都不再需要了,因为systemd的journald组件可以收集并记录systemd服务的输出,换句话说,这变成了systemd的任务

这一行为也可以通过添加这两个选项修改:

1
2
3
4
5
6
7
8
[Unit]
StandardOutput=journal|null|syslog|file:路径
StandardError=journal|null|syslog|file:路径

# journal即为默认选项,收集到journald
# null表示直接抛弃所有输出
# syslog表示收集到syslog缓冲区,注意,所有syslog缓冲区输出也会收集到journald一份
# file表示收集到文件中

值得一提的是,在默认情况下,Python的输出是带缓冲的,这也就意味着,输出会被阻塞到填满缓冲区后才通过管道传输给systemd,这是不合适的,因此,我们需要关闭Python的输出缓冲,要么给Python解释器加上-u选项,要么在服务单元中添加以下内容:

1
2
[Service]
Environment=PYTHONUNBUFFERED=1

标准输入

和标准输出与标准错误的选项类似,标准输入也有一个选项:

1
2
StandardInput=null|data|tty|tty-force|tty-fail|file:路径|socket
# 默认选项为null,即不接受任何标准输入

我们来分别讲讲:

使用固定输入

使用StandardInput=data选项可以使得服务在启动时接收一个固定的标准输入,这个固定的标准输入通过StandardInputText=StandardInputData=(如果是二进制)给出:

1
2
StandardInput=data
StandardInputText='TestInput'

如果直接给出了StandardInputText=StandardInputData=,那么实际上StandardInput=的默认值就会变成data

使用终端

使用StandardInput=tty|tty-force|tty-fail可以将某个终端(Ctrl + Fn)分配给服务使用,不过这三种选项存在细微差别:

  • StandardInput=tty在终端已经被占用时会阻塞服务启动并等待释放。
  • StandardInput=tty-force在终端已经被占用时会直接抢占终端。
  • StandardInput=tty-fail在终端已经被占用时会直接启动失败。

在设置这一选项后,还需要使用TTYPath=指定使用的终端:

1
2
3
StandardInput=tty|tty-force|tty-fail
# 使用TTY5
TTYPath=/dev/tty5

这样就可以使用Ctrl + Alt + F5切换到TTY5进行控制了。

使用文件

使用StandardInput=file:路径可以使用任意文件作为标准输入的数据来源,“任意文件”意味着可以是普通的文本文件,也可以是FIFO,这就带来了很多种可能性。

首先我们可以使用普通的文本文件,这就和StandardInput=data差不多,会直接读取文件内容作为固定的数据源。

其次我们还可以使用FIFO,不过需要注意的是,此时服务的启动会被阻塞,直到从FIFO中接收到第一个标准输入,FIFO在输入源程序退出后会发送一个EOF,这会关闭标准输入。

那么假如我们要实现FIFO在服务启动时自动创建,在服务停止时自动删除该怎么办呢?其实很简单,一种思路是,创建另一个服务:

1
2
3
4
5
6
7
8
9
10
11
[Unit]
# 我们不能让用户独立启动这个服务
RefuseManualStart=yes
# 在主服务停止时,该服务也要停止
PartOf=xxx.service

[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/bin/mkfifo /run/xxx.sock
ExecStop=/bin/rm -f /run/xxx.sock

然后,我们再在原来的服务中添加依赖即可:

1
2
3
4
5
6
7
8
9
10
11
12
[Unit]
# 绑定依赖,前置服务启动失败或者停止了该服务都要连带停止
BindsTo=xxx.service
# 如果找不到FIFO文件,那么该服务会直接失败而不是阻塞,因此需要设置顺序关系
After=xxx.service

[Service]
StandardInput=file:/run/xxx.sock
...

[Install]
Also=xxx.service
使用套接字

这个“套接字”指的不是具体意义上的套接字,而是systemd的套接字单元,而且仅可用于套接字激活的服务单元。

使用StandardInput=socket选项,我们可以很容易的实现通过某些接口激活的任务,例如,对于前面的FIFO需求,更优雅的思路是,使用systemd原生的.socket单元,创建一个监听FIFO的.socket单元:

1
2
[Socket]
ListenFIFO=/run/xxx.sock

然后,在配套的服务单元中使用StandardInput=socket读取其内容即可:

1
2
3
[Service]
StandardInput=socket
StandardOutput=journal

需要注意的是,由于FIFO的读取/发送目标是内核缓冲区,读取时内核缓冲区只会忠诚地返回它内部保存的数据(而网络套接字读取/发送目标是网络对端,对端会进行消费动作),如果StandardOutput=socket(默认值),那么会导致严重的死锁(进程从FIFO中接收到数据后,会再次发送到FIFO,接着立刻从FIFO读取到自己刚刚发送的数据,从而导致死循环,直到打满内核缓冲区),因此StandardOutput=journal是必要的。

但是,对于网络套接字,就没有上述那些问题,我们可以直接大大方方地使用它进行双工通信:

1
2
3
4
5
6
[Unit]
...

[Socket]
ListenStream=XXXX
Accept=yes

然后,千万注意,我们的应用程序必须改写为模板服务XXX@.service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[Unit]
...
# 如果你的程序没有为客户端单方面断开链接(EOF)的情况进行适配
# 那么很可能在这种情况下异常退出,导致单元进入failed状态
# systemd默认会在内存中保留failed状态的单元,不进行清理
# 这可能导致failed单元大量积累
# 该配置项可以修改这一逻辑
CollectMode=inactive-or-failed

[Service]
StandardInput=socket
StandardOutput=socket
# 如果不同名,可以使用Sockets=指定套接字单元
Sockets=xxx.socket

然后,每当我们向套接字传入某些数据时,便会自动激活该服务并将通信套接字的文件描述符连接到标准输入和标准输出,应用程序可以直接通过标准输入和标准输出和远端进行通信。

需要注意的是,Accept=yes是必要的,否则systemd传递给服务的套接字将不会是通信套接字,而是监听套接字,应用程序需要通过sd_listen_fds接口、或标准输入、或标准输出的文件描述符获取监听套接字,然后自行调用accept()创建通信套接字。

指定用户运行

出于安全因素考虑,对于系统级服务,我们也会希望使用独立用户而不是root运行,对于systemd来说,这很容易实现:

1
2
3
[Service]
User=用户名
Group=组名

还有一种更方便的方法,也就是使用systemd动态生成的临时用户,这种方法是为了取代过去的nobody公用用户:

1
2
[Service]
DynamicUser=yes

自动拉起

systemd包含着Supervisor的功能,可以进行服务的健康监测(准确地说,是活动状态监测)与自动拉起,要实现也相当简单,只需要添加这个选项:

1
2
3
4
5
6
[Service]
Restart=no|always|on-failure

# no即为默认选项,不启用自动拉起功能
# always表示总是自动拉起
# on-failure表示仅在服务异常退出时自动拉起

简单服务与forking服务

在传统的情况下,我们创建一个服务需要从主进程分叉出守护进程,将守护进程的PID保存在PID文件中,然后杀死主进程(参考:PEP 3143)。

但是如之前所说,对于systemd来说,我们不再需要这么麻烦,systemd本身就有能力进行进程的生命周期管理和自动拉起,因此我们只需要实现一个简单的主程序,然后将服务类型设为:

1
2
[Service]
Type=simple

即可。

但是,systemd仍然是可以兼容传统的守护进程逻辑的,只需要给systemd指出forking服务类型和PID文件路径即可:

1
2
3
[Service]
Type=forking
PIDFile=/run/xxx.pid

此外,如果开发者在服务实现中使用了systemd.daemon自行向systemd发送信号通知状态的话,那么就不需要让systemd对服务的状态进行推断,此时可以将服务类型设为:

1
2
[Service]
Type=notify

我们在之后会再次提到。

停止与垃圾处理

systemd会对服务的生命周期进行管理,这其中当然包括对服务的停止。

我们知道,systemd使用CGroup对服务的所有进程进行追踪,默认情况下,在停止服务时,systemd会对服务CGroup内的所有进程发送SIGTERM信号,如果90秒后仍然存在未停止的进程,那么就再对它们发送SIGKILL信号,不过,这一行为可以进行修改:

1
2
3
4
5
6
7
[Service]
KillMode=control-group|process|mixed|none

# control-group即为默认值
# process表示仅对主进程进行杀死,而无视其他进程
# mixed表示对主进程发送SIGTERM信号,但是对其他进程一律发送SIGKILL信号
# none表示不进行任何杀死操作,这一选项必须搭配ExecStop=使用

这些选项给开发者实现守护进程的垃圾处理提供了多种方式,比如,开发者可以使用KillMode=process,然后让主进程捕获信号后,再自行处理垃圾和杀死子进程。或者,开发者可以使用KillMode=none,完全使用在ExecStop=中指定的方法进行服务停止。

除此之外,我们还可以直接让systemd帮忙进行垃圾处理:

1
2
3
4
5
6
7
8
[Service]
ExecStop=/命令
ExecStopPost=/命令

# ExecStop=指定的命令会在进行杀死操作之前先执行
# ExecStopPost=指定的命令会在进行杀死操作之后才执行
# 简单来说,它们两者的区别仅仅在于,ExecStopPost=无论在服务启动是否启动成功的情况下都会执行,而ExecStop=则反之
# 因此,如果用户需要特别的手动杀死部分进程,那么使用ExecStop=,如果要进行进程完全停止后的垃圾处理,使用ExecStopPost

服务启用

“启用”一个服务对systemd来说意味着执行服务的[Install]段,要实现服务的开机启动,我们是通过将服务设置为某些target的依赖(一般为多用户模式)实现的,因此只需要添加依赖关系即可:

1
2
[Install]
WantedBy=default.target

值得一提的是,如果在主机上不存在用户的任何会话,那么用户的systemd服务也会随之被销毁,要使得用户的服务拥有和系统级服务一样的生命周期,需要给用户启用Linger(参考systemd文档)。

资源限制

基于CGroup,systemd可以轻易地对服务进行资源限制,就像这样:

1
2
[Service]
CPUQuota=

详情请见systemd文档。

沙箱

systemd也可以对服务进行沙箱化,限制服务对某些系统目录或网络的使用,从而有效提高安全性,详情请看systemd文档:

1
2
[Service]
ProtectSystem=no|yes|strict

编写服务实现

是时候使用Python实现守护进程的主要内容了,一个简单的前台服务只需要实现一个永久的循环即可:

1
2
3
if __name__ == '__main__':
while True:
...

既然要编写适配systemd的守护进程,那么自然离不开systemd模块:

1
import systemd.daemon as sd

检测系统类型

使用这一函数检测系统是否使用systemd:

1
systemd.daemon.booted() -> bool

如果是,那么返回True,否则返回False

发送通知

我们之前提到过notify,它的功能就是给systemd发送一些服务状态信号,函数签名如下:

1
systemd.daemon.notify('状态')

而状态有以下几种:

  • READY=1:告诉systemd服务已经启动完成。
  • RELOADING=1:告诉systemd服务正在重载。
  • STOPPING=1:告诉systemd服务正在停止。
  • STATUS=...:告诉systemd服务的状态文本。
  • ERRNO=X:服务出错了,以X状态码退出。
  • MAINPID=X:告诉systemd主进程的PID。
  • BUSERROR=...:告诉systemd D-Bus问题,等号右侧必须是D-Bus风格的错误。
  • WATCHDOG=1:告诉systemd服务看门狗该服务仍在运行。
  • WATCHDOG_USEC=...:重设看门狗的时间。
  • FDSTORE=1:保留文件描述符,这些文件描述符将在下次启动服务时通过sd_listen_fds的方式传递给进程。
  • FDSTOREMOVE=1:移除文件描述符。
    • FDNAME=...:保留的文件描述符。

举个例子,我们可以在启动前进行一些准备工作:

1
2
3
4
5
6
7
8
9
import systemd.daemon

if __name__ == '__main__':
print('Starting...')
systemd.daemon.notify('READY=1')

# 然后开始实现主逻辑
while True:
...

或者,我们可以在退出前进行一些处理:

1
2
3
4
5
6
def exit():
print('Exiting......')
systemd.daemon.notify('STOPPING=1')
sys.exit(0)

signal.signal(signal.SIGTERM, exit())

你也可以自己实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def notify(state: str) -> bool:
notify_socket = os.getenv("NOTIFY_SOCKET")

if not notify_socket:
return False

# 抽象命名空间Socket
if notify_socket.startswith("@"):
notify_socket = "\0" + notify_socket.removeprefix("@")

try:
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as s:
s.sendto(state.encode(), notify_socket)
except OSError:
return False

return True

第三步:测试

可以使用systemd-run工具进行测试,这是一个临时封装systemd单元并运行的命令,它的常见用法为:

1
systemd-run --unit=任务名 [--user] [--service-type=oneshot|forking] [-G] [-p "PIDFile=/run/xxx.pid"] [-p "SuccessExitStatus=0 1 2"] -d|--working-directory=工作目录 [-E "Key=value"] [-r] 执行的命令

选项含义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
--unit=
明确指定单元的名称(而不是自动生成)

--user
与当前调用用户的用户服务管理器(systemd 用户实例)通信,而不是默认的系统级服务管理器(systemd 系统实例)

-G, --collect
即使临时单元执行失败(failed),也在结束后从内存中卸载它

--property=, -p
为临时单元设置一个属性

--same-dir, -d
使用用户当前的工作目录运行服务进程

-E NAME=VALUE, --setenv=NAME=VALUE
给服务进程传递一个环境变量。可以多次使用此选项以传递多个环境变量

-r, --remain-after-exit
在服务进程结束之后,继续保持服务的存在,直到被明确的停止(stop)后才卸载

更多优化

编写单元文件的建议

  1. 尽量不要使用依赖。如果一定要使用依赖,尽量使用Wants=,这可以避免一个服务被用户停止时导致另一个服务被连带停止。
  2. 尽量不要修改StandardOutput=StandardError=,交给journald处理就好。
  3. 尽量带上ProtectSystem=yes|strict,一般的服务都用不到会被这个配置项屏蔽的目录。
  4. 尽量带上PrivateDevices=yes,一般的服务很少需要直接访问硬件。
  5. 尽量带上PrivateNetwork=yes,如果是非网络服务的话。

套接字激活

systemd官方认为,对于任何需要使用套接字通信的服务来说,套接字都应当交给systemd创建、配置和管理,而不是程序内部。这样,我们就把套接字的创建提前到了服务启动之前,有利于服务的并行启动。

systemd接收到数据后,会对数据进行排队,同时启动服务进程,服务启动后,systemd会将套接字的文件描述符传递给进程,因此,使用套接字激活机制的服务,必须有能力从systemd接手套接字。要实现这一功能,我们有很多种办法:

  • systemd传递的第一个文件描述符总是3,如果服务只监听一个端口,我们可以直接使用文件描述符3获取套接字。
  • 如果服务只监听一个端口,那么可以设置StandardInput=socketStandardOutput=socket,通过文件描述符0或文件描述符1获取套接字。
  • 如果服务监听多个端口,我们可以使用systemd.daemon.listen_fds()函数获取文件描述符列表,然后逐一获取。
  • 我们可以使用socket.socket(fileno=文件描述符)这个函数实现获取文件描述符对应的套接字对象。
  • 我们可以使用socket.fromfd(fd, family, type)这个函数克隆文件描述符,并创建一个新的套接字对象(不常用)。
自行创建套接字兼容模式

很多应用程序本身没有实现systemd的套接字激活协议,systemd组为此开发了一个专门用于实现套接字激活,而不必修改现存应用程序的代理工具systemd-socket-proxyd

首先我们需要编写一个套接字单元proxy.socket

1
2
3
4
5
6
7
8
[Socket]
# 实际监听的套接字
ListenStream=80
# Accept不可以是yes
# Accept=no

[Install]
WantedBy=sockets.target

然后我们编写一个用于代理的服务proxy.service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[Unit]
# 程序服务
Requires=XXX.service
# 程序服务
After=XXX.service
Requires=proxy.socket
After=proxy.socket

[Service]
# 注意类型
Type=notify
ExecStart=/usr/lib/systemd/systemd-socket-proxyd /run/Socket|127.0.0.1:Port
PrivateTmp=yes
PrivateNetwork=yes

接着我们只需要为我们的程序再编写一个服务XXX.service

1
2
3
4
5
6
7
8
[Unit]
# 如果希望启用自动退出
# StopWhenUnneeded=yes
...

[Service]
# 你的程序应当使用UNIX域套接字,或是监听127.0.0.1上的Inet套接字
ExecStart=... --bind=/run/Socket|127.0.0.1:Port
Inetd兼容模式

参考:End Road

为传统inetd设计的程序无需修改即可在 systemd 的套接字激活协议下运行,这一行为是通过Accept=yes选项配置的。

首先我们需要编写一个套接字单元服务名.socket

1
2
3
4
5
6
7
8
9
10
11
[Socket]
# 实际监听的套接字
# 不可以是数据报套接字或FIFO
ListenStream=80
# 核心:Accept必须是yes
# systemd会为每个连接分别调用accept()派生服务实例
# 并通过标准输入输出传递已连接的套接字
Accept=yes

[Install]
WantedBy=sockets.target

接着,需要修改我们的服务单元为模板服务单元:服务名@.service,并且必须进行如下设置:

1
2
3
4
[Service]
# 确保和传统inetd行为一致
StandardInput=socket
StandardOutput=socket

接着,我们在程序中不需要关心任何套接字内容,只需要从标准输入读取字节流,将响应字节流写入标准输出:

1
2
3
4
import sys
# 你必须使用sys.stdout.write()
# 加上CRLF结尾
sys.stdout.write(sys.stdin.readline().strip() + "\r\n")
1
2
3
4
5
6
# 持续服务实例
import sys

while True:
if sys.stdin.read():
sys.stdout.write(sys.stdin.readline().strip() + "\r\n")
  • 程序应当在处理完成请求后直接以非0退出码退出

一个示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import sys
from wsgiref.handlers import SimpleHandler, read_environ
import json

# 应用逻辑
def app(environ, start_response):
# 获取请求目标
path = environ.get("PATH_INFO", "/")
# 获取方法
method = environ.get("REQUEST_METHOD", "GET")

# 构建响应
if method == "GET":
status = "200 OK"
resp = {"Hello": "World"}
else:
status = "404 Not Found"
resp = {}

resp = json.dumps(resp).encode()

headers = [
("Content-Type", "application/json; charset=utf-8"),
("Content-Length", str(len(resp)))
]

start_response(status, headers)

return [resp]

if __name__ == "__main__":
environ = read_environ()

request = sys.stdin.readline().strip()
method, url, protocol = request.strip().split(" ")
if "?" in url:
path, query = url.split("?", 1)
else:
path, query = url, None

environ |= {
"REQUEST_METHOD": method,
"PATH_INFO": path,
"QUERY_STRING": query or "",
"SERVER_PROTOCOL": protocol
}

while header := sys.stdin.readline().strip():
k, v = header.split(":", 1)
environ["HTTP_" + k.replace("-", "_").upper()] = v.strip()

SimpleHandler(
stdin=sys.stdin.buffer,
stdout=sys.stdout.buffer,
stderr=sys.stderr,
environ=environ,
multithread=False,
multiprocess=True,
).run(app)
直接适配

首先我们需要创建一个套接字单元文件,它的文件名应为服务名.socket,内容如下:

1
2
3
4
5
6
7
8
9
10
11
[Unit]
Description=描述

[Socket]
# 字节流套接字,用于TCP,你也可以在端口号之前指定IP
ListenStream=2000
# 数据报套接字,用于UDP
#ListenDatagram=

[Install]
WantedBy=sockets.target

通常,我们会希望在启用按需启动的服务时仅仅启用对应的套接字,所以要对服务单元进行一些修改:

1
2
3
4
5
6
7
# 以上内容省略

[Install]
# 如果确定服务是按需启动的话,服务就没必要在开机启动
#WantedBy=
# 仅仅需要启动套接字就行了
Also=xxx.socket

接着,我们需要在程序代码中实现套接字:

socket模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import systemd.daemon
import socket

# 这是systemd.daemon中的定义的全局变量
# os.getenv("SYSTEMD_FIRST_SOCKET_FD") == 3

# 或者,我们直接获取文件描述符列表
STSTEMD_LISTEN_FDS = systemd.daemon.listen_fds()
# 这里只获取第一个套接字做例子,通常也不会在一个.socket中配置两个套接字
LISTEN_SOCKET = socket.socket(fileno=STSTEMD_LISTEN_FDS[0])

# 等待连接,并在连接后创建通信套接字
comm_sock, client_addr = LISTEN_SOCKET.accept()

while True:
... # 进行通信

# 可选地关闭套接字,然后退出
# 禁止使用shutdown()
#comm_sock.close()
  • 程序应当在accept()后持续接收请求,在明确要求停止后才退出。
socketserver模块

我们在socketserver定义的服务器类中是不是也可以实现呢?答案是可以的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import systemd.daemon
import socketserver
import socket

# 让我们获取systemd传递的套接字
STSTEMD_LISTEN_FDS = systemd.daemon.listen_fds()
LISTEN_SOCKET = socket.socket(fileno=STSTEMD_LISTEN_FDS[0])

... # 实现一个处理器

# 让我们创建一个实例
my_server = socketserver.TCPServer(('', 0), 请求处理器类, bind_and_activate=False)
my_server.socket = LISTEN_SOCKET
my_server.serve_forever()

为了提高复用性,我们可以创建一个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import systemd.daemon
import socketserver
import socket

# 让我们获取systemd传递的套接字
STSTEMD_LISTEN_FDS = systemd.daemon.listen_fds()
LISTEN_SOCKET = socket.socket(fileno=STSTEMD_LISTEN_FDS[0])

class MyServer(socketserver.TCPServer):
# 我们要稍微重写一下构造方法,当然你也可以考虑一些其他的方法
def __init__(self, listen_socket, RequestHandlerClass):
# 在这里我们把套接字设为空,把自动绑定给关掉,原因很显然:我们要直接使用systemd传递的套接字,没必要再创建和绑定了
super().__init__(('', 0), RequestHandlerClass, bind_and_activate=False)
# 服务器类使用的socket是可以直接设置的,我们直接使用systemd负责监听的套接字就好
self.socket = LISTEN_SOCKET

... # 实现一个处理器

# 让我们创建一个实例
my_server = MyServer(LISTEN_SOCKET, 请求处理器类)
my_server.serve_forever()
http.server模块

再往上一层,我们对于HTTP服务器也可以这么做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import systemd.daemon
import http.server
import socket

# 让我们获取systemd传递的套接字
STSTEMD_LISTEN_FDS = systemd.daemon.listen_fds()
LISTEN_SOCKET = socket.socket(fileno=STSTEMD_LISTEN_FDS[0])

# 实际上,HTTPServer类对TCPServer类的主要重写就在于server_bind()函数,对我们的代码没什么影响
class MyServer(http.server.HTTPServer):
# HTTPServer类继承自TCPServer,所以差不多
def __init__(self, listen_socket, RequestHandlerClass):
super().__init__(('', 0), RequestHandlerClass, bind_and_activate=False)
self.socket = LISTEN_SOCKET

... # 实现一个处理器

# 让我们创建一个实例
my_server = MyServer(systemd_listen_socket, 请求处理器类)
my_server.serve_forever()

然后,我们只需要启用套接字,那么之后有客户端对套接字发送请求时,对应的服务就会被systemd自动激活了。

wsgiref模块

最有用的场景就是实现一个WSGI服务器了,和之前提到过的Inetd风格的WSGI服务器一样,我们也可以轻松地进行转换,只需要进行少许的额外操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import sys
from wsgiref.handlers import SimpleHandler, read_environ
import systemd.daemon
import socket
import json

# 应用逻辑
def app(environ, start_response):
# 获取请求目标
path = environ.get("PATH_INFO", "/")
# 获取方法
method = environ.get("REQUEST_METHOD", "GET")

# 构建响应
if method == "GET":
status = "200 OK"
resp = {"Hello": "World"}
elif method == "POST":
# 获取请求体
body = environ["wsgi.input"].read(int(environ.get("CONTENT_LENGTH", "0")))
status = "200 OK"
resp = {"data": body.decode("utf-8")}
else:
status = "404 Not Found"
resp = {}

resp = json.dumps(resp).encode()

headers = [
("Content-Type", "application/json; charset=utf-8"),
("Content-Length", str(len(resp)))
]

start_response(status, headers)

return [resp]

class WSGIStub(WSGIServer):
"""WSGIRequestHandler 需要的最小 server 接口"""

def __init__(self, socket, application): # pyright: ignore[reportMissingSuperCall]
self.socket = socket
host, port, *_ = socket.getsockname()
self.server_address = (host, port)
self.server_port = port
self.server_name = socket.getfqdn(host)
self.application = application
self.setup_environ()

if __name__ == "__main__":
SYSTEMD_LISTEN_FDS = systemd.daemon.listen_fds()
if not SYSTEMD_LISTEN_FDS:
raise OSError("Must be activated by systemd.")

listen_sock = socket.socket(fileno=SYSTEMD_LISTEN_FDS[0])

# 单连接处理
WSGIRequestHandler(
*listen_sock.accept(),
WSGIStub(socket.socket(fileno=SYSTEMD_LISTEN_FDS[0]), app),
)

# 你也可以使用多线程
# from concurrent.futures import ThreadPoolExecutor
# with ThreadPoolExecutor(max_workers=10) as executor:
# while True:
# executor.submit(WSGIRequestHandler, *listen_sock.accept(), WSGIStub(socket.socket(fileno=SYSTEMD_LISTEN_FDS[0]), app))

# 不会退出的多线程版本
# from concurrent.futures import ThreadPoolExecutor
# executor = ThreadPoolExecutor(max_workers=10)
# while True:
# executor.submit(WSGIRequestHandler, *listen_sock.accept(), WSGIStub(socket.socket(fileno=SYSTEMD_LISTEN_FDS[0]), app))

D-Bus激活

要使用D-Bus激活机制,我们首先必须确保D-Bus激活请求会被转发给systemd,以确保D-Bus服务不会被重复激活。要实现这一点,我们需要在/usr/share/dbus-1/system-services/目录下创建一个D-Bus服务文件,内容如下:

1
2
3
4
5
[D-Bus Service]
Name=服务名称,使用URL
Exec=非systemd启动时执行的命令,仅在未使用systemd时生效
User=启动的用户
SystemdService=dbus-服务名称.service

然后我们还需要在服务单元中指出BusName:

1
2
3
4
5
6
7
8
9
10
11
12
[Unit]
...

[Service]
Type=dbus
BusName=服务名称
ExecStart=启动时执行的命令
...

[Install]
# 建议添加这一别名以方便管理
Alias=dbus-服务名称.service

在过去要禁用D-Bus服务,除了删除以外别无它法,不过现在在systemd的帮助下,通过给服务单元设置标准化的别名,然后将服务别名设为和D-Bus绑定的服务,这样,我们就可以使用systemctl命令对其进行启用与禁用了。

然后,我们就需要编写服务的主程序及其接口配置了,对于Python来说,我们使用pydbus模块实现与D-Bus的绑定与Bus的申请。

首先我们需要编写服务的Policy,保存在/etc/dbus-1/system.d/目录下:

1
2
3
4
5
6
7
8
9
10
11
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"
"http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
<busconfig>
<policy user="root">
<allow own="org.example.test"/>
</policy>
<policy context="default">
<allow send_destination="org.example.test"/>
</policy>
</busconfig>

首先,我们编写服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/bin/env python3
import getpass
from gi.repository import GLib
from pydbus import SystemBus

# 这是我们要注册的服务名称
BUS_NAME = 'org.example.test'

# 定义我们自己的接口
class Example:
f"""
<node>
<interface name='{BUS_NAME}'> # 服务名称
<method name='log'> # 给接口定义一个方法
# 类型s自然就是字符串
<arg type='s' name='message' direction='in'/>
# 类型i就是整数了
<arg type='i' name='response' direction='out'/>
</method>
</interface>
</node>
"""

def log(self, message) -> int:
print('Client sent {}'.format(message))
return 1

# 可以连接到系统总线SystemBus()或用户总线SessionBus(),在这里我们使用系统总线
bus = SystemBus()
# 注册我们的服务,注意第二个参数带括号
bus.publish(BUS_NAME, Example())
# 进入主循环才能暴露服务
loop = GLib.MainLoop()
loop.run()

接着我们编写D-Bus服务文件和systemd单元文件:

1
2
3
4
5
[D-BUS Service]
Name=org.example.test
Exec=/bin/false
User=root
SystemdService=dbus-org.example.test.service
1
2
3
4
5
6
7
8
9
10
11
[Unit]
Description=No

[Service]
Type=dbus
BusName=org.example.test
ExecStart=/程序路径

[Install]
Alias=dbus-org.example.test.service
WantedBy=multi-user.target

最后启动服务,我们测试连通性:

1
busctl call org.example.test /org/example/test org.example.test log s 'Message'

应该能得到:

1
i 1

标准守护进程模块

python-daemon是符合 PEP 3143 标准的守护进程实现模块。

实现逻辑

根据《UNIX环境高级编程》的记录,详细来说,要实现一个守护进程,那么需要依次执行以下几步:

  • 关闭所有已经打开的文件描述符(Close)(注:这并不代表之后守护进程不能再打开文件)。
  • 切换当前的工作目录(Chdir)。
  • 重设自身的权限掩码(Umask)。
  • 进入后台(第一次Fork)
  • 脱离进程组(Setsid)
  • 无视终端IO发送的信号(Setsid)
  • 脱离控制终端,并且确保不会重新获取终端(Setsid和第二次Fork)
  • 确保正确地处理以下行为:
    • 被System V init启动的情况。
    • 接收到SIGTERM信号的退出。
    • 子进程在退出时要向父进程发送SIGCHLD信号。

一些守护进程工具,例如 Slack-daemon,不同于 PEP 3143 仅仅实现一个单独的守护进程的目标,它们专注于实现完整的UNIX守护进程功能。参考它们的功能,对于一个守护进程来说,这些额外行为是可取的:

  • 设置当前的进程上下文。
  • 检测initdinetd,在被其启动时做出正确的反应。
  • 设置SUID与SGID权限,以提高安全性。
  • 阻止生成内核转储文件,以防止信息泄露。
  • 创建以守护进程命名的PID文件,并保存在合适的位置,以确保守护进程的唯一性(可选)。
  • 重设守护进程所属的用户和组(可选,仅用于root守护进程)。
  • 重设CHROOT目录(可选,仅可用于root守护进程)。
  • 捕获守护进程的标准输出和标准错误,重定向到Syslog缓冲区或者文件(可选)。

具体内容

python-daemon是围绕一个守护进程上下文类daemon.DaemonContext(),与它的参数实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
对象名 = daemon.DaemonContext(
files_preserve=[保留的文件句柄列表],
chroot_directory=CHROOT目录,
working_directory=工作目录, # 默认为/,影响整个上下文内的工作目录,必须写绝对路径
umask=掩码, # 默认为0
pidfile=PID文件锁对象, # 几乎总要设置,PID文件会在守护进程结束时自动清理
detach_process=True, # 是否守护进程化,默认是True,除非你希望通过手动控制
uid=执行用户UID, # 默认是当前用户
gid=执行用户GID, # 默认是当前用户
prevent_core=True, # 禁止生成转储文件,防止信息泄露,默认是True
signal_map={信号: 函数}, # 信号处理函数映射,默认为{signal.SIGTERM: 'terminate'},设为None表示无视信号,设为字符串表示DaemonContext的一个实例属性,设为其他值表示信号处理函数,函数名后请不要带括号
stdin=None, # 标准输入来源,必须是流对象,至少要有r权限
stdout=None, # 标准输出目标,必须是流对象,至少要有w+权限
stderr=None, # 标准错误目标,必须是流对象,至少要有w+权限
)

这些参数都会被转换为实例属性,可以在之后通过实例进行修改,只需要通过对象.属性 = 值即可

而这个对象有以下这些方法:

打开守护进程上下文,它会将当前的程序进程Fork出一个子进程,然后杀死父进程,并在子进程中继续运行当前程序,换句话说,当这个方法返回时,运行中的程序就会变成守护进程,is_open被设为True(换句话说,所有进程分叉过程都交给守护进程上下文解决了,如果你想了解具体是怎么实现的,请看 python-daemon

1
对象.open() -> None

守护进程接收到指定信号后(默认是signal.SIGTERM,见之前提到的默认signal_map)进行退出的方法,它会简单地退出程序,然后抛出SystemExit异常以解释接收到的信号:

1
对象.terminate([信号])

关闭守护进程上下文,**这意味着清除PID文件,以及将is_open设为False**:

1
对象.close() -> None

返回守护进程上下文是否处于开启状态:

1
对象.is_open -> bool

此外,既然是上下文,它就当然实现了with语句:

执行with语句时自动执行的方法,调用.open()方法并返回实例:

1
对象.__enter__() -> DaemonContext

退出with语句时自动执行的方法,调用.close()方法,如果关闭成功则返回True

1
对象.__exit__() -> bool

这些方法几乎都不需要用户关心,它们是在with语句中被隐式调用的。

使用

要使用daemon模块,你至少还需要lockfilesignal模块:

1
2
3
4
5
import daemon
# 用于给PID文件上锁
import lockfile
# 用于提供信号
import signal

简单来说,你只需要把守护进程的主逻辑放到上下文中:

1
2
3
4
5
6
7
with daemon.DaemonContext(pidfile=lockfile.LockFile('/run/xxx.pid')):
# 接下来的内容就是会在守护进程中执行的程序
# 在进入循环之前,你也可以进行一些处理,比如打开一些文件之类的
before_loop()
# 循环不一定要写在外面,写在函数里面也可以
while True:
# 实现一些功能...

更规范的用法是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 定义一个在Fork守护进程之前的准备函数
def before_daemon():
...

# 定义一个在守护进程进入循环之前的准备函数
def before_loop():
...

# 定义主函数
def main_program():
# 循环不一定要写在外面,写在函数里面也可以
while True:
...

# 定义一个退出前的清理函数
def cleanup():
...
# 别忘了退出
sys.exit(0)

# 定义一个重载函数
def reload():
...

# 先实例化一个上下文对象,注意给PID文件加锁
daemon_context = daemon.DaemonContext(pidfile=lockfile.FileLock('/run/xxx.pid'),
signal_map = {signal.SIGTERM: cleanup, signal.SIGHUP: reload} # 在捕获SIGTERM信号时进行清理,在捕获SIGHUP信号时进行重载,函数名后没有括号
)

# 在这里执行进入守护进程前的处理函数
before_daemon()

# 在这里实现守护进程中运行的程序
with daemon_context:
# 在这里执行进入循环之前的处理函数,提示:和before_daemon()的主要区别在于进程环境不同
before_loop()
# 进入主程序循环
main_program()

效果

你应该能在ps xf的输出中看到一个脱离终端,孤立的守护进程,进程显示的命令就是执行脚本时执行的命令:

1
2
PID    TTY    STAT    TIME    COMMAND
2023 ? S 0:00 python daemon.py

缺陷与注意事项

因为在守护进程上下文内的程序已经与终端脱离连接了,所以如果出错,那么不会在终端上显示任何错误,因此需要提前做好Log登记。如果发现执行脚本后什么也没有发生,也没有守护进程产生,那么就说明守护进程上下文内的代码有问题。

一种可行的测试方法是在实际使用前先不要把主程序放到守护进程上下文中,单次执行后检查无误再放进守护进程上下文中测试。

  • 在守护进程上下文中读取文件时,千万不要忘了working_directory=参数的默认值是/而不是os.getcwd()
  • 当已经有一个守护进程在运行时,再次运行一次守护进程,它会在守护进程上下文前阻塞并等待锁,而不会进入主程序,直到锁解除。

示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/bin/python3
# coding: utf-8
import sh
import daemon
import signal
import lockfile
import os
import sys
import time

# 创建一个目录用于保存log
def before_daemon():
if not os.path.exists('daemon_dir'):
os.mkdir('daemon_dir')

# 这里是主循环内容
def main_program(in_handle):
while True:
in_file.write('I am running...\n')
in_file.flush()
time.sleep(1)

# 这里是退出时的处理
def cleanup():
sh.touch('daemon_dir/cleanup.done')
sys.exit(0)

# 让我们先创建目录以防止报错
before_daemon()

if __name__ == '__main__':
# 让我们进入上下文
with daemon.DaemonContext(pidfile=lockfile.LockFile('/tmp/daemon.pid'), working_directory=os.getcwd(), signal_map={signal.SIGTERM: cleanup}):
# 提前打开文件句柄
daemon_log = open('daemon_dir/mydaemon.log', 'w+')
main_program(daemon_log)

简化模块

python-daemon有一个简化的service模块,非常易于使用。

服务类

service模块定义了一个Service类,用户只需要继承这个类即可:

1
2
3
4
5
6
7
8
9
10
class MyService(service.Service):
def __init__(self, *args, **kwargs):
# 构造函数里必须先调用父类的构造函数
super(MyService, self).__init__(*args, **kwargs)

# 守护进程主逻辑,必须实现
def run(self):
# 主循环
while not self.got_sigterm():
...

实例化时,需要提供以下参数:

1
对象名 = MyService('服务名', pid_dir='PID文件保存目录'[, signals=['信号列表']])

控制服务

实例化服务对象后,就可以轻易的控制服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
if __name__ == '__main__':
my_service = MyService('我的服务', pid_dir='/run')

# 启动
my_service.start() -> None
# 发送SIGTERM信号停止
my_service.stop([block=False]) -> None
# 发送SIGKILL信号杀死
my_service.kill([block=False]) -> None
# 检测是否运行
my_service.is_running() -> bool
# 获取PID
my_service.get_pid() -> int

当然,还是建议使用位置参数实现子命令进行操作。

登记日志

service模块封装了syslog日志处理器查找函数find_syslog(),可以直接使用syslog登记日志:

1
2
3
4
5
6
7
8
9
10
class MyService(service.Service):
def __init__(self, *args, **kwargs):
super(MyService, self).__init__(*args, **kwargs)
# 这个日志器对象是预定义好的
self.logger.addHandler(logging.handlers.SysLogHandler(address=find_syslog()), facility=logging.handlers.SysLogHandler.LOG_DAEMON)
...

def run(self):
while not self.got_sigterm():
self.logger.info('日志')

等待信号

.start()方法被调用时,服务会使用一个独立进程运行run()函数;当.stop()方法被调用时,会对run()进程发送SIGTERM信号,此时,run()进程的self.got_sigterm()方法会返回True。此外,也可以使用.kill()方法直接发送SIGKILL信号。

另外,用户可以通过.send_signal()方法给守护进程发送信号,run()函数中,也可以使用self.wait_for_signal()方法等待接收其他信号,不过,这些信号必须在初始化时通过signals参数声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MyService(service.Service):
def __init__(self, *args, **kwargs):
super(MyService, self).__init__(*args, **kwargs)

def run(self):
while not self.got_sigterm():
...
# 阻塞直到收到SIGTERM
self.wait_for_sigterm()

# 检测是否收到指定信号
while not self.got_signal(信号):
...
# 阻塞直到收到指定信号
self.wait_for_signal(信号)

my_service = MyService('服务名', pid_dir='/run', signals=[signal.SIGHUP])
# 发送信号
my_service.send_signal(信号名)
# 清除信号行为
my_service.clear()

Ephemeral Worker Pool(短生 Worker 池)

这是一种借鉴自 systemd-nsresourced.servicesystemd-userdbd.service 的处理范式。

其核心思想是,维护一个固定数量的 Worker 进程池,但每个 Worker 都有硬性的寿命上限——处理固定数量的请求,或存活固定时间后自愿退出,然后立即用全新 Worker 进程替补。

在这种设计中,Worker 的死亡不是故障,而是正常工作流程的一环

如果你的服务需要处理频繁的、可能阻塞的、涉及特权操作的请求,并且你需要故障隔离和无泄漏保证,但又不想为每个连接都付出 Fork+Exec 的代价,考虑使用这种模式——它同时吸取了两个范式的长处:既不会像 Fork-per-Connection 范式一样频繁 Fork 进程导致响应延迟,又不会像单进程并发范式一样将所有请求处理压入同一个进程,使得任何一个阻塞操作卡住所有连接、任何一次崩溃摧毁整个服务、任何一处内存泄漏在进程的整个生命周期中持续累积且无法被回收。

起源

该模式由 systemd 项目在 systemd-userdbd(用户数据库多路复用器)、systemd-mountfsd(DDI 挂载服务)和 systemd-nsresourced(用户命名空间资源管理器)三个服务中独立验证,形成了可复制的模板。三者的 Manager 代码结构几乎完全同构:相同的 workers_fixed / workers_dynamic 双层集合、相同的 SIGUSR2 反压机制、相同的 on_worker_exit() 补位逻辑、相同的 SO_RCVTIMEO 空闲心跳。

对比

该模式与 Fork-per-Connection 范式和单进程并发范式形成三足鼎立:

维度 Fork-per-Connection 范式 单进程并发范式 Ephemeral Worker Pool
套接字管理者做 accept() 否(只传 listen_fd 否(只传 listen_fd
谁 Fork Worker 进程? 套接字管理服务 无独立 Worker 进程 服务自己的 Manager
Worker 数量模型 仅有上限没有下限,套接字管理服务控制上限 N/A 静态水位 + 动态扩缩
Worker 寿命 一个连接 N/A 有限寿命内处理不固定请求数,定期轮换
并发来源 套接字管理服务管理的多进程 进程内异步/多线程 Manager 管理的多进程,竞争 accept()
故障隔离
状态累积防御 天然(一次性进程) 定期杀死即回收
Fork 代价 每个连接一次 零 Fork 若干个请求分摊一次
适合的请求特征 低频、长命 高频、无阻塞 高频、可能阻塞、特权操作

Ephemeral Worker Pool 仅利用套接字管理服务的排队能力(Listen Backlog),而由守护进程自己全权管理 Worker——并且对 Worker 采用静态数量、动态生命周期、定期刷新的维护方式。

一个运行了很久的进程不是资产,而是负债——它的内部状态(内存碎片、资源泄漏、缓存腐化、状态偏移)随时间推移越来越不可信。而一个刚 Fork 出来的全新进程,其状态总是干净的,因为它还没有做过任何事。

这将“正确性”的证明义务从代码层面转移到了架构层面:

  • 传统做法需要证明”代码中每条路径都正确释放了资源”——需要审计所有代码路径包括第三方库,依赖于人类不犯错。
  • Ephemeral Worker Pool 只需要证明”Worker 进程最多活 N 分钟”——需要审计 on_worker_exit() 的十几行补位代码,依赖于内核的 do_exit() 不犯错。

这本质上是让操作系统的进程生命周期模型承担垃圾回收和资源清理的职责——进程退出是最彻底的 free(),而内核的进程清理路径(do_exit()exit_mm()exit_files()exit_fs())是四十年来被数十亿台机器反复验证过的。

角色划分

该模式由两个独立的程序组成,且职责严格分离:

Pool Manager(进程池管理器):一个守护进程,它不处理任何业务请求。职责为:

  1. 持有 listen_fd(从套接字管理服务接收或自行创建)
  2. Fork+Exec Worker 进程,将 listen_fd 传递给它们
  3. 维持固定的 Worker 水位(低水位 WORKERS_MIN)——Worker 退出(无论正常或崩溃)时补位
  4. 响应 Worker 的反压信号(SIGUSR2),Fork 更多 Worker 直到高水位(WORKERS_MAX)以应对突发负载——这些超出低水位的 Worker 称为 Dynamic Worker,它们和固定 Worker 行为完全一致,区别在于数量是临时的:当它们因请求计数或运行时间到期退出后,Manager 只会补充到低水位,不会维持高水位状态
  5. 可选:管理与请求处理无关的服务级状态(如 BPF、Registry)

Worker(短生业务进程):一个有限寿命的无状态进程,属于 Semi-Daemon(从 Manager Fork 而来,非 Init 直接管理)。职责为:

  1. 继承 listen_fd,在其上做阻塞 accept() 竞争连接
  2. 接受并处理业务请求
  3. 在以下任一条件满足时自愿退出(这是设计中的关键一环,不是故障):
    • 处理了固定数量的请求(如 64 个)
    • 存活时间超过固定上限(如 5 分钟)
    • 空闲时间超过阈值(仅 Dynamic Worker,如 90 秒;Fixed Worker 的空闲超时为无限)
  4. accept() 后若发现 listen_fd 上仍有排队连接,向 Manager 发送 SIGUSR2 反压信号

选型判据

当你的服务同时满足以下条件时,Ephemeral Worker Pool 是合适的选择:

  1. 请求频繁——Fork-per-Connection 范式的代价不可接受
  2. 操作可能阻塞——单进程并发范式中一个阻塞调用会卡住所有连接
  3. 操作涉及特权或内核边界——Worker 中的状态累积(内存泄漏、fd 泄漏、缓存腐败)后果严重
  4. Manager 有独立于请求处理的服务级职责——如生命周期管理、清理、监控——这些职责需要长驻进程但不应与请求处理混在一起

条件 1 排除 Fork-per-Connection 范式。条件 2 和 3 排除单进程并发范式。条件 4 使 Manager/Worker 分离成为结构性需求。

Python 实现

以下是一个完整的、可直接运行的 Ephemeral Worker Pool 实现。Manager 和 Worker 分别为独立程序。

进程池管理器(pool.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/python3
# coding: utf-8
import collections
import logging
import os
import signal
import socket
import subprocess
from pathlib import Path
from typing import Final
from collections.abc import Iterable

logging.basicConfig(
level=logging.INFO,
)

WORKERS_MIN = 3 # 低水位:始终维持的 Fixed Worker 数
WORKERS_MAX = 20 # 高水位:突发负载时的 Worker 上限(含 Dynamic Worker)
WORKER_EXEC = Path() # 执行的程序路径
LISTEN_ADDR = ("127.0.0.1", 9800) # 监听地址

def consume(iterable: Iterable[None]):
_ = collections.deque(
maxlen=0,
iterable=iterable,
)

class WorkerBucket:
__slots__: Final = ["fixed", "dynamic", "fd", "min", "max"]

def __init__(self, fd: int, min: int, max: int):
self.fixed: set[int] = set()
self.dynamic: set[int] = set()
self.fd: Final[int] = fd
self.min: Final[int] = min
self.max: Final[int] = max

def add(self, exec: Path, dynamic: bool = False):
proc = subprocess.Popen(
[exec],
env={
**os.environ,
"LISTEN_FD": str(self.fd),
"FIXED": "0" if dynamic else "1",
},
pass_fds=(self.fd,),
)

(self.dynamic if dynamic else self.fixed).add(proc.pid)

logging.info(
f"Spawned {'dynamic' if dynamic else 'fixed'} worker {proc.pid}",
)
logging.info(f"Current fixed: {self.fixed}, dynamic: {self.dynamic}")

def wait(self) -> list[int] | None:
"""阻塞等待至少一个子进程退出,然后非阻塞地捞干净所有尸体。返回所有已退出的 pid"""
# 阻塞:等到至少一个子进程退出
try:
pid, _ = os.wait()

except ChildProcessError:
# 所有子进程在两轮 wait 之间全退出了——清空 set,返回空列表
# fill() 看到 count=0 会自动补满
self.fixed.clear()
self.dynamic.clear()
return None

except InterruptedError:
return None

dead: list[int] = [pid]

# 在 os.wait() 收集到一个进程的**同时**可能有其他进程也退出了
# 我们必须提前发现僵尸进程并更新两个 set
# 回收僵尸进程不是我们的主要目的,Popen 调用时本身也会回收
while True:
try:
pid, _ = os.waitpid(-1, os.WNOHANG)

# 没有子进程
except ChildProcessError:
break

# 没有退出的子进程
if pid == 0:
break

dead.append(pid)

return dead

def reap(self, pid: int):
"""从池中移除一个已退出的 worker"""
self.fixed.discard(pid)
self.dynamic.discard(pid)
logging.info(
f"{'fixed' if pid in self.fixed else 'dynamic'} worker {pid} exited"
)
logging.info(
f"pool={self.fixed if self.fixed else 'empty'}+{self.dynamic if self.dynamic else 'empty'}"
)

def fill(self, exec: Path, burst: bool = False):
consume(
self.add(exec, dynamic=burst)
for _ in range(
len(self.fixed) + len(self.dynamic), self.max if burst else self.min
)
)

def main():
if v := os.getenv("SYSTEMD_FIRST_SOCKET_FD"):
fd = int(v)
logging.info(f"Using systemd socket fd={fd}")
else:
s = socket.create_server(LISTEN_ADDR, reuse_port=True, backlog=128)
fd = s.fileno()
logging.info(f"listening on {LISTEN_ADDR} fd={fd}")

workers = WorkerBucket(fd, WORKERS_MIN, WORKERS_MAX)

# ── SIGUSR2 处理:Worker 告知"队列有积压,请加人" ──
# 在信号处理函数中只设标志位,由主循环统一处理,避免在 os.wait() 期间重入 fork
burst = False

def sigusr2(*_):
nonlocal burst
burst = True

_ = signal.signal(signal.SIGUSR2, sigusr2)

# ── 启动初始 Worker 池 ──
logging.info("Filling pool to low water mark")

workers.fill(exec=WORKER_EXEC)

# ── 主循环:等待子进程退出,然后补位 ──
# 这意味着只补到低水位——Dynamic Worker 退出后不会被补充,池自然缩回
while True:
if dead := workers.wait():
consume(workers.reap(pid) for pid in dead)

# SIGUSR2 可能在 wait() 阻塞期间到达多次,但只处理一次 burst
if burst:
burst = False
workers.fill(exec=WORKER_EXEC, burst=True)
else:
workers.fill(exec=WORKER_EXEC)

if __name__ == "__main__":
main()

PIDFD版Pool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/python3
# coding: utf-8
"""Ephemeral Worker Pool Manager — pidfd 事件驱动版"""

import collections
import logging
import os
import selectors
import signal
import socket
import subprocess
from pathlib import Path
from typing import Final
from collections.abc import Iterable

logging.basicConfig(level=logging.INFO)

WORKERS_MIN = 3 # 低水位:始终维持的 Fixed Worker 数
WORKERS_MAX = 20 # 高水位:突发负载时的 Worker 上限(含 Dynamic Worker)
WORKER_EXEC = Path() # 执行的程序路径
LISTEN_ADDR = ("127.0.0.1", 9800) # 监听地址

def consume(iterable: Iterable[None]):
_ = collections.deque(maxlen=0, iterable=iterable)

class WorkerBucket:
__slots__: Final = [
"fixed",
"dynamic",
"fd",
"min",
"max",
"selector",
]

def __init__(self, fd: int, min: int, max: int):
self.fixed: set[int] = set()
self.dynamic: set[int] = set()
self.fd: Final[int] = fd
self.min: Final[int] = min
self.max: Final[int] = max
self.selector: Final = selectors.DefaultSelector()

def add(self, exec: Path, dynamic: bool = False):
# Popen 同时也带有回收历史进程的功能
proc = subprocess.Popen(
[exec],
env={
**os.environ,
"LISTEN_FD": str(self.fd),
"FIXED": "0" if dynamic else "1",
},
pass_fds=(self.fd,),
)

# 为这个 worker 注册 pidfd 到 selector
# worker 退出时内核会让 pidfd 变为可读,selector.select() 能感知到
pidfd = os.pidfd_open(proc.pid)

(self.dynamic if dynamic else self.fixed).add(pidfd)

_ = self.selector.register(pidfd, selectors.EVENT_READ)

logging.info(f"Spawned {'dynamic' if dynamic else 'fixed'} worker {proc.pid}")

def reap(self, pidfd: int):
"""回收一个已退出的 worker:关闭 pidfd,从池中移除"""
_ = self.selector.unregister(pidfd)

# 虽然 Popen.__init__() 中会调用 _cleanup()
# 但是在闲置时(长时间未调用 Popen)有资源池存中在僵尸进程的可能性
# 明确处理一下
try:
_ = os.waitid(os.P_PIDFD, pidfd, os.WEXITED | os.WNOHANG)
except ChildProcessError:
pass

logging.info(
f"{'fixed' if pidfd in self.fixed else 'dynamic'} worker {pidfd} exited"
)
logging.info(
f"pool={self.fixed if self.fixed else 'empty'}+{self.dynamic if self.dynamic else 'empty'}"
)

os.close(pidfd)

self.fixed.discard(pidfd)
self.dynamic.discard(pidfd)

def fill(self, exec: Path, burst: bool = False):
consume(
self.add(exec, dynamic=burst)
for _ in range(
len(self.fixed) + len(self.dynamic), self.max if burst else self.min
)
)

def main():
if v := os.getenv("SYSTEMD_FIRST_SOCKET_FD"):
fd = int(v)
else:
s = socket.create_server(LISTEN_ADDR, reuse_port=True, backlog=128)
fd = s.fileno()
logging.info(f"listening on {LISTEN_ADDR} fd={fd}")

workers = WorkerBucket(fd, WORKERS_MIN, WORKERS_MAX)

# ── SIGUSR2 处理:Worker 告知"队列有积压,请加人" ──
# 在信号处理函数中只设标志位,由主循环统一处理,避免在 os.wait() 期间重入 fork
burst = False

def sigusr2(*_):
nonlocal burst
burst = True

_ = signal.signal(signal.SIGUSR2, sigusr2)

# ── 启动初始 Worker 池 ──
workers.fill(exec=WORKER_EXEC)

# ── 主循环:select 等待 pidfd 就绪 ──
# 多个 worker 同时退出时,select() 一次返回所有就绪的 pidfd
# 每个退出都是独立事件,不存在"漏数"问题
while True:
try:
# 只轮询获取终止的子进程
events = workers.selector.select()

except InterruptedError:
continue

for key, _ in events:
workers.reap(key.fd)

# SIGUSR2 可能在阻塞期间到达多次,但只处理一次 burst
if burst:
burst = False
workers.fill(exec=WORKER_EXEC, burst=True)
else:
workers.fill(exec=WORKER_EXEC)

if __name__ == "__main__":
main()

短生业务Worker(worker.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python
# coding: utf-8
import json
import logging
import os
import selectors
import signal
import socket
import time
from socketserver import BaseServer
from typing import Final
from wsgiref.simple_server import WSGIRequestHandler

logging.basicConfig(
level=logging.INFO,
)

MAX_ITERATIONS: Final = 16 # 最大处理次数
MAX_LIFETIME: Final = 120.0 # 最大运行时间
MAX_IDLE: Final = 30.0 # Dynamic Worker 空闲超时(Fixed Worker 为无限)

# ── 业务逻辑:标准 WSGI Application ──
def app(environ, start_response):
path = environ.get("PATH_INFO", "/")
method = environ.get("REQUEST_METHOD", "GET")

if method == "GET":
status = "200 OK"
resp = {
"worker": os.getpid(),
"path": path,
}
else:
status = "405 Method Not Allowed"
resp = {}

resp = json.dumps(resp).encode()

start_response(
status,
[
("Content-Type", "application/json; charset=utf-8"),
("Content-Length", str(len(resp))),
],
)

return [resp]

class Stub(BaseServer):
def __init__(self, sock, application): # pyright: ignore[reportMissingSuperCall]
self.server_address = sock.getsockname()
self.base_environ = {}
self._application = application

def get_app(self):
return self._application

class EphemeralWorker:
"""短生 Worker(Semi-Daemon)

从 Manager 继承 listen_fd,竞争 accept() 处理请求
到达退出条件后自愿退出——这是正常流程,不是故障
"""

__slots__: Final = ["sock", "parent", "_stub"]

def __init__(self, fd: int, application):
self.sock: Final[socket.socket] = socket.socket(fileno=fd)
self.sock.setblocking(False)
self.parent: Final[int] = os.getppid()
# WSGIRequestHandler 需要一个带 get_app() 和 server_address 的对象
# 用 Stub 实现最小接口即可
self._stub: Final[Stub] = Stub(self.sock, application)

def accept(
self, timeout: float = 5.0
) -> tuple[socket.socket, tuple[str, int]] | None:
"""等待连接,超时或被抢走时返回 None"""
with selectors.DefaultSelector() as s:
_ = s.register(self.sock, selectors.EVENT_READ)
if not s.select(timeout=timeout):
return None # 超时,无连接到达

try:
return self.sock.accept()

except BlockingIOError:
return None # 被其他 Worker 抢走了

def pressure(self):
"""反压检测:listen_fd 上还有排队的连接吗?有则通知 Manager 加人"""
with selectors.DefaultSelector() as s:
_ = s.register(self.sock, selectors.EVENT_READ)
if not s.select(timeout=0):
return None

os.kill(self.parent, signal.SIGUSR2)

def serve(self, conn: socket.socket, addr: tuple[str, int]):
"""处理单个连接:WSGIRequestHandler 自动完成 HTTP 解析 + WSGI 调用"""
logging.info(f"Accepted {addr[0]}:{addr[1]}")
try:
_ = WSGIRequestHandler(conn, addr, self._stub)
finally:
conn.close()

def main():
fd = int(os.environ["LISTEN_FD"])
is_fixed = os.environ["FIXED"] == "1"
idle = float("inf") if is_fixed else MAX_IDLE
worker = EphemeralWorker(fd, app)
start = last_busy = time.monotonic()

logging.info(
f"{'fixed' if is_fixed else 'dynamic'} worker started: max_iter={MAX_ITERATIONS}, max_life={MAX_LIFETIME}, max_idle={idle}",
)

iterations = 0
# 三重退出条件:任一满足即自愿退出(这是正常流程,不是故障)
while (
iterations < MAX_ITERATIONS
and (now := time.monotonic()) - start < MAX_LIFETIME
and now - last_busy < idle
):
result = worker.accept()

if not result:
continue

iterations += 1

last_busy = time.monotonic()

worker.pressure()

worker.serve(*result)

logging.info(
f"Worker retired: iterations={iterations}, uptime={time.monotonic() - start}",
)

if __name__ == "__main__":
main()

运行方式:

1
2
$ curl http://127.0.0.1:9800/hello
{"worker": 12001, "iteration": "1/16", "path": "/hello"}

参数决策

  • 最小Worker数量:一般5-6个,最大取CPU核心数,合理数量即可。
  • 最大Worker数量:4-8倍CPU核心数。
  • 单个Worker处理请求数:由单个请求的内存开销决定——Worker处理的每个请求都可能留下无法回收的碎片(Python对象池、第三方库缓存、文件描述符膨胀)。
    • 极轻(纯JSON序列化,< 100KB):碎片累积慢,应当多分摊些 Fork 代价,256 - 1024。
    • 中等(读文件、查DB、处理图片,< 10MB):64 - 256。
    • 重(调第三方C库、大量临时对象,> 10MB):16 - 64。
  • 压力测试原则:请求总量要跑满至少5代Worker轮换,并发数设为最小Worker数量的2~3倍。
  • ab参数:ab -r -n <WORKERS_MIN × MAX_ITERATIONS × 5> -c <WORKERS_MIN × 3> http://...
  • 测试目标:
    • Failed requests小于1%,过大意味着wait()逻辑有问题。
    • Non-2xx responses为0,大于0意味着业务逻辑有问题。
    • Requests per second合理,过低意味着有阻塞。