1. 程式人生 > >python 網路程式設計學習 非阻塞socket

python 網路程式設計學習 非阻塞socket

主要學習伺服器的非同步使用

SocketServer簡化了網路伺服器的編寫。它有4個類:TCPServer,UDPServer,UnixStreamServer,UnixDatagramServer。這4個類是同步進行處理的,另外通過ForkingMixIn和ThreadingMixIn類來支援非同步。

建立伺服器的步驟

  1. 建立一個請求處理類,它是BaseRequestHandler的子類並重載其handle()方法。
  2. 例項化一個伺服器類,傳入伺服器的地址和請求處理程式類。
  3. 呼叫handle_request()(一般是呼叫其他事件迴圈或者使用select())或serve_forever()。

整合ThreadingMixIn類時需要處理異常關閉。daemon_threads指示伺服器是否要等待執行緒終止,要是執行緒互相獨立,必須要設定為True,預設是False。

使用基於多程序,ForkingMinIn 支援非同步

import os                                                                                                                                                                          
import socket                                                                       
import threading                                                                    
import SocketServer                                                                 
                                                                                    
# 構造客戶端                                                                               
class ForkingClient():                                                              
    def __init__(self, ip, port):                                                   
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)     
        self.sock.connect((ip, port))                                               
                                                                                    
    def run(self):                                                                  
        crtpid = os.getpid()                                                        
        print "PID: %s " % crtpid                                                    
        send_length = self.sock.send("hello, world!")                               
        print "sent %d characters..." % send_length                                 
        res = self.sock.recv(2048)                                                  
        print "PID %s received: %s" % (crtpid, res)                                 
                                                                                    
    def shutdown(self):                                                             
        self.sock.close()                                                           
                                                                                    
# 寫服務端處理函式                                                                                
class ForkingServerRequestHandler(SocketServer.BaseRequestHandler):                 
    def handle(self):                                                               
        data = self.request.recv(2048)                                              
        crtpid = os.getpid()                                                        
        res = "%s: %s" % (crtpid, data)                                             
        print res                                                                   
        self.request.send(res)                                                      
        return                                                                      
                                                                                    
# 繼承                                                                                  
class ForkingServer(SocketServer.ForkingMixIn, SocketServer.TCPServer):             
    pass                                                                            
                                                                                    
                                                                                    
def main():                                                                         
    server = ForkingServer(('localhost', 8989), ForkingServerRequestHandler)        
    ip, port = server.server_address                                                
    server_thread = threading.Thread(target=server.serve_forever)                   
    server_thread.setDaemon(True)                                                   
    server_thread.start()
    print 'Server loop running PID: %s' % os.getpid()

    client_1 = ForkingClient(ip, port)
    client_2 = ForkingClient(ip, port)

    client_1.run()
    client_2.run()
                                                                                    
    client_1.shutdown()
    client_2.shutdown()
    server.socket.close()

                                                                          
if __name__ == '__main__':
    main()

執行可以看到

duck@duck:~/sockdir/chapter_2$ python forkser.py 
Server loop running PID: 22649
PID: %s  22649
sent 13 characters...
22651: hello, world!
PID 22649 received: 22651: hello, world!
PID: %s  22649
sent 13 characters...
22652: hello, world!
PID 22649 received: 22652: hello, world!

現在用ThreadingMixIn

優勢:執行緒之間共享應用狀態容易,避免程序間通訊的複雜操作。

import os                                                                                                                                                                          
import socket                                                                       
import threading                                                                    
import SocketServer                                                                 
                                                                                    
                                                                                    
class ForkingClient():                                                              
    def __init__(self, ip, port):                                                   
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)               
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)             
        self.sock.connect((ip, port))                                               
                                                                                    
    def run(self):                                                                  
        crtpid = os.getpid()                                                        
        print "PID: %s " % crtpid                                                   
        send_length = self.sock.send("hello, world!")                               
        print "sent %d characters..." % send_length                                 
        res = self.sock.recv(2048)                                                  
        print "PID %s received: %s" % (crtpid, res)                                 
                                                                                    
    def shutdown(self):                                                             
        self.sock.close()                                                           
                                                                                    
                                                                                    
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):                   
    def handle(self):                                                               
        data = self.request.recv(2048)                                              
        crtpid = threading.current_thread()                                         
        res = "%s: %s" % (crtpid, data)                                             
        print res                                                                   
        self.request.sendall("hello, client!")                                      
        return                                                                      
                                                                                    
                                                                                    
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):       
    pass                                                                            
                                                                                    
                                                                                    
def main():                                                                         
    server = ThreadedTCPServer(('localhost', 8989),ThreadedTCPRequestHandler)       
    ip, port = server.server_address                                                
    server_thread = threading.Thread(target=server.serve_forever)
    # 執行緒互相獨立,必須要設定為True                   
    server_thread.setDaemon(True)                                                   
    server_thread.start()                                                           
    print 'Server loop running PID: %s' % server_thread.name                        
                                                                                    
    client_1 = ForkingClient(ip, port)                                              
    client_2 = ForkingClient(ip, port)                                              
                                                                                    
    client_1.run()                                                                  
    client_2.run() 
    server.socket.close()                                                        
                                                                                 
if __name__ == '__main__':                                                       
    main()                                                                                                                                                                        


可以看到基本套路都差不多,就是替換了一些處理類

而在大型網路伺服器應用中,存在幾百上千的併發連線時,為每個客戶端建立單獨的執行緒和程序不太實際。記憶體和主機cpu都有限,需要一種更好的辦法來處理,那就是select模組。

這裡可以瞭解一下,select,poll ,epoll三個模組

link

先用select的select模組編寫一個聊天室伺服器。分了三個模組

先寫通訊模組

import cPickle                                                                                                                                                                     
import struct                                                                       
import socket                                                                       
                                                                                    
def send(channel, *args):                                                           
    buf = cPickle.dumps(args)                                                       
    value = socket.htonl(len(buf))                                                  
    size = struct.pack("L", value)                                                  
    channel.send(size)                                                              
    channel.send(buf)                                                               
                                                                                    
def receive(channel):                                                               
    size = struct.calcsize("L")                                                     
    size = channel.recv(size)                                                       
    try:                                                                            
        size = socket.ntohl(struct.unpack("L", size)[0])                            
    except struct.error, e:                                                         
        return ''                                                                   
    buf = ""                                                                        
    while len(buf) < size:                                                          
        buf += channel.recv(size - len(buf))                                        
    return cPickle.loads(buf)[0]

服務模組
from com_model import send, receive                                                                                                                                                
                                                                                    
import select                                                                       
import socket                                                                       
import sys                                                                          
import signal                                                                       
                                                                                    
                                                                                    
class ChatServer(object):                                                           
    def __init__(self, port, backlog=5):                                            
        self.clients = 0                                                            
        self.clientmap = {}                                                         
        self.outputs = []                                                           
        self.server = socket.socket(socket.AF_INET, socket.SOL_SOCKET)              
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)           
        self.server.bind(('localhost', port))                                       
        self.server.listen(backlog)                                                 
        signal.signal(signal.SIGINT, self.sighandler)                               
                                                                                    
    def sighandler(self, signum, frame):                                            
        print "Shutting down server...."                                            
        for output in self.outputs:                                                 
            output.close()                                                          
        self.server.close()                                                         
                                                                                    
    def get_client_name(self, client):                                              
        info = self.clientmap[client]                                               
        host, name = info[0][0], info[1]                                            
        return '@'.join((name, host))                                               
                                                                                    
    def run(self):                                                                  
        inputs = [self.server, sys.stdin]                                           
        self.outputs = []                                                           
        running = True                                                              
        while running:                                                              
            try:                                                                    
                readable, writeable, exceptional = select.select(inputs, self.outputs, []) 
            except select.error, e:                                                 
                break                                                               
                                                                                    
            for sock in readable:                                                   
                if sock == self.server:                                             
                    client, address = self.server.accept()                          
                    print "Chat server: got connection %d from %s" % (client.fileno(), address)
                    cname = receive(client).split('NAME: ')[1]                      
                                                                                    
                    self.clients += 1                                               
                    send(client, "CLIENT: " + str(address[0]))                      
                    inputs.append(client)                                           
                    self.clientmap[client] = (address, cname)                       
                    msg = "\n(Connected: New client (%d) from %s" % (self.clients, self.get_client_name(client))
                    for output in self.outputs: 
                                                                                
                elif sock == sys.stdin:                                          
                    junk = sys.stdin.readline()                                  
                    running = False                                              
                                                                                 
                else:                                                            
                    try:                                                         
                        data = receive(sock)                                     
                        if data:                                                 
                            msg = '\n#[' + self.get_client_name(sock) + ']>>' + data
                            for output in self.outputs:                          
                                if output != sock:                               
                                    send(output, msg)                            
                                                                                 
                        else:                                                    
                            print "Chat server: %d hung up" % sock.fileno()      
                            self.clients -= 1                                    
                            sock.close()                                         
                            inputs.remove(sock)                                  
                            self.outputs.remove(sock)                            
                                                                                 
                            msg = '\n(Now hung up: Client from %s)' % self.get_client_name(sock)
                            for output in self.outputs:                          
                                send(output, msg)                                
                                                                                 
                    except socket.error, e:                                      
                        inputs.remove(sock)                                      
                        self.outputs.remove(sock)                                
        self.server.close()                                                      
                                                                                 
                                                                                 
def main():                                                                      
    server = ChatServer(8989)                                                    
    server.run()                                                                 
                                                                                 
if __name__ == '__main__':                                                       
    main()                               


客戶端模組
from com_model import send, receive                                                                                                                                                
                                                                                    
import os                                                                           
import select                                                                       
import socket                                                                       
import sys                                                                          
import signal                                                                       
                                                                                    
class ChatClient(object):                                                           
    def __init__(self, name, port, host="localhost"):                               
        self.name = name                                                            
        self.connected = False                                                      
        self.host = host                                                            
        self.port = port                                                            
        self.prompt = '[' + '@'.join((name, socket.gethostname().split('.')[0])) + ']>'
                                                                                    
        try:                                                                        
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)           
            self.sock.connect((host, self.port))                                    
            print "Now connected to chat server@ port %d" % self.port               
            self.connected = True                                                   
            send(self.sock, 'NAME: ' + self.name)                                   
            data = receive(self.sock)                                               
            addr = data.split('CLIENT: ')[1]                                        
            self.prompt = '[' + '@'.join((self.name, addr)) + ']> '                 
        except socket.error, e:                                                     
            print "Failed connect server @port %d" % self.port                      
            sys.exit(1)                                                             
                                                                                    
    def run(self):                                                                  
        while self.connected:                                                       
            try:                                                                    
                sys.stdout.write(self.prompt)                                       
                sys.stdout.flush()                                                  
                readable, writeable, exceptional = select.select([0, self.sock], [],[])
                for sock in readable:                                               
                    if sock == 0:                                                   
                        data = sys.stdin.readline().strip()                         
                        if data:                                                    
                            send(self.sock, data)                                   
                    elif sock == self.sock:                                         
                        data = receive(self.sock)                                   
                        if not data:                                                
                            print "Client shutting down."                           
                            self.connected = False                                  
                            break                                                   
                        else:                                                       
                            sys.stdout.write(data + '\n')                           
                            sys.stdout.flush()                                      
                                                                                    
            except KeyboardInterrupt:                                               
                print "interupt"                                                     
                self.sock.close()                                                
                break                                                            
def main():                                                                      
    cli_name = str(os.getpid()) + 'client'                                       
    cli_port = raw_input("port: ")                                               
    client = ChatClient(cli_name, int(cli_port))                                 
    client.run()                                                                 
                                                                                 
if __name__ == '__main__':                                                       
    main()  

 先啟動服務端,然後啟動客戶端,輸入服務端的port,這裡寫死成8989了,所以寫入8989就可以連入通訊了
duck@duck:~/sockdir/chapter_2/select_ex$ python cli_chat.py 
port: 8989
Now connected to chat server@ port 8989
[[email protected]]> haha
[[email protected]]> 
(Connected: New client (2) from [email protected]
[[email protected]]> 
#[[email protected]]>>hee
[[email protected]]> nihao

接下來將用epoll寫一個簡單的web伺服器,熟悉一下什麼叫I/O多路複用,這名字翻譯的也是一臉蒙逼

看看大神的們的解釋,秒懂

link