1. 程式人生 > >Python--IO模型

Python--IO模型

沒有 class 即使 本質 限制 接收 oca 操作系統相關 readlines

Python----IO模型

原文地址:http://www.jb51.net/article/87466.htm

網絡I/O模型
人多了,就會有問題。web剛出現的時候,光顧的人很少。近年來網絡應用規模逐漸擴大,應用的架構也需要隨之改變。C10k的問題,讓工程師們需要思考服務的性能與應用的並發能力。

網絡應用需要處理的無非就是兩大類問題,網絡I/O,數據計算。相對於後者,網絡I/O的延遲,給應用帶來的性能瓶頸大於後者。網絡I/O的模型大致有如下幾種:

  • 同步模型(synchronous I/O)
  • 阻塞I/O(bloking I/O)
  • 非阻塞I/O(non-blocking I/O)
  • 多路復用I/O(multiplexing I/O)
  • 信號驅動式I/O(signal-driven I/O)
  • 異步I/O(asynchronous I/O)

網絡I/O的本質是socket的讀取,socket在linux系統被抽象為流,I/O可以理解為對流的操作。這個操作又分為兩個階段:

等待流數據準備(wating for the data to be ready)。
從內核向進程復制數據(copying the data from the kernel to the process)。
對於socket流而已,

第一步通常涉及等待網絡上的數據分組到達,然後被復制到內核的某個緩沖區。
第二步把數據從內核緩沖區復制到應用進程緩沖區。
I/O模型:
舉個簡單比喻,來了解這幾種模型。網絡IO好比釣魚,等待魚上鉤就是網絡中等待數據準備好的過程,魚上鉤了,把魚拉上岸就是內核復制數據階段。釣魚的人就是一個應用進程。

阻塞I/O(bloking I/O)
阻塞I/O是最流行的I/O模型。它符合人們最常見的思考邏輯。阻塞就是進程 "被" 休息, CPU處理其它進程去了。在網絡I/O的時候,進程發起recvform系統調用,然後進程就被阻塞了,什麽也不幹,直到數據準備好,並且將數據從內核復制到用戶進程,最後進程再處理數據,在等待數據到處理數據的兩個階段,整個進程都被阻塞。不能處理別的網絡I/O。大致如下圖:

技術分享

這就好比我們去釣魚,拋竿之後就一直在岸邊等,直到等待魚上鉤。然後再一次拋竿,等待下一條魚上鉤,等待的時候,什麽事情也不做,大概會胡思亂想吧。

阻塞IO的特點就是在IO執行的兩個階段都被block了
非阻塞I/O(non-bloking I/O)

在網絡I/O時候,非阻塞I/O也會進行recvform系統調用,檢查數據是否準備好,與阻塞I/O不一樣,"非阻塞將大的整片時間的阻塞分成N多的小的阻塞, 所以進程不斷地有機會 ‘被‘ CPU光顧"。

也就是說非阻塞的recvform系統調用調用之後,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒準備好,此時會返回一個error。進程在返回之後,可以幹點別的事情,然後再發起recvform系統調用。重復上面的過程,循環往復的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。需要註意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

技術分享

我們再用釣魚的方式來類別,當我們拋竿入水之後,就看下魚漂是否有動靜,如果沒有魚上鉤,就去幹點別的事情,比如再挖幾條蚯蚓。然後不久又來看看魚漂是否有魚上鉤。這樣往返的檢查又離開,直到魚上鉤,再進行處理。

非阻塞 IO的特點是用戶進程需要不斷的主動詢問kernel數據是否準備好。
多路復用I/O(multiplexing I/O)
可以看出,由於非阻塞的調用,輪詢占據了很大一部分過程,輪詢會消耗大量的CPU時間。結合前面兩種模式。如果輪詢不是進程的用戶態,而是有人幫忙就好了。多路復用正好處理這樣的問題。

多路復用有兩個特別的系統調用select或poll。select調用是內核級別的,select輪詢相對非阻塞的輪詢的區別在於---前者可以等待多個socket,當其中任何一個socket的數據準好了,就能返回進行可讀,然後進程再進行recvform系統調用,將數據由內核拷貝到用戶進程,當然這個過程是阻塞的。多路復用有兩種阻塞,select或poll調用之後,會阻塞進程,與第一種阻塞不同在於,此時的select不是等到socket數據全部到達再處理, 而是有了一部分數據就會調用用戶進程來處理。如何知道有一部分數據到達了呢?監視的事情交給了內核,內核負責數據到達的處理。也可以理解為"非阻塞"吧。技術分享

對於多路復用,也就是輪詢多個socket。釣魚的時候,我們雇了一個幫手,他可以同時拋下多個釣魚竿,任何一桿的魚一上鉤,他就會拉桿。他只負責幫我們釣魚,並不會幫我們處理,所以我們還得在一幫等著,等他把收桿。我們再處理魚。多路復用既然可以處理多個I/O,也就帶來了新的問題,多個I/O之間的順序變得不確定了,當然也可以針對不同的編號。

多路復用的特點是通過一種機制一個進程能同時等待IO文件描述符,內核監視這些文件描述符(套接字描述符),其中的任意一個進入讀就緒狀態,select, poll,epoll函數就可以返回。對於監視的方式,又可以分為 select, poll, epoll三種方式。
了解了前面三種模式,在用戶進程進行系統調用的時候,他們在等待數據到來的時候,處理的方式不一樣,直接等待,輪詢,select或poll輪詢,第一個過程有的阻塞,有的不阻塞,有的可以阻塞又可以不阻塞。當時第二個過程都是阻塞的。從整個I/O過程來看,他們都是順序執行的,因此可以歸為同步模型(asynchronous)。都是進程主動向內核檢查。

異步I/O(asynchronous I/O)
相對於同步I/O,異步I/O不是順序執行。用戶進程進行aio_read系統調用之後,無論內核數據是否準備好,都會直接返回給用戶進程,然後用戶態進程可以去做別的事情。等到socket數據準備好了,內核直接復制數據給進程,然後從內核向進程發送通知。I/O兩個階段,進程都是非阻塞的。

技術分享

比之前的釣魚方式不一樣,這一次我們雇了一個釣魚高手。他不僅會釣魚,還會在魚上鉤之後給我們發短信,通知我們魚已經準備好了。我們只要委托他去拋竿,然後就能跑去幹別的事情了,直到他的短信。我們再回來處理已經上岸的魚。

同步和異步的區別
通過對上述幾種模型的討論,需要區分阻塞和非阻塞,同步和異步。他們其實是兩組概念。區別前一組比較容易,後一種往往容易和前面混合。在我看來,所謂同步就是在整個I/O過程。尤其是拷貝數據的過程是阻塞進程的,並且都是應用進程態去檢查內核態。而異步則是整個過程I/O過程用戶進程都是非阻塞的,並且當拷貝數據的時是由內核發送通知給用戶進程。

技術分享

對於同步模型,主要是第一階段處理方法不一樣。而異步模型,兩個階段都不一樣。這裏我們忽略了信號驅動模式。這幾個名詞還是容易讓人迷惑,只有同步模型才考慮阻塞和非阻塞,因為異步肯定是非阻塞,異步非阻塞的說法感覺畫蛇添足。


Select 模型
同步模型中,使用多路復用I/O可以提高服務器的性能。
在多路復用的模型中,比較常用的有select模型和poll模型。這兩個都是系統接口,由操作系統提供。當然,Python的select模塊進行了更高級的封裝。select與poll的底層原理都差不多。千呼萬喚始出來,本文的重點select模型。
1.select 原理
網絡通信被Unix系統抽象為文件的讀寫,通常是一個設備,由設備驅動程序提供,驅動可以知道自身的數據是否可用。支持阻塞操作的設備驅動通常會實現一組自身的等待隊列,如讀/寫等待隊列用於支持上層(用戶層)所需的block或non-block操作。設備的文件的資源如果可用(可讀或者可寫)則會通知進程,反之則會讓進程睡眠,等到數據到來可用的時候,再喚醒進程。

這些設備的文件描述符被放在一個數組中,然後select調用的時候遍歷這個數組,如果對於的文件描述符可讀則會返回改文件描述符。當遍歷結束之後,如果仍然沒有一個可用設備文件描述符,select讓用戶進程則會睡眠,直到等待資源可用的時候在喚醒,遍歷之前那個監視的數組。每次遍歷都是線性的。

2.select 回顯服務器
select涉及系統調用和操作系統相關的知識,因此單從字面上理解其原理還是比較乏味。用代碼來演示最好不過了。使用python的select模塊很容易寫出下面一個回顯服務器:

 1 import select
 2 import socket
 3 import sys
 4  
 5 HOST = localhost
 6 PORT = 5000
 7 BUFFER_SIZE = 1024
 8  
 9 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
10 server.bind((HOST, PORT))
11 server.listen(5)
12  
13 inputs = [server, sys.stdin]
14 running = True
15  
16 while True:
17   try:
18     # 調用 select 函數,阻塞等待
19     readable, writeable, exceptional = select.select(inputs, [], [])
20   except select.error, e:
21     break
22  
23   # 數據抵達,循環
24   for sock in readable:
25     # 建立連接
26     if sock == server:
27       conn, addr = server.accept()
28       # select 監聽的socket
29       inputs.append(conn)
30     elif sock == sys.stdin:
31       junk = sys.stdin.readlines()
32       running = False
33     else:
34       try:
35         # 讀取客戶端連接發送的數據
36         data = sock.recv(BUFFER_SIZE)
37         if data:
38           sock.send(data)
39           if data.endswith(\r\n\r\n):
40             # 移除select監聽的socket
41             inputs.remove(sock)
42             sock.close()
43         else:
44           # 移除select監聽的socket
45           inputs.remove(sock)
46           sock.close()
47       except socket.error, e:
48         inputs.remove(sock)
49  
50 server.close()

運行上述代碼,使用curl訪問http://localhost:5000,即可看命令行返回請求的HTTP request信息。

下面詳細解析上述代碼的原理。

1 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2 server.bind((HOST, PORT))
3 server.listen(5)

上述代碼使用socket初始化一個TCP套接字,並綁定主機地址和端口,然後設置服務器監聽。

1 inputs = [server, sys.stdin]

這裏定義了一個需要select監聽的列表,列表裏面是需要監聽的對象(等於系統監聽的文件描述符)。這裏監聽socket套接字和用戶的輸入。

然後代碼進行一個服務器無線循環。

1 try:
2   # 調用 select 函數,阻塞等待
3   readable, writeable, exceptional = select.select(inputs, [], [])
4 except select.error, e:
5   break

調用了select函數,開始循環遍歷監聽傳入的列表inputs。如果沒有curl服務器,此時沒有建立tcp客戶端連接,因此改列表內的對象都是數據資源不可用。因此select阻塞不返回。

客戶端輸入curl http://localhost:5000之後,一個套接字通信開始,此時input中的第一個對象server由不可用變成可用。因此select函數調用返回,此時的readable有一個套接字對象(文件描述符可讀)。

1 for sock in readable:
2   # 建立連接
3   if sock == server:
4     conn, addr = server.accept()
5     # select 監聽的socket
6     inputs.append(conn)

select返回之後,接下來遍歷可讀的文件對象,此時的可讀中只有一個套接字連接,調用套接字的accept()方法建立TCP三次握手的連接,然後把該連接對象追加到inputs監視列表中,表示我們要監視該連接是否有數據IO操作。

由於此時readable只有一個可用的對象,因此遍歷結束。再回到主循環,再次調用select,此時調用的時候,不僅會遍歷監視是否有新的連接需要建立,還是監視剛才追加的連接。如果curl的數據到了,select再返回到readable,此時在進行for循環。如果沒有新的套接字,將會執行下面的代碼:

 1 try:
 2   # 讀取客戶端連接發送的數據
 3   data = sock.recv(BUFFER_SIZE)
 4   if data:
 5     sock.send(data)
 6     if data.endswith(\r\n\r\n):
 7       # 移除select監聽的socket
 8       inputs.remove(sock)
 9       sock.close()
10   else:
11     # 移除select監聽的socket
12     inputs.remove(sock)
13     sock.close()
14 except socket.error, e:
15   inputs.remove(sock)

通過套接字連接調用recv函數,獲取客戶端發送的數據,當數據傳輸完畢,再把監視的inputs列表中除去該連接。然後關閉連接。

整個網絡交互過程就是如此,當然這裏如果用戶在命令行中輸入中斷,inputs列表中監視的sys.stdin也會讓select返回,最後也會執行下面的代碼:

1 elif sock == sys.stdin:
2   junk = sys.stdin.readlines()
3   running = False

有人可能有疑問,在程序處理sock連接的是時候,假設又輸入了curl對服務器請求,將會怎麽辦?此時毫無疑問,inputs裏面的server套接字會變成可用。等現在的for循環處理完畢,此時select調用就會返回server。如果inputs裏面還有上一個過程的conn連接,那麽也會循環遍歷inputs的時候,再一次針對新的套接字accept到inputs列表進行監視,然後繼續循環處理之前的conn連接。如此有條不紊的進行,直到for循環結束,進入主循環調用select。

任何時候,inputs監聽的對象有數據,下一次調用select的時候,就會繁返回readable,只要返回,就會對readable進行for循環,直到for循環結束在進行下一次select。

主要註意,套接字建立連接是一次IO,連接的數據抵達也是一次IO。

3.select的不足
盡管select用起來挺爽,跨平臺的特性。但是select還是存在一些問題。
select需要遍歷監視的文件描述符,並且這個描述符的數組還有最大的限制。隨著文件描述符數量的增長,用戶態和內核的地址空間的復制所引發的開銷也會線性增長。即使監視的文件描述符長時間不活躍了,select還是會線性掃描。

為了解決這些問題,操作系統又提供了poll方案,但是poll的模型和select大致相當,只是改變了一些限制。目前Linux最先進的方式是epoll模型。

許多高性能的軟件如nginx, nodejs都是基於epoll進行的異步。

selectors模塊

 1 import socket
 2 import select
 3 sk=socket.socket()
 4 sk.bind(("127.0.0.1",8800))
 5 sk.listen(5)
 6 sk.setblocking(False)
 7 inputs=[sk,]
 8 
 9 while True:            # [sk, conn, conn, conn]
10     r,w,e=select.select(inputs,[],[])   # 原理就是通過 select 來監聽多個 socket 對象,只要如果所有對象都沒有接收到信息,那麽就 阻塞 在這裏
11                                         # 如果有 socket 對象接受到了信息,就把他們 放到一個新的列表 r 中
12                                         # 將 r 中的每一個對象拿出來,判斷它是 服務端用來創建與客戶端連接的 socket 對象還是,和client接收、發送信息的隊形
13                                         # 通過判斷他們不同的身份來進行不同的操作(創建與新的 client 連接的通道,或者是與 已經存在的 client 進行通信)
14 
15     # r = [sk, conn, conn]
16     print(len(r))
17 
18     for obj in r:
19         if obj==sk:
20             conn,add=obj.accept()
21             print("conn:",conn)
22             inputs.append(conn)
23         else:
24 
25             data_byte=obj.recv(1024)
26             print(str(data_byte,utf8))
27             if not data_byte:
28                 inputs.remove(obj)
29                 continue
30             inp=input(回答%s: >>>%inputs.index(obj))
31             obj.sendall(bytes(inp,utf8))
32 
33     print(>>,r)

Python--IO模型