Python envoy 模組原始碼剖析
Kenneth Reitz 是公認的這個世界上 Python 程式碼寫得最好的人之一。抱著學習的心態,我閱讀了 Reitz 寫的 envoy
模組的原始碼,將筆記記錄如下。
介紹
和 requests
模組一樣,envoy
也是
Reitz 的作品,連官方描述都類似——Python Subprocesses for Humans。
實際上,envoy
的核心程式碼非常少,總共只有不到 300 行程式碼,只是簡單的對標準庫 subprocess
的封裝。但是,所謂短小精幹,envoy
實現的介面簡單易用,比裸用 subprocess
方便不少。
背景知識
在講 envoy
的程式碼之前,我們先回顧一些背景知識。
程式和程序
在電腦科學及相關領域,經常能看到程式和程序的概念。有些人不清楚它們的差別,混為一談。這是不好的。
- 程式:一般是一組CPU指令的集合構成的檔案,靜態儲存在諸如硬碟之類的儲存裝置上。
- 程序:當一個程式要被計算機執行時,就是在記憶體中產生該程式的一個執行時例項,我們就把這個例項叫做程序。
簡單來說,程式就是編譯出來的二進位制可執行檔案,比如 Windows 裡的 .exe
檔案,nix 裡的 ELF 檔案。作業系統將它們裝載到記憶體空間並執行時的例項,就是程序。*程式和程序之間隔著一個「裝載」的步驟。
Linux 裡的程序
以下實驗均在 CentOS 5.4 環境下操作。
首先,我們在終端裡執行
1
|
ps -eo pid,ppid,comm,cmd | less
|
這裡 ps
命令用來查詢正在執行的程序,-e
表示我們想要檢視所有的程序,-o
則選擇我們想檢視的列名稱。這裡我們檢視 pid
, ppid
, comm
, cmd
。
在這個輸出結果中,每一行代表一個程序(表頭除外),共分為 4 列。
- PID: Process IDentity,程序在當前系統中的唯一識別碼,相當於我們的身份證號。
- PPID: Parent PID,父程序的 PID。
- COMMAND: 程序的簡稱。
- CMD: 程序對應的程式及其執行時所帶的引數。
從計算機啟動到程序的建立
計算機啟動時,首先會從主機板上的 BIOS
init
程序,並將控制權交給這個程序。在
Linux 裡,init
的 PID 是 1
。init
程序負責設定計算機名稱、時區,檢測檔案系統,掛載硬碟,清空臨時檔案,設定網路等操作。通常意義上說,當 init
完成這些工作,計算機就算啟動完成了。
我們小結一下,計算機啟動的流程是:
BIOS -> MBR -> Boot Loader -> Kernel -> 預留記憶體空間 -> drivers ->
init
-> settings
我們知道,運行於作業系統上的程序(包括 init
)與作業系統互動,都是通過系統呼叫來完成的。然而 Linux 並沒有提供建立新程序的系統呼叫。實際上,Linux
裡建立新的程序這一動作,是通過 fork
和 exec
兩個函式來實現的。
我們先來看看 fork
函式的用法。
1 2 3 4 5 6 |
pid_t pid; if (pid = fork()) { // ... } else { // ... } |
呼叫 fork
函式後,新的程序(任務)和當前程序一起從程式碼的同一位置開始執行:從 fork
函式獲得返回值。在這裡,新的程序稱為子程序
(Child Process),當前程序相對應稱之為父程序 (Parent Process)。不過,在子程序中,fork
函式返回 0
;在父程序中,fork
函式則返回子程序的
PID。因此,在子程序中,表示式 pid = fork()
為 false
,跳轉到後續的 else
語句塊繼續執行;在父程序中,表示式 pid
= fork()
為 true
,繼續執行語句塊。
fork
函式的產生子程序的速度非常快。這是因為,通過 fork
產生的子程序,只是簡單地分配了記憶體空間,並與父程序共享寫時複製
(Copy on Write, COW)記憶體空間。這意味著,通過 fork
產生子程序的過程中,並沒有記憶體中內容的複製,因此速度非常快。
fork
產生的子程序,只是父程序的映象。通過 fork
的返回值,我們可以在程式碼裡判斷是否是子程序。如果是子程序,就可以呼叫 exec
函式,使用新的程式(可執行映像檔案)覆蓋當前的映像,從而執行新的任務。
不難發現,Linux 中所有的程序,不斷追溯其父程序,都會最終追溯到 init
程序。
程序的終止
當一個程序執行 exit
函式之後,核心會釋放它所開啟的檔案、佔用的記憶體等資源,然後在作業系統核心中保留一些退出資訊
- PID
- Exit Code
- CPU time taken by the process
簡而言之,程序退出後,會釋放資源,然後在核心裡留下一些診斷資訊,成為殭屍程序 (Zombie Process)。程序退出後,將 PID 留在了作業系統核心中尚未釋放。因此,該 PID 是不可以被後續的新程序使用的。因此,在 Linux 的設計中,父程序需要呼叫 wait
或者 waitpid
函式從核心中獲取並處理子程序的診斷資訊,並釋放
PID(清掃殭屍程序)。
如果子程序退出時,父程序尚在,但父程序始終不處理子程序退出後留下的殭屍程序,而不斷因為業務邏輯產生新的子程序,那麼殭屍程序就會不斷積累,最終佔滿所有可用的 PID(沒有程序槽了)。這樣一來,在作業系統中就無法產生新的子程序了。(參見 fork
炸彈)因此,通過 fork
函式建立子程序之後,一定要注意 wait
子程序。
如果父程序退出時,子程序尚在。這時候,沒爹沒孃的孤兒程序(Orphand Process)就會被 init
程序收養,直到它退出後被 init
處理。
envoy
原始碼剖析
envoy
的核心程式碼儲存在 ./envoy/core.py
當中。我們先就這份程式碼的語法點做分析,然後討論它的結構。
庫
1 2 3 4 5 6 7 |
import os import sys import shlex import signal import subprocess import threading import traceback |
最頭上的兩個 os
和 sys
是常用的標準庫,不必多說。
shlex
的名字可以分為兩部分:sh
代表
shell;lex
是一個著名的詞法分析器的生成器(lexical analyzer)。運用這個標準庫,我們可以很容易地解析出使用者需要在子程序中執行的命令。
signal
是 Python 裡處理 Linux 核心訊號的標準庫。我們這裡主要用它內部定義的訊號的值,不涉及它的具體用法。
subprocess
是 Python 中實現子程序的標準庫,是 envoy
封裝的實際內容。
threading
是 Python 中實現多執行緒的一個標準庫。在 envoy
裡,我們實際用它來執行 subprocess.Popen()
建立子程序並執行任務。
traceback
是 Python 中用來追溯異常的標準庫。
Command
類
我們來看 Command
類。這是一個模組內部使用的類,Command
類的每個例項都能執行 run()
方法,在一個子程序裡執行
Shell 命令。
初始化函式 __init__()
直截了當,只是簡單地對各個資料成員賦值。
整個類的主要部分是 run()
函式,我們仔細深入進去觀察一下。
第一個值得注意的地方,是對環境變數的處理。
1 2 |
environ = dict(os.environ) environ.update(env or {}) |
首先,作者將 os.environ
轉換成一個 Python 內建字典,儲存在 environ
中。而後,用字典的 update()
方法,將使用者傳入的環境變數補充到 environ
中。這裡,update()
方法有兩個特點
- 輸入必須是一個非空的字典,因此作者利用短路求值
env or {}
的方式確保「非空」; - 輸入的
env
如果有與os.environ
同名的環境變數,則會以env
中的值為準,否則直接在environ
中新增鍵值對。
利用這兩個特點,作者巧妙地實現了程式邏輯。
第二個值得注意的地方,是在 run()
函式的內部,巢狀定義了 target()
函式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
def target(): try: self.process = subprocess.Popen(self.cmd, universal_newlines=True, shell=False, env=environ, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, cwd=cwd, ) if sys.version_info[0] >= 3: self.out, self.err = self.process.communicate( input = bytes(self.data, "UTF-8") if self.data else None ) else: self.out, self.err = self.process.communicate(self.data) except Exception as exc: self.exc = exc |
在 Python 中,函式定義是允許巢狀的,不過
- 各個函式有自己的作用域;
- 內層函式優先訪問內層作用域的變數,如果內層沒有所需變數,則逐層向外尋找所需變數;
- 外層函式不能訪問內層函式的變數(對外層函式來說,這是區域性變數);除非內層函式宣告變數時加上了
global
關鍵字修飾,並且在訪問它時已經呼叫過內層函式。
這裡的 target()
函式定義了我們接到一個執行 Shell 命令的需求時,我們要做哪些事情。依其定義,我們要首先使用 subprocess.Popen()
建立一個子程序,並在相應的條件下執行 self.cmd
。然後呼叫 self.process.communicate()
方法,將 self.data
通過管道傳給正在
Shell 中執行的程式,並獲取程式的標準輸出和標準錯誤。在整個過程中,但凡出現任何問題,都儲存在 self.exc
當中。這裡作者使用了所有異常的基類 Exception
,這是因為對於作者來說 self.cmd
是不可控的,在執行 self.cmd
的過程中可能出現任何形式的異常。為了能夠處理所有異常,作者必須使用 Exception
來處理。
第三個值得注意的地方,是作者在工作執行緒中去實際執行 target()
完成的任務。
1 2 3 |
thread = threading.Thread(target=target) thread.start() thread.join(timeout) |
首先,作者建立了一個執行緒,將 target()
函式作為引數傳入構造。也就是說,thread.start()
實際會執行 target()
函式的程式碼。而後,作者用 thread.join(timeout)
的方式,來處理上層傳下來的超時限制。這樣,主程序將會阻塞住,直到
- 執行緒中的任務完成(也就是
target()
中建立的子程序的任務完成);或者 - 達到超時時間限制。
第四個值得注意的地方,是作者回收和處理線上程中執行的子程序任務的執行狀態資訊。
1 2 3 4 5 6 7 8 9 10 |
if self.exc: raise self.exc if _is_alive(thread) : _terminate_process(self.process) thread.join(kill_timeout) if _is_alive(thread): _kill_process(self.process) thread.join() self.returncode = self.process.returncode return self.out, self.err |
首先,子程序可能丟擲異常,因此需要捕獲和繼續向上丟擲異常。
其次,執行緒 thread
可能因為超時而執行到當前程式碼,因此通過預定義的 _is_alive()
函式來判斷執行緒是正常退出還是扔在超時執行。如果確實超時,那麼首先應該終止子程序,然後嘗試等待執行緒超時終止。如果執行緒仍然還活著,說明執行緒內的子程序沒有被正確終止,那麼首先殺死子程序,然後阻塞執行緒直到它完成。這樣的設計,是確保子程序和執行緒都完全停止,防止殭屍程序的出現。
最後,函式返回標準輸出和標準錯誤的內容。
Response
類
我們來看 Response
類。這是一個模組內部使用的類,Response
類的每個例項都是 Command
類的例項呼叫 run()
方法後的執行結果資訊。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class Response(object): """A command's response""" def __init__(self, process=None): super(Response, self).__init__() self._process = process self.command = None self.std_err = None self.std_out = None self.status_code = None self.history = [] def __repr__(self): if len(self.command): return '<Response [{0}]>'.format(self.command[0]) else: return '<Response>' |
從只有一個 __repr__()
方法可以看出,Response
類幾乎只是一個簡單的資料結構,提供了可供列印的功能,僅此而已。那麼作者為什麼要設計這樣一個類呢?這裡我們留給讀者思考。
expand_args
函式
expand_args(command)
函式接收一個字串作為引數,並將之解析為一個個的命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
def expand_args(command): """Parses command strings and returns a Popen-ready list.""" # Prepare arguments. if isinstance(command, (str, unicode)): splitter = shlex.shlex(command.encode('utf-8')) splitter.whitespace = '|' splitter.whitespace_split = True command = [] while True: token = splitter.get_token() if token: command.append(token) else: break command = list(map(shlex.split, command)) return command |
我們以 'cat inFile | sort | uniq'
為引數,傳入 expand_args
函式,分析一下會發生什麼。
首先,作者用 shlex.shlex()
構造了一個詞法分析器,並設定以管道符號 |
為標誌,分割傳入的字串(或者 unicode
型別的例項,後不再重複)。加上之後的 while
迴圈,這基本相當於執行了 command
= command.split('|')
的效果。
而後,執行 command = list(map(shlex.split, command))
,呼叫 shlex.split
函式,作用在 command
的每一個元素上,並返回一個列表,儲存在 command
當中。最後以 return
將 command
返回給呼叫函式。
這裡的 map()
函式接收兩個引數
- 一個函式
- 一個可迭代的列表
然後將函式作用在列表的每一個元素上,並返回一個列表。類似的函式還有 reduce()
函式(參考 Google 的 MapReduce 架構)。這裡給出兩個示例,供體會它們的作用
1 2 3 4 |
#!/usr/bin/env python inIter = ['adam', 'LISA', 'barT'] regNames = lambda iter: map ((lambda inStr: inStr.capitalize()), iter) print regNames (inIter) |
1 2 3 4 |
#!/usr/bin/env python inIter = [1, 2, 3, 4, 5] prod = lambda iter: reduce ((lambda x, y: x * y), iter) print prod (inIter) |
最後,輸入 'cat inFile | sort | uniq'
有輸出 [['cat',
'inFile'], ['sort'], ['uniq']]
。
run
函式
run(command, data=None, timeout=None, kill_timeout=None, env=None, cwd=None)
函式是 envoy
模組的主要介面,用來在子程序裡執行
Shell 命令。
首先解釋一下 run()
函式的各個引數的含義
command
需要執行的 Shell 命令(可以包含管道,但是不允許包含&&
或者;
之類的符號);data
通過管道傳入 Shell 命令的內容;timeout
子程序執行超時時間;kill_timeout
終止子程序失敗的超時時間,超過這個時間將直接殺死子程序;env