1. 程式人生 > >Python envoy 模組原始碼剖析

Python envoy 模組原始碼剖析

Kenneth Reitz 是公認的這個世界上 Python 程式碼寫得最好的人之一。抱著學習的心態,我閱讀了 Reitz 寫的 envoy 模組的原始碼,將筆記記錄如下。

介紹

和 requests 模組一樣,envoy 也是 Reitz 的作品,連官方描述都類似——Python Subprocesses for Humans。

實際上,envoy 的核心程式碼非常少,總共只有不到 300 行程式碼,只是簡單的對標準庫 subprocess 的封裝。但是,所謂短小精幹,envoy 實現的介面簡單易用,比裸用 subprocess 方便不少。

背景知識

在講 envoy 的程式碼之前,我們先回顧一些背景知識。

程式和程序

在電腦科學及相關領域,經常能看到程式和程序的概念。有些人不清楚它們的差別,混為一談。這是不好的。

  • 程式:一般是一組CPU指令的集合構成的檔案,靜態儲存在諸如硬碟之類的儲存裝置上。
  • 程序:當一個程式要被計算機執行時,就是在記憶體中產生該程式的一個執行時例項,我們就把這個例項叫做程序。

簡單來說,程式就是編譯出來的二進位制可執行檔案,比如 Windows 裡的 .exe 檔案,nix 裡的 ELF 檔案。作業系統將它們裝載到記憶體空間並執行時的例項,就是程序。*程式和程序之間隔著一個「裝載」的步驟

Linux 裡的程序

以下實驗均在 CentOS 5.4 環境下操作。

首先,我們在終端裡執行

1
ps -eo pid,ppid,comm,cmd | less

這裡 ps 命令用來查詢正在執行的程序,-e 表示我們想要檢視所有的程序,-o 則選擇我們想檢視的列名稱。這裡我們檢視 pidppidcommcmd

在這個輸出結果中,每一行代表一個程序(表頭除外),共分為 4 列。

  • PID: Process IDentity,程序在當前系統中的唯一識別碼,相當於我們的身份證號。
  • PPID: Parent PID,父程序的 PID。
  • COMMAND: 程序的簡稱。
  • CMD: 程序對應的程式及其執行時所帶的引數。

從計算機啟動到程序的建立

計算機啟動時,首先會從主機板上的 BIOS

 (Basic Input/Output System) 中執行程式,從某個裝置(比如軟盤、硬碟、光碟、網路等)上啟動計算機。而後,計算機會定位到所選的裝置上,讀取開頭的 512 位元組裡的 MBR (Master Boot Record)。MBR 裡記錄著從儲存裝置啟動 Boot Loader 的具體分割槽和位置。Boot Loder 裡記錄著作業系統名稱、核心所在位置等資訊,啟動 Boot Loader 之後,它會幫我們載入 Kernel。核心負責兩件事:對下負責管理硬體,對上負責提供系統呼叫。於是,核心首先會預留自己執行所需的記憶體空間,然後呼叫驅動程式 (drivers)檢測計算機硬體,最後啟動 init 程序,並將控制權交給這個程序。在 Linux 裡,init 的 PID 是 1init 程序負責設定計算機名稱、時區,檢測檔案系統,掛載硬碟,清空臨時檔案,設定網路等操作。通常意義上說,當 init 完成這些工作,計算機就算啟動完成了。

我們小結一下,計算機啟動的流程是:

BIOS -> MBR -> Boot Loader -> Kernel -> 預留記憶體空間 -> drivers -> init -> settings

我們知道,運行於作業系統上的程序(包括 init)與作業系統互動,都是通過系統呼叫來完成的。然而 Linux 並沒有提供建立新程序的系統呼叫。實際上,Linux 裡建立新的程序這一動作,是通過 fork 和 exec 兩個函式來實現的。

我們先來看看 fork 函式的用法。

1
2
3
4
5
6
pid_t pid;
if (pid = fork()) {
    // ...
} else {
    // ...
}

呼叫 fork 函式後,新的程序(任務)和當前程序一起從程式碼的同一位置開始執行:從 fork 函式獲得返回值。在這裡,新的程序稱為子程序 (Child Process),當前程序相對應稱之為父程序 (Parent Process)。不過,在子程序中,fork 函式返回 0;在父程序中,fork 函式則返回子程序的 PID。因此,在子程序中,表示式 pid = fork() 為 false,跳轉到後續的 else 語句塊繼續執行;在父程序中,表示式 pid = fork() 為 true,繼續執行語句塊。

fork 函式的產生子程序的速度非常快。這是因為,通過 fork 產生的子程序,只是簡單地分配了記憶體空間,並與父程序共享寫時複製 (Copy on Write, COW)記憶體空間。這意味著,通過 fork 產生子程序的過程中,並沒有記憶體中內容的複製,因此速度非常快。

fork 產生的子程序,只是父程序的映象。通過 fork 的返回值,我們可以在程式碼裡判斷是否是子程序。如果是子程序,就可以呼叫 exec 函式,使用新的程式(可執行映像檔案)覆蓋當前的映像,從而執行新的任務。

不難發現,Linux 中所有的程序,不斷追溯其父程序,都會最終追溯到 init 程序。

程序的終止

當一個程序執行 exit 函式之後,核心會釋放它所開啟的檔案、佔用的記憶體等資源,然後在作業系統核心中保留一些退出資訊

  • PID
  • Exit Code
  • CPU time taken by the process

簡而言之,程序退出後,會釋放資源,然後在核心裡留下一些診斷資訊,成為殭屍程序 (Zombie Process)。程序退出後,將 PID 留在了作業系統核心中尚未釋放。因此,該 PID 是不可以被後續的新程序使用的。因此,在 Linux 的設計中,父程序需要呼叫 wait 或者 waitpid 函式從核心中獲取並處理子程序的診斷資訊,並釋放 PID(清掃殭屍程序)。

如果子程序退出時,父程序尚在,但父程序始終不處理子程序退出後留下的殭屍程序,而不斷因為業務邏輯產生新的子程序,那麼殭屍程序就會不斷積累,最終佔滿所有可用的 PID(沒有程序槽了)。這樣一來,在作業系統中就無法產生新的子程序了。(參見 fork 炸彈)因此,通過 fork 函式建立子程序之後,一定要注意 wait 子程序。

如果父程序退出時,子程序尚在。這時候,沒爹沒孃的孤兒程序(Orphand Process)就會被 init 程序收養,直到它退出後被 init 處理。

envoy 原始碼剖析

envoy 的核心程式碼儲存在 ./envoy/core.py 當中。我們先就這份程式碼的語法點做分析,然後討論它的結構。

1
2
3
4
5
6
7
import os
import sys
import shlex
import signal
import subprocess
import threading
import traceback

最頭上的兩個 os 和 sys 是常用的標準庫,不必多說。

shlex 的名字可以分為兩部分:sh 代表 shell;lex 是一個著名的詞法分析器的生成器(lexical analyzer)。運用這個標準庫,我們可以很容易地解析出使用者需要在子程序中執行的命令。

signal 是 Python 裡處理 Linux 核心訊號的標準庫。我們這裡主要用它內部定義的訊號的值,不涉及它的具體用法。

subprocess 是 Python 中實現子程序的標準庫,是 envoy 封裝的實際內容。

threading 是 Python 中實現多執行緒的一個標準庫。在 envoy 裡,我們實際用它來執行 subprocess.Popen() 建立子程序並執行任務。

traceback 是 Python 中用來追溯異常的標準庫。

Command 類

我們來看 Command 類。這是一個模組內部使用的類,Command 類的每個例項都能執行 run() 方法,在一個子程序裡執行 Shell 命令。

初始化函式 __init__() 直截了當,只是簡單地對各個資料成員賦值。

整個類的主要部分是 run() 函式,我們仔細深入進去觀察一下。

第一個值得注意的地方,是對環境變數的處理。

1
2
environ = dict(os.environ)
environ.update(env or {})

首先,作者將 os.environ 轉換成一個 Python 內建字典,儲存在 environ 中。而後,用字典的 update() 方法,將使用者傳入的環境變數補充到 environ 中。這裡,update() 方法有兩個特點

  • 輸入必須是一個非空的字典,因此作者利用短路求值 env or {} 的方式確保「非空」
  • 輸入的 env 如果有與 os.environ 同名的環境變數,則會以 env 中的值為準,否則直接在 environ 中新增鍵值對。

利用這兩個特點,作者巧妙地實現了程式邏輯。

第二個值得注意的地方,是在 run() 函式的內部,巢狀定義了 target() 函式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def target():

    try:
        self.process = subprocess.Popen(self.cmd,
            universal_newlines=True,
            shell=False,
            env=environ,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            bufsize=0,
            cwd=cwd,
        )

        if sys.version_info[0] >= 3:
            self.out, self.err = self.process.communicate(
                input = bytes(self.data, "UTF-8") if self.data else None
            )
        else:
            self.out, self.err = self.process.communicate(self.data)
    except Exception as exc:
        self.exc = exc

在 Python 中,函式定義是允許巢狀的,不過

  • 各個函式有自己的作用域;
  • 內層函式優先訪問內層作用域的變數,如果內層沒有所需變數,則逐層向外尋找所需變數;
  • 外層函式不能訪問內層函式的變數(對外層函式來說,這是區域性變數);除非內層函式宣告變數時加上了 global 關鍵字修飾,並且在訪問它時已經呼叫過內層函式。

這裡的 target() 函式定義了我們接到一個執行 Shell 命令的需求時,我們要做哪些事情。依其定義,我們要首先使用 subprocess.Popen() 建立一個子程序,並在相應的條件下執行 self.cmd。然後呼叫 self.process.communicate() 方法,將 self.data 通過管道傳給正在 Shell 中執行的程式,並獲取程式的標準輸出和標準錯誤。在整個過程中,但凡出現任何問題,都儲存在 self.exc 當中。這裡作者使用了所有異常的基類 Exception,這是因為對於作者來說 self.cmd 是不可控的,在執行 self.cmd 的過程中可能出現任何形式的異常。為了能夠處理所有異常,作者必須使用 Exception 來處理。

第三個值得注意的地方,是作者在工作執行緒中去實際執行 target() 完成的任務。

1
2
3
thread = threading.Thread(target=target)
thread.start()
thread.join(timeout)

首先,作者建立了一個執行緒,將 target() 函式作為引數傳入構造。也就是說,thread.start() 實際會執行 target() 函式的程式碼。而後,作者用 thread.join(timeout) 的方式,來處理上層傳下來的超時限制。這樣,主程序將會阻塞住,直到

  • 執行緒中的任務完成(也就是 target() 中建立的子程序的任務完成);或者
  • 達到超時時間限制。

第四個值得注意的地方,是作者回收和處理線上程中執行的子程序任務的執行狀態資訊。

1
2
3
4
5
6
7
8
9
10
if self.exc:
    raise self.exc
if _is_alive(thread) :
    _terminate_process(self.process)
    thread.join(kill_timeout)
    if _is_alive(thread):
        _kill_process(self.process)
        thread.join()
self.returncode = self.process.returncode
return self.out, self.err

首先,子程序可能丟擲異常,因此需要捕獲和繼續向上丟擲異常。

其次,執行緒 thread 可能因為超時而執行到當前程式碼,因此通過預定義的 _is_alive() 函式來判斷執行緒是正常退出還是扔在超時執行。如果確實超時,那麼首先應該終止子程序,然後嘗試等待執行緒超時終止。如果執行緒仍然還活著,說明執行緒內的子程序沒有被正確終止,那麼首先殺死子程序,然後阻塞執行緒直到它完成。這樣的設計,是確保子程序和執行緒都完全停止,防止殭屍程序的出現

最後,函式返回標準輸出和標準錯誤的內容。

Response 類

我們來看 Response 類。這是一個模組內部使用的類,Response 類的每個例項都是 Command 類的例項呼叫 run() 方法後的執行結果資訊。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Response(object):
    """A command's response"""

    def __init__(self, process=None):
        super(Response, self).__init__()

        self._process = process
        self.command = None
        self.std_err = None
        self.std_out = None
        self.status_code = None
        self.history = []


    def __repr__(self):
        if len(self.command):
            return '<Response [{0}]>'.format(self.command[0])
        else:
            return '<Response>'

從只有一個 __repr__() 方法可以看出,Response 類幾乎只是一個簡單的資料結構,提供了可供列印的功能,僅此而已。那麼作者為什麼要設計這樣一個類呢?這裡我們留給讀者思考。

expand_args 函式

expand_args(command) 函式接收一個字串作為引數,並將之解析為一個個的命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def expand_args(command):
    """Parses command strings and returns a Popen-ready list."""

    # Prepare arguments.
    if isinstance(command, (str, unicode)):
        splitter = shlex.shlex(command.encode('utf-8'))
        splitter.whitespace = '|'
        splitter.whitespace_split = True
        command = []

        while True:
            token = splitter.get_token()
            if token:
                command.append(token)
            else:
                break

        command = list(map(shlex.split, command))

    return command

我們以 'cat inFile | sort | uniq' 為引數,傳入 expand_args 函式,分析一下會發生什麼。

首先,作者用 shlex.shlex() 構造了一個詞法分析器,並設定以管道符號 | 為標誌,分割傳入的字串(或者 unicode 型別的例項,後不再重複)。加上之後的 while 迴圈,這基本相當於執行了 command = command.split('|') 的效果。

而後,執行 command = list(map(shlex.split, command)),呼叫 shlex.split 函式,作用在 command 的每一個元素上,並返回一個列表,儲存在 command 當中。最後以 return 將 command 返回給呼叫函式。

這裡的 map() 函式接收兩個引數

  • 一個函式
  • 一個可迭代的列表

然後將函式作用在列表的每一個元素上,並返回一個列表。類似的函式還有 reduce() 函式(參考 Google 的 MapReduce 架構)。這裡給出兩個示例,供體會它們的作用

map.py
1
2
3
4
#!/usr/bin/env python
inIter = ['adam', 'LISA', 'barT']
regNames = lambda iter: map ((lambda inStr: inStr.capitalize()), iter)
print regNames (inIter)
reduce.py
1
2
3
4
#!/usr/bin/env python
inIter = [1, 2, 3, 4, 5]
prod = lambda iter: reduce ((lambda x, y: x * y), iter)
print prod (inIter)

最後,輸入 'cat inFile | sort | uniq' 有輸出 [['cat', 'inFile'], ['sort'], ['uniq']]

run 函式

run(command, data=None, timeout=None, kill_timeout=None, env=None, cwd=None) 函式是 envoy 模組的主要介面,用來在子程序裡執行 Shell 命令。

首先解釋一下 run() 函式的各個引數的含義

  • command 需要執行的 Shell 命令(可以包含管道,但是不允許包含 && 或者 ; 之類的符號);
  • data 通過管道傳入 Shell 命令的內容;
  • timeout 子程序執行超時時間;
  • kill_timeout 終止子程序失敗的超時時間,超過這個時間將直接殺死子程序;
  • env