python------IO模型
一、IO模型:
1.blocking IO 阻塞IO
2.nonblocking IO 非阻塞IO
3.IO multiplexing IO多路復用
4.signal driven IO 信號驅動IO
5.asynchronous IO 異步IO
二、阻塞IO(blocking IO)
在linux中,默認情況下所有的socket都是blocking。
blocking IO的特點就是IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。
阻塞型接口:指系統調用(一般是IO接口)不返回調用結果並讓當前線程一直阻塞,只有當該系統調用獲得結果或者超時出錯時才返回。
‘線程池’或‘連接池’或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題。
三、非阻塞IO
Linux下,可以通過設置socket使其變為non-blocking。
在非阻塞IO中,用戶進程其實是需要不斷的主動詢問kernel數據準備好了沒有。
# 服務端 import socket sk = socket.socket() sk.bind((‘127.0.0.1‘,8080)) sk.listen() sk.setblocking(False) conn_lst非阻塞IO實例 服務端= [] while True: try: conn,addr = sk.accept() #非阻塞 有鏈接來 conn_lst.append(conn) except BlockingIOError: del_lst = [] for c in conn_lst: # 才能執行這一句 try: msg = c.recv(10).decode(‘utf-8‘) # recv不會阻塞 if not msg: c.close() del_lst.append(c)else: print(msg) c.send(msg.upper().encode(‘utf-8‘)) except BlockingIOError: pass if del_lst: for del_item in del_lst: conn_lst.remove(del_item)
import time import socket import threading def func(): sk = socket.socket() sk.connect((‘127.0.0.1‘,8080)) time.sleep(1) sk.send(b‘hi‘) print(sk.recv(10)) sk.close() for i in range(10): threading.Thread(target=func,).start()非阻塞IO實例 客戶端
非阻塞IO優缺點
優點:能夠在等待任務完成的時間裏幹別的活(包括提交其他任務,也就是‘後臺’可以有多個任務在‘同時’執行)。
缺點:
1.循環調用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現卡機情況。
2.任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體數據吞吐量的降低。
此外,非阻塞IO中recv()更多的是起到檢測“操作是否完成”的作用,實際操作系統提供了更為高效的檢測“操作是否完成”作用的接口,例如select()多路復用模式,可以一次檢測多個連接是否活躍。
四、多路復用IO
當用戶進程調用了select,那麽整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個和blocking IO其實並沒有太大的不同,事實上還更差一些。因為這裏需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。
強調:
1. 如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。
2. 在多路復用模型中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
結論: select的優勢在於可以處理多個連接,不適用於單個連接
import socket import select sk = socket.socket() sk.bind((‘127.0.0.1‘,8099)) sk.listen() read_lst = [sk] while True: rl,wl,xl = select.select(read_lst,[],[]) # select阻塞,rl可以讀的 wl可以寫的 xl可以改的 [sk,conn] for item in rl: if item == sk: conn,addr = item.accept() # 有數據等待著它接收 read_lst.append(conn) else: ret = item.recv(1024).decode(‘utf-8‘) if not ret: item.close() read_lst.remove(item) else: print(ret) item.send((‘received %s‘%ret).encode(‘utf-8‘))IO多路復用服務端
import time import socket import threading def client_async(args): sk = socket.socket() sk.connect((‘127.0.0.1‘,8099)) for i in range(10): time.sleep(2) sk.send((‘%s[%s] :hello‘%(args,i)).encode(‘utf-8‘)) print(sk.recv(1024)) sk.close() for i in range(10): threading.Thread(target=client_async,args=(‘*‘*i,)).start()IO多路復用客戶端
多路復用IO的優缺點
優點:相比其他模型,使用select() 的事件驅動模型只用單線程(進程)執行,占用資源少,不消耗太多 CPU,同時能夠為多客戶端提供服務。如果試圖建立一個簡單的事件驅動的服務器程序,這個模型有一定的參考價值。
缺點:
1.select()接口並不是實現“事件驅動”的最好選擇。因為當需要探測的句柄值較大時,select()接口本身需要消耗大量時間去輪詢各個句柄。
2.很多操作系統提供了更為高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要實現更高效的服務器程序,類似epoll這樣的接口更被推薦。遺憾的是不同的操作系統特供的epoll接口有很大差異,所以使用類似於epoll的接口實現具有較好跨平臺能力的服務器會比較困難。
3.該模型將事件探測和事件響應夾雜在一起,一旦事件響應的執行體龐大,則對整個模型是災難性的。
五、異步IO
用戶進程發起read操作之後,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對用戶進程產生任何block。然後,kernel會等待數據準備完成,然後將數據拷貝到用戶內存,當這一切都完成之後,kernel會給用戶進程發送一個signal,告訴它read操作完成了。
六、selectors模塊
IO復用:為了解釋這個名詞,首先來理解下復用這個概念,復用也就是共用的意思,這樣理解還是有些抽象,為此,咱們來理解下復用在通信領域的使用,在通信領域中為了充分利用網絡連接的物理介質,往往在同一條網絡鏈路上采用時分復用或頻分復用的技術使其在同一鏈路上傳輸多路信號,到這裏我們就基本上理解了復用的含義,即公用某個“介質”來盡可能多的做同一類(性質)的事,那IO復用的“介質”是什麽呢?為此我們首先來看看服務器編程的模型,客戶端發來的請求服務端會產生一個進程來對其進行服務,每當來一個客戶請求就產生一個進程來服務,然而進程不可能無限制的產生,因此為了解決大量客戶端訪問的問題,引入了IO復用技術,即:一個進程可以同時對多個客戶請求進行服務。也就是說IO復用的“介質”是進程(準確的說復用的是select和poll,因為進程也是靠調用select和poll來實現的),復用一個進程(select和poll)來對多個IO進行服務,雖然客戶端發來的IO是並發的但是IO所需的讀寫數據多數情況下是沒有準備好的,因此就可以利用一個函數(select和poll)來監聽IO所需的這些數據的狀態,一旦IO有數據可以進行讀寫了,進程就來對這樣的IO進行服務。 理解完IO復用後,我們在來看下實現IO復用中的三個API(select、poll和epoll)的區別和聯系 select,poll,epoll都是IO多路復用的機制,I/O多路復用就是通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知應用程序進行相應的讀寫操作。但select,poll,epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒後自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需自己負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。三者的原型如下所示: int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); int poll(struct pollfd *fds, nfds_t nfds, int timeout); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 1.select的第一個參數nfds為fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制為__FD_SETSIZE(1024),位數組的每一位代表其對應的描述符是否需要被檢查。第二三四參數表示需要關註讀、寫、錯誤事件的文件描述符位數組,這些參數既是輸入參數也是輸出參數,可能會被內核修改用於標示哪些描述符上發生了關註的事件,所以每次調用select前都需要重新初始化fdset。timeout參數為超時時間,該結構會被內核修改,其值為超時剩余的時間。 select的調用步驟如下: (1)使用copy_from_user從用戶空間拷貝fdset到內核空間 (2)註冊回調函數__pollwait (3)遍歷所有fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據情況會調用到tcp_poll,udp_poll或者datagram_poll) (4)以tcp_poll為例,其核心實現就是__pollwait,也就是上面註冊的回調函數。 (5)__pollwait的主要工作就是把current(當前進程)掛到設備的等待隊列中,不同的設備有不同的等待隊列,對於tcp_poll 來說,其等待隊列是sk->sk_sleep(註意把進程掛到等待隊列中並不代表進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數 據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。 (6)poll方法返回時會返回一個描述讀寫操作是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。 (7)如果遍歷完所有的fd,還沒有返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。如果超過一定的超時時間(schedule_timeout 指定),還是沒人喚醒,則調用select的進程會重新被喚醒獲得CPU,進而重新遍歷fd,判斷有沒有就緒的fd。 (8)把fd_set從內核空間拷貝到用戶空間。 總結下select的幾大缺點: (1)每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大 (2)同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大 (3)select支持的文件描述符數量太小了,默認是1024 2. poll與select不同,通過一個pollfd數組向內核傳遞需要關註的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用於標示關註的事件和發生的事件,故pollfd數組只需要被初始化一次。 poll的實現機制與select類似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,然後對pollfd中的每個描述符進行poll,相比處理fdset來說,poll效率更高。poll返回後,需要對pollfd中的每個元素檢查其revents值,來得指事件是否發生。 3.直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麽它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這裏也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法後,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。 epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麽解決的呢?在此之前,我們先看一下epoll 和select和poll的調用接口上的不同,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函 數,epoll_create,epoll_ctl和epoll_wait,epoll_create是創建一個epoll句柄;epoll_ctl是註 冊要監聽的事件類型;epoll_wait則是等待事件的產生。 對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定 EPOLL_CTL_ADD),會把所有的fd拷貝進內核,而不是在epoll_wait的時候重復拷貝。epoll保證了每個fd在整個過程中只會拷貝 一次。 對於第二個缺點,epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應的設備等待隊列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)並為每個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調 函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是類似的)。 對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大可以打開文件的數目,這個數字一般遠大於2048,舉個例子, 在1GB內存的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統內存關系很大。 總結: (1)select,poll實現需要自己不斷輪詢所有fd集合,直到設備就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要調用 epoll_wait不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替,但是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在 epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,但是select和poll在“醒著”的時候要遍歷整個fd集合,而epoll在“醒著”的 時候只要判斷一下就緒鏈表是否為空就行了,這節省了大量的CPU時間,這就是回調機制帶來的性能提升。 (2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,並且要把current往設備等待隊列中掛一次,而epoll只要 一次拷貝,而且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,註意這裏的等待隊列並不是設備等待隊列,只是一個epoll內 部定義的等待隊列),這也能節省不少的開銷。select,poll,epoll
這三種IO多路復用模型在不同的平臺有著不同的支持,而epoll在windows下就不支持,好在我們有selectors模塊,幫我們默認選擇當前平臺下最合適的。
#服務端 from socket import * import selectors sel=selectors.DefaultSelector() # 創建一個默認的多路復用模型 def accept(sk): conn,addr=sk.accept() sel.register(conn,selectors.EVENT_READ,read) def read(conn): try: data=conn.recv(1024) if not data: #win8 win10 print(‘closing‘,conn) sel.unregister(conn) conn.close() return conn.send(data.upper()+b‘_SB‘) except Exception: # linux操作系統 print(‘closing‘, conn) sel.unregister(conn) conn.close() sk=socket(AF_INET,SOCK_STREAM) sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) sk.bind((‘127.0.0.1‘,8088)) sk.listen(5) sk.setblocking(False) #設置socket的接口為非阻塞 sel.register(sk,selectors.EVENT_READ,accept) #相當於網select的讀列表裏append了一個文件句柄server_fileobj,並且綁定了一個回調函數accept while True: events=sel.select() #檢測所有的fileobj,是否有完成wait data的 #[sk,conn] for sel_obj,mask in events: # 有人觸動了你在sel當中註冊的對象 callback=sel_obj.data #callback=accpet # sel_obj.data就能拿到當初註冊的時候寫的accept/read方法 callback(sel_obj.fileobj) #accpet(sk)/read(conn)基於selectors模塊實現聊天-服務端
#客戶端 from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect((‘127.0.0.1‘,8088)) while True: msg=input(‘>>: ‘) if not msg:continue c.send(msg.encode(‘utf-8‘)) data=c.recv(1024) print(data.decode(‘utf-8‘))基於selectors模塊實現聊天-客戶端
python------IO模型