1. 程式人生 > >Python非同步非阻塞IO多路複用Select/Poll/Epoll使用

Python非同步非阻塞IO多路複用Select/Poll/Epoll使用

有許多封裝好的非同步非阻塞IO多路複用框架,底層在linux基於最新的epoll實現,為了更好的使用,瞭解其底層原理還是有必要的。
下面記錄下分別基於Select/Poll/Epoll的echo server實現。
Python Select Server,可監控事件數量有限制:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import select
import socket</span>
import Queue
 
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)
server_address= ('192.168.1.5',8080)
server.bind(server_address)
server.listen(10)
 
#select輪詢等待讀socket集合
inputs = [server]
#select輪詢等待寫socket集合
outputs = []
message_queues = {}
#select超時時間
timeout = 20
 
while True:
    print "等待活動連線......"
    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
 
    if not (readable or writable or exceptional) :
        print "select超時無活動連線,重新select...... "
        continue;   
    #迴圈可讀事件
    for s in readable :
        #如果是server監聽的socket
        if s is server:
            #同意連線
            connection, client_address = s.accept()
            print "新連線: ", client_address
            connection.setblocking(0)
            #將連線加入到select可讀事件佇列
            inputs.append(connection)
            #新建連線為key的字典,寫回讀取到的訊息
            message_queues[connection] = Queue.Queue()
        else:
            #不是本機監聽就是客戶端發來的訊息
            data = s.recv(1024)
            if data :
                print "收到資料:" , data , "客戶端:",s.getpeername()
                message_queues[s].put(data)
                if s not in outputs:
                    #將讀取到的socket加入到可寫事件佇列
                    outputs.append(s)
            else:
                #空白訊息,關閉連線
                print "關閉連線:", client_address
                if s in outputs :
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
    for s in writable:
        try:
            msg = message_queues[s].get_nowait()
        except Queue.Empty:
            print "連線:" , s.getpeername() , '訊息佇列為空'
            outputs.remove(s)
        else:
            print "傳送資料:" , msg , "到", s.getpeername()
            s.send(msg)
 
    for s in exceptional:
        print "異常連線:", s.getpeername()
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]
Python Poll Server,Select升級版,無可監控事件數量限制,還是要輪詢所有事件:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket
import select
import Queue
 
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
server.bind(server_address)
server.listen(5)
print  "伺服器啟動成功,監聽IP:" , server_address
message_queues = {}
#超時,毫秒
timeout = 5000
#監聽哪些事件
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
#新建輪詢事件物件
poller = select.poll()
#註冊本機監聽socket到等待可讀事件事件集合
poller.register(server,READ_ONLY)
#檔案描述符到socket對映
fd_to_socket = {server.fileno():server,}
while True:
    print "等待活動連線......"
    #輪詢註冊的事件集合
    events = poller.poll(timeout)
    if not events:
      print "poll超時,無活動連線,重新poll......"
      continue
    print "有" , len(events), "個新事件,開始處理......"
    for fd ,flag in events:
        s = fd_to_socket[fd]
        #可讀事件
        if flag & (select.POLLIN | select.POLLPRI) :
            if s is server :
                #如果socket是監聽的server代表有新連線
                connection , client_address = s.accept()
                print "新連線:" , client_address
                connection.setblocking(False)
 
                fd_to_socket[connection.fileno()] = connection
                #加入到等待讀事件集合
                poller.register(connection,READ_ONLY)
                message_queues[connection]  = Queue.Queue()
            else :
                #接收客戶端傳送的資料
                data = s.recv(1024)
                if data:
                    print "收到資料:" , data , "客戶端:" , s.getpeername()
                    message_queues[s].put(data)
                    #修改讀取到訊息的連線到等待寫事件集合
                    poller.modify(s,READ_WRITE)
                else :
                    # Close the connection
                    print "  closing" , s.getpeername()
                    # Stop listening for input on the connection
                    poller.unregister(s)
                    s.close()
                    del message_queues[s]
        #連線關閉事件
        elif flag & select.POLLHUP :
            print " Closing ", s.getpeername() ,"(HUP)"
            poller.unregister(s)
            s.close()
        #可寫事件
        elif flag & select.POLLOUT :
            try:
                msg = message_queues[s].get_nowait()
            except Queue.Empty:
                print s.getpeername() , " queue empty"
                poller.modify(s,READ_ONLY)
            else :
                print "傳送資料:" , data , "客戶端:" , s.getpeername()
                s.send(msg)
        #異常事件
        elif flag & select.POLLERR:
            print "  exception on" , s.getpeername()
            poller.unregister(s)
            s.close()
            del message_queues[s]</span>
Python Epoll Server,基於回撥的事件通知模式,輕鬆管理大量連線:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket, select
import Queue
 
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
serversocket.bind(server_address)
serversocket.listen(1)
print  "伺服器啟動成功,監聽IP:" , server_address
serversocket.setblocking(0)
timeout = 10
#新建epoll事件物件,後續要監控的事件新增到其中
epoll = select.epoll()
#新增伺服器監聽fd到等待讀事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues = {}
 
fd_to_socket = {serversocket.fileno():serversocket,}
while True:
  print "等待活動連線......"
  #輪詢註冊的事件集合
  events = epoll.poll(timeout)
  if not events:
     print "epoll超時無活動連線,重新輪詢......"
     continue
  print "有" , len(events), "個新事件,開始處理......"
  for fd, event in events:
     socket = fd_to_socket[fd]
     #可讀事件
     if event & select.EPOLLIN:
         #如果活動socket為伺服器所監聽,有新連線
         if socket == serversocket:
            connection, address = serversocket.accept()
            print "新連線:" , address
            connection.setblocking(0)
            #註冊新連線fd到待讀事件集合
            epoll.register(connection.fileno(), select.EPOLLIN)
            fd_to_socket[connection.fileno()] = connection
            message_queues[connection]  = Queue.Queue()
         #否則為客戶端傳送的資料
         else:
            data = socket.recv(1024)
            if data:
               print "收到資料:" , data , "客戶端:" , socket.getpeername()
               message_queues[socket].put(data)
               #修改讀取到訊息的連線到等待寫事件集合
               epoll.modify(fd, select.EPOLLOUT)
     #可寫事件
     elif event & select.EPOLLOUT:
        try:
           msg = message_queues[socket].get_nowait()
        except Queue.Empty:
           print socket.getpeername() , " queue empty"
           epoll.modify(fd, select.EPOLLIN)
        else :
           print "傳送資料:" , data , "客戶端:" , socket.getpeername()
           socket.send(msg)
     #關閉事件
     elif event & select.EPOLLHUP:
        epoll.unregister(fd)
        fd_to_socket[fd].close()
        del fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()