1. 程式人生 > 實用技巧 >《深度剖析CPython直譯器》30. 原始碼解密 map、filter、zip 底層實現,對比列表解析式

《深度剖析CPython直譯器》30. 原始碼解密 map、filter、zip 底層實現,對比列表解析式

楔子

Python 現在如此流行,擁有眾多開源、高質量的第三方庫是一個重要原因,不過 Python 的簡單、靈巧、容易上手也是功不可沒的,而其背後的內建函式(類)則起到了很大的作用。舉個栗子:

numbers = [1, 2, 3, 4, 5]
# 將裡面每一個元素都加1
print(list(map(lambda x: x + 1, numbers)))  # [2, 3, 4, 5, 6]

strings = ["abc", "d", "def", "kf", "ghtc"]
# 篩選出長度大於等於3的
print(list(filter(lambda x: len(x) >= 3, strings)))  # ['abc', 'def', 'ghtc']

keys = ["name", "age", "gender"]
values = ["夏色祭", 17, "female"]
# 將keys 和 values 裡面的元素按照順序組成字典
print(dict(zip(keys, values)))  # {'name': '夏色祭', 'age': 17, 'gender': 'female'}

我們看到一行程式碼就搞定了,那麼問題來了,這些內建函式(類)在底層是怎麼實現的呢?下面我們就來通過原始碼來分析一下,這裡我們介紹 map、filter、zip。

首先這些類(map、filter、zip都是類)都位於 builtin 名字空間中,而我們之前在介紹原始碼的時候提到過一個檔案:Python/bltinmodule.c,我們說該檔案是和內建函式(類)相關的,那麼顯然 map、filter、zip 也藏身於此。

Python已經進化到 3.9.0 了,所以這裡就使用 Python3.9.0 的原始碼進行分析吧。

map底層實現

我們知道map是將一個序列中的每個元素都作用於同一個函式(當然類、方法也可以):

當然,我們知道呼叫map的時候並沒有馬上執行上面的操作,而是返回一個map物件。既然是物件,那麼底層必有相關的定義。

typedef struct {
    PyObject_HEAD	  
    PyObject *iters;  
    PyObject *func;   
} mapobject;

PyObject_HEAD:見過很多次了,它是Python中任何物件都會有的頭部資訊。包含一個引用計數ob_refcnt、和一個型別物件的指標ob_type;

iters:一個指標,這裡實際上是一個PyTupleObject *,以 map(lambda x: x + 1, [1, 2, 3])

為例,那麼這裡的 iters 就相當於是 ([1, 2, 3, 4, 5].__iter__(),)。至於為什麼,分析原始碼的時候就知道了;

func:顯然就是函式指標了,PyFunctionObject *;

通過底層結構體定義,我們也可以得知在呼叫map時並沒有真正的執行;對於函式和可迭代物件,只是維護了兩個指標去指向它。

而一個PyObject佔用16位元組,再加上兩個8位元組的指標總共32位元組。因此在64位機器上,任何一個map物件所佔大小都是32位元組。

numbers = list(range(100000))
strings = ["abc", "def"]

# 都佔32位元組
print(map(lambda x: x * 3, numbers).__sizeof__())  # 32
print(map(lambda x: x * 3, strings).__sizeof__())  # 32

再來看看map的用法,Python中的 map 不僅可以作用於一個序列,還可以作用於任意多個序列。

m1 = map(lambda x: x[0] + x[1] + x[2], [(1, 2, 3), (2, 3, 4), (3, 4, 5)])
print(list(m1))  # [6, 9, 12]

# map 還可以接收任意個可迭代物件
m2 = map(lambda x, y, z: x + y + z, [1, 2, 3], [2, 3, 4], [3, 4, 5])
print(list(m2))  # [6, 9, 12]
# 所以底層結構體中的iters在這裡就相當於 ([1, 2, 3].__iter__(), [2, 3, 4].__iter__(), [3, 4, 5].__iter__())

# 我們說map的第一個引數是一個函式, 後面可以接收任意多個可迭代物件
# 但是注意: 可迭代物件的數量 和 函式的引數個數 一定要匹配
m3 = map(lambda x, y, z: str(x) + y + z, [1, 2, 3], ["a", "b", "c"], "abc")
print(list(m3))  # ['1aa', '2bb', '3cc']

# 但是可迭代物件之間的元素個數不要求相等, 會以最短的為準
m4 = map(lambda x, y, z: x + y + z, [1, 2, 3], [2, 3], [3, 4, 5])
print(list(m4))  # [6, 9]

# 當然也支援更加複雜的形式
m5 = map(lambda x, y: x[0] + x[1] + y, [(1, 2), (2, 3)], [3, 4])
print(list(m5))  # [6, 9]

所以我們看到 map 會將後面所有可迭代物件中的每一個元素按照順序依次取出,然後傳遞到函式中,因此函式的引數個數 和 可迭代物件的個數 一定要相等。

那麼map物件在底層是如何建立的呢?很簡單,因為map是一個類,那麼呼叫的時候一定會執行裡面的 __new__ 方法。

static PyObject *
map_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
    PyObject *it, *iters, *func;
    mapobject *lz;
    Py_ssize_t numargs, i;
	
    // map物件在底層對應的是 mapobject、map類本身在底層對應的則是 PyMap_Type
    // _PyArg_NoKeywords表示檢驗是否沒有傳遞關鍵字引數, 如果沒傳遞, 那麼結果為真; 傳遞了, 結果為假;
    if (type == &PyMap_Type && !_PyArg_NoKeywords("map", kwds))
        // 可以看到 map 不接受關鍵字引數
        // 如果傳遞了, 那麼會報如下錯誤: TypeError: map() takes no keyword arguments, 可以自己嘗試一下
        return NULL;
	
    // 位置引數都在 args 裡面, 上面的 kwds 是關鍵字引數
    // 這裡獲取位置引數的個數, 1個函式、numargs - 1個可迭代物件, 這裡的args 是一個 PyTupleObject *
    numargs = PyTuple_Size(args);
    // 如果引數個數小於2
    if (numargs < 2) {
        // 丟擲 TypeError, 表示 map 至少接受兩個位置引數: 一個函式 和 至少一個可迭代物件
        PyErr_SetString(PyExc_TypeError,
           "map() must have at least two arguments.");
        return NULL;
    }
	
    // 申請一個元組, 容量為 numargs - 1, 用於存放傳遞的所有可迭代物件(對應的迭代器)
    iters = PyTuple_New(numargs-1);
    // 為NULL表示申請失敗
    if (iters == NULL)
        return NULL;
	
    // 依次迴圈
    for (i=1 ; i<numargs ; i++) {
        // PyTuple_GET_ITEM(args, i) 表示獲取索引為 i 的可迭代物件
        // PyObject_GetIter 表示獲取對應的迭代器, 相當於內建函式 iter
        it = PyObject_GetIter(PyTuple_GET_ITEM(args, i));
        // 為NULL表示獲取失敗, 但是iters這個元組已經申請了, 所以減少其引用計數, 將其銷燬
        if (it == NULL) {
            Py_DECREF(iters);
            return NULL;
        }
        // 將對應的迭代器設定在元組iters中
        PyTuple_SET_ITEM(iters, i-1, it);
    }

    // 呼叫 PyMap_Type 的 tp_alloc, 為其例項物件申請空間
    lz = (mapobject *)type->tp_alloc(type, 0);
    // 為NULL表示申請失敗, 減少iters的引用計數
    if (lz == NULL) {
        Py_DECREF(iters);
        return NULL;
    }
    // 讓lz的iters成員 等於 iters
    lz->iters = iters;
    // 獲取第一個引數, 也就是函式
    func = PyTuple_GET_ITEM(args, 0);
    // 增加引用計數, 因為該函式被作為引數傳遞給map了
    Py_INCREF(func);
    // 讓lz的func成員 等於 func
    lz->func = func;
	
    // 轉成 PyObject *泛型指標, 然後返回
    return (PyObject *)lz;
}

所以我們看到map_new做的工作很簡單,就是例項化一個map物件,然後對內部的成員進行賦值。我們用Python來模擬一下上述過程:

class MyMap:

    def __new__(cls, *args, **kwargs):
        if kwargs:
            raise TypeError("MyMap不接受關鍵字引數")
        numargs = len(args)
        if numargs < 2:
            raise TypeError("MyMap至少接收兩個引數")
        # 元組內部的元素不可以改變(除非本地修改), 所以這裡使用列表來模擬
        iters = [None] * (numargs - 1)  # 建立一個長度為 numargs - 1 的列表, 元素都是None, 模擬C中的NULL
        i = 1
        while i < numargs:  # 逐步迴圈
            it = iter(args[i])  # 獲取可迭代物件, 得到其迭代器
            iters[i - 1] = it  # 設定在 iters 中
            i += 1
        # 為例項物件申請空間
        instance = object.__new__(cls)
        # 設定成員
        instance.iters = iters  
        instance.func = args[0]
        return instance  # 返回例項物件


m = MyMap(lambda x, y: x + y, [1, 2, 3], [11, 22, 33])
print(m)  # <__main__.MyMap object at 0x00000167F4552E80>
print(m.func)  # <function <lambda> at 0x0000023ABC4C51F0>
print(m.func(2, 3))  # 5

print(m.iters)  # [<list_iterator object at 0x0000025F13AF2940>, <list_iterator object at 0x0000025F13AF2CD0>]
print([list(it) for it in m.iters])  # [[1, 2, 3], [11, 22, 33]]

我們看到非常簡單,這裡我們沒有設定建構函式__init__,這是因為 map 內部沒有 __init__,它的成員都是在__new__裡面設定的。

# map的__init__ 實際上就是 object的__init__
print(map.__init__ is object.__init__)  # True
print(map.__init__)  # <slot wrapper '__init__' of 'object' objects>

事實上,你會發現map物件非常類似迭代器,而事實上它們也正是迭代器。

from typing import Iterable, Iterator

m = map(str, [1, 2, 3])
print(isinstance(m, Iterable))  # True
print(isinstance(m, Iterator))  # True

為了能更方便地理解後續內容,這裡我們來提一下Python中的迭代器,可能有人覺得Python的迭代器很神奇,但如果你看了底層實現的話,你肯定會覺得:"就這?"

// Objects/iterobject.c
typedef struct {
    PyObject_HEAD
    Py_ssize_t it_index;  // 每迭代一次, index自增1
    PyObject *it_seq; // 走到頭之後, 將it_seq設定為NULL
} seqiterobject;

這就是Python的迭代器,非常簡單,我們直接用Python來模擬一下:

class MyIterator:

    def __new__(cls, it_seq):
        instance = object.__new__(cls)  # 建立例項物件
        instance.it_index = 0
        instance.it_seq = it_seq
        return instance

    def __iter__(self):
        return self

    def __next__(self):
        # 如果 self.it_seq 為None, 證明此迭代器已經迭代完畢
        if self.it_seq is None:
            raise StopIteration
        try:
            # 逐步迭代, 說白了就是使用索引獲取, 每迭代一次、索引自增1
            val = self.it_seq[self.it_index]
            self.it_index += 1
            return val
        except IndexError:
            # 出現索引越界, 證明已經遍歷完畢
            # 直接將 self.it_seq 設定為None即可
            raise StopIteration


for _ in MyIterator([1, 2, 3]):
    print(_, end=" ")  # 1 2 3
print()

my_it = MyIterator([2, 3, 4])
# 只能迭代一次
print(list(my_it))  # [2, 3, 4]
print(list(my_it))  # []

Python的迭代器底層就是這麼做的,可能有人覺得這不就是把 可迭代物件 和 索引 進行了一層封裝嘛。每迭代一次,索引自增1,當出現索引越界時,證明迭代完了,直接將 it_seq 設定為 NULL 即可(這也側面說明了為什麼迭代器從開始到結束只能迭代一次)。

是的,迭代器就是這麼簡單,沒有一點神祕。當然不僅是迭代器,再比如關鍵字 in,在C的層面其實就是一層 for 迴圈罷了。下面看看原始碼是如何體現的:

// Objects/iterobject.c

// 建立
PyObject *
PySeqIter_New(PyObject *seq)
{	
    // 迭代器
    seqiterobject *it;
	
    // 如果不是一個序列的話, 那麼呼叫失敗
    if (!PySequence_Check(seq)) {
        PyErr_BadInternalCall();
        return NULL;
    }
    // 申請空間
    it = PyObject_GC_New(seqiterobject, &PySeqIter_Type);
    // 為NULL表示申請失敗
    if (it == NULL)
        return NULL;
    // it_index 初始化為0
    it->it_index = 0;
    // 因為seq被傳遞了, 所以指向的物件的引用計數要加1
    Py_INCREF(seq);
    // 將成員it_seq初始化為seq
    it->it_seq = seq;
    // 將該迭代器物件連結到 第0代連結串列 中, 並由GC負責跟蹤(此處和垃圾回收機制相關, 這裡不做過多介紹)
    _PyObject_GC_TRACK(it);
    // 返回迭代器物件
    return (PyObject *)it;
}


// 迭代
static PyObject *
iter_iternext(PyObject *iterator)
{
    seqiterobject *it;  // 迭代器物件
    PyObject *seq;     // 迭代器物件內部的可迭代物件
    PyObject *result;  // 迭代結果

    assert(PySeqIter_Check(iterator));  // 一定是迭代器
    it = (seqiterobject *)iterator;  // 將泛型指標PyObject * 轉成 seqiterobject *
    seq = it->it_seq;  // 獲取內部可迭代物件
    // 如果是NULL, 那麼證明此迭代器已經迭代完畢, 直接返回NULL
    if (seq == NULL)
        return NULL;
    // 索引達到了最大值, 因為容器內部的元素個數是有限制的; 但如果不是吃撐了寫惡意程式碼, 這個限制幾乎不可能會觸發
    if (it->it_index == PY_SSIZE_T_MAX) {
        PyErr_SetString(PyExc_OverflowError,
                        "iter index too large");
        return NULL;
    }
	
    // 根據索引獲取 seq 內部的元素
    result = PySequence_GetItem(seq, it->it_index);
    // 如果不為NULL, 證明確實迭代出了元素
    if (result != NULL) {
        // 索引自增1
        it->it_index++;
        // 然後返回結果
        return result;
    }
    // 當result為NULL的時候, 證明出異常了, 也說明遍歷到頭了
    // 進行異常匹配, 如果出現的異常能匹配 IndexError 或者 StopIteration
    if (PyErr_ExceptionMatches(PyExc_IndexError) ||
        PyErr_ExceptionMatches(PyExc_StopIteration))
    {
        // 那麼不會讓異常丟擲, 而是通過 PyErr_Clear() 將異常回溯棧清空
        // 所以使用 for i in 迭代器, 或者 list(迭代器) 等等不會報錯, 原因就在於此; 儘管它們也是不斷迭代, 但是在最後會捕獲異常
        PyErr_Clear();
        // 將it_seq設定為NULL, 表示此迭代器大限已至、油盡燈枯
        it->it_seq = NULL;
        // 因為將it_seq賦值NULL, 那麼原來的可迭代物件就少了一個引用, 因此要將引用計數減1
        Py_DECREF(seq);
    }
    return NULL;
}

所以這就是迭代器,真的一點都不神祕。

在迭代器上面多扯皮了一會兒,但這肯定是值得的,那麼回到主題。我們說呼叫map只是得到一個map物件,從上面的分析我們也可以得出,在整個過程並沒有進行任何的計算。如果要計算的話,我們可以呼叫__next__、或者使用for迴圈等等。

m = map(lambda x: x + 1, [1, 2, 3, 4, 5])
print([i for i in m])  # [2, 3, 4, 5, 6]

# 當然我們知道 for 迴圈的背後本質上會呼叫迭代器的 __next__
m = map(lambda x: int(x) + 1, "12345")
while True:
    try:
        print(m.__next__())
    except StopIteration:
        break
"""
2
3
4
5
6
"""

# 當然上面都不是最好的方式
# 如果只是單純的將元素迭代出來, 而不做任何處理的話, 那麼交給tuple、list、set等型別物件才是最佳的方式
# 像tuple(m)、list(m)、set(m)等等
# 所以如果你是[x for x in it]這種做法的話, 那麼更建議你使用list(m), 效率會更高, 因為它用的是C中的for迴圈
# 當然不管是哪種做法, 底層都是一個不斷呼叫__next__、逐步迭代的過程

所以下面我們來看看map底層是怎麼做的?

static PyObject *
map_next(mapobject *lz)
{	
    // small_stack顯然是一個數組, 裡面存放 PyObject *, 顯然它用來存放 map 中所有可迭代物件的索引為i(i=0,1,2,3...)的元素
    // 但這個_PY_FASTCALL_SMALL_STACK是什麼呢? 我們需要詳細說一下
    PyObject *small_stack[_PY_FASTCALL_SMALL_STACK];
    /*
    _PY_FASTCALL_SMALL_STACK 是一個巨集, 定義在 Include/cpython/abstract.h 中, 結果就等於5
    small_stack這個陣列會首先嚐試在棧區分配,如果通過位置引數來呼叫一個函式的話, 可以不用申請在堆區
    但是數量不能過大, 官方將這個值設定成5, 如果引數個數小於等於5的話, 便可申請在棧中
    然後通過傳遞位置引數的方式對函式進行呼叫, 這裡的函式是 PyObject_Vectorcall 系列函式(向量呼叫, 會更快), 關於如何在C中呼叫一個函式, 我們後面會介紹
    
    之所以將其設定成5, 是為了不濫用C的棧, 從而減少棧溢位的風險
    */
    
    
    // 二級指標, 指向 small_stack 陣列的首元素, 所以是 PyObject **
    PyObject **stack;
    // 函式呼叫的返回值
    PyObject *result = NULL;
    // 獲取當前的執行緒狀態物件
    PyThreadState *tstate = _PyThreadState_GET();
	
    // 獲取iters內建迭代器的數量, 同時也是呼叫函式時的引數數量
    const Py_ssize_t niters = PyTuple_GET_SIZE(lz->iters);
    // 如果這個引數小於等於5, 那麼在獲取這些迭代器中的元素時, 可以直接使用在C棧中申請的陣列進行儲存
    if (niters <= (Py_ssize_t)Py_ARRAY_LENGTH(small_stack)) {
        stack = small_stack;
    }
    else {
        // 如果超過了5, 那麼不好意思, 只能在堆區重新申請了
        stack = PyMem_Malloc(niters * sizeof(stack[0]));
        // 返回NULL, 表示申請失敗, 說明沒有記憶體了
        if (stack == NULL) {
            // 這裡傳入執行緒狀態物件, 會在內部設定異常
            _PyErr_NoMemory(tstate);
            return NULL;
        }
    }
	
    // 走到這裡說明一切順利, 那麼下面就開始迭代了
    // 如果是 map(func, [1, 2, 3], ["xx", "yy", "zz"], [2, 3, 4]), 那麼第一次迭代出來的元素就是 (1, "xx", 2)
    Py_ssize_t nargs = 0;
    for (Py_ssize_t i=0; i < niters; i++) {
        // 獲取索引為i對應的迭代器, 
        PyObject *it = PyTuple_GET_ITEM(lz->iters, i);
        // Py_TYPE表示獲取物件的 ob_type(型別物件), 然後呼叫tp_iternext成員進行迭代
        // 類似於 type(it).__next__(it)
        PyObject *val = Py_TYPE(it)->tp_iternext(it);
        // 如果val為NULL, 直接跳轉到 exit 這個label中
        if (val == NULL) {
            goto exit;
        }
        // 將 val 設定在陣列索引為i的位置中, 然後進行下一輪迴圈, 也就是獲取下一個迭代器中的元素設定在陣列stack中
        stack[i] = val;
        // nargs++, 和引數個數、迭代器個數 保持一致
        // 如果可迭代物件個數是3, 那麼小於5, 所以stack會申請在棧區; 但是在棧區申請的話, 長度預設為5, 因此後兩個是元素是無效的
        // 所以在呼叫的時候需要指定有效的引數個數
        nargs++;
    }
	
    // 進行呼叫, 得到結果, 這個函式是Python3.9新增的; 如果是Python3.8的話, 呼叫的是_PyObject_FastCall
    result = _PyObject_VectorcallTstate(tstate, lz->func, stack, nargs, NULL);

exit:
    // 呼叫完畢之後, 將stack中指標指向的物件的引用計數減1
    for (Py_ssize_t i=0; i < nargs; i++) {
        Py_DECREF(stack[i]);
    }
    // 不相等的話, 說明該stack是在堆區申請的, 要釋放
    if (stack != small_stack) {
        PyMem_Free(stack);
    }
    // 返回result
    return result;
}

然後突然發現map物件還有一個鮮為人知的一個方法,也是一個沒有什麼卵用的方法。說來慚愧,要不是看原始碼,我還真沒注意過。

static PyObject *
map_reduce(mapobject *lz, PyObject *Py_UNUSED(ignored))
{	
    // 獲取迭代器的元素個數
    Py_ssize_t numargs = PyTuple_GET_SIZE(lz->iters);
    // 申請一個元素, 空間是numargs + 1 個
    PyObject *args = PyTuple_New(numargs+1);
    Py_ssize_t i;
    if (args == NULL)
        return NULL;
    Py_INCREF(lz->func);
    // 將函式設定為args的第一個元素
    PyTuple_SET_ITEM(args, 0, lz->func);
    // 然後再將剩下的迭代器也設定在args中
    for (i = 0; i<numargs; i++){
        PyObject *it = PyTuple_GET_ITEM(lz->iters, i);
        Py_INCREF(it);
        PyTuple_SET_ITEM(args, i+1, it);
    }
	
    // 將 Py_TYPE(lz) 和 args 打包成一個元組返回
    // 所以從結果上看, 返回的內容應該是: ( <class 'map'>, (函式, 迭代器1, 迭代器2, 迭代器3, ......) )
    return Py_BuildValue("ON", Py_TYPE(lz), args);
}

static PyMethodDef map_methods[] = {
    // 然後這個函式叫 __reduce__
    {"__reduce__",   (PyCFunction)map_reduce,   METH_NOARGS, reduce_doc},
    {NULL,           NULL}           /* sentinel */
};

然後我們來演示一下:

from pprint import pprint

m = map(lambda x, y, z: x + y + z, [1, 2, 3], [2, 3, 4], [3, 4, 5])
pprint(m.__reduce__())
"""
(<class 'map'>,
 (<function <lambda> at 0x000001D2791451F0>,
  <list_iterator object at 0x000001D279348640>,
  <list_iterator object at 0x000001D279238700>,
  <list_iterator object at 0x000001D27950AF40>))
"""

filter底層實現

然後我們filter的實現原理,看完了map之後,再看filter就簡單許多了。

lst = [1, 2, 3, 4, 5]
print(list(filter(lambda x: x % 2 !=0, lst)))  # [1, 3, 5]

首先filter接收兩個元素,第一個引數是一個函式(類、方法),第二個引數是一個可迭代物件。然後當我們迭代的時候會將可迭代物件中每一個元素都傳入到函式中,如果返回的結果為真,則保留;為假,則丟棄。

但是,其實第一個引數除了是一個可呼叫的物件之外,它還可以是None。

lst = ["夏色祭", "", [], 123, 0, {}, [1]]
# 會自動選擇結果為真的元素
print(list(filter(None, lst)))  # ['夏色祭', 123, [1]]

至於為什麼,一會看原始碼filter的實現就清楚了。

下面看看底層結構:

typedef struct {
    PyObject_HEAD
    PyObject *func;
    PyObject *it;
} filterobject;

我們看到和map物件是一致的,沒有什麼區別。因為map、filter都不會立刻呼叫,而是返回一個相應的物件。

static PyObject *
filter_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
    PyObject *func, *seq;  // 函式、可迭代物件
    PyObject *it;  // 可迭代物件的迭代器
    filterobject *lz;  // 返回值, filter物件
	
    // filter也不接受關鍵字引數
    if (type == &PyFilter_Type && !_PyArg_NoKeywords("filter", kwds))
        return NULL;
	
    // 只接受兩個引數
    if (!PyArg_UnpackTuple(args, "filter", 2, 2, &func, &seq))
        return NULL;

    // 獲取seq對應的迭代器
    it = PyObject_GetIter(seq);
    if (it == NULL)
        return NULL;

    // 為filter物件申請空間
    lz = (filterobject *)type->tp_alloc(type, 0);
    if (lz == NULL) {
        Py_DECREF(it);
        return NULL;
    }
    // 增加函式的引用計數
    Py_INCREF(func);
    // 初始化成員
    lz->func = func;
    lz->it = it;
	
    // 返回
    return (PyObject *)lz;
}

和map是類似的,因為本質上它們做的事情都是差不多的,下面看看迭代過程。

static PyObject *
filter_next(filterobject *lz)
{
    PyObject *item; // 迭代器中迭代出來的每一個元素
    PyObject *it = lz->it;  // 迭代器
    long ok;  // 是否為真, 1表示真、0表示假
    PyObject *(*iternext)(PyObject *);  // 函式指標, 接收一個PyObject *, 返回一個PyObject *
    // 如果 func == None 或者 func == bool, 那麼checktrue為真; 會走單獨的方法, 所以給func傳遞一個None是完全合法的
    int checktrue = lz->func == Py_None || lz->func == (PyObject *)&PyBool_Type;
    // 迭代器的 __next__ 方法
    iternext = *Py_TYPE(it)->tp_iternext;
    // 無限迴圈
    for (;;) {
        // 迭代出迭代器的每一個元素
        item = iternext(it);
        if (item == NULL)
            return NULL;
		
        // 如果checkture, 或者說如果func == None || func == bool
        if (checktrue) {
            // PyObject_IsTrue(item)實際上就是在判斷item是否為真, 像0、長度為0的序列、False、None為假
            // 另外我們在if語句的時候經常會寫 if item: 這種形式, 但是很少會寫 if bool(item):
            // 因為bool(item)底層也是呼叫 PyObject_IsTrue
            // 而if item: 如果你檢視它的位元組碼的話, 會發現有這麼一條指令: POP_JUMP_IF_FALSE, 它在底層也是呼叫了 PyObject_IsTrue, 因此完全沒有必要寫成 if bool(item): 這種形式
            ok = PyObject_IsTrue(item);
            // 而如果func為None或者bool的話, 那麼直接走PyObject_IsTrue
        } else {
            // 否則的話, 會呼叫我們傳遞的func
            // 這裡的 good 就是函式呼叫的返回值
            PyObject *good;
            // 呼叫函式, 將返回值賦值給good
            good = PyObject_CallFunctionObjArgs(lz->func, item, NULL);
            // 如果 good 等於 NULL, 說明函式呼叫失敗; 說句實話, 原始碼中做的很多異常捕獲都是針對直譯器內部的
            // 尤其像底層這種和NULL進行比較的, 我們在使用Python的時候, 很少會出現
            if (good == NULL) {
                Py_DECREF(item);
                return NULL;
            }
            // 判斷 good 是否為真
            ok = PyObject_IsTrue(good);
            Py_DECREF(good); // 減少其引用計數, 因為它不被外界所使用
        }
        // 如果ok大於0, 說明將item傳給函式呼叫之後返回的結果為真, 那麼將item返回
        if (ok > 0)
            return item;
        // 同時減少其引用計數
        // 如果等於0, 說明為假, 那麼進行下一輪迴圈
        Py_DECREF(item);
        // 小於0的話, 表示PyObject_IsTrue呼叫失敗了, 呼叫失敗會返回-1
        // 但還是那句話, 這種情況, 在Python的使用層面上幾乎不可能發生
        if (ok < 0)
            return NULL;
    }
}

所以看到這裡你還覺得Python神祕嗎,從原始碼層面我們看的非常流暢,只要你有一定的C語言基礎即可。還是那句話,儘管我們不可能寫一個直譯器,因為背後涉及的東西太多了,但至少我們在看的過程中,很清楚底層到底在做什麼。而且這背後的實現,如果讓你設計一個方案的話,那麼相信你也一樣可以做到。

還是拿關鍵字 in 舉例子,像"b"in["a", "b", "c"]我們知道結果為真。那如果讓你設計關鍵字 in 的實現,你會怎麼做呢?反正基本上都會想到,遍歷 in 後面的可迭代物件唄,將裡面的元素 依次和 in前面的元素進行比較,如果出現相等的,返回真;遍歷完了也沒發現相等的,那麼返回假。如果你是這麼想的,那麼恭喜你,Python直譯器內部也是這麼做的,我們以列表為例:

// item in 列表: 本質上就是呼叫 list.__contains__(列表, item) 或者 列表.__contains__(item)
static int
list_contains(PyListObject *a, PyObject *el)
{
    PyObject *item; // 列表中的每一個元素
    Py_ssize_t i;  // 迴圈變數
    int cmp;  // 比較的結果
	
    // cmp初始為0
    for (i = 0, cmp = 0 ; cmp == 0 && i < Py_SIZE(a); ++i) {
        // 獲取PyListObject中的每一個元素
        item = PyList_GET_ITEM(a, i);
        Py_INCREF(item);
        // 呼叫PyObject_RichCompareBool進行比較, 大於、小於、不等於之類的都是使用這個函式, 具體是哪一種則通過第三個引數控制
        // 而前兩個元素則是比較的物件
        cmp = PyObject_RichCompareBool(el, item, Py_EQ);
        Py_DECREF(item);
    }
    // 如果出現相等的元素, 那麼cmp為1, 因此cmp == 0 && i < Py_SIZE(a)會不成立, 直接結束迴圈
    // 如果沒有出現相等的元素, 那麼會一直遍歷整個列表, 始終沒有出現相等的元素, 那麼cmp還是0
    // 為1代表真, 為0代表假
    return cmp;
}

以上便是關鍵字 in,是不是很簡單呢?所以個人推薦沒事的話可以多讀一讀Python直譯器,如果你不使用Python / C API進行程式設計的話,那麼不需要你有太高的C語言水平(況且現在還有Cython)。如果你想寫出高質量、並且精簡利落的Python程式碼,那麼就勢必要懂得背後的實現原理。比如:我們看幾道思考題,自己亂編的。

1. 為什麼 方法一 要比 方法二 更高效?

lst = [1, 2, 3, 4, 5]


# 方法一
def f1():
    return [f"item: {item}" for item in lst]


# 方法二
def f2():
    res = []
    for item in lst:
        res.append(f"item: {item}")
    return res

所以這道題考察的實際上是列表解析為什麼更快?首先Python中的變數在底層本質上都是一個泛型指標PyObject *,呼叫res.append的時候實際上會進行一次屬性查詢。會呼叫 PyObject_GetAttr(res, "append"),去尋找該物件是否有 append 函式,如果有的話,那麼進行獲取然後呼叫;而列表解析,Python在編譯的時候看到左右的兩個中括號就知道這是一個列表解析式,所以它會立刻知道自己該幹什麼,會直接呼叫C一級函式 PyList_Append,因為Python對這些內建物件非常熟悉。所以列表解析少了一層屬性查詢的過程,因此它的效率會更高。

2. 假設有三個變數a、b、c,三個常量"xxx"、123、3.14,我們要判斷這三個變數對應的值 和 這三個常量是否相等,該怎麼做呢?注意:順序沒有要求,可以是 a =="xxx"、也可以是 b =="xxx",只要這個三個變數對應的值正好也是"xxx"、123、3.14 就行。

顯然最方便的是使用集合:

a, b, c = 3.14, "xxx", 123
print(not {a, b, c} - {"xxx", 3.14, 123})  # True

3. 令人困惑的生成器解析式,請問下面哪段程式碼會報錯?

# 程式碼一
x = ("xxx" for _ in dasdasdad)

# 程式碼二
x = (dasdasdad for _ in "xxx")

首先生成器解析式,只有在執行的時候才會真正的產出值。但是關鍵字 in 後面的變數是會提前確定的,所以程式碼一會報錯,丟擲 NameError;但程式碼二不會,因為只有在產出值的時候才會去尋找變數dasdasdad 指向的值。

再留個兩個思考題,為什麼會出現這種結果呢?

# 思考題一: 
class A:
    x = 123
    print(x)
    lst = [x for _ in range(3)]
"""
123
NameError: name 'x' is not defined
"""


########################################################################


# 思考題二: 
def f():
    a = 123
    print(eval("a"))
    print([eval("a") for _ in range(3)])

f()
"""
123
NameError: name 'a' is not defined
"""

像這樣類似的問題還有很多很多,當然最關鍵的還是理解底層的資料結構 以及 直譯器背後的執行原理,只有這樣才能寫出更加高效的程式碼。

回到正題,filter 也有 __reduce__方法,和 map 類似。

f = filter(None, [1, 2, 3, 0, "", [], "xx"])
print(f.__reduce__())  # (<class 'filter'>, (None, <list_iterator object at 0x00000239AF2AB0D0>))
print(list(f.__reduce__()[1][1]))  # [1, 2, 3, 0, '', [], 'xx']

zip底層實現

最後看看 zip,其實 zip 和 map 也是有著高度相似之處的,首先它們都可以接受任意個可迭代物件。而且 zip,我們完全可以使用 map 來進行模擬。

print(
    list(zip([1, 2, 3], [11, 22, 33], [111, 222, 333]))
)  # [(1, 11, 111), (2, 22, 222), (3, 33, 333)]

print(
    list(map(lambda x, y, z: (x, y, z), [1, 2, 3], [11, 22, 33], [111, 222, 333]))
)  # [(1, 11, 111), (2, 22, 222), (3, 33, 333)]

print(
    list(map(lambda *args: args, [1, 2, 3], [11, 22, 33], [111, 222, 333]))
)  # [(1, 11, 111), (2, 22, 222), (3, 33, 333)]

# 所以我們看到實現zip, 完全可以使用 map, 只需要多指定一個函式即可

所以 zip 的底層實現同樣很簡單,我們來看一下:

typedef struct {
    PyObject_HEAD
    Py_ssize_t          tuplesize;
    PyObject *ittuple;                  
    PyObject *result;
} zipobject;
// 以上便是zip物件的底層定義, 這些欄位的含義, 我們暫時先不討論, 它們會體現在zip_new方法中, 我們到時候再說

目前我們根據結構體裡面的成員,可以得到一個 zipobject 顯然是佔 40 位元組的,16 + 8 + 8 + 8,那麼結果是不是這樣呢?我們來試一下就知道了。

z1 = zip([1, 2, 3], [11, 22, 33])
z2 = zip([1, 2, 3, 4], [11, 22, 33, 44])
z3 = zip([1, 2, 3], [11, 22, 33], [111, 222, 333])

print(z1.__sizeof__())  # 40  
print(z2.__sizeof__())  # 40
print(z3.__sizeof__())  # 40

所以我們分析的沒有錯,任何一個 zip 物件所佔的大小都是 40 位元組。所以在計算記憶體大小的時候,有人會好奇這到底是怎麼計算的,其實就是根據底層的結構體進行計算的。

注意:如果你使用 sys.getsizeof 函式計算的話,可能會多出 16 個位元組,這是因為對於可變物件,它們是會被 GC 跟蹤的。在建立的時候,它們會被掛到零代連結串列中,所以它們額外還會有一個 前繼指標 和 一個 後繼指標,而 sys.getsizeof 將這兩個指標的大小也算在內了。

下面看看 zip 物件是如何被例項化的。

static PyObject *
zip_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
    zipobject *lz;  // zip 物件
    Py_ssize_t i;  // 迴圈變數
    PyObject *ittuple;  // 所有可迭代物件的迭代器組成的元組
    PyObject *result;   // "程式碼中有體現"
    Py_ssize_t tuplesize;  // 可迭代物件的數量
	
    // zip同樣不需要關鍵字引數, 但是在3.10的時候將會提供一個關鍵字引數strict, 如果為True, 表示可迭代物件之間的長度必須相等, 否則報錯
    // strict如果為False, 則和目前是等價的, 會自動以短的為準
    if (type == &PyZip_Type && !_PyArg_NoKeywords("zip", kwds))
        return NULL;

    // args必須使用一個PyTupleObject *
    assert(PyTuple_Check(args));
    // 獲取可迭代物件的數量
    tuplesize = PyTuple_GET_SIZE(args);

    // 申請一個元組, 長度為tuplesize, 用於存放可迭代物件對應的迭代器
    ittuple = PyTuple_New(tuplesize);
    if (ittuple == NULL) // 為NULL表示申請失敗
        return NULL;
    // 然後依次遍歷
    for (i=0; i < tuplesize; ++i) {
        // 獲取傳遞的可迭代物件
        PyObject *item = PyTuple_GET_ITEM(args, i);
        // 通過PyObject_GetIter獲取對應的迭代器
        PyObject *it = PyObject_GetIter(item);
        if (it == NULL) {
            // 為NULL表示獲取失敗, 減少ittuple的引用計數, 返回NULL
            Py_DECREF(ittuple);
            return NULL;
        }
        // 設定在ittuple中
        PyTuple_SET_ITEM(ittuple, i, it);
    }

    // 這裡又申請一個元組result, 長度也為tuplesize
    result = PyTuple_New(tuplesize);
    if (result == NULL) {
        Py_DECREF(ittuple);
        return NULL;
    }
    // 然後將內部的所有元素都設定為None, Py_None就是Python中的None
    for (i=0 ; i < tuplesize ; i++) {
        Py_INCREF(Py_None);
        PyTuple_SET_ITEM(result, i, Py_None);
    }

    // 申請一個zip物件
    lz = (zipobject *)type->tp_alloc(type, 0);
    // 申請失敗減少引用計數, 返回NULL
    if (lz == NULL) {
        Py_DECREF(ittuple);
        Py_DECREF(result);
        return NULL;
    } 
    // 初始化成員
    lz->ittuple = ittuple;
    lz->tuplesize = tuplesize;
    lz->result = result;
	
    // 轉成泛型指標PyObject *之後返回
    return (PyObject *)lz;
}

再來看看,zip物件的定義:

typedef struct {
    PyObject_HEAD
    Py_ssize_t          tuplesize;
    PyObject *ittuple;                  
    PyObject *result;
} zipobject;

如果以:zip([1, 2, 3], [11, 22, 33], [111, 222, 333])為例的話,那麼:

  • tuplesize: 3
  • ittuple: ([1, 2, 3].__iter__(), [11, 22, 33].__iter__(), [111, 222, 333].__iter__())
  • result: (None, None, None)

所以目前來說,其它的很好理解,唯獨這個result讓人有點懵,搞不懂它是幹什麼的。不過既然有這個成員,那就說明它肯定有用武之地,而派上用場的地方不用想,肯定是在迭代的時候使用。

static PyObject *
zip_next(zipobject *lz)
{
    Py_ssize_t i; // 迴圈遍變數
    Py_ssize_t tuplesize = lz->tuplesize;  // 可迭代物件數量
    PyObject *result = lz->result;   // (None, None, ....)
    PyObject *it;  // 每一個迭代器 
    
    // 程式碼中體現
    PyObject *item;
    PyObject *olditem;
	
    // tuplesize == 0, 直接返回
    if (tuplesize == 0)
        return NULL;
    // 如果 result 的引用計數為1, 證明該元組的空間的被申請了
    if (Py_REFCNT(result) == 1) {
        // 因為它要作為返回值返回, 引用計數加1
        Py_INCREF(result);
        // 遍歷
        for (i=0 ; i < tuplesize ; i++) {
            // 依次獲取每一個迭代器
            it = PyTuple_GET_ITEM(lz->ittuple, i);
            // 迭代出相應的元素
            item = (*Py_TYPE(it)->tp_iternext)(it);
            // 如果出現了NULL, 證明迭代結束了, 會直接停止
            // 所以會以元素最少的可迭代物件(迭代器)為準
            if (item == NULL) {
                Py_DECREF(result);
                return NULL;
            }
            // 設定在 result 中, 但是要先獲取result中原來的元素, 並將其引用計數減1, 因為元組不再持有對它的引用
            olditem = PyTuple_GET_ITEM(result, i);
            PyTuple_SET_ITEM(result, i, item);
            Py_DECREF(olditem);
        }
    } else {
        // 否則的話同樣的邏輯, 只不過需要自己重新手動申請一個tuple
        result = PyTuple_New(tuplesize);
        if (result == NULL)
            return NULL;
        // 然後下面的邏輯是類似的
        for (i=0 ; i < tuplesize ; i++) {
            it = PyTuple_GET_ITEM(lz->ittuple, i);
            item = (*Py_TYPE(it)->tp_iternext)(it);
            if (item == NULL) {
                Py_DECREF(result);
                return NULL;
            }
            PyTuple_SET_ITEM(result, i, item);
        }
    }
    // 返回元組 result
    return result;
}

所以當我們進行迭代的時候,迭代出來的是一個元組。

z = zip([1, 2, 3], [11, 22, 33])
print(z.__next__())  # (1, 11)

# 即使只有一個可迭代物件, 依舊是一個元組, 因為底層返回的result就是一個元組
z = zip([1, 2, 3])
print(z.__next__())  # (1,)

# 可迭代物件的巢狀也是一樣的規律, 直接把裡面的列表看成一個標量即可
z = zip([[1, 2, 3], [11, 22, 33]])
print(z.__next__())  # ([1, 2, 3],)

最後,zip 也有一個__reduce__方法:

z = zip([1, 2, 3], [11, 22, 33])
print(z.__reduce__())
# (<class 'zip'>, (<list_iterator object at 0x0000018D1723B0D0>, <list_iterator object at 0x0000018D1723B040>))
 
print([tuple(_) for _ in z.__reduce__()[1]])  # [(1, 2, 3), (11, 22, 33)]

map、filter 和 列表解析之間的區別

其實在使用 map、filter 的時候,我們完全可以使用列表解析來實現。比如:

lst = [1, 2, 3, 4]

print([str(_) for _ in lst])  # ['1', '2', '3', '4']
print(list(map(str, lst)))  # ['1', '2', '3', '4']

這兩者之間實際上是沒有什麼太大區別的,都是將 lst 中的元素一個一個迭代出來、然後呼叫 str 、返回結果。如果非要找出區別話,就是列表解析使用的是 Python 的for迴圈,而呼叫list的時候使用的是C中的for迴圈。從這個角度來說,使用 map 的效率會更高一些。

所以後者的效率稍微更高一些,因為列表解析用的是 Python 的for迴圈,list(map(func, iter))用的是C的for迴圈。但是注意:如果是下面這種做法的話,會得到相反的結果。

我們看到 map 貌似變慢了,其實原因很簡單,後者多了一層匿名函式的呼叫,所以速度變慢了。

如果列表解析也是函式呼叫的話:

會發現速度更慢了,當然這種做法完全是吃飽了撐的。之所以說這些,是想說明在同等條件下,list(map) 這種形式是要比列表解析快的。當然在工作中,這兩者都是可以使用的,這點效率上的差別其實不用太在意,如果真的到了需要在乎這點差別的時候,那麼你應該考慮的是換一門更有效率的靜態語言。

filter 和 列表解析之間的差別,也是如此。

對於過濾含有 1000個 False 和 1個True 的元組,它們的結果都是一樣的,但是誰的效率更高呢?首先第一種方式 肯定比 第二種方式快,因為第二種方式涉及到函式的呼叫;但是第三種方式,我們知道它在底層會走單獨的分支,所以再加上之前的結論,我們認為第三種方式是最快的。

結果也確實是我們分析的這樣,當然我們說在底層 None 和 bool 都會走相同的分支,所以這裡將 None 換成 bool 也是可以的。雖然 bool 是一個類,但是通過 filter_new 函式我們知道,底層不會進行呼叫,也是直接使用 PyObject_IsTrue,可以將 None 換成 bool 看看結果如何,應該是差不多的。

總結

所以 map、filter 完全可以使用列表解析替代,如果執行的邏輯比較複雜的話,那麼對於 map、filter 而言就要寫匿名函數了。但邏輯簡單的話,比如:獲取為真的元素,完全可以通過list(filter(None, lst))實現,效率更高,因為它走的是相當於是C的迴圈;但如果獲取大於3的元素,那麼就需要使用list(filter(lambda x: x > 3, lst))這種形式了,而我們說它的效率是不如列表解析[x for x in lst if x > 3]的,因為前者多了一層函式呼叫。

但是在工作中,這兩種方式都是可以的,使用哪一種就看個人喜好。到此我們發現,如果排除那一點點效率上的差異,那麼確實有列表解析式就完全足夠了,因為列表解析式可以同時實現 map、filter 的功能,而且表達上也更加地直觀。只不過是 map、filter 先出現,然後才有的列表解析式,但是前者依舊被保留了下來。

當然 map、filter 返回的是一個可迭代物件,它不會立即計算,可以節省資源;當然這個功能,我們也可以通過生成器表示式來實現。

map、filter、zip 的底層實現我們就看完了,是不是很簡單呢?

另外,如果你得到的結論和我這裡的不一致,那麼不妨把可迭代物件的元素個數設定的稍微大一些,最終結論和我這裡一定是一樣的。