1. 程式人生 > >Python自動化之select解析

Python自動化之select解析

原文地址:http://www.cnblogs.com/alex3714/p/4372426.html#top


Python自動化之select解析

本文目錄

 

select原理

網路通訊被Unix系統抽象為檔案的讀寫,通常是一個裝置,由裝置驅動程式提供,驅動可以知道自身的資料是否可用。支援阻塞操作的裝置驅動通常會實現一組自身的等待佇列,如讀/寫等待佇列用於支援上層(使用者層)所需的block或non-block操作。裝置的檔案的資源如果可用(可讀或者可寫)則會通知程序,反之則會讓程序睡眠,等到資料到來可用的時候,再喚醒程序。

這些裝置的檔案描述符被放在一個數組中,然後select呼叫的時候遍歷這個陣列,如果對於的檔案描述符可讀則會返回改檔案描述符。當遍歷結束之後,如果仍然沒有一個可用裝置檔案描述符,select讓使用者程序則會睡眠,直到等待資源可用的時候在喚醒,遍歷之前那個監視的陣列。每次遍歷都是線性的。

select方法

Python的select()方法直接呼叫作業系統的IO介面,它監控sockets,open files, and pipes(所有帶fileno()方法的檔案控制代碼)何時變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個連線變的簡單,並且這比寫一個長迴圈來等待和監控多客戶端連線要高效,因為select直接通過作業系統提供的C的網路介面進行操作,而不是通過Python的直譯器。
只支援Unix,不支援Windows
示例


import select
import socket
import sys
import Queue

# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print >>sys.stderr, 'starting up on %s port %s' % server_address
server.bind
(server_address) # Listen for incoming connections server.listen(5)

select()方法接收並監控3個通訊列表, 第一個是所有的輸入的data,就是指外部發過來的資料,第2個是監控和接收所有要發出去的data(outgoing data),第3個監控錯誤資訊,接下來我們需要建立2個列表來包含輸入和輸出資訊來傳給select().

readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)

所有客戶端的進來的連線和資料將會被server的主迴圈程式放在上面的list中處理,我們現在的server端需要等待連線可寫(writable)之後才能過來,然後接收資料並返回(因此不是在接收到資料之後就立刻返回),因為每個連線要把輸入或輸出的資料先快取到queue裡,然後再由select取出來再發出去

當你把inputs,outputs,exceptional(這裡跟inputs共用)傳給select()後,它返回3個新的list,我們上面將他們分別賦值為readable,writable,exceptional, 所有在readable list中的socket連線代表有資料可接收(recv),所有在writable list中的存放著你可以對其進行傳送(send)操作的socket連線,當連線通訊出現error時會把error寫到exceptional列表中。

Readable list 中的socket 可以有3種可能狀態,第一種是如果這個socket是main "server" socket,它負責監聽客戶端的連線,如果這個main server socket出現在readable裡,那代表這是server端已經ready來接收一個新的連線進來了,為了讓這個main server能同時處理多個連線,在下面的程式碼裡,我們把這個main server的socket設定為非阻塞模式。

# Handle inputs
for s in readable:
 
    if s is server:
        # A "readable" server socket is ready to accept a connection
        connection, client_address = s.accept()
        print >>sys.stderr, 'new connection from', client_address
        connection.setblocking(0)
        inputs.append(connection)
 
        # Give the connection a queue for data we want to send
        message_queues[connection] = Queue.Queue()

第二種情況是這個socket是已經建立了的連線,它把資料發了過來,這個時候你就可以通過recv()來接收它發過來的資料,然後把接收到的資料放到queue裡,這樣你就可以把接收到的資料再傳回給客戶端了。

else:
     data = s.recv(1024)
     if data:
         # A readable client socket has data
         print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
         message_queues[s].put(data)
         # Add output channel for response
         if s not in outputs:
             outputs.append(s)

第三種情況就是這個客戶端已經斷開了,所以你再通過recv()接收到的資料就為空了,所以這個時候你就可以把這個跟客戶端的連線關閉了。

else:
    # Interpret empty result as closed connection
    print >>sys.stderr, 'closing', client_address, 'after reading no data'
    # Stop listening for input on the connection
    if s in outputs:
        outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回資料了,所以這時候如果這個客戶端的連線物件還在outputs列表中,就把它刪掉
    inputs.remove(s)    #inputs中也刪除掉
    s.close()           #把這個連線關閉掉
 
    # Remove message queue
    del message_queues[s] 


else:
    # Interpret empty result as closed connection
    print >>sys.stderr, 'closing', client_address, 'after reading no data'
    # Stop listening for input on the connection
    if s in outputs:
        outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回資料了,所以這時候如果這個客戶端的連線物件還在outputs列表中,就把它刪掉
    inputs.remove(s)    #inputs中也刪除掉
    s.close()           #把這個連線關閉掉
 
    # Remove message queue
    del message_queues[s]  

對於writable list中的socket,也有幾種狀態,如果這個客戶端連線在跟它對應的queue裡有資料,就把這個資料取出來再發回給這個客戶端,否則就把這個連線從output list中移除,這樣下一次迴圈select()呼叫時檢測到outputs list中沒有這個連線,那就會認為這個連線還處於非活動狀態

# Handle outputs
for s in writable:
    try:
        next_msg = message_queues[s].get_nowait()
    except Queue.Empty:
        # No messages waiting so stop checking for writability.
        print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
        outputs.remove(s)
    else:
        print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
        s.send(next_msg)

最後,如果在跟某個socket連線通訊過程中出了錯誤,就把這個連線物件在inputs\outputs\message_queue中都刪除,再把連線關閉掉

# Handle "exceptional conditions"
for s in exceptional:
    print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
    # Stop listening for input on the connection
    inputs.remove(s)
    if s in outputs:
        outputs.remove(s)
    s.close()
 
    # Remove message queue
    del message_queues[s]

客戶端

下面的這個是客戶端程式展示瞭如何通過select()對socket進行管理並與多個連線同時進行互動

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10000)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]

# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)

接下來通過迴圈通過每個socket連線給server傳送和接收資料

for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()

服務端完整程式碼

#_*_coding:utf-8_*_
 
import select
import socket
import sys
import queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)
 
# Sockets from which we expect to read
inputs = [ server ]
 
# Sockets to which we expect to write
outputs = [ ]
 
message_queues = {}
while inputs:
 
    # Wait for at least one of the sockets to be ready for processing
    print( '\nwaiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # Handle inputs
    for s in readable:
 
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print('new connection from', client_address)
            connection.setblocking(False)
            inputs.append(connection)
 
            # Give the connection a queue for data we want to send
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
                print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客戶端都斷開了,我就不用再給它返回資料了,所以這時候如果這個客戶端的連線物件還在outputs列表中,就把它刪掉
                inputs.remove(s)    #inputs中也刪除掉
                s.close()           #把這個連線關閉掉
 
                # Remove message queue
                del message_queues[s]
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
            print('output queue for', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle "exceptional conditions"
    for s in exceptional:
        print('handling exceptional condition for', s.getpeername() )
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
 
        # Remove message queue
        del message_queues[s]

客戶端程式碼

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10000)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)
 
for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()
            s.close()