守护进程不是服务
守护进程不是服务,或者说,守护进程不完全是服务,服务意味着一个主进程,和能够通过某种方式与该主进程进行交流与控制的另一个进程;而守护进程仅仅意味着服务的主进程部分。
创建一个systemd服务
在几乎所有Linux发行版都使用systemd的现在,使用Python开发一个服务惊人的简单。
第一步:编写systemd单元
systemd兼并了服务管理工具、会话控制工具与监视器工具,systemd既支持系统服务,也支持用户服务。
首先需要创建一个适配服务的systemd服务单元文件,这个服务单元文件可以保存在:
- 系统级:
/etc/systemd/system目录下。
- 全局用户级:
/etc/systemd/user目录下。
- 用户级:
~/.config/systemd/user/目录下。
这个服务单元文件名应当为:
内容骨干为:
1 2 3 4 5
| [Unit] Description=描述
[Service] ExecStart=/usr/bin/python3 -u /脚本路径
|
就这么简单,接下来我们讲一些详细内容。
标准输出与标准错误
在过去,用户需要将服务的所有输出保存或者重定向到日志文件中,但是对于systemd来说,这一切都不再需要了,因为systemd的journald组件可以收集并记录systemd服务的输出,换句话说,这变成了systemd的任务。
这一行为也可以通过添加这两个选项修改:
1 2 3 4 5 6 7 8
| [Unit] StandardOutput=journal|null|syslog|file:路径 StandardError=journal|null|syslog|file:路径
# journal即为默认选项,收集到journald # null表示直接抛弃所有输出 # syslog表示收集到syslog缓冲区,注意,所有syslog缓冲区输出也会收集到journald一份 # file表示收集到文件中
|
值得一提的是,在默认情况下,Python的输出是带缓冲的,这也就意味着,输出会被阻塞到填满缓冲区后才通过管道传输给systemd,这是不合适的,因此,我们需要关闭Python的输出缓冲,要么给Python解释器加上-u选项,要么在服务单元中添加以下内容:
1 2
| [Service] Environment=PYTHONUNBUFFERED=1
|
标准输入
和标准输出与标准错误的选项类似,标准输入也有一个选项:
1 2
| StandardInput=null|data|tty|tty-force|tty-fail|file:路径|socket # 默认选项为null,即不接受任何标准输入
|
我们来分别讲讲:
使用固定输入
使用StandardInput=data选项可以使得服务在启动时接收一个固定的标准输入,这个固定的标准输入通过StandardInputText=或StandardInputData=(如果是二进制)给出:
1 2
| StandardInput=data StandardInputText='TestInput'
|
如果直接给出了StandardInputText=或StandardInputData=,那么实际上StandardInput=的默认值就会变成data。
使用终端
使用StandardInput=tty|tty-force|tty-fail可以将某个终端(Ctrl + Fn)分配给服务使用,不过这三种选项存在细微差别:
StandardInput=tty在终端已经被占用时会阻塞服务启动并等待释放。
StandardInput=tty-force在终端已经被占用时会直接抢占终端。
StandardInput=tty-fail在终端已经被占用时会直接启动失败。
在设置这一选项后,还需要使用TTYPath=指定使用的终端:
1 2 3
| StandardInput=tty|tty-force|tty-fail # 使用TTY5 TTYPath=/dev/tty5
|
这样就可以使用Ctrl + Alt + F5切换到TTY5进行控制了。
使用文件
使用StandardInput=file:路径可以使用任意文件作为标准输入的数据来源,“任意文件”意味着可以是普通的文本文件,也可以是FIFO,这就带来了很多种可能性。
首先我们可以使用普通的文本文件,这就和StandardInput=data差不多,会直接读取文件内容作为固定的数据源。
其次我们还可以使用FIFO,不过需要注意的是,此时服务的启动会被阻塞,直到从FIFO中接收到第一个标准输入,FIFO在输入源程序退出后会发送一个EOF,这会关闭标准输入。
那么假如我们要实现FIFO在服务启动时自动创建,在服务停止时自动删除该怎么办呢?其实很简单,一种思路是,创建另一个服务:
1 2 3 4 5 6 7 8 9 10 11
| [Unit] # 我们不能让用户独立启动这个服务 RefuseManualStart=yes # 在主服务停止时,该服务也要停止 PartOf=xxx.service
[Service] Type=oneshot RemainAfterExit=yes ExecStart=/bin/mkfifo /run/xxx.sock ExecStop=/bin/rm -f /run/xxx.sock
|
然后,我们再在原来的服务中添加依赖即可:
1 2 3 4 5 6 7 8 9 10 11 12
| [Unit] # 绑定依赖,前置服务启动失败或者停止了该服务都要连带停止 BindsTo=xxx.service # 如果找不到FIFO文件,那么该服务会直接失败而不是阻塞,因此需要设置顺序关系 After=xxx.service
[Service] StandardInput=file:/run/xxx.sock ...
[Install] Also=xxx.service
|
使用套接字
这个“套接字”指的不是具体意义上的套接字,而是systemd的套接字单元,而且仅可用于套接字激活的服务单元。
使用StandardInput=socket选项,我们可以很容易的实现通过某些接口激活的任务,例如,对于前面的FIFO需求,更优雅的思路是,使用systemd原生的.socket单元,创建一个监听FIFO的.socket单元:
1 2
| [Socket] ListenFIFO=/run/xxx.sock
|
然后,在配套的服务单元中使用StandardInput=socket读取其内容即可:
1 2 3
| [Service] StandardInput=socket StandardOutput=journal
|
需要注意的是,由于FIFO的读取/发送目标是内核缓冲区,读取时内核缓冲区只会忠诚地返回它内部保存的数据(而网络套接字读取/发送目标是网络对端,对端会进行消费动作),如果StandardOutput=socket(默认值),那么会导致严重的死锁(进程从FIFO中接收到数据后,会再次发送到FIFO,接着立刻从FIFO读取到自己刚刚发送的数据,从而导致死循环,直到打满内核缓冲区),因此StandardOutput=journal是必要的。
但是,对于网络套接字,就没有上述那些问题,我们可以直接大大方方地使用它进行双工通信:
1 2 3 4 5 6
| [Unit] ...
[Socket] ListenStream=XXXX Accept=yes
|
然后,千万注意,我们的应用程序必须改写为模板服务XXX@.service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| [Unit] ... # 如果你的程序没有为客户端单方面断开链接(EOF)的情况进行适配 # 那么很可能在这种情况下异常退出,导致单元进入failed状态 # systemd默认会在内存中保留failed状态的单元,不进行清理 # 这可能导致failed单元大量积累 # 该配置项可以修改这一逻辑 CollectMode=inactive-or-failed
[Service] StandardInput=socket StandardOutput=socket # 如果不同名,可以使用Sockets=指定套接字单元 Sockets=xxx.socket
|
然后,每当我们向套接字传入某些数据时,便会自动激活该服务并将通信套接字的文件描述符连接到标准输入和标准输出,应用程序可以直接通过标准输入和标准输出和远端进行通信。
需要注意的是,Accept=yes是必要的,否则systemd传递给服务的套接字将不会是通信套接字,而是监听套接字,应用程序需要通过sd_listen_fds接口、或标准输入、或标准输出的文件描述符获取监听套接字,然后自行调用accept()创建通信套接字。
使用用户运行
出于安全因素考虑,对于系统级服务,我们也会希望使用独立用户而不是root运行,对于systemd来说,这很容易实现:
1 2 3
| [Service] User=用户名 Group=组名
|
还有一种更方便的方法,也就是使用systemd动态生成的临时用户,这种方法是为了取代过去的nobody公用用户:
1 2
| [Service] DynamicUser=yes
|
自动拉起
systemd包含着Supervisor的功能,可以进行服务的健康监测(准确地说,是活动状态监测)与自动拉起,要实现也相当简单,只需要添加这个选项:
1 2 3 4 5 6
| [Service] Restart=no|always|on-failure
# no即为默认选项,不启用自动拉起功能 # always表示总是自动拉起 # on-failure表示仅在服务异常退出时自动拉起
|
简单服务与forking服务
在传统的情况下,我们创建一个服务需要从主进程分叉出守护进程,将守护进程的PID保存在PID文件中,然后杀死主进程(参考:PEP 3143)。
但是如之前所说,对于systemd来说,我们不再需要这么麻烦,systemd本身就有能力进行进程的生命周期管理和自动拉起,因此我们只需要实现一个简单的主程序,然后将服务类型设为:
即可。
但是,systemd仍然是可以兼容传统的守护进程逻辑的,只需要给systemd指出forking服务类型和PID文件路径即可:
1 2 3
| [Service] Type=forking PIDFile=/run/xxx.pid
|
此外,如果开发者在服务实现中使用了systemd.daemon自行向systemd发送信号通知状态的话,那么就不需要让systemd对服务的状态进行推断,此时可以将服务类型设为:
我们在之后会再次提到。
停止与垃圾处理
systemd会对服务的生命周期进行管理,这其中当然包括对服务的停止。
我们知道,systemd使用CGroup对服务的所有进程进行追踪,默认情况下,在停止服务时,systemd会对服务CGroup内的所有进程发送SIGTERM信号,如果90秒后仍然存在未停止的进程,那么就再对它们发送SIGKILL信号,不过,这一行为可以进行修改:
1 2 3 4 5 6 7
| [Service] KillMode=control-group|process|mixed|none
# control-group即为默认值 # process表示仅对主进程进行杀死,而无视其他进程 # mixed表示对主进程发送SIGTERM信号,但是对其他进程一律发送SIGKILL信号 # none表示不进行任何杀死操作,这一选项必须搭配ExecStop=使用
|
这些选项给开发者实现守护进程的垃圾处理提供了多种方式,比如,开发者可以使用KillMode=process,然后让主进程捕获信号后,再自行处理垃圾和杀死子进程。或者,开发者可以使用KillMode=none,完全使用在ExecStop=中指定的方法进行服务停止。
除此之外,我们还可以直接让systemd帮忙进行垃圾处理:
1 2 3 4 5 6 7 8
| [Service] ExecStop=/命令 ExecStopPost=/命令
# ExecStop=指定的命令会在进行杀死操作之前先执行 # ExecStopPost=指定的命令会在进行杀死操作之后才执行 # 简单来说,它们两者的区别仅仅在于,ExecStopPost=无论在服务启动是否启动成功的情况下都会执行,而ExecStop=则反之 # 因此,如果用户需要特别的手动杀死部分进程,那么使用ExecStop=,如果要进行进程完全停止后的垃圾处理,使用ExecStopPost
|
服务启用
“启用”一个服务对systemd来说意味着执行服务的[Install]段,要实现服务的开机启动,我们是通过将服务设置为某些target的依赖(一般为多用户模式)实现的,因此只需要添加依赖关系即可:
1 2
| [Install] WantedBy=default.target
|
值得一提的是,如果在主机上不存在用户的任何会话,那么用户的systemd服务也会随之被销毁,要使得用户的服务拥有和系统级服务一样的生命周期,需要给用户启用Linger(参考systemd文档)。
资源限制
基于CGroup,systemd可以轻易地对服务进行资源限制,就像这样:
详情请见systemd文档。
沙箱
systemd也可以对服务进行沙箱化,限制服务对某些系统目录或网络的使用,从而有效提高安全性,详情请看systemd文档:
1 2
| [Service] ProtectSystem=no|yes|strict
|
编写服务实现
是时候使用Python实现守护进程的主要内容了,一个简单的前台服务只需要实现一个永久的循环即可:
1 2 3
| if __name__ == '__main__': while True: ...
|
既然要编写适配systemd的守护进程,那么自然离不开systemd模块:
1
| import systemd.daemon as sd
|
检测系统类型
使用这一函数检测系统是否使用systemd:
1
| systemd.daemon.booted() -> bool
|
如果是,那么返回True,否则返回False。
发送通知
我们之前提到过notify,它的功能就是给systemd发送一些服务状态信号,函数签名如下:
1
| systemd.daemon.notify('状态')
|
而状态有以下几种:
READY=1:告诉systemd服务已经启动完成。
RELOADING=1:告诉systemd服务正在重载。
STOPPING=1:告诉systemd服务正在停止。
STATUS=...:告诉systemd服务的状态文本。
ERRNO=X:服务出错了,以X状态码退出。
MAINPID=X:告诉systemd主进程的PID。
BUSERROR=...:告诉systemd D-Bus问题,等号右侧必须是D-Bus风格的错误。
WATCHDOG=1:告诉systemd服务看门狗该服务仍在运行。
WATCHDOG_USEC=...:重设看门狗的时间。
FDSTORE=1:保留文件描述符,这些文件描述符将在下次启动服务时通过sd_listen_fds的方式传递给进程。
FDSTOREMOVE=1:移除文件描述符。
举个例子,我们可以在启动前进行一些准备工作:
1 2 3 4 5 6 7 8 9
| import systemd.daemon
if __name__ == '__main__': print('Starting...') systemd.daemon.notify('READY=1')
# 然后开始实现主逻辑 while True: ...
|
或者,我们可以在退出前进行一些处理:
1 2 3 4 5 6
| def exit(): print('Exiting......') systemd.daemon.notify('STOPPING=1') sys.exit(0)
signal.signal(signal.SIGTERM, exit())
|
第三步:测试
可以使用systemd-run工具进行测试,这是一个临时封装systemd单元并运行的命令,它的常见用法为:
1
| systemd-run --unit=任务名 [--user] [--service-type=oneshot|forking] [-G] [-p "PIDFile=/run/xxx.pid"] [-p "SuccessExitStatus=0 1 2"] -d|--working-directory=工作目录 [-E "Key=value"] [-r] 执行的命令
|
选项含义为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| --unit= 明确指定单元的名称(而不是自动生成)
--user 与当前调用用户的用户服务管理器(systemd 用户实例)通信,而不是默认的系统级服务管理器(systemd 系统实例)
-G, --collect 即使临时单元执行失败(failed),也在结束后从内存中卸载它
--property=, -p 为临时单元设置一个属性
--same-dir, -d 使用用户当前的工作目录运行服务进程
-E NAME=VALUE, --setenv=NAME=VALUE 给服务进程传递一个环境变量。可以多次使用此选项以传递多个环境变量
-r, --remain-after-exit 在服务进程结束之后,继续保持服务的存在,直到被明确的停止(stop)后才卸载
|
更多优化
编写单元文件的建议
- 尽量不要使用依赖。如果一定要使用依赖,尽量使用
Wants=,这可以避免一个服务被用户停止时导致另一个服务被连带停止。
- 尽量不要修改
StandardOutput=和StandardError=,交给journald处理就好。
- 尽量带上
ProtectSystem=yes|strict,一般的服务都用不到会被这个配置项屏蔽的目录。
- 尽量带上
PrivateDevices=yes,一般的服务很少需要直接访问硬件。
- 尽量带上
PrivateNetwork=yes,如果是非网络服务的话。
套接字激活
systemd官方认为,对于任何需要使用套接字通信的服务来说,套接字都应当交给systemd创建、配置和管理,而不是程序内部。这样,我们就把套接字的创建提前到了服务启动之前,有利于服务的并行启动。
systemd接收到数据后,会对数据进行排队,同时启动服务进程,服务启动后,systemd会将套接字的文件描述符传递给进程,因此,使用套接字激活机制的服务,必须有能力从systemd接手套接字。要实现这一功能,我们有很多种办法:
- systemd传递的第一个文件描述符总是
3,如果服务只监听一个端口,我们可以直接使用文件描述符3获取套接字。
- 如果服务只监听一个端口,那么可以设置
StandardInput=socket和StandardOutput=socket,通过文件描述符0或文件描述符1获取套接字。
- 如果服务监听多个端口,我们可以使用
systemd.daemon.listen_fds()函数获取文件描述符列表,然后逐一获取。
- 我们可以使用
socket.socket(fileno=文件描述符)这个函数实现获取文件描述符对应的套接字对象。
- 我们可以使用
socket.fromfd(fd, family, type)这个函数克隆文件描述符,并创建一个新的套接字对象(不常用)。
自行创建套接字兼容模式
很多应用程序本身没有实现systemd的套接字激活协议,systemd组为此开发了一个专门用于实现套接字激活,而不必修改现存应用程序的代理工具systemd-socket-proxyd。
首先我们需要编写一个套接字单元proxy.socket:
1 2 3 4 5 6 7 8
| [Socket] # 实际监听的套接字 ListenStream=80 # Accept不可以是yes # Accept=no
[Install] WantedBy=sockets.target
|
然后我们编写一个用于代理的服务proxy.service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| [Unit] # 程序服务 Requires=XXX.service # 程序服务 After=XXX.service Requires=proxy.socket After=proxy.socket
[Service] # 注意类型 Type=notify ExecStart=/usr/lib/systemd/systemd-socket-proxyd /run/Socket|127.0.0.1:Port PrivateTmp=yes PrivateNetwork=yes
|
接着我们只需要为我们的程序再编写一个服务XXX.service:
1 2 3 4 5 6 7 8
| [Unit] # 如果希望启用自动退出 # StopWhenUnneeded=yes ...
[Service] # 你的程序应当使用UNIX域套接字,或是监听127.0.0.1上的Inet套接字 ExecStart=... --bind=/run/Socket|127.0.0.1:Port
|
Inetd兼容模式
参考:End Road。
为传统inetd设计的程序无需修改即可在 systemd 的套接字激活协议下运行,这一行为是通过Accept=yes选项配置的。
首先我们需要编写一个套接字单元服务名.socket:
1 2 3 4 5 6 7 8 9 10 11
| [Socket] # 实际监听的套接字 # 不可以是数据报套接字或FIFO ListenStream=80 # 核心:Accept必须是yes # systemd会为每个连接分别调用accept()派生服务实例 # 并通过标准输入输出传递已连接的套接字 Accept=yes
[Install] WantedBy=sockets.target
|
接着,需要修改我们的服务单元为模板服务单元:服务名@.service,并且必须进行如下设置:
1 2 3 4
| [Service] # 确保和传统inetd行为一致 StandardInput=socket StandardOutput=socket
|
接着,我们在程序中不需要关心任何套接字内容,只需要从标准输入读取字节流,将响应字节流写入标准输出:
1 2 3 4
| import sys # 你必须使用sys.stdout.write() # 加上CRLF结尾 sys.stdout.write(sys.stdin.readline().strip() + "\r\n")
|
1 2 3 4 5 6
| # 持续服务实例 import sys
while True: if sys.stdin.read(): sys.stdout.write(sys.stdin.readline().strip() + "\r\n")
|
一个示例程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import sys from wsgiref.handlers import SimpleHandler, read_environ import json
# 应用逻辑 def app(environ, start_response): # 获取请求目标 path = environ.get("PATH_INFO", "/") # 获取方法 method = environ.get("REQUEST_METHOD", "GET")
# 构建响应 if method == "GET": status = "200 OK" resp = {"Hello": "World"} else: status = "404 Not Found" resp = {}
resp = json.dumps(resp).encode()
headers = [ ("Content-Type", "application/json; charset=utf-8"), ("Content-Length", str(len(resp))) ]
start_response(status, headers)
return [resp]
if __name__ == "__main__": environ = read_environ()
request = sys.stdin.readline().strip() method, url, protocol = request.strip().split(" ") if "?" in url: path, query = url.split("?", 1) else: path, query = url, None
environ |= { "REQUEST_METHOD": method, "PATH_INFO": path, "QUERY_STRING": query or "", "SERVER_PROTOCOL": protocol }
while header := sys.stdin.readline().strip(): k, v = header.split(":", 1) environ["HTTP_" + k.replace("-", "_").upper()] = v.strip()
SimpleHandler( stdin=sys.stdin.buffer, stdout=sys.stdout.buffer, stderr=sys.stderr, environ=environ, multithread=False, multiprocess=True, ).run(app)
|
直接适配
首先我们需要创建一个套接字单元文件,它的文件名应为服务名.socket,内容如下:
1 2 3 4 5 6 7 8 9 10 11
| [Unit] Description=描述
[Socket] # 字节流套接字,用于TCP,你也可以在端口号之前指定IP ListenStream=2000 # 数据报套接字,用于UDP #ListenDatagram=
[Install] WantedBy=sockets.target
|
通常,我们会希望在启用按需启动的服务时仅仅启用对应的套接字,所以要对服务单元进行一些修改:
1 2 3 4 5 6 7
| # 以上内容省略
[Install] # 如果确定服务是按需启动的话,服务就没必要在开机启动 #WantedBy= # 仅仅需要启动套接字就行了 Also=xxx.socket
|
接着,我们需要在程序代码中实现套接字:
socket模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import systemd.daemon import socket
# 这是systemd.daemon中的定义的全局变量 #SYSTEMD_FIRST_SOCKET_FD = 3
# 或者,我们直接获取文件描述符列表 STSTEMD_LISTEN_FDS = systemd.daemon.listen_fds() # 这里只获取第一个套接字做例子,通常也不会在一个.socket中配置两个套接字 LISTEN_SOCKET = socket.socket(fileno=STSTEMD_LISTEN_FDS[0])
# 等待连接,并在连接后创建通信套接字 comm_sock, client_addr = LISTEN_SOCKET.accept()
while True: ... # 进行通信
# 可选地关闭套接字,然后退出 # 禁止使用shutdown() #comm_sock.close()
|
- 程序应当在
accept()后持续接收请求,在明确要求停止后才退出。
socketserver模块
我们在socketserver定义的服务器类中是不是也可以实现呢?答案是可以的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import systemd.daemon import socketserver import socket
# 让我们获取systemd传递的套接字 STSTEMD_LISTEN_FDS = systemd.daemon.listen_fds() LISTEN_SOCKET = socket.socket(fileno=STSTEMD_LISTEN_FDS[0])
... # 实现一个处理器
# 让我们创建一个实例 my_server = socketserver.TCPServer(('', 0), 请求处理器类, bind_and_activate=False) my_server.socket = LISTEN_SOCKET my_server.serve_forever()
|
为了提高复用性,我们可以创建一个类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import systemd.daemon import socketserver import socket
# 让我们获取systemd传递的套接字 STSTEMD_LISTEN_FDS = systemd.daemon.listen_fds() LISTEN_SOCKET = socket.socket(fileno=STSTEMD_LISTEN_FDS[0])
class MyServer(socketserver.TCPServer): # 我们要稍微重写一下构造方法,当然你也可以考虑一些其他的方法 def __init__(self, listen_socket, RequestHandlerClass): # 在这里我们把套接字设为空,把自动绑定给关掉,原因很显然:我们要直接使用systemd传递的套接字,没必要再创建和绑定了 super().__init__(('', 0), RequestHandlerClass, bind_and_activate=False) # 服务器类使用的socket是可以直接设置的,我们直接使用systemd负责监听的套接字就好 self.socket = LISTEN_SOCKET
... # 实现一个处理器
# 让我们创建一个实例 my_server = MyServer(LISTEN_SOCKET, 请求处理器类) my_server.serve_forever()
|
http.server模块
再往上一层,我们对于HTTP服务器也可以这么做:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import systemd.daemon import http.server import socket
# 让我们获取systemd传递的套接字 STSTEMD_LISTEN_FDS = systemd.daemon.listen_fds() LISTEN_SOCKET = socket.socket(fileno=STSTEMD_LISTEN_FDS[0])
# 实际上,HTTPServer类对TCPServer类的主要重写就在于server_bind()函数,对我们的代码没什么影响 class MyServer(http.server.HTTPServer): # HTTPServer类继承自TCPServer,所以差不多 def __init__(self, listen_socket, RequestHandlerClass): super().__init__(('', 0), RequestHandlerClass, bind_and_activate=False) self.socket = LISTEN_SOCKET
... # 实现一个处理器
# 让我们创建一个实例 my_server = MyServer(systemd_listen_socket, 请求处理器类) my_server.serve_forever()
|
然后,我们只需要启用套接字,那么之后有客户端对套接字发送请求时,对应的服务就会被systemd自动激活了。
wsgiref模块
最有用的场景就是实现一个WSGI服务器了,和之前提到过的Inetd风格的WSGI服务器一样,我们也可以轻松地进行转换,只需要进行少许的额外操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| import sys from wsgiref.handlers import SimpleHandler, read_environ import systemd.daemon import socket import json
# 应用逻辑 def app(environ, start_response): # 获取请求目标 path = environ.get("PATH_INFO", "/") # 获取方法 method = environ.get("REQUEST_METHOD", "GET")
# 构建响应 if method == "GET": status = "200 OK" resp = {"Hello": "World"} else: status = "404 Not Found" resp = {}
resp = json.dumps(resp).encode()
headers = [ ("Content-Type", "application/json; charset=utf-8"), ("Content-Length", str(len(resp))) ]
start_response(status, headers)
return [resp]
def handle_connection(commSock, clientAddr): try: environ = read_environ()
request = commSock.recv(8192).decode("latin-1").split("\r\n") method, url, protocol = request[0].split(" ") if "?" in url: path, query = url.split("?", 1) else: path, query = url, None
environ |= { "REQUEST_METHOD": method, "PATH_INFO": path, "QUERY_STRING": query or "", "SERVER_PROTOCOL": protocol, "REMOTE_ADDR": clientAddr[0] }
for l in request[1:]: if ":" in l: k, v = l.split(":", 1) environ["HTTP_" + k.replace("-", "_").upper()] = v.strip()
SimpleHandler( stdin=commSock.makefile('rb'), stdout=commSock.makefile('wb'), stderr=sys.stderr, environ=environ | { "SERVER_PROTOCOL": "HTTP/1.0" }, multithread=False, multiprocess=False, ).run(app)
finally: commSock.close()
if __name__ == "__main__": SYSTEMD_LISTEN_FDS = systemd.daemon.listen_fds() if not SYSTEMD_LISTEN_FDS: raise OSError("Must be activated by systemd.")
listen_sock = socket.socket(fileno=SYSTEMD_LISTEN_FDS[0])
# 单线程处理 handle_connection(*listen_sock.accept())
# 你也可以使用多线程 #from concurrent.futures import ThreadPoolExecutor #with ThreadPoolExecutor(max_workers=10) as executor: # while True: # executor.submit(handle_connection, *listen_sock.accept())
# 不会退出的多线程版本 #from concurrent.futures import ThreadPoolExecutor #executor = ThreadPoolExecutor(max_workers=10) #while True: # executor.submit(handle_connection, *listen_sock.accept())
|
D-Bus激活
要使用D-Bus激活机制,我们首先必须确保D-Bus激活请求会被转发给systemd,以确保D-Bus服务不会被重复激活。要实现这一点,我们需要在/usr/share/dbus-1/system-services/目录下创建一个D-Bus服务文件,内容如下:
1 2 3 4 5
| [D-Bus Service] Name=服务名称,使用URL Exec=非systemd启动时执行的命令,仅在未使用systemd时生效 User=启动的用户 SystemdService=dbus-服务名称.service
|
然后我们还需要在服务单元中指出BusName:
1 2 3 4 5 6 7 8 9 10 11 12
| [Unit] ...
[Service] Type=dbus BusName=服务名称 ExecStart=启动时执行的命令 ...
[Install] # 建议添加这一别名以方便管理 Alias=dbus-服务名称.service
|
在过去要禁用D-Bus服务,除了删除以外别无它法,不过现在在systemd的帮助下,通过给服务单元设置标准化的别名,然后将服务别名设为和D-Bus绑定的服务,这样,我们就可以使用systemctl命令对其进行启用与禁用了。
然后,我们就需要编写服务的主程序及其接口配置了,对于Python来说,我们使用pydbus模块实现与D-Bus的绑定与Bus的申请。
首先我们需要编写服务的Policy,保存在/etc/dbus-1/system.d/目录下:
1 2 3 4 5 6 7 8 9 10 11
| <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN" "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd"> <busconfig> <policy user="root"> <allow own="org.example.test"/> </policy> <policy context="default"> <allow send_destination="org.example.test"/> </policy> </busconfig>
|
首先,我们编写服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #!/bin/env python3 import getpass from gi.repository import GLib from pydbus import SystemBus
# 这是我们要注册的服务名称 BUS_NAME = 'org.example.test'
# 定义我们自己的接口 class Example: f""" <node> <interface name='{BUS_NAME}'> # 服务名称 <method name='log'> # 给接口定义一个方法 # 类型s自然就是字符串 <arg type='s' name='message' direction='in'/> # 类型i就是整数了 <arg type='i' name='response' direction='out'/> </method> </interface> </node> """
def log(self, message) -> int: print('Client sent {}'.format(message)) return 1
# 可以连接到系统总线SystemBus()或用户总线SessionBus(),在这里我们使用系统总线 bus = SystemBus() # 注册我们的服务,注意第二个参数带括号 bus.publish(BUS_NAME, Example()) # 进入主循环才能暴露服务 loop = GLib.MainLoop() loop.run()
|
接着我们编写D-Bus服务文件和systemd单元文件:
1 2 3 4 5
| [D-BUS Service] Name=org.example.test Exec=/bin/false User=root SystemdService=dbus-org.example.test.service
|
1 2 3 4 5 6 7 8 9 10 11
| [Unit] Description=No
[Service] Type=dbus BusName=org.example.test ExecStart=/程序路径
[Install] Alias=dbus-org.example.test.service WantedBy=multi-user.target
|
最后启动服务,我们测试连通性:
1
| busctl call org.example.test /org/example/test org.example.test log s 'Message'
|
应该能得到:
标准守护进程模块
python-daemon是符合 PEP 3143 标准的守护进程实现模块。
实现逻辑
根据《UNIX环境高级编程》的记录,详细来说,要实现一个守护进程,那么需要依次执行以下几步:
- 关闭所有已经打开的文件描述符(Close)(注:这并不代表之后守护进程不能再打开文件)。
- 切换当前的工作目录(Chdir)。
- 重设自身的权限掩码(Umask)。
- 进入后台(第一次Fork)。
- 脱离进程组(Setsid)。
- 无视终端IO发送的信号(Setsid)。
- 脱离控制终端,并且确保不会重新获取终端(Setsid和第二次Fork)。
- 确保正确地处理以下行为:
- 被System V init启动的情况。
- 接收到SIGTERM信号的退出。
- 子进程在退出时要向父进程发送SIGCHLD信号。
一些守护进程工具,例如 Slack-daemon,不同于 PEP 3143 仅仅实现一个单独的守护进程的目标,它们专注于实现完整的UNIX守护进程功能。参考它们的功能,对于一个守护进程来说,这些额外行为是可取的:
- 设置当前的进程上下文。
- 检测
initd与inetd,在被其启动时做出正确的反应。
- 设置SUID与SGID权限,以提高安全性。
- 阻止生成内核转储文件,以防止信息泄露。
- 创建以守护进程命名的PID文件,并保存在合适的位置,以确保守护进程的唯一性(可选)。
- 重设守护进程所属的用户和组(可选,仅用于root守护进程)。
- 重设CHROOT目录(可选,仅可用于root守护进程)。
- 捕获守护进程的标准输出和标准错误,重定向到Syslog缓冲区或者文件(可选)。
具体内容
python-daemon是围绕一个守护进程上下文类daemon.DaemonContext(),与它的参数实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 对象名 = daemon.DaemonContext( files_preserve=[保留的文件句柄列表], chroot_directory=CHROOT目录, working_directory=工作目录, umask=掩码, pidfile=PID文件锁对象, detach_process=True, uid=执行用户UID, gid=执行用户GID, prevent_core=True, signal_map={信号: 函数}, stdin=None, stdout=None, stderr=None, )
|
这些参数都会被转换为实例属性,可以在之后通过实例进行修改,只需要通过对象.属性 = 值即可。
而这个对象有以下这些方法:
打开守护进程上下文,它会将当前的程序进程Fork出一个子进程,然后杀死父进程,并在子进程中继续运行当前程序,换句话说,当这个方法返回时,运行中的程序就会变成守护进程,is_open被设为True(换句话说,所有进程分叉过程都交给守护进程上下文解决了,如果你想了解具体是怎么实现的,请看 python-daemon):
守护进程接收到指定信号后(默认是signal.SIGTERM,见之前提到的默认signal_map)进行退出的方法,它会简单地退出程序,然后抛出SystemExit异常以解释接收到的信号:
关闭守护进程上下文,**这意味着清除PID文件,以及将is_open设为False**:
返回守护进程上下文是否处于开启状态:
此外,既然是上下文,它就当然实现了with语句:
执行with语句时自动执行的方法,调用.open()方法并返回实例:
1
| 对象.__enter__() -> DaemonContext
|
退出with语句时自动执行的方法,调用.close()方法,如果关闭成功则返回True:
这些方法几乎都不需要用户关心,它们是在with语句中被隐式调用的。
使用
要使用daemon模块,你至少还需要lockfile和signal模块:
1 2 3 4 5
| import daemon
import lockfile
import signal
|
简单来说,你只需要把守护进程的主逻辑放到上下文中:
1 2 3 4 5 6 7
| with daemon.DaemonContext(pidfile=lockfile.LockFile('/run/xxx.pid')): before_loop() while True:
|
更规范的用法是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| def before_daemon(): ...
def before_loop(): ...
def main_program(): while True: ...
def cleanup(): ... sys.exit(0)
def reload(): ...
daemon_context = daemon.DaemonContext(pidfile=lockfile.FileLock('/run/xxx.pid'), signal_map = {signal.SIGTERM: cleanup, signal.SIGHUP: reload} )
before_daemon()
with daemon_context: before_loop() main_program()
|
效果
你应该能在ps xf的输出中看到一个脱离终端,孤立的守护进程,进程显示的命令就是执行脚本时执行的命令:
1 2
| PID TTY STAT TIME COMMAND 2023 ? S 0:00 python daemon.py
|
缺陷与注意事项
因为在守护进程上下文内的程序已经与终端脱离连接了,所以如果出错,那么不会在终端上显示任何错误,因此需要提前做好Log登记。如果发现执行脚本后什么也没有发生,也没有守护进程产生,那么就说明守护进程上下文内的代码有问题。
一种可行的测试方法是在实际使用前先不要把主程序放到守护进程上下文中,单次执行后检查无误再放进守护进程上下文中测试。
- 在守护进程上下文中读取文件时,千万不要忘了
working_directory=参数的默认值是/而不是os.getcwd()。
- 当已经有一个守护进程在运行时,再次运行一次守护进程,它会在守护进程上下文前阻塞并等待锁,而不会进入主程序,直到锁解除。
示例程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
import sh import daemon import signal import lockfile import os import sys import time
def before_daemon(): if not os.path.exists('daemon_dir'): os.mkdir('daemon_dir')
def main_program(in_handle): while True: in_file.write('I am running...\n') in_file.flush() time.sleep(1)
def cleanup(): sh.touch('daemon_dir/cleanup.done') sys.exit(0)
before_daemon()
if __name__ == '__main__': with daemon.DaemonContext(pidfile=lockfile.LockFile('/tmp/daemon.pid'), working_directory=os.getcwd(), signal_map={signal.SIGTERM: cleanup}): daemon_log = open('daemon_dir/mydaemon.log', 'w+') main_program(daemon_log)
|
简化模块
python-daemon有一个简化的service模块,非常易于使用。
服务类
service模块定义了一个Service类,用户只需要继承这个类即可:
1 2 3 4 5 6 7 8 9 10
| class MyService(service.Service): def __init__(self, *args, **kwargs): super(MyService, self).__init__(*args, **kwargs)
def run(self): while not self.got_sigterm(): ...
|
实例化时,需要提供以下参数:
1
| 对象名 = MyService('服务名', pid_dir='PID文件保存目录'[, signals=['信号列表']])
|
控制服务
实例化服务对象后,就可以轻易的控制服务:
1 2 3 4 5 6 7 8 9 10 11 12 13
| if __name__ == '__main__': my_service = MyService('我的服务', pid_dir='/run')
my_service.start() -> None my_service.stop([block=False]) -> None my_service.kill([block=False]) -> None my_service.is_running() -> bool my_service.get_pid() -> int
|
当然,还是建议使用位置参数实现子命令进行操作。
登记日志
service模块封装了syslog日志处理器查找函数find_syslog(),可以直接使用syslog登记日志:
1 2 3 4 5 6 7 8 9 10
| class MyService(service.Service): def __init__(self, *args, **kwargs): super(MyService, self).__init__(*args, **kwargs) self.logger.addHandler(logging.handlers.SysLogHandler(address=find_syslog()), facility=logging.handlers.SysLogHandler.LOG_DAEMON) ...
def run(self): while not self.got_sigterm(): self.logger.info('日志')
|
等待信号
当.start()方法被调用时,服务会使用一个独立进程运行run()函数;当.stop()方法被调用时,会对run()进程发送SIGTERM信号,此时,run()进程的self.got_sigterm()方法会返回True。此外,也可以使用.kill()方法直接发送SIGKILL信号。
另外,用户可以通过.send_signal()方法给守护进程发送信号,run()函数中,也可以使用self.wait_for_signal()方法等待接收其他信号,不过,这些信号必须在初始化时通过signals参数声明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class MyService(service.Service): def __init__(self, *args, **kwargs): super(MyService, self).__init__(*args, **kwargs)
def run(self): while not self.got_sigterm(): ... self.wait_for_sigterm()
while not self.got_signal(信号): ... self.wait_for_signal(信号)
my_service = MyService('服务名', pid_dir='/run', signals=[signal.SIGHUP])
my_service.send_signal(信号名)
my_service.clear()
|