python系列之
阿新 • • 發佈:2018-12-31
I/O多路複用是在單執行緒模式下實現多執行緒的效果,實現一個多I/O併發的效果。
客戶端:
有沒有一種比較好的方式呢,當然有,其一是使用非同步socket。 這種socket只有在一些event觸發時才會阻塞。相反,程式在非同步socket上面執行一個動作,會立即被告知這個動作是否成功。程式會根據這個信 息決定怎麼繼續下面的操作由於非同步socket是非阻塞的,就沒有必要再來使用多執行緒。所有的工作都可以在一個執行緒中完成。這種單執行緒模式有它自己的挑 戰,但可以成為很多方案不錯的選擇。它也可以結合多執行緒一起使用:單執行緒使用非同步socket用於處理伺服器的網路部分,多執行緒可以用來訪問其他阻塞資 源,比如資料庫。Linux的2.6核心有一系列機制來管理異 步socket,其中3個有對應的Python的API:select、poll和epoll。epoll和pool比select更好,因為 Python程式不需要檢查每一個socket感興趣的event。相反,它可以依賴作業系統來告訴它哪些socket可能有這些event。epoll 比pool更好,因為它不要求作業系統每次都去檢查python程式需要的所有socket感興趣的event。而是Linux在event發生的時候會 跟蹤到,並在Python需要的時候返回一個列表。因此epoll對於大量(成千上萬)併發socket連線,是更有效率和可擴充套件的機制
看一個簡單socket例子:
import socket SOCKET_FAMILY = socket.AF_INET SOCKET_TYPE = socket.SOCK_STREAM sockServer = socket.socket() sockServer.bind(('0.0.0.0', 8888)) sockServer.listen(5) while True: cliobj, addr = sockServer.accept() while True: recvdata = cliobj.recv(1024) if recvdata: print(recvdata.decode()) else: cliobj.close() break
客戶端:
import socket
socCli = socket.socket()
socCli.connect(('127.0.0.1', 8888))
while True:
data = input("input str:")
socCli.send(data.encode())
<br />
以上為一個簡單的客戶端傳送一個輸入資訊給服務端的socket通訊的例項,在以上的例子中,服務端是一個單執行緒、阻塞模式的。如何實現多客戶端連線呢,我們可以使用多執行緒模式,這個當然沒有問題。 使用多執行緒、阻塞socket來處理的話,程式碼會很直觀,但是也會有不少缺陷。它很難確保執行緒共享資源沒有問題。而且這種程式設計風格的程式在只有一個CPU的電腦上面效率更低。但如果一個使用者開啟的執行緒有限的情況下,比如1024個。當第1025個客戶端連線是仍然會阻塞。有沒有一種比較好的方式呢,當然有,其一是使用非同步socket。 這種socket只有在一些event觸發時才會阻塞。相反,程式在非同步socket上面執行一個動作,會立即被告知這個動作是否成功。程式會根據這個信 息決定怎麼繼續下面的操作由於非同步socket是非阻塞的,就沒有必要再來使用多執行緒。所有的工作都可以在一個執行緒中完成。這種單執行緒模式有它自己的挑 戰,但可以成為很多方案不錯的選擇。它也可以結合多執行緒一起使用:單執行緒使用非同步socket用於處理伺服器的網路部分,多執行緒可以用來訪問其他阻塞資 源,比如資料庫。Linux的2.6核心有一系列機制來管理異 步socket,其中3個有對應的Python的API:select、poll和epoll。epoll和pool比select更好,因為 Python程式不需要檢查每一個socket感興趣的event。相反,它可以依賴作業系統來告訴它哪些socket可能有這些event。epoll 比pool更好,因為它不要求作業系統每次都去檢查python程式需要的所有socket感興趣的event。而是Linux在event發生的時候會 跟蹤到,並在Python需要的時候返回一個列表。因此epoll對於大量(成千上萬)併發socket連線,是更有效率和可擴充套件的機制
非同步I/O處理模型
select最早於1983年出現在4.2BSD中,它通過一個select()系統呼叫來監視多個檔案描述符的陣列,當select()返回後,該陣列中就緒的檔案描述符便會被核心修改標誌位,使得程序可以獲得這些檔案描述符從而進行後續的讀寫操作。
select目前幾乎在所有的平臺上支援,其良好跨平臺支援也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。
select的一個缺點在於單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改巨集定義甚至重新編譯核心的方式提升這一限制。
另外,select()所維護的儲存大量檔案描述符的資料結構,隨著檔案描述符數量的增大,其複製的開銷也線性增長。同時,由於網路響應時間的延遲使得大量TCP連線處於非活躍狀態,但呼叫select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷
select poll epoll比較
1 特點 | |
select | select本質上是通過設定或者檢查存放fd標誌位的資料結構來進行下一步處理。這樣所帶來的缺點是: 1 單個程序可監視的fd數量被限制 2 需要維護一個用來存放大量fd的資料結構,這樣會使得使用者空間和核心空間在傳遞該結構時複製開銷大 3 對socket進行掃描時是線性掃描 |
poll | poll本質上和select沒有區別,它將使用者傳入的陣列拷貝到核心空間,然後查詢每個fd對應的裝置狀態,如果裝置就緒則在裝置等待佇列中加入一項並繼續遍歷,如果遍歷完所有fd後沒有發現就緒裝置,則掛起當前程序,直到裝置就緒或者主動超時,被喚醒後它又要再次遍歷fd。這個過程經歷了多次無謂的遍歷。 它沒有最大連線數的限制,原因是它是基於連結串列來儲存的,但是同樣有一個缺點: 大量的fd的陣列被整體複製於使用者態和核心地址空間之間,而不管這樣的複製是不是有意義。 poll還有一個特點是“水平觸發”,如果報告了fd後,沒有被處理,那麼下次poll時會再次報告該fd。 |
epoll | epoll支援水平觸發和邊緣觸發,最大的特點在於邊緣觸發,它只告訴程序哪些fd剛剛變為就需態,並且只會通知一次。 在前面說到的複製問題上,epoll使用mmap減少複製開銷。 還有一個特點是,epoll使用“事件”的就緒通知方式,通過epoll_ctl註冊fd,一旦該fd就緒,核心就會採用類似callback的回撥機制來啟用該 fd,epoll_wait便可以收到通知 |
2 支援一個程序所能開啟的最大連線數 | |
select | 單個程序所能開啟的最大連線數有FD_SETSIZE巨集定義,其大小是32個整數的大小(在32位的機器上,大小就是32*32,同理64位機器上FD_SETSIZE為32*64),當然我們可以對進行修改,然後重新編譯核心,但是效能可能會受到影響,這需要進一步的測試。 |
poll | poll本質上和select沒有區別,但是它沒有最大連線數的限制,原因是它是基於連結串列來儲存的 |
epoll | 雖然連線數有上限,但是很大,1G記憶體的機器上可以開啟10萬左右的連線,2G記憶體的機器可以開啟20萬左右的連線 |
3 FD劇增後帶來的IO效率問題 | |
select | 因為每次呼叫時都會對連線進行線性遍歷,所以隨著FD的增加會造成遍歷速度慢的“線性下降效能問題”。 |
poll | 同上 |
epoll | 因為epoll核心中實現是根據每個fd上的callback函式來實現的,只有活躍的socket才會主動呼叫callback,所以在活躍socket較少的情況下,使用epoll沒有前面兩者的線性下降的效能問題,但是所有socket都很活躍的情況下,可能會有效能問題。 |
4 訊息傳遞方式 | |
select | 核心需要將訊息傳遞到使用者空間,都需要核心拷貝動作。 |
poll | 同上 |
epoll | epoll通過核心和使用者空間共享一塊記憶體來實現的。 |
下面我們對上面的socket例子進行改造,看一下select的例子:
elect 詳細解釋,用執行緒的IO多路複用實現一個讀寫分離的、支援多客戶端的連線請求
"""
import socket
import queue
from select import select
SERVER_IP = ('127.0.0.1', 9999)
# 儲存客戶端傳送過來的訊息,將訊息放入佇列中
message_queue = {}
input_list = []
output_list = []
if __name__ == "__main__":
server = socket.socket()
server.bind(SERVER_IP)
server.listen(10)
# 設定為非阻塞
server.setblocking(False)
# 初始化將服務端加入監聽列表
input_list.append(server)
while True:
# 開始 select 監聽,對input_list中的服務端server進行監聽
stdinput, stdoutput, stderr = select(input_list, output_list, input_list)
# 迴圈判斷是否有客戶端連線進來,當有客戶端連線進來時select將觸發
for obj in stdinput:
# 判斷當前觸發的是不是服務端物件, 當觸發的物件是服務端物件時,說明有新客戶端連線進來了
if obj == server:
# 接收客戶端的連線, 獲取客戶端物件和客戶端地址資訊
conn, addr = server.accept()
print("Client {0} connected! ".format(addr))
# 將客戶端物件也加入到監聽的列表中, 當客戶端傳送訊息時 select 將觸發
input_list.append(conn)
# 為連線的客戶端單獨建立一個訊息佇列,用來儲存客戶端傳送的訊息
message_queue[conn] = queue.Queue()
else:
# 由於客戶端連線進來時服務端接收客戶端連線請求,將客戶端加入到了監聽列表中(input_list),客戶端傳送訊息將觸發
# 所以判斷是否是客戶端物件觸發
try:
recv_data = obj.recv(1024)
# 客戶端未斷開
if recv_data:
print("received {0} from client {1}".format(recv_data.decode(), addr))
# 將收到的訊息放入到各客戶端的訊息佇列中
message_queue[obj].put(recv_data)
# 將回復操作放到output列表中,讓select監聽
if obj not in output_list:
output_list.append(obj)
except ConnectionResetError:
# 客戶端斷開連線了,將客戶端的監聽從input列表中移除
input_list.remove(obj)
# 移除客戶端物件的訊息佇列
del message_queue[obj]
print("\n[input] Client {0} disconnected".format(addr))
# 如果現在沒有客戶端請求,也沒有客戶端傳送訊息時,開始對傳送訊息列表進行處理,是否需要傳送訊息
for sendobj in output_list:
try:
# 如果訊息佇列中有訊息,從訊息佇列中獲取要傳送的訊息
if not message_queue[sendobj].empty():
# 從該客戶端物件的訊息佇列中獲取要傳送的訊息
send_data = message_queue[sendobj].get()
sendobj.sendall(send_data)
else:
# 將監聽移除等待下一次客戶端傳送訊息
output_list.remove(sendobj)
except ConnectionResetError:
# 客戶端連線斷開了
del message_queue[sendobj]
output_list.remove(sendobj)
print("\n[output] Client {0} disconnected".format(addr))
epoll實現例項
#!/usr/bin/env python
import select
import socket
response = b''
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
# 因為socket預設是阻塞的,所以需要使用非阻塞(非同步)模式。
serversocket.setblocking(0)
# 建立一個epoll物件
epoll = select.epoll()
# 在服務端socket上面註冊對讀event的關注。一個讀event隨時會觸發服務端socket去接收一個socket連線
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
# 字典connections對映檔案描述符(整數)到其相應的網路連線物件
connections = {}
requests = {}
responses = {}
while True:
# 查詢epoll物件,看是否有任何關注的event被觸發。引數“1”表示,我們會等待1秒來看是否有event發生。
# 如果有任何我們感興趣的event發生在這次查詢之前,這個查詢就會帶著這些event的列表立即返回
events = epoll.poll(1)
# event作為一個序列(fileno,event code)的元組返回。fileno是檔案描述符的代名詞,始終是一個整數。
for fileno, event in events:
# 如果是服務端產生event,表示有一個新的連線進來
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
print('client connected:', address)
# 設定新的socket為非阻塞模式
connection.setblocking(0)
# 為新的socket註冊對讀(EPOLLIN)event的關注
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
# 初始化接收的資料
requests[connection.fileno()] = b''
# 如果發生一個讀event,就讀取從客戶端傳送過來的新資料
elif event & select.EPOLLIN:
print("------recvdata---------")
# 接收客戶端傳送過來的資料
requests[fileno] += connections[fileno].recv(1024)
# 如果客戶端退出,關閉客戶端連線,取消所有的讀和寫監聽
if not requests[fileno]:
connections[fileno].close()
# 刪除connections字典中的監聽物件
del connections[fileno]
# 刪除接收資料字典對應的控制代碼物件
del requests[connections[fileno]]
print(connections, requests)
epoll.modify(fileno, 0)
else:
# 一旦完成請求已收到,就登出對讀event的關注,註冊對寫(EPOLLOUT)event的關注。寫event發生的時候,會回覆資料給客戶端
epoll.modify(fileno, select.EPOLLOUT)
# 列印完整的請求,證明雖然與客戶端的通訊是交錯進行的,但資料可以作為一個整體來組裝和處理
print('-' * 40 + '\n' + requests[fileno].decode())
# 如果一個寫event在一個客戶端socket上面發生,它會接受新的資料以便傳送到客戶端
elif event & select.EPOLLOUT:
print("-------send data---------")
# 每次傳送一部分響應資料,直到完整的響應資料都已經發送給作業系統等待傳輸給客戶端
byteswritten = connections[fileno].send(requests[fileno])
requests[fileno] = requests[fileno][byteswritten:]
if len(requests[fileno]) == 0:
# 一旦完整的響應資料傳送完成,就不再關注寫event
epoll.modify(fileno, select.EPOLLIN)
# HUP(掛起)event表明客戶端socket已經斷開(即關閉),所以服務端也需要關閉。
# 沒有必要註冊對HUP event的關注。在socket上面,它們總是會被epoll物件註冊
elif event & select.EPOLLHUP:
print("end hup------")
# 登出對此socket連線的關注
epoll.unregister(fileno)
# 關閉socket連線
connections[fileno].close()
del connections[fileno]
finally:
# 開啟的socket連線不需要關閉,因為Python會在程式結束的時候關閉。這裡顯式關閉是一個好的程式碼習慣
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()