使用select.select編寫聊天室伺服器 《Python網路程式設計攻略》
#
現實中,大型網路伺服器可能要處理幾百或幾千個客戶端同時連線的請求,此時為每個客戶端建立單獨的執行緒或程序可能不實際。因為主機的記憶體可用量和CPU的能力皆有限制。
要處理大量客戶端的連線需要更好的技術,那就是Python提供的select模組。
select
select最早於1983年出現在4.2BSD中,它通過一個select()系統呼叫來監視多個檔案描述符的陣列,當select()返回後,該陣列中就緒的檔案描述符便會被核心修改標誌位,使得程序可以獲得這些檔案描述符從而進行後續的讀寫操作。
Python的select()方法直接呼叫作業系統的IO介面,它監控sockets,open files, and pipes(所有帶fileno()方法的檔案控制代碼)何時變成readable 和writeable, 或者通訊錯誤。
select()使得同時監控多個連線變的簡單,並且這比寫一個長迴圈來等待和監控多客戶端連線要高效,因為select直接通過作業系統提供的網路介面進行操作,而不是通過Python的直譯器。
select()方法接收並監控3個通訊列表, 第一個是所有的輸入的data,就是指外部發過來的資料,第2個是監控和接收所有要發出去的data(outgoing data),第3個監控錯誤資訊,接下來我們需要建立2個列表來包含輸入和輸出資訊來傳給select().
# Sockets from which we expect to read
inputs = [ server ]
# Sockets to which we expect to write
outputs = [ ]
# Outgoing message queues (socket:Queue)
message_queues = {}
while inputs:
# Wait for at least one of the sockets to be ready for processing
print >>sys.stderr, '\nwaiting for the next event'
readable, writable, exceptional = select.select(inputs, outputs, inputs)
當你把inputs,outputs,exceptional(這裡跟inputs共用)傳給select()後,它返回3個新的list,我們上面將他們分別賦值為readable,writable,exceptional, 所有在readable list中的socket連線代表有資料可接收(recv),所有在writable list中的存放著你可以對其進行傳送(send)操作的socket連線,當連線通訊出現error時會把error寫到exceptional列表中。
Readable list
Readable list中的socket 可以有3種可能狀態:
- 第一種是如果這個socket是main “server” socket,它負責監聽客戶端的連線,如果這個main server socket出現在readable裡,那代表這是server端已經ready來接收一個新的連線進來了,為了讓這個main server能同時處理多個連線,在下面的程式碼裡,我們把這個main server的socket設定為非阻塞模式。
# Handle inputs
for s in readable:
if s is server:
# A "readable" server socket is ready to accept a connection
connection, client_address = s.accept()
print >>sys.stderr, 'new connection from', client_address
connection.setblocking(0)
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection] = Queue.Queue()
- 第二種情況是這個socket是已經建立了的連線,它把資料發了過來,這個時候你就可以通過recv()來接收它發過來的資料,然後把接收到的資料放到queue裡,這樣你就可以把接收到的資料再傳回給客戶端了。
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
- 第三種情況就是這個客戶端已經斷開了,所以你再通過recv()接收到的資料就為空了,所以這個時候你就可以把這個跟客戶端的連線關閉了。
else:
# Interpret empty result as closed connection
print >>sys.stderr, 'closing', client_address, 'after reading no data'
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回資料了,所以這時候如果這個客戶端的連線物件還在outputs列表中,就把它刪掉
inputs.remove(s) #inputs中也刪除掉
s.close() #把這個連線關閉掉
# Remove message queue
del message_queues[s]
writable list
對於writable list中的socket,也有幾種狀態,如果這個客戶端連線在跟它對應的queue裡有資料,就把這個資料取出來再發回給這個客戶端,否則就把這個連線從output list中移除,這樣下一次迴圈select()呼叫時檢測到outputs list中沒有這個連線,那就會認為這個連線還處於非活動狀態。
# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
# No messages waiting so stop checking for writability.
print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
outputs.remove(s)
else:
print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
s.send(next_msg)
select 例項
伺服器端示例程式碼
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import select
import socket
import sys
import queue
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
# Listen for incoming connections
server.listen(5)
# Sockets from which we expect to read
inputs = [ server ]
# Sockets to which we expect to write
outputs = [ ]
message_queues = {}
while inputs:
# Wait for at least one of the sockets to be ready for processing
print( '\nwaiting for the next event')
readable, writable, exceptional = select.select(inputs, outputs, inputs)
# Handle inputs
for s in readable:
if s is server:
# A "readable" server socket is ready to accept a connection
connection, client_address = s.accept()
print('new connection from', client_address)
connection.setblocking(False)
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection] = queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result as closed connection
print('closing', client_address, 'after reading no data')
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回資料了,所以這時候如果這個客戶端的連線物件還在outputs列表中,就把它刪掉
inputs.remove(s) #inputs中也刪除掉
s.close() #把這個連線關閉掉
# Remove message queue
del message_queues[s]
# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# No messages waiting so stop checking for writability.
print('output queue for', s.getpeername(), 'is empty')
outputs.remove(s)
else:
print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print('handling exceptional condition for', s.getpeername() )
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
客戶端示例程式碼
__author__ = 'jieli'
import socket
import sys
messages = [ 'This is the message. ',
'It will be sent ',
'in parts.',
]
server_address = ('localhost', 10000)
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
s.connect(server_address)
for message in messages:
# Send messages on both sockets
for s in socks:
print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
if not data:
print >>sys.stderr, 'closing socket', s.getsockname()
s.close()
執行結果參考:
實現方法
- 本例將客戶端和伺服器端程式碼寫在一個腳本里,執行時指定不同的–name引數區別執行的是伺服器或客戶端:當傳入–name=server時,指令碼啟動聊天室伺服器;當傳入的是其他引數,如client1、client2時,則執行的是客戶端。
- 聊天室伺服器埠通過–port指定。
- 對大型的應用程式而言,最好在不同模組中編寫伺服器和客戶端。
程式程式碼
'''
Created on 2017-2-28
@author: lenovo
'''
import select
import socket
import sys
import signal
import cPickle
import struct
import argparse
SERVER_HOST = 'localhost'
CHAT_SERVER_NAME = 'server'
# Some utilities
def send(channel, *args):
buffer = cPickle.dumps(args)
value = socket.htonl(len(buffer))
size = struct.pack("L",value)
channel.send(size)
channel.send(buffer)
def receive(channel):
size = struct.calcsize("L")
size = channel.recv(size)
#socket.recv(bufsize[, flags])
#Receive data from the socket. The return value is a string representing the data received.
#The maximum amount of data to be received at once is specified by bufsize. See the Unix manual page recv(2) for the meaning of the optional argument flags; it defaults to zero.
#Note:For best match with hardware and network realities, the value of bufsize should be a relatively small power of 2, for example, 4096.
try:
size = socket.ntohl(struct.unpack("L",size)[0])
#socket.ntohl(x)
#Convert 32-bit positive integers from network to host byte order.
#On machines where the host byte order is the same as network byte order,
#this is a no-op; otherwise, it performs a 4-byte swap operation.
except struct.error, e:
return ''
buf = ""
while len(buf) < size:
buf += channel.recv(size - len(buf))
return cPickle.loads(buf)[0]
class ChatServer(object):
"""An example chat server using select"""
def __init__(self,port,backlog=5):
self.clients = 0
self.clientmap = {}
self.outputs = [] #list out sockets
self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((SERVER_HOST,port))
print 'Server listening to port: %s...' %port
self.server.listen(backlog)
# Catch keyboard interrupts
signal.signal(signal.SIGINT, self.sighandler)
def sighandler(self,signum,frame):
"""Clean up client output"""
# Close the server
print 'Shutting down server...'
# close existing client sockets
for output in self.outputs:
output.close()
self.server.close()
def get_client_name(self,client):
"""Return the name of the client"""
info = self.clientmap[client]
host,name = info[0][0],info[1]
return '@'.join((name,host))
def run(self):
# define input source.
inputs = [self.server,sys.stdin]
self.outputs = []
running = True
while running:
try:
readable,writeable,exceptional = select.select(inputs, self.outputs, [])
except select.error, e:
break
for sock in readable:
if sock == self.server:
# handle the server socket
client,address = self.server.accept()
print "Chat server: got connection %d from %s" %(client.fileno(),address)
# read the login name
cname = receive(client).split('NAME: ')[1]
# Compute client name and send back
self.clients += 1
send(client, 'CLIENT: ' + str(address[0]))
inputs.append(client)
self.clientmap[client] = (address,cname)
# Send joining information to other clients
msg = "\n(Connected: New client (%d) from %s)" % (self.clients,self.get_client_name(client))
for output in self.outputs:
send(output,msg)
self.outputs.append(client)
elif sock == sys.stdin:
#handle standard inut
junk = sys.stdin.readline()
running = False
else:
# Handle all other sockets
try:
data = receive(sock)
if data:
#send as new client's message...
msg = '\n#[' + self.get_client_name(sock) +']>>' + data
# send data to all except ourself
for output in self.outputs:
if output != sock:
send(output,msg)
else:
print "Chat server: %d hung up" % sock.fileno()
self.clients -= 1
sock.close()
inputs.remove(sock)
self.outputs.remove(sock)
#sending client leaving information to others
msg = "\n(Now hung up: Client from %s)" % self.get_client_name(sock)
for output in self.outputs:
send(output,msg)
except socket.error, e:
#remove
inputs.remove(sock)
self.outputs.remove(sock)
self.server.close()
class ChatClient(object):
""" A command ine chat client using select """
def __init__(self,name,port,host=SERVER_HOST):
self.name = name
self.connected = False
self.host = host
self.port = port
#Initial prompt
self.prompt = '[' + '@'.join((name,socket.gethostname().split('.')[0])) + ']>'
# Connect to server at port
try:
self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.sock.connect((host,self.port))
print "New connected to chat server @ port %d" %self.port
self.connected = True
#Send my name...
send(self.sock,'NAME: ' + self.name)
data = receive(self.sock)
#Contains client address, set it
addr = data.split('CLIENT: ')[1]
self.prompt = '[' +'@'.join((self.name,addr)) +']>'
except socket.error, e:
print "Failed to connect to chat server @ port %d" %self.port
sys.exit(1)
def run(self):
""" Chat client main loop"""
while self.connected:
try:
sys.stdout.write(self.prompt)
sys.stdout.flush()
#wait for input from stdin and socket
readable,writeable,exceptional = select.select([0,self.sock], [], [])
for sock in readable:
if sock == 0:
data = sys.stdin.readline().strip()
if data: send(self.sock,data)
elif sock == self.sock:
data = receive(self.sock)
if not data:
print 'Client shutting down.'
self.connected = False
break
else:
sys.stdout.write(data + '\n')
sys.stdout.flush()
except KeyboardInterrupt:
print " CLient interrupted. """
self.sock.close()
break
if __name__ == "__main__":
parser = argparse.ArgumentParser(description = 'Socket Server Example with Select')
parser.add_argument('--name', action="store",dest="name",required=True)
parser.add_argument('--port',action="store",dest="port",type=int,required=True)
given_args=parser.parse_args()
port=given_args.port
name=given_args.name
if name == CHAT_SERVER_NAME:
server = ChatServer(port)
server.run()
else:
client = ChatClient(name=name,port=port)
client.run()