1. 程式人生 > 程式設計 >Python select及selectors模組概念用法詳解

Python select及selectors模組概念用法詳解

1. select模組

針對select,要先理解其他幾個概念:

檔案描述符:

檔案描述符在形式上是一個非負整數。實際上,它是一個索引值,指向核心為每一個程序所維護的該程序開啟檔案的記錄表。當程式開啟一個現有檔案或者建立一個新檔案時,核心向程序返回一個檔案描述符。

核心空間:

Linux簡化了分段機制,使得虛擬地址與線性地址總是一致,因此,Linux的虛擬地址空間也為0~4G。Linux核心將這4G位元組的空間分為兩部分。將最高的1G位元組(從虛擬地址0xC0000000到0xFFFFFFFF),供核心使用,稱為“核心空間”。而將較低的3G位元組(從虛擬地址 0x00000000到0xBFFFFFFF),供各個程序使用,稱為“使用者空間)。因為每個程序可以通過系統呼叫進入核心,因此,Linux核心由系統內的所有程序共享。於是,從具體程序的角度來看,每個程序可以擁有4G位元組的虛擬空間。

核心空間中存放的是核心程式碼和資料,而程序的使用者空間中存放的是使用者程式的程式碼和資料。不管是核心空間還是使用者空間,它們都處於虛擬空間中。

核心空間和使用者空間一般通過系統呼叫進行通訊。

select就是針對許多檔案描述符(簡稱fd)進行監控,它有三個引數:

  • rlist -- wait until ready for reading
  • wlist -- wait until ready for writing
  • xlist -- wait for an "exceptional condition"

第一個引數監控 進來的 資料的fd列表,select監控這個列表,等待這些fd傳送過來資料,一旦資料傳送過來了(可以讀取了),就返回一個可讀的fd列表

第二個引數監控 出去的 資料的fd列表,select監控這個列表,等待這些fd傳送出去資料,一旦fd準備好傳送了(可以寫入了),就返回一個可寫的fd列表

第三個引數監控fd列表,返回出異常的fd列表

服務端:

import select
import socket
import sys
import queue

# 生成socket物件
server = socket.socket()
# 設定非阻塞模式
server.setblocking(False)

# 繫結地址,設定監聽
server.bind(('localhost',9999))
server.listen(5)

# 將自己也放進待監測列表裡
inputs = [server,]
outputs = []
message_queues = {}

while True:
  '''
  關於socket可讀可寫的判斷,可以參考部落格:https://blog.csdn.net/majianfei1023/article/details/45788591
  '''
  rlist,wlist,elist = select.select(inputs,outputs,inputs) #如果沒有任何fd就緒,那程式就會一直阻塞在這裡

  for r in rlist: # 遍歷已經可以準備讀取資料的 fd
    if r is server: # 如果這個 fd 是server,即 server 有資料待接收讀取,說明有新的客戶端連線過來了
      conn,client_addr = r.accept()
      print("new connection from",client_addr)
      conn.setblocking(False)
      inputs.append(conn) # 將這個新的客戶端連線新增到檢測的列表中
      message_queues[conn] = queue.Queue() # 用佇列儲存客戶端傳送來的資料,等待伺服器統一返回資料

    else:     # 這個可讀的 r 不是伺服器,那就是某個客戶端。就是說客戶端傳送資料過來了,這些資料處於待讀取狀態
      try:    # 異常處理,這是為了防止客戶端異常斷開報錯(比如手動關掉客戶端黑視窗,伺服器也會跟著報錯退出)
        data = r.recv(1024)
        if data:  # 根據判斷data是否為空,判斷客戶端是否斷開
          print("收到來自[%s]的資料:" % r.getpeername()[0],data)
          message_queues[r].put(data)  # 收到的資料先放到queue裡,一會返回給客戶端
          if r not in outputs:
            outputs.append(r)   # 放進可寫的fd列表中,表明這些 fd 已經準備好去傳送資料了。
        else:  # 如果資料為空,表明客戶端斷開了
          print('客戶端斷開了')
          if r in outputs:
            outputs.remove(r)  # 清理已斷開的連線
          inputs.remove(r)     # 清理已斷開的連線
          del message_queues[r]  # 清理已斷開的連線
      except ConnectionResetError:   # 如果報錯,說明客戶端斷開了
        print("客戶端異常斷開了",r)
        if r in outputs:
          outputs.remove(r)  # 清理已斷開的連線
        inputs.remove(r)    # 清理已斷開的連線
        del message_queues[r] # 清理已斷開的連線

  for w in wlist:    # 遍歷可寫的 fd 列表,即準備好傳送資料的那些fd
    # 判斷佇列是否為空
    try :
      next_msg = message_queues[w].get_nowait()
    except queue.Empty:
      # print("client [%s]" % w.getpeername()[0],"queue is empty..")
      outputs.remove(w)
    # 佇列不為空,就把佇列中的資料改成大寫,原樣發回去
    else:
      # print("sending msg to [%s]"% w.getpeername()[0],next_msg)
      w.send(next_msg.upper())

  for e in elist:  # 處理報錯的 fd
    e.close()
    print("Error occured in ",e.getpeername())
    inputs.remove(e)
    if e in outputs:
      outputs.remove(e)
    del message_queues[e]

客戶端:

import socket
import sys

sock = socket.socket()
sock.connect(('localhost',9999))
while True:
  c = input('>>>:').strip()
  sock.send(c.encode())
  data = sock.recv(1024)
  print(data.decode())

sock.close()

2. selectors模組

官方文件:https://docs.python.org/3/library/selectors.html

服務端:

import selectors
import socket

# 根據平臺自動選擇最佳的IO多路機制,比如linux就會選擇epoll,windows會選擇select
sel = selectors.DefaultSelector()

def accept(sock,mask):
  # 建立客戶端連線
  conn,addr = sock.accept()
  print('accepted',conn,'from',addr)
  # 設定非阻塞模式
  conn.setblocking(False)
  # 再次註冊一個連線,將其加入監測列表中,
  sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
  try:  # 丟擲客戶端強制關閉的異常(如手動關閉客戶端黑視窗)
    data = conn.recv(1000) # Should be ready
    if data:
      print('echoing',repr(data),'to',conn)
      conn.send(data) # Hope it won't block
    else:
      print('Client closed.',conn)
      # 將conn從監測列表刪除
      sel.unregister(conn)
      conn.close()
  except ConnectionResetError:
    print('Client forcibly closed.',conn)
    # 將conn從監測列表刪除
    sel.unregister(conn)
    conn.close()

# 建立socket物件
sock = socket.socket()

# 繫結埠,設定監聽
sock.bind(('localhost',1234))
sock.listen(100)

# 設定為非阻塞模式
sock.setblocking(False)

# 註冊一個檔案物件,監測它的IO事件,data是和檔案物件相關的資料(此處放置了一個 accept 函式的記憶體地址)
# register(fileobj,events,data=None)
sel.register(sock,accept)

while True:
  '''
  sel.select()
  看似是select方法,實際上會根據平臺自動選擇使用select還是epoll
  它返回一個(key,events)元組,key是一個namedtuple型別的元組,可以使用 key.name 獲取元組的資料
  key 的內容(fileobj,fd,data):
    fileobj  已經註冊的檔案物件
    fd     也就是第一個引數的那個檔案物件的更底層的檔案描述符
    events   等待的IO事件
    data    可選項。可以存一些和fileobj有關的資料,如 sessioin 的 id
  '''
  events = sel.select()   # 監測有無活動物件,沒有就阻塞在這裡等待
  for key,mask in events: # 有活動物件了
    callback = key.data   # key.data 是註冊時傳遞的 accept 函式
    callback(key.fileobj,mask)  # key.fileobj 就是傳遞的 socket 物件

客戶端:

import socket
tin=socket.socket()
tin.connect(('localhost',1234))
while True:
  inp=input('>>>>')
  tin.send(inp.encode('utf8'))
  data=tin.recv(1024)
  print(data.decode('utf8'))

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。