1. 程式人生 > >python實戰之IO多路複用(別名:事件驅動,三種模式:(sellect,poll,epoll),Python的selectors模組)

python實戰之IO多路複用(別名:事件驅動,三種模式:(sellect,poll,epoll),Python的selectors模組)

IO多路複用前需瞭解

通常,我們寫伺服器處理模型的程式時,有以下幾種模型:

(1)每收到一個請求,建立一個新的程序,來處理該請求;

(2)每收到一個請求,建立一個新的執行緒,來處理該請求;

(3)每收到一個請求,放入一個事件列表,讓主程序通過非阻塞I/O方式來處理請求

上面的幾種方式,各有千秋,

第(1)中方法,由於建立新的程序的開銷比較大,所以,會導致伺服器效能比較差,但實現比較簡單。

第(2)種方式,由於要涉及到執行緒的同步,有可能會面臨死鎖等問題。

第(3)種方式,在寫應用程式程式碼時,邏輯比前面兩種都複雜。

綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數

網路伺服器採用的方式

 

UI程式設計中,常常要對滑鼠點選進行相應,首先如何獲得滑鼠點選呢?
 

方式一:建立一個執行緒,該執行緒一直迴圈檢測是否有滑鼠點選,那麼這個方式有以下幾個缺點:
1. CPU資源浪費,可能滑鼠點選的頻率非常小,但是掃描執行緒還是會一直迴圈檢測,這會造成很多的CPU資源浪費;如果掃描滑鼠點選的介面是阻塞的呢?
2. 如果是堵塞的,又會出現下面這樣的問題,如果我們不但要掃描滑鼠點選,還要掃描鍵盤是否按下,由於掃描滑鼠時被堵塞了,那麼可能永遠不會去掃描鍵盤;
3. 如果一個迴圈需要掃描的裝置非常多,這又會引來響應時間的問題;
所以,該方式是非常不好的。

目前大部分的UI程式設計都是事件驅動模型,如很多UI平臺都會提供onClick()事件,這個事件就代表滑鼠按下事件。事件驅動模型大體思路如下:
1. 有一個事件(訊息)佇列;
2. 滑鼠按下時,往這個佇列中增加一個點選事件(訊息);
3. 有個迴圈,不斷從佇列取出事件,根據不同的事件,呼叫不同的函式,如onClick()onKeyDown()等;
4. 事件(訊息)一般都各自儲存各自的處理函式指標,這樣,每個訊息都有獨立的處理函式

事件驅動程式設計是一種程式設計正規化,這裡程式的執行流由外部事件來決定。它的特點是包含一個事件迴圈,當外部事件發生時使用回撥機制來觸發相應的處理。另外兩種常見的程式設計正規化是(單執行緒)同步以及多執行緒程式設計。

 

協程處理:簡單理解

一遇到IO就切換,IO操作是有作業系統完成的,Python通過排程作業系統的檔案介面進行IO操作.

得到結論:IO操作是作業系統負責完成的

問題是什麼時候確認作業系統已經完成了IO操作 我可以切換回去?

只要加一個回撥函式,告訴作業系統,如果你處理完了以後就調下回調函式,回撥函式就會通知我。接到通知我自然就知道你已經操作完了,然後切換回去獲取資料

 

同步IO和非同步IO,阻塞IO和非阻塞IO分別是什麼,到底有什麼區別?不同的人在不同的上下文下給出的答案是不同的。所以先限定一下本文的上下文

一 概念說明

在進行解釋之前,首先要說明幾個概念:
- 使用者空間和核心空間

32位的Linux來說,預設把記憶體劃分1G給核心用(核心空間),而3G給各個程序用(使用者空間)
 

- 程序切換

核心可以掛起CPU正在執行的程序,也可以進行恢復掛起的程序(稱之程序切換)

- 程序的阻塞

期待的某些事未傳送,如請求系統資源失敗、等待某種操作完成、新資料未到達等等就會轉換成阻塞狀態,不暫用CPU資源

- 檔案描述符

索引值,核心為每個程序所維護的該程序開啟檔案的記錄表,簡單理解程式開啟一個現有的檔案和建立一個新的檔案,核心會向該程式返回一個檔案描述符

- 快取 I/O

快取 I/O 又被稱作標準 I/O,大多數檔案系統的預設 I/O 操作都是快取 I/O。在 Linux 的快取 I/O 機制中,作業系統會將 I/O 的資料快取在檔案系統的頁快取( page cache )中,也就是說,資料會先被拷貝到作業系統核心的緩衝區中,然後才會從作業系統核心的緩衝區拷貝到應用程式的地址空間

 

快取 I/O 的缺點:
資料在傳輸過程中需要在應用程式地址空間和核心進行多次資料拷貝操作,這些資料拷貝操作所帶來的 CPU 以及記憶體開銷是非常大的。

 

IO模式

剛才說了,對於一次IO訪問(以read舉例),資料會先被拷貝到作業系統核心的緩衝區中,然後才會從作業系統核心的緩衝區拷貝到應用程式的地址空間。所以說,當一個read操作發生時,它會經歷兩個階段:
1. 等待資料準備 (Waiting for the data to be ready)
2. 將資料從核心拷貝到程序中 (Copying the data from the kernel to the process)

正式因為這兩個階段,linux系統產生了下面五種網路模式的方案:

- 阻塞 I/O(blocking IO)

請求發去以後,kernel開始準備資料(把硬碟的資料拷貝到核心快取區),此刻程序

入阻塞block狀態,,等到kernel把資料拷貝到了使用者記憶體(使用者空間),然後返回一個結果給程序,程序才解除block狀態

- 非阻塞 I/O(nonblocking IO)

請求以後,kernel直接返回一個錯誤,程序自然正常,我們可以進行迴圈檢測該請求直到kernal準備好為止

- I/O 多路複用( IO multiplexing)

electpollepoll都是IO多路複用的機制。I/O多路複用就是通過一種機制,一個程序可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程式進行相應的讀寫操作

它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序

所以,如果處理的連線數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server效能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連線能處理得更快,而是在於能處理更多的連線。)

非阻塞socket +selectepoll==對於使用者來說就是所謂的非同步I/O 開發者角度還是同步


- 訊號驅動 I/O( signal driven IO)

    由於signal driven IO在實際中並不常用,所以我這隻提及剩下的四種IO Model。

- 非同步 I/O(asynchronous IO)

  用的很少,很多漏洞

 

首先列一下,sellectpollepoll三者的區別 


select 
select
最早於1983年出現在4.2BSD中,它通過一個select()系統呼叫來監視多個檔案描述符的陣列,當select()返回後,該陣列中就緒的檔案描述符便會被核心修改標誌位,使得程序可以獲得這些檔案描述符從而進行後續的讀寫操作。

select目前幾乎在所有的平臺上支援,其良好跨平臺支援也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。

select的一個缺點在於單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改巨集定義甚至重新編譯核心的方式提升這一限制。

另外,select()所維護的儲存大量檔案描述符的資料結構,隨著檔案描述符數量的增大,其複製的開銷也線性增長。同時,由於網路響應時間的延遲使得大量TCP連線處於非活躍狀態,但呼叫select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。

poll 
poll
1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大檔案描述符數量的限制。

pollselect同樣存在一個缺點就是,包含大量檔案描述符的陣列被整體複製於使用者態和核心的地址空間之間,而不論這些檔案描述符是否就緒,它的開銷隨著檔案描述符數量的增加而線性增大。

另外,select()poll()將就緒的檔案描述符告訴程序後,如果程序沒有對其進行IO操作,那麼下次呼叫select()poll()的時候將再次報告這些檔案描述符,所以它們一般不會丟失就緒的訊息,這種方式稱為水平觸發(Level Triggered)。

epoll 
直到Linux2.6才出現了由核心直接支援的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下效能最好的多路I/O就緒通知方法。

epoll可以同時支援水平觸發和邊緣觸發(Edge Triggered,只告訴程序哪些檔案描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麼它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的效能要更高一些,但是程式碼實現相當複雜。

epoll同樣只告知那些就緒的檔案描述符,而且當我們呼叫epoll_wait()獲得就緒檔案描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體對映(mmap)技術,這樣便徹底省掉了這些檔案描述符在系統呼叫時複製的開銷。

另一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,程序只有在呼叫一定的方法後,核心才對所有監視的檔案描述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個檔案描述符,一旦基於某個檔案描述符就緒時,核心會採用類似callback的回撥機制,迅速啟用這個檔案描述符,當程序呼叫epoll_wait()時便得到通知。

 epoll(市場用的最多,如果單單連線不做任何事 1G核心記憶體可以支援10W連線)

市面上所謂的非同步IO :Nginx tornado twisted 只是稱之,其實真正是採用IO多路複用

 

Python select 

Pythonselect()方法直接呼叫作業系統的IO介面,它監控sockets,open files, and pipes(所有帶fileno()方法的檔案控制代碼)何時變成readable writeable, 或者通訊錯誤,select()使得同時監控多個連線變的簡單,並且這比寫一個長迴圈來等待和監控多客戶端連線要高效,因為select直接通過作業系統提供的C的網路介面進行操作,而不是通過Python的直譯器。

注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.

接下來通過echo server例子要以瞭解select 是如何通過單程序實現同時處理多個非阻塞的socket連線的

使用select需要非阻塞IO情況下才可以使用

select實戰服務端:

__author__ = "Burgess Zheng"
import select
import socket
import queue


server = socket.socket()
server.setblocking(0)

server_addr = ('localhost',10000)

print('starting up on %s port %s' % server_addr)
server.bind(server_addr) #設定非阻塞模式

server.listen(5)

inputs = [server, ] #自己也要監測呀,因為server本身也是個fd
#server就是本身的連線,在沒有連線過來的時候select是檢測該server本身連線
#如果發現server有活動代表有客戶端連線進來

outputs = []#輸出空列表
message_queues = {}#定義個一個空字典,接收客戶端的資料

while True:
    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果沒有任何fd就緒,那程式就會一直阻塞在這裡

    for s in readable: #每個s就是一個socket(#客戶端連線過來的例項)

        if s is server: #別忘記,上面我們server自己也當做一個fd放在了inputs列表裡,傳給了select,如果這個s是server,代表server這個fd就緒了,
            #就是有活動了, 什麼情況下它才有活動? 當然 是有新連線進來的時候 呀
            #新連線進來了,接受這個連線
            conn, client_addr = s.accept()#接收客戶端的資料,conn:連結標記 addr:對面地址
            print("new connection from",client_addr)
            conn.setblocking(0)
            inputs.append(conn) #為了不阻塞整個程式,我們不會立刻在這裡開始接收客戶端發來的資料, 把它放到inputs裡, 下一次loop時,這個新連線
            #就會被交給select去監聽,如果這個連線的客戶端發來了資料 ,那這個連線的fd在server端就會變成就續的,select就會把這個連線返回,返回到
            #readable 列表裡,然後你就可以loop readable列表,取出這個連線,開始接收資料了, 下面就是這麼幹 的

            message_queues[conn] = queue.Queue() #接收到客戶端的資料後,不立刻返回 ,暫存在佇列裡,以後傳送

        else: #s不是server的話,那就只能是一個 與客戶端建立的連線的fd了
            #客戶端的資料過來了,在這接收
            data = s.recv(1024)
            if data:
                print("收到來自[%s]的資料:" % s.getpeername()[0], data)
                message_queues[s].put(data) #收到的資料先放到queue裡,一會返回給客戶端
                if s not in outputs:
                    outputs.append(s) #為了不影響處理與其它客戶端的連線 , 這裡不立刻返回資料給客戶端


            else:#如果收不到data代表什麼呢? 代表客戶端斷開了呀
                print("客戶端斷開了",s)

                if s in outputs:
                    outputs.remove(s) #清理已斷開的連線

                inputs.remove(s) #清理已斷開的連線

                del message_queues[s] ##清理已斷開的連線


    for s in writeable:
        try :
            next_msg = message_queues[s].get_nowait()

        except queue.Empty:
            print("client [%s]" %s.getpeername()[0], "queue is empty..")
            outputs.remove(s)

        else:
            print("sending msg to [%s]"%s.getpeername()[0], next_msg)
            s.send(next_msg.upper())


    for s in exeptional:#出現連線異常
        print("handling exception for ",s.getpeername())
        inputs.remove(s)#檢測列表移除掉該異常
        if s in outputs:#如果返回列表有該連線移除掉
            outputs.remove(s)#移除掉字典該連線資料
        s.close()

        del message_queues[s]

客戶端1

#_*_coding:utf-8_*_
__author__ = "Burgess Zheng"


import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )

客戶端2

#_*_coding:utf-8_*_
__author__ = "Burgess Zheng"

import socket

HOST = 'localhost'  # The remote host
PORT = 10000  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)


    print('Received', data)
s.close()

執行結果

 

Python的selectors模組

selectors模組

This module allows high-level and efficient I/O multiplexing, built upon the select module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.

這個模組允許高層和高效的I / O多路複用,建立在原語的選擇模組。鼓勵使用者使用這個模組相反,除非他們想要精確控制所使用的作業系統原語。

selectors模組就是封裝了select 和epole多路複用IO

window不能支援epole模式,所以預設是select模式

linux預設是epole模式

試驗selectors(然後開啟1000個socket客戶端訪問server端)

server

import selectors
import socket
#Linux預設封裝是epole  window不支援epole所以預設是select

sel = selectors.DefaultSelector()#生成一個selector物件
def accept(sock,mask):#mask連線數量(當前多少個客戶端連線)
    conn, addr = sock.accept()  # conn:連結標記位,addr:對方的地址
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)#把該連結設定為非阻塞IO模式
    sel.register(conn, selectors.EVENT_READ, read)
    #把該客戶端的conn連結標記位註冊到sel#讓sel監聽
    #如果該連線再次活動就呼叫read該函式(往while迴圈看)



def read(conn, mask):
    data = conn.recv(1000)  #接收資料
    if data:#如果有資料
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # 返回資料
    else:
        print('closing', conn)#該連線斷開了
        sel.unregister(conn)#取消註冊
        conn.close()#關閉該連線


sock = socket.socket()#生成socket
sock.bind(('localhost', 10000))#繫結埠
sock.listen(100)#最多併發100
sock.setblocking(False)#設定非阻塞IO模式
sel.register(sock, selectors.EVENT_READ, accept)
#server物件sock註冊到sel#讓sel監聽
#第一次客戶端連線的時候經過下面的while True
#如果是新連線經過了while True判斷以後會呼叫accept函式


while True:
    events = sel.select() #預設阻塞,有活動連線就返回活動的連線列表
    for key, mask in events:
            # events相當於sel監聽的連線列表(裡面目前監聽sock本伺服器的連線)
            # 由於server連線活動代表新的客戶連線進來了
            # 新連線的客戶端資訊返回值放入key
            # key =客戶端發起訪問連線的資訊

        callback = key.data  # callback = key.data = 調取accept函式
        callback(key.fileobj, mask)  
            #key.fileobj =檔案控制代碼==還沒有建立連線的客戶端資訊
            # accept(key.fileobj,mask) key.xx:實參 mask:實參

client(開啟100個socket連線服務端)

__author__ = "Burgess Zheng"


import socket
import sys

messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)]
#開啟100個socket
print(socks)

# Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)

for message in messages:

    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print( 'closing socket', s.getsockname() )

執行結果: