1. 程式人生 > 實用技巧 >Python也能高併發

Python也能高併發

作業系統到底在幹啥?

如果由筆者來概括,作業系統大概做了兩件事情,計算與IO,任何具體數學計算或者邏輯判斷,或者業務邏輯都是計算,而網路互動,磁碟互動,人機之間的互動都是IO。

高併發的瓶頸在哪?

大多數時候在IO上面。注意,這裡說得是大多數,不是說絕對。

因為大多數時候業務本質上都是從資料庫或者其他儲存上讀取內容,然後根據一定的邏輯,將資料返回給使用者,比如大多數web內容。而大多數邏輯的互動都算不上計算量多大的邏輯,CPU的速度要遠遠高於記憶體IO,磁碟IO,網路IO, 而這些IO中網路IO最慢。

在根據上面的筆者對作業系統的概述,當併發高到一定的程度,根據業務的不同,比如計算密集,IO密集

,或兩者皆有,因此瓶頸可能出在計算上面或者IO上面,又或兩者兼有。

而本文解決的高併發,是指IO密集的高併發瓶頸,因此,計算密集的高併發並不在本文的討論範圍內。

為了使本文歧義更少,這裡的IO主要指網路IO

Python怎麼處理高併發?

使用協程, 事件迴圈, 高效IO模型(比如多路複用,比如epoll),三者缺一不可。

很多時候,筆者看過的文章都是說協程如何如何,最後告訴我一些協程庫或者asyncio用來說明協程的威力,最終我看懂了協程,卻還是不知道它為啥能高併發,這也是筆者寫本文的目的。

但是一切還是得從生成器說起,因為asyncio或者大多數協程庫內部也是通過生成器實現的。

注意上面的三者缺一不可。
如果只懂其中一個,那麼你懂了三分之一,以此類推,只有都會了,你才知道為啥協程能高併發。

生成器

生成器的定義很抽象,現在不懂沒關係,但是當你懂了之後回過頭再看,會覺得定義的沒錯,並且準確。下面是定義

摘自百度百科: 生成器是一次生成一個值的特殊型別函式。可以將其視為可恢復函式。

關於生成器的內容,本文著重於生成器實現了哪些功能,而不是生成器的原理及內部實現。

yield

簡單例子如下

def gen_func():
    yield 1
    yield 2
    yield 3

if __name__ == '__main__':
    gen = gen_func()
    for i in gen:
        print(i)

output:
1
2
3

上面的例子沒有什麼稀奇的不是嗎?yield像一個特殊的關鍵字,將函式變成了一個類似於迭代器的物件,可以使用for迴圈取值。

send, next

協程自然不會這麼簡單,python協程的目標是星辰大海,從上面的例之所以get不到它的野心,是因為你沒有試過send, next兩個函式。

首先說next

def gen_func():
    yield 1
    yield 2
    yield 3

if __name__ == '__main__':
    gen = gen_func()
    print(next(gen))
    print(next(gen))
    print(next(gen))

output:
1
2
3

next的操作有點像for迴圈,每呼叫一次next,就會從中取出一個yield出來的值,其實還是沒啥特別的,感覺還沒有for迴圈好用。

不過,不知道你有沒有想過,如果你只需要一個值,你next一次就可以了,然後你可以去做其他事情,等到需要的時候才回來再次next取值。

就這一部分而言,你也許知道為啥說生成器是可以暫停的了,不過,這似乎也沒什麼用,那是因為你不知到時,生成器除了可以丟擲值,還能將值傳遞進去

接下來我們看send的例子。

def gen_func():
    a = yield 1
    print("a: ", a)
    b = yield 2
    print("b: ", b)
    c = yield 3
    print("c: ", c)
    return "finish"

if __name__ == '__main__':
    gen = gen_func()
    for i in range(4):
        if i == 0:
            print(gen.send(None))
        else:
            # 因為gen生成器裡面只有三個yield,那麼只能迴圈三次。
            # 第四次迴圈的時候,生成器會丟擲StopIteration異常,並且return語句裡面內容放在StopIteration異常裡面
            try:
                print(gen.send(i))
            except StopIteration as e:
                print("e: ", e)

output:
1
a:  1
2
b:  2
3
c:  3
e:  finish

send有著next差不多的功能,不過send在傳遞一個值給生成器的同時,還能獲取到生成器yield丟擲的值,在上面的程式碼中,send分別將None,1,2,3四個值傳遞給了生成器,之所以第一需要傳遞None給生成器,是因為規定,之所以規定,因為第一次傳遞過去的值沒有特定的變數或者說物件能接收,所以規定只能傳遞None, 如果你傳遞一個非None的值進去,會丟擲一下錯誤

TypeError: can't send non-None value to a just-started generator

從上面的例子我們也發現,生成器裡面的變數a,b,c獲得了,send函式傳送將來的1, 2, 3.

如果你有事件迴圈或者說多路複用的經驗,你也許能夠隱隱察覺到微妙的感覺。
這個微妙的感覺是,是否可以將IO操作yield出來?由事件迴圈排程, 如果你能get到這個微妙的感覺,那麼你已經知道協程高併發的祕密了.

但是還差一點點.嗯, 還差一點點了.

yield from

下面是yield from的例子

def gen_func():
    a = yield 1
    print("a: ", a)
    b = yield 2
    print("b: ", b)
    c = yield 3
    print("c: ", c)
    return 4

def middle():
    gen = gen_func()
    ret = yield from gen
    print("ret: ", ret)
    return "middle Exception"

def main():
    mid = middle()
    for i in range(4):
        if i == 0:
            print(mid.send(None))
        else:
            try:
                print(mid.send(i))
            except StopIteration as e:
                print("e: ", e)

if __name__ == '__main__':
    main()

output: 
1
a:  1
2
b:  2
3
c:  3
ret:  4
e:  middle Exception

從上面的程式碼我們發現,main函式呼叫的middle函式的send,但是gen_func函式卻能接收到main函式傳遞的值.有一種透傳的感覺,這就是yield from的作用, 這很關鍵

而yield from最終傳遞出來的值是StopIteration異常,異常裡面的內容是最終接收生成器(本示例是gen_func)return出來的值,所以ret獲得了gen_func函式return的4.但是ret將異常裡面的值取出之後會繼續將接收到的異常往上拋,所以main函式裡面需要使用try語句捕獲異常。而gen_func丟擲的異常裡面的值已經被middle函式接收,所以middle函式會將丟擲的異常裡面的值設為自身return的值,

至此生成器的全部內容講解完畢,如果,你get到了這些功能,那麼你已經會使用生成器了。

io模型

Linux平臺一共有五大IO模型,每個模型有自己的優點與確定。根據應用場景的不同可以使用不同的IO模型。

不過本文主要的考慮場景是高併發,所以會針對高併發的場景做出評價。

同步IO

同步模型自然是效率最低的模型了,每次只能處理完一個連線才能處理下一個,如果只有一個執行緒的話, 如果有一個連線一直佔用,那麼後來者只能傻傻的等了。所以不適合高併發,不過最簡單,符合慣性思維。

非阻塞式IO

不會阻塞後面的程式碼,但是需要不停的顯式詢問核心資料是否準備好,一般通過while迴圈,而while迴圈會耗費大量的CPU。所以也不適合高併發。

多路複用

當前最流行,使用最廣泛的高併發方案。而多路複用又有三種實現方式, 分別是select, poll, epoll。

select, poll, epoll

select,poll由於設計的問題,當處理連線過多會造成效能線性下降,而epoll是在前人的經驗上做過改進的解決方案。不會有此問題。

不過select, poll並不是一無是處,假設場景是連線數不多,並且每個連線非常活躍,select,poll是要效能高於epoll的。

至於為啥,檢視小結參考連結, 或者自行查詢資料。但是本文講解的高併發可是指的連線數非常多的。

小結

使用最廣泛多路複用epoll, 可以使得IO操作更有效率。但是使用上有一定的難度。

至此,如果你理解了多路複用的IO模型,那麼你瞭解python為什麼能夠通過協程實現高併發的三分之二了。

IO模型參考:https://www.jianshu.com/p/486b0965c296

select,poll,epoll區別參考:https://www.cnblogs.com/Anker/p/3265058.html

事件迴圈

上面的IO模型能夠解決IO的效率問題,但是實際使用起來需要一個事件迴圈驅動協程去處理IO。

簡單實現

下面引用官方的一個簡單例子。

import selectors
import socket

# 建立一個selctor物件
# 在不同的平臺會使用不同的IO模型,比如Linux使用epoll, windows使用select(不確定)
# 使用select排程IO
sel = selectors.DefaultSelector()

# 回撥函式,用於接收新連線
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

# 回撥函式,使用者讀取client使用者資料
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

# 建立一個非堵塞的socket
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

# 一個事件迴圈,用於IO排程
# 當IO可讀或者可寫的時候, 執行事件所對應的回撥函式
def loop():
    while True:
        events = sel.select()
        for key, mask in events:
            callback = key.data
            callback(key.fileobj, mask)

if __name__ == '__main__':
    loop()

上面程式碼中loop函式對應事件迴圈,它要做的就是一遍一遍的等待IO,然後呼叫事件的回撥函式.

但是作為事件迴圈遠遠不夠,比如怎麼停止,怎麼在事件迴圈中加入其他邏輯.

小結

如果就功能而言,上面的程式碼似乎已經完成了高併發的影子,但是如你所見,直接使用select的編碼難度比較大, 再者回調函式素來有"回撥地獄"的惡名.

實際生活中的問題要複雜的多,作為一個調庫狂魔,怎麼可能會自己去實現這些,所以python官方實現了一個跨平臺的事件迴圈,至於IO模型具體選擇,官方會做適配處理。

不過官方實現是在Python3.5及以後了,3.5之前的版本只能使用第三方實現的高併發非同步IO解決方案, 比如tornado,gevent,twisted

至此你需要get到python高併發的必要條件了.

asyncio

在本文開頭,筆者就說過,python要完成高併發需要協程,事件迴圈,高效IO模型.而Python自帶的asyncio模組已經全部完成了.盡情使用吧.

下面是有引用官方的一個例子

import asyncio

# 通過async宣告一個協程
async def handle_echo(reader, writer):
    # 將需要io的函式使用await等待, 那麼此函式就會停止
    # 當IO操作完成會喚醒這個協程
    # 可以將await理解為yield from
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()

    print("Close the client socket")
    writer.close()

# 建立事件迴圈
loop = asyncio.get_event_loop()
# 通過asyncio.start_server方法建立一個協程
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

總的來說python3.5明確了什麼是協程,什麼是生成器,雖然原理差不多,但是這樣會使得不會讓生成器即可以作為生成器使用(比如迭代資料)又可以作為協程。

所以引入了async,await使得協程的語義更加明確。