非阻塞的socket
非阻塞的socket
同步和非同步的概念描述的是使用者執行緒與核心的互動方式:同步是指使用者執行緒發起IO請求後需要等待或者輪詢核心IO操作完成後才能繼續執行;而非同步是指使用者執行緒發起IO請求後仍繼續執行,當核心IO操作完成後會通知使用者執行緒,或者呼叫使用者執行緒註冊的回撥函式。
阻塞和非阻塞的概念描述的是使用者執行緒呼叫核心IO操作的方式:阻塞是指IO操作需要徹底完成後才返回到使用者空間;而非阻塞是指IO操作被呼叫後立即返回給使用者一個狀態值,無需等到IO操作徹底完成。
有三種io方式
blocking IO: 發起IO操作後阻塞當前執行緒直到IO結束,標準的同步IO,如預設行為的posix read和write。
non-blocking IO: 發起IO操作後不阻塞,使用者可阻塞等待多個IO操作同時結束。non-blocking也是一種同步IO:“批量的同步”。如linux下的poll,select, epoll,BSD下的kqueue。
asynchronous IO: 發起IO操作後不阻塞,使用者得遞一個回撥待IO結束後被呼叫。如windows下的OVERLAPPED + IOCP。linux的native AIO只對檔案有效。
1.非阻塞IO
from socket import *
import time
#用來儲存所有的新連結socket
g_socket_list = list ()
def mian():
server_socket = socket(AF_INET,SOCK_STREAM)
server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,True)
server_socket.bind(('',7890))
server_socket.listen(128)
server_socket.setblocking(False)
while True:
# time.sleep(0.1)
try:
newClientInfo = server_socket.accept()
except Exception as result:
print(result)
else:
print('一個新的客戶端到來',str(newClientInfo))
newClientInfo[0].setblocking(False)#切換為非阻塞
g_socket_list.append(newClientInfo)
for clien_socket,client_addr in g_socket_list:
try:
recvData = clien_socket.recv(1024)
if recvData:
print('客戶端已關閉',client_addr)
clien_socket.close()
g_socket_list.remove((clien_socket,client_addr))
except Exception as result:
print(result)
pass
print(g_socket_list)
if __name__ == '__main__':
mian()
簡單來說,程式一直輪詢io埠,如果沒有響應不會發生阻塞等待有資料來領,而是丟擲異常,跳過,繼續輪詢。
設定非阻塞scoket在server_socket.setblocking(False)
非阻塞acept,非阻塞connect,非阻塞write,非阻塞read等也是一樣
單程序非阻塞的服務端實現
import time
from socket import *
import sys
import re
class WSGIServer(object):
def __init__(self,port):
self.server_socket = socket(AF_INET,SOCK_STREAM)
self.server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,True)
self.server_socket.setblocking(False)
self.server_socket.bind(('',port))
self.server_socket.listen(128)
self.socket_list = []
self.root = '/Users/keith/Downloads/wurenjizhineng/www.aspku.com'
def run(self):
while True:
try:
new_socket,new_addr = self.server_socket.accept()
except Exception as e:
print('-----1------',e)
else:
new_socket.setblocking(False)
self.socket_list.append(new_socket)
#socket輪詢了~
for client_socket in self.socket_list:
try:
recv_data = client_socket.recv(1024).decode('utf-8')
except Exception as e:
print('------2------',e)
else:
if recv_data:
self.deal_with_request(recv_data,client_socket)
else:
client_socket.close()
self.socket_list.remove(client_socket)
print(self.socket_list)
#處理請求
def deal_with_request(self,recv_data,client_socket):
if not recv_data:
return
request_lines =recv_data.splitlines()
for i ,line in enumerate(request_lines):
print(i,line)
#提取請求的檔案(index.html)
#GET /a/b/c/d/e/index.html HTTP/1.1
ret =re.match(r'[^/]+([^ ])+',request_lines[0])
if ret:
print('正則提取資料:',ret.group(1))
file_name = ret.group(1)
if file_name == '/':
file_name = '/index.html'
try:
f = open(self.root+file_name,'rb')
except:
response_body = 'file not found,請輸入正確的url'
response_line = 'HTTP/1.1 404\r\n'
response_header = 'Content-Type: text/html;charset=utf-8\r\n'
response_data = (response_line+response_line+'\r\n'+response_body).encode('utf-8')
#返回404
client_socket.send(response_data)
else:
content = f.read()
f.close()
response_body = content
response_line = 'HTTP/1.1 200 OK\r\n'
response_header = 'Content-Type: text/html;charset=utf-8\r\n\r\n'
response_data = (response_line+response_header).encode('utf-8')+response_body
#傳送資料
client_socket.send(response_data)
def main():
if len(sys.argv) == 2:
port = sys.argv[1]
if port.isdigit():
port = int(port)
else:
print('按照格式輸入')
return
print('伺服器用的埠是',port)
http_server = WSGIServer(port)
http_server.run()
if __name__ == '__main__':
main()
2.epool非阻塞實現伺服器,IO多路複用
就是我們常說的select,poll,epoll,有些地方也稱這種IO方式為event driven IO。
select/epoll的好處就在於單個process就可以同時處理多個網路連線的IO。
它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序。
I/O 多路複用的特點:
通過一種機制使一個程序能同時等待多個檔案描述符,而這些檔案描述符(套接字描述符)其中的任意一個進入讀就緒狀態,epoll()函式就可以返回。 所以, IO多路複用,本質上不會有併發的功能,因為任何時候還是隻有一個程序或執行緒進行工作,它之所以能提高效率是因為select\epoll 把進來的socket放到他們的 ‘監視’ 列表裡面,當任何socket有可讀可寫資料立馬處理,那如果select\epoll 手裡同時檢測著很多socket, 一有動靜馬上返回給程序處理,總比一個一個socket過來,阻塞等待,處理高效率。
當然也可以多執行緒/多程序方式,一個連線過來開一個程序/執行緒處理,這樣消耗的記憶體和程序切換頁會耗掉更多的系統資源。 所以我們可以結合IO多路複用和多程序/多執行緒 來高效能併發,IO複用負責提高接受socket的通知效率,收到請求後,交給程序池/執行緒池來處理邏輯。
import socket
import select
# 建立套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 設定可以重複使用繫結的資訊
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
# 繫結本機資訊
s.bind(("",7788))
# 變為被動
s.listen(10)
# 建立一個epoll物件
epoll = select.epoll()
# 測試,用來列印套接字對應的檔案描述符
# print(s.fileno())
# print(select.EPOLLIN|select.EPOLLET)
# 註冊事件到epoll中
# epoll.register(fd[, eventmask])
# 注意,如果fd已經註冊過,則會發生異常
# 將建立的套接字新增到epoll的事件監聽中
epoll.register(s.fileno(), select.EPOLLIN|select.EPOLLET)
connections = {}
addresses = {}
# 迴圈等待客戶端的到來或者對方傳送資料
while True:
# epoll 進行 fd 掃描的地方 -- 未指定超時時間則為阻塞等待
epoll_list = epoll.poll()
# 對事件進行判斷
for fd, events in epoll_list:
# print fd
# print events
# 如果是socket建立的套接字被啟用
if fd == s.fileno():
new_socket, new_addr = s.accept()
print('有新的客戶端到來%s' % str(new_addr))
# 將 conn 和 addr 資訊分別儲存起來
connections[new_socket.fileno()] = new_socket
addresses[new_socket.fileno()] = new_addr
# 向 epoll 中註冊 新socket 的 可讀 事件
epoll.register(new_socket.fileno(), select.EPOLLIN|select.EPOLLET)
# 如果是客戶端傳送資料
elif events == select.EPOLLIN:
# 從啟用 fd 上接收
recvData = connections[fd].recv(1024).decode("utf-8")
if recvData:
print('recv:%s' % recvData)
else:
# 從 epoll 中移除該 連線 fd
epoll.unregister(fd)
# server 側主動關閉該 連線 fd
connections[fd].close()
print("%s---offline---" % str(addresses[fd]))
del connections[fd]
del addresses[fd]