Python非同步非阻塞IO多路複用Select/Poll/Epoll使用
阿新 • • 發佈:2019-01-09
有許多封裝好的非同步非阻塞IO多路複用框架,底層在linux基於最新的epoll實現,為了更好的使用,瞭解其底層原理還是有必要的。
下面記錄下分別基於Select/Poll/Epoll的echo server實現。
Python Select Server,可監控事件數量有限制:
Python Poll Server,Select升級版,無可監控事件數量限制,還是要輪詢所有事件:#!/usr/bin/python # -*- coding: utf-8 -*- import select import socket</span> import Queue server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setblocking(False) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1) server_address= ('192.168.1.5',8080) server.bind(server_address) server.listen(10) #select輪詢等待讀socket集合 inputs = [server] #select輪詢等待寫socket集合 outputs = [] message_queues = {} #select超時時間 timeout = 20 while True: print "等待活動連線......" readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout) if not (readable or writable or exceptional) : print "select超時無活動連線,重新select...... " continue; #迴圈可讀事件 for s in readable : #如果是server監聽的socket if s is server: #同意連線 connection, client_address = s.accept() print "新連線: ", client_address connection.setblocking(0) #將連線加入到select可讀事件佇列 inputs.append(connection) #新建連線為key的字典,寫回讀取到的訊息 message_queues[connection] = Queue.Queue() else: #不是本機監聽就是客戶端發來的訊息 data = s.recv(1024) if data : print "收到資料:" , data , "客戶端:",s.getpeername() message_queues[s].put(data) if s not in outputs: #將讀取到的socket加入到可寫事件佇列 outputs.append(s) else: #空白訊息,關閉連線 print "關閉連線:", client_address if s in outputs : outputs.remove(s) inputs.remove(s) s.close() del message_queues[s] for s in writable: try: msg = message_queues[s].get_nowait() except Queue.Empty: print "連線:" , s.getpeername() , '訊息佇列為空' outputs.remove(s) else: print "傳送資料:" , msg , "到", s.getpeername() s.send(msg) for s in exceptional: print "異常連線:", s.getpeername() inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
Python Epoll Server,基於回撥的事件通知模式,輕鬆管理大量連線:#!/usr/bin/python # -*- coding: utf-8 -*- import socket import select import Queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ("192.168.1.5", 8080) server.bind(server_address) server.listen(5) print "伺服器啟動成功,監聽IP:" , server_address message_queues = {} #超時,毫秒 timeout = 5000 #監聽哪些事件 READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) READ_WRITE = (READ_ONLY|select.POLLOUT) #新建輪詢事件物件 poller = select.poll() #註冊本機監聽socket到等待可讀事件事件集合 poller.register(server,READ_ONLY) #檔案描述符到socket對映 fd_to_socket = {server.fileno():server,} while True: print "等待活動連線......" #輪詢註冊的事件集合 events = poller.poll(timeout) if not events: print "poll超時,無活動連線,重新poll......" continue print "有" , len(events), "個新事件,開始處理......" for fd ,flag in events: s = fd_to_socket[fd] #可讀事件 if flag & (select.POLLIN | select.POLLPRI) : if s is server : #如果socket是監聽的server代表有新連線 connection , client_address = s.accept() print "新連線:" , client_address connection.setblocking(False) fd_to_socket[connection.fileno()] = connection #加入到等待讀事件集合 poller.register(connection,READ_ONLY) message_queues[connection] = Queue.Queue() else : #接收客戶端傳送的資料 data = s.recv(1024) if data: print "收到資料:" , data , "客戶端:" , s.getpeername() message_queues[s].put(data) #修改讀取到訊息的連線到等待寫事件集合 poller.modify(s,READ_WRITE) else : # Close the connection print " closing" , s.getpeername() # Stop listening for input on the connection poller.unregister(s) s.close() del message_queues[s] #連線關閉事件 elif flag & select.POLLHUP : print " Closing ", s.getpeername() ,"(HUP)" poller.unregister(s) s.close() #可寫事件 elif flag & select.POLLOUT : try: msg = message_queues[s].get_nowait() except Queue.Empty: print s.getpeername() , " queue empty" poller.modify(s,READ_ONLY) else : print "傳送資料:" , data , "客戶端:" , s.getpeername() s.send(msg) #異常事件 elif flag & select.POLLERR: print " exception on" , s.getpeername() poller.unregister(s) s.close() del message_queues[s]</span>
#!/usr/bin/python # -*- coding: utf-8 -*- import socket, select import Queue serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ("192.168.1.5", 8080) serversocket.bind(server_address) serversocket.listen(1) print "伺服器啟動成功,監聽IP:" , server_address serversocket.setblocking(0) timeout = 10 #新建epoll事件物件,後續要監控的事件新增到其中 epoll = select.epoll() #新增伺服器監聽fd到等待讀事件集合 epoll.register(serversocket.fileno(), select.EPOLLIN) message_queues = {} fd_to_socket = {serversocket.fileno():serversocket,} while True: print "等待活動連線......" #輪詢註冊的事件集合 events = epoll.poll(timeout) if not events: print "epoll超時無活動連線,重新輪詢......" continue print "有" , len(events), "個新事件,開始處理......" for fd, event in events: socket = fd_to_socket[fd] #可讀事件 if event & select.EPOLLIN: #如果活動socket為伺服器所監聽,有新連線 if socket == serversocket: connection, address = serversocket.accept() print "新連線:" , address connection.setblocking(0) #註冊新連線fd到待讀事件集合 epoll.register(connection.fileno(), select.EPOLLIN) fd_to_socket[connection.fileno()] = connection message_queues[connection] = Queue.Queue() #否則為客戶端傳送的資料 else: data = socket.recv(1024) if data: print "收到資料:" , data , "客戶端:" , socket.getpeername() message_queues[socket].put(data) #修改讀取到訊息的連線到等待寫事件集合 epoll.modify(fd, select.EPOLLOUT) #可寫事件 elif event & select.EPOLLOUT: try: msg = message_queues[socket].get_nowait() except Queue.Empty: print socket.getpeername() , " queue empty" epoll.modify(fd, select.EPOLLIN) else : print "傳送資料:" , data , "客戶端:" , socket.getpeername() socket.send(msg) #關閉事件 elif event & select.EPOLLHUP: epoll.unregister(fd) fd_to_socket[fd].close() del fd_to_socket[fd] epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close()