socketserver模組使用與原始碼分析
socketserver模組使用與原始碼分析
前言
在前面的學習中我們其實已經可以通過socket
模組來建立我們的服務端,並且還介紹了關於TCP協議的粘包問題。但是還有一個非常大的問題就是我們所編寫的Server端是不支援併發性服務的,在我們之前的程式碼中只能加入一個通訊迴圈來進行排隊式的單視窗一對一服務。那麼這一篇文章將主要介紹如何使用socketserver
模組來建立具有併發性的Server端。
基於TCP協議的socketserver服務端
我們先看它的一段程式碼,對照程式碼來看功能。
#!/usr/bin/env python3 # _*_ coding:utf-8 _*_ # ==== 使用socketserver建立支援多併發性的伺服器 TCP協議 ==== import socketserver class MyServer(socketserver.BaseRequestHandler): """自定義類""" def handle(self): """handle處理請求""" print("雙向連結通道建立完成:", self.request) # 對於TCP協議來說,self.request相當於雙向連結通道conn,即accept()的第一部分 print("客戶端的資訊是:", self.client_address) # 對於TCP協議來說,相當於accept()的第二部分,即客戶端的ip+port while 1: # 開始內層通訊迴圈 try: # # bug修復:針對windows環境 data = self.request.recv(1024) if not data: break # bug修復:針對類UNIX環境 print("收到客戶機[{0}]的訊息:[{1}]".format(self.client_address, data)) self.request.sendall(data.upper()) # #sendall是重複呼叫send. except Exception as e: break self.request.close() # 當出現異常情況下一定要關閉連結 if __name__ == '__main__': s1 = socketserver.ThreadingTCPServer(("0.0.0.0", 6666), MyServer) # 公網伺服器繫結 0.0.0.0 私網測試為 127.0.0.1 s1.serve_forever() # 啟動服務
1.匯入
socketserver
模組2.建立一個新的類,並繼承
socketserver.BaseRequestHandler
類3.覆寫
handle
方法,對於TCP協議來說,self.request
相當於雙向連結通道conn
,self.client_address
相當於被服務方的ip和port資訊,也就是addr
,而整個handle
方法相當於連結迴圈。4.寫入收發邏輯規則
5.防止客戶端傳送空的訊息已致雙方卡死
6.防止客戶端突然斷開已致服務端崩潰
7.粘包優化(可選)
8.例項化
socketserver.ThreadingTCPServer
類,並傳入IP+port,以及剛寫好的類名9.使用
socketserver.ThreadingTCPServer
例項化物件中的server_forever( )
方法啟動服務
它其實是這樣的:
我們不用管連結迴圈,因為在執行
handle
方法之前內部已經幫我們做好了。當我們使用serve_forever()
方法的時候便開始監聽連結描述符物件,一旦有連結請求就建立一個子執行緒來處理該連結。
基於UDP協議的socketserver服務端
基於UDP協議的socketserver
服務端與基於TCP協議的socketserver
服務端大相徑庭,但是還是有幾點不太一樣的地方。
對TCP來說:
self.request = 雙向連結通道(conn)
對UDP來說:
self.request = (client_data_byte,udp的套接字物件)
#!/usr/bin/env python3 # _*_ coding:utf-8 _*_ # ==== 使用socketserver建立支援多併發性的伺服器 UDP協議 ==== import socketserver class MyServer(socketserver.BaseRequestHandler): """自定義類""" def handle(self): """handle處理請求""" # 由於UDP是基於訊息的協議,故根本不用通訊迴圈 data = self.request[0] # 對於UDP協議來說,self.request其實是個元組。第一個元素是訊息內容主題(Bytes型別),相當於recvfrom()的第一部分 server = self.request[1] # 第二個元素是服務端本身,即自己 print("客戶端的資訊是:", self.client_address) # 對於UDP協議來說,相當於recvfrom()的第二部分,即客戶端的ip+port print("收到客戶機[{0}]的訊息:[{1}]".format(self.client_address, data)) server.sendto(data.upper(),self.client_address) if __name__ == '__main__': s1 = socketserver.ThreadingUDPServer(("0.0.0.0", 6666), MyServer) # 公網伺服器繫結 0.0.0.0 私網測試為 127.0.0.1 s1.serve_forever() # 啟動服務
擴充套件:socketserver原始碼分析
探索socketserver中的繼承關係
好了,接下來我們開始剖析socketserver
模組中的原始碼部分。在Pycharm下使用CTRL+滑鼠左鍵
,可以進入原始碼進行檢視。
我們在檢視原始碼前一定要首先要明白兩點:
socketserver
類分為兩部分,其一是server
類主要是負責處理連結方面,另一類是request
類主要負責處理通訊方面。
好了,請在腦子裡記住這個概念。我們來看一些socketserver
模組的實現用了哪些其他的基礎模組。
注意,接下來的原始碼註釋部分我並沒有在原始碼中修改,也請讀者不要修改原始碼的任何內容。
import socket # 這模組挺熟悉吧 import selectors # 這個是一個多執行緒模組,主要支援I/O多路複用。 import os # 老朋友了 import sys # 老朋友 import threading # 多執行緒模組 from io import BufferedIOBase # 讀寫相關的模組 from time import monotonic as time # 老朋友time模組socketserver中用到的基礎模組
好了,讓我們接著往下走。可以看到一個變數__all__
,是不是覺得很熟悉?就是我們使用 from xxx import xxx
能匯入進的東西全是被__all__
控制的,我們看一下它包含了哪些內容。
__all__ = ["BaseServer", "TCPServer", "UDPServer", "ThreadingUDPServer", "ThreadingTCPServer", "BaseRequestHandler", "StreamRequestHandler", "DatagramRequestHandler", "ThreadingMixIn"] # 這個是我們原本的 __all__ 中的值。 if hasattr(os, "fork"): __all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"]) if hasattr(socket, "AF_UNIX"): __all__.extend(["UnixStreamServer","UnixDatagramServer", "ThreadingUnixStreamServer", "ThreadingUnixDatagramServer"]) # 上面兩個if判斷是給__all__新增內容的,os.fork()這個方法是建立一個新的程序,並且只在類UNIX平臺下才有效,Windows平臺下是無效的,所以這裡對於Windows平臺來說就from socketserver import xxx 肯定少了三個類,這三個類的作用我們接下來會聊到。而關於socket中的AF_UNIX來說我們其實已經學習過了,是基於檔案的socket家族。這在Windows上也是不支援的,只有在類UNIX平臺下才有效。所以Windows平臺下的匯入又少了4個類。 # poll/select have the advantage of not requiring any extra file descriptor, # contrarily to epoll/kqueue (also, they require a single syscall). if hasattr(selectors, 'PollSelector'): _ServerSelector = selectors.PollSelector else: _ServerSelector = selectors.SelectSelector # 這兩個if還是做I/O多路複用使用的,Windows平臺下的結果是False,而類Unix平臺下的該if結果為True,這關乎I/O多路複用的效能選擇。到底是select還是poll或者epoll。socketserver模組對於from xxx import * 匯入的處理
我們接著向下看原始碼,會看到許許多多的類。先關掉它來假設自己是直譯器一行一行往下走會去執行那個部分。首先是一條if
判斷
if hasattr(os, "fork"): class ForkingMixIn: pass # 這裡我自己省略了 # 我們可以看見這條程式碼是接下來執行的,它意思還是如果在類Unix環境下,則會去建立該類。如果在Windows平臺下則不會建立該類處理點一
繼續走,其實這種if
判斷再建立類的地方還有兩處。我這裡全部列出來:
if hasattr(os, "fork"): class ForkingUDPServer(ForkingMixIn, UDPServer): pass class ForkingTCPServer(ForkingMixIn, TCPServer): pass if hasattr(socket, 'AF_UNIX'): class UnixStreamServer(TCPServer): address_family = socket.AF_UNIX class UnixDatagramServer(UDPServer): address_family = socket.AF_UNIX class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass處理點二 and 三
好了,說完了大體粗略的一個流程,我們該來研究這裡面的類都有什麼作用,這裡可以檢視每個類的文件資訊。大致如下:
前面已經說過,socketserver
模組中主要分為兩大類,我們就依照這個來進行劃分。
socketserver模組原始碼內部class功能一覽 | |
---|---|
處理連結相關 | |
BaseServer | 基礎連結類 |
TCPServer | TCP協議類 |
UDPServer | UDP協議類 |
UnixStreamServer | 檔案形式位元組流類 |
UnixDatagramServer | 檔案形式資料報類 |
處理通訊相關 | |
BaseRequestHandler | 基礎請求處理類 |
StreamRequestHandler | 位元組流請求處理類 |
DatagramRequestHandler | 資料報請求處理類 |
多執行緒相關 | |
ThreadingMixIn | 執行緒方式 |
ThreadingUDPServer | 多執行緒UDP協議服務類 |
ThreadingTCPServer | 多執行緒TCP協議服務類 |
多程序相關 | |
ForkingMixIn | 程序方式 |
ForkingUDPServer | 多程序UDP協議服務類 |
ForkingTCPServer | 多程序TCP協議服務類 |
他們的繼承關係如下:
ForkingUDPServer(ForkingMixIn, UDPServer) ForkingTCPServer(ForkingMixIn, TCPServer) ThreadingUDPServer(ThreadingMixIn, UDPServer) ThreadingTCPServer(ThreadingMixIn, TCPServer) StreamRequestHandler(BaseRequestHandler) DatagramRequestHandler(BaseRequestHandler)
處理連結相關
處理通訊相關
多執行緒相關
總繼承關係(處理通訊相關的不在其中,並且不包含多程序)
最後補上一個多程序的繼承關係,就不放在總繼承關係中了,容易圖形造成混亂。
多程序相關
例項化過程分析
有了繼承關係我們可以來模擬例項化的過程,我們以TCP協議為準:
socketserver.ThreadingTCPServer(("0.0.0.0", 6666), MyServer)
我們點進(選中上面程式碼的ThradingTCPServe
r部分,CTRL+滑鼠左鍵
)原始碼部分,查詢其 __init__
方法:
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
看來沒有,那麼就找第一父類有沒有,我們點進去可以看到第一父類ThreadingMixIn
也沒有__init__
方法,看上面的繼承關係圖可以看出是普通多繼承,那麼就是廣度優先的查詢順序。我們來看第二父類TCPServer
中有沒有,看來第二父類中是有__init__
方法的,我們詳細來看。
class TCPServer(BaseServer): """註釋全被我刪了,影響視線""" address_family = socket.AF_INET # 基於網路的套接字家族 socket_type = socket.SOCK_STREAM # TCP(位元組流)協議 request_queue_size = 5 # 訊息佇列最大為5,可以理解為backlog,即半連結池的大小 allow_reuse_address = False # 埠重用預設關閉 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) # 可以看見,上面先是呼叫了父類的__init__方法,然後又例項化出了一個socket物件!所以我們先不著急往下看,先看其父類中的__init__方法。 if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raiseTCPServer中的__init__()
來看一下,BaseServer
類中的__init__
方法。
class BaseServer: """註釋依舊全被我刪了""" timeout = None # 這個變數可以理解為超時時間,先不著急說他。先看 __init__ 方法 def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address # 即我們傳入的 ip+port ("0.0.0.0", 6666) self.RequestHandlerClass = RequestHandlerClass # 即我們傳入的自定義類 MyServer self.__is_shut_down = threading.Event() # 這裡可以看到執行了該方法,這裡先不詳解,因為它是一個事件鎖,所以不用管 self.__shutdown_request = FalseBaseServer中的__init__()
在BaseServer
中執行了thrading
模組下的Event()
方法。我這裡還是提一嘴這個方法是幹嘛用的,它會去控制執行緒的啟動順序,這裡例項化出的self.__is_shut_down
其實就是一把鎖,沒什麼深究的,接下來的文章中我也會寫到。我們繼續往下看,現在是該回到TCPServer
的__init__
方法中來了。
class TCPServer(BaseServer): """註釋全被我刪了,影響視線""" address_family = socket.AF_INET # 基於網路的套接字家族 socket_type = socket.SOCK_STREAM # TCP(位元組流)協議 request_queue_size = 5 # 訊息佇列最大為5,可以理解為backlog,即半連結池的大小 allow_reuse_address = False # 埠重用預設關閉 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): # 看這裡!!!! """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: # 在建立完socket物件後就會進行該判斷。預設引數bind_and_activate就是為True try: self.server_bind() # 現在進入該方法檢視細節 self.server_activate() except: self.server_close() raiseTCPServer中的__init__()
好了,需要找這個self.bind()
方法,還是從頭開始找。例項本身沒有,第一父類ThreadingMixIn
也沒有,所以現在我們看的是TCPServer
的server_bind()
方法:
def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: # 這裡的變數對應 TCPServer.__init__ 上面定義的類方法,埠重用這個。由於是False,所以我們直接往下執行。 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) # 繫結 ip+port 即 ("0.0.0.0", 6666) self.server_address = self.socket.getsockname() # 獲取socket的名字 其實還是 ("0.0.0.0", 6666)TCPServer中的server_bind()
現在我們該看TCPServer
下的server_activate()
方法了。
def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) # 其實就是監聽半連結池,backlog為5TCPServer中的server_activate()
這個時候沒有任何異常會丟擲的,所以我們已經跑完了整個例項化的流程。並將其賦值給s1
現在我們看一下s1
的__dict__
字典,再接著進行原始碼分析。
{'server_address': ('0.0.0.0', 6666), 'RequestHandlerClass': <class '__main__.MyServer'>, '_BaseServer__is_shut_down': <threading.Event object at 0x000002A96A0208E0>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=716, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 6666)>}s1的__dict__
server_forever()啟動服務分析
我們接著來看下一條程式碼。
s1.serve_forever()
還是老規矩,由於s1
是ThreadingTCPServer
類的例項物件,所以我們去一層層的找serve_forever()
,最後在BaseServer
類中找到了。
def serve_forever(self, poll_interval=0.5): """註釋被我刪了""" self.__is_shut_down.clear() # 上面說過了那個Event鎖,控制子執行緒的啟動順序。這裡的clear()代表清除,這個不是重點,往下看。 try: # XXX: Consider using another file descriptor or connecting to the # socket to wake this up instead of polling. Polling reduces our # responsiveness to a shutdown request and wastes cpu at all other # times. with _ServerSelector() as selector: selector.register(self, selectors.EVENT_READ)# 這裡是設定了一個監聽型別為讀取事件。也就是說當有請求來的時候當前socket物件就會發生反應。 while not self.__shutdown_request: # 為False,會執行,注意!下面都是死迴圈了!!! ready = selector.select(poll_interval) # 設定最大監聽時間為0.5s # bpo-35017: shutdown() called during select(), exit immediately. if self.__shutdown_request: # BaseServer類中的類方法,為False,所以不執行這個。 break if ready: # 代表有連結請求會執行下面的方法 self._handle_request_noblock() # 這兒是比較重要的一個點。我們先來看。 self.service_actions() finally: self.__shutdown_request = False self.__is_shut_down.set() # 這裡是一個釋放鎖的行為BaseServer中的serve_forever()
如果有連結請求,則會執行self._handle_request_noblock()
方法,它在哪裡呢?剛好這個方法就在BaseServer
中serve_forever()
方法的正下方第4個方法的位置。
def _handle_request_noblock(self): """註釋被我刪了""" try: request, client_address = self.get_request() # 這裡的這個方法在TCPServer中,它的return值是 self.socket.accept(),就是就是返回了元組然後被解壓賦值了。其實到這一步三次握手監聽已經開啟了。 except OSError: return if self.verify_request(request, client_address): # 這個是驗證ip和port,返回的始終是True try: self.process_request(request, client_address) # request 雙向連結通道,client_address客戶端ip+port。現在我們來找這個方法。 except Exception: self.handle_error(request, client_address) self.shutdown_request(request) except: self.shutdown_request(request) raise else: self.shutdown_request(request)BaseServer中的_handle_request_noblock()
現在開始查詢self.process_request(request, client_address)
該方法,還是先從例項物件本身找,找不到去第一父類找。他位於第一父類ThreadingMixIn
中。
def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) # 建立子執行緒!!看這裡! t.daemon = self.daemon_threads # ThreadingMixIn的類屬性,為False if not t.daemon and self.block_on_close: # 第一個值為False,第二個值為True。他們都是ThreadingMixIn的類屬性 if self._threads is None: # 會執行 self._threads = [] # 建立了空列表 self._threads.append(t) # 將當前的子執行緒新增至空列表中 t.start() # 開始當前子執行緒的執行,即執行self.process_request_thread方法ThreadingMixIn中的process_request()
我們可以看到,這裡的target
引數中指定了一個方法self.process_request_thread
,其實意思就是說當這個執行緒t
在start
的時候會去執行該方法。我們看一下它都做了什麼,這個方法還是在ThreadingMixIn
類中。
def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) # 可以看到又執行該方法了,這裡我再標註一下,別弄頭暈了。request 雙向連結通道,client_address客戶端ip+port。 except Exception: self.handle_error(request, client_address) finally: self.shutdown_request(request) # 它不會關閉這個執行緒,而是將其設定為wait()狀態。ThreadingMixIn中的 process_request_thread()
看self.finish_request()
方法,它在BaseServer
類中
def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) # 這裡是幹嘛?其實就是在進行例項化!BaseServer中的finish_request
self.RequestHandlerClass(request, client_address, self)
,我們找到self
的__dict__
字典,看看這個到底是什麼東西
{'server_address': ('0.0.0.0', 6666), 'RequestHandlerClass': <class '__main__.MyServer'>, '_BaseServer__is_shut_down': <threading.Event object at 0x000002A96A0208E0>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=716, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 6666)>}s1的__dict__
可以看到,它就是我們傳入的那個類,即自定義的MyServer
類。我們把request,client_address,以及整個是例項self傳給了MyServer的__init__
方法。但是我們的MyServer類沒有__init__
,怎麼辦呢?去它父類BaseRequestHandler
裡面找唄。
class BaseRequestHandler: """註釋被我刪了""" def __init__(self, request, client_address, server): self.request = request # request 雙向連結通道 self.client_address = client_address # 客戶端ip+port self.server = server # 即 例項物件本身。上面的__dict__就是它的__dict__ self.setup() # 鉤子函式,我們可以自己寫一個類然後繼承`BaseRequestHandler`並覆寫其setup方法即可。 try: self.handle() # 看,自動執行handle finally: self.finish() # 鉤子函式 def setup(self): pass def handle(self): pass def finish(self): passBaseRequestHandler中的__init__
現在我們知道了,為什麼一定要覆寫handle
方法了吧。
socketserver內部呼叫順序流程圖(基於TCP協議)
例項化過程圖解
server_forever()啟動服務圖解
擴充套件:驗證連結合法性
在很多時候,我們的TCP服務端為了防止網路泛洪可以設定一個三次握手驗證機制。那麼這個驗證機制的實現其實也是非常簡單的,我們的思路在於進入通訊迴圈之前,客戶端和服務端先走一次連結認證,只有通過認證的客戶端才能夠繼續和服務端進行連結。
下面就來看一下具體的實現步驟。
#_*_coding:utf-8_*_ __author__ = 'Linhaifeng' from socket import * import hmac,os secret_key=b'linhaifeng bang bang bang' def conn_auth(conn): ''' 認證客戶端連結 :param conn: :return: ''' print('開始驗證新連結的合法性') msg=os.urandom(32) # 新方法,生成32位隨機Bytes型別的值 conn.sendall(msg) h=hmac.new(secret_key,msg) digest=h.digest() respone=conn.recv(len(digest)) return hmac.compare_digest(respone,digest) # 對比結果為True或者為False def data_handler(conn,bufsize=1024): if not conn_auth(conn): print('該連結不合法,關閉') conn.close() return print('連結合法,開始通訊') while True: data=conn.recv(bufsize) if not data:break conn.sendall(data.upper()) def server_handler(ip_port,bufsize,backlog=5): ''' 只處理連結 :param ip_port: :return: ''' tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(backlog) while True: conn,addr=tcp_socket_server.accept() print('新連線[%s:%s]' %(addr[0],addr[1])) data_handler(conn,bufsize) if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 server_handler(ip_port,bufsize)Server端
#_*_coding:utf-8_*_ __author__ = 'Linhaifeng' from socket import * import hmac,os secret_key=b'linhaifeng bang bang bang' def conn_auth(conn): ''' 驗證客戶端到伺服器的連結 :param conn: :return: ''' msg=conn.recv(32) # 拿到隨機位數 h=hmac.new(secret_key,msg) # 摻鹽 digest=h.digest() conn.sendall(digest) def client_handler(ip_port,bufsize=1024): tcp_socket_client=socket(AF_INET,SOCK_STREAM) tcp_socket_client.connect(ip_port) conn_auth(tcp_socket_client) while True: data=input('>>: ').strip() if not data:continue if data == 'quit':break tcp_socket_client.sendall(data.encode('utf-8')) respone=tcp_socket_client.recv(bufsize) print(respone.decode('utf-8')) tcp_socket_client.close() if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 client_handler(ip_port,bufsize)Client端
到這裡已經很簡單了,伺服器將隨機數給客戶機發過去,客戶機收到後也用自家的鹽與隨機數加料,再使用digest()
將它轉化為位元組,直接傳送了回來然後客戶端通過hmac.compare_digest()
方法驗證兩個的值是否相等,如果不等就說明鹽不對。客戶機不合法服務端將會關閉與該客戶機的雙向連結通道。