python3 非同步IO\資料庫\佇列\快取
阿新 • • 發佈:2019-01-31
yield 詳解
協程
協程:又稱微執行緒,纖程,協程是一種使用者態的輕量級執行緒
協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切回來的時候,恢復先前儲存的暫存器上下文和棧。因此:
協程能保留上一次呼叫時的狀態(即所有區域性狀態的一個特定組合),每次過程重入時,就相當於進入上一次呼叫的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
select 多併發socket 例子
服務端:客戶端: 在range中調整併發客戶端數目 當個數為1000時 出現select數目過多問題import select import socket import sys import queue server = socket.socket() server.setblocking(False) server_addr = ('localhost',9999) print('starting up on %s port %s' % server_addr) server.bind(server_addr) server.listen(5) inputs = [server,] outputs = [] message_queues = {} while True: print('waiting for next event...') readable,writeable,exeptional = select.select(inputs, outputs, inputs) for s in readable: #每個s就是一個socket if s is server: conn,client_addr = s.accept() print('new connection from',client_addr) conn.setblocking(False) inputs.append(conn) message_queues[conn] = queue.Queue() else: data = s.recv(1024) if data: print("收到來自[%s]的資料:" %s.getpeername()[0],data) message_queues[s].put(data) if s not in outputs: outputs.append(s) else: print('客戶端斷開了',s) if s in outputs: outputs.remove(s) inputs.remove(s) del message_queues[s] for s in writeable: try: next_msg = message_queues[s].get_nowait() except queue.Empty: print("client [%s]" %s.getpeername()[0],'queue is empty..') outputs.remove(s) else: print('sending msg to [%s]' %s.getpeername()[0],next_msg) s.send(next_msg.upper()) for s in exeptional: print('handling exception for ',s.getpeername) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
import socket import sys messages = [b'This is the message.', b'It will be sent', b'in parts.'] server_address = ('localhost',9999) socks = [socket.socket(socket.AF_INET,socket.SOCK_STREAM) for i in range(100)] print('connecting to %s port %s' %server_address) for s in socks: s.connect(server_address) for message in messages: for s in socks: print('%s:sending "%s"' %(s.getsockname(),message)) s.send(message) for s in socks: data = s.recv(1024) print('%s:recvived "%s"' %(s.getsockname(),data)) if not data: print(sys.stderr,'closing socket',s.getsockname)
linux中改socket代開檔案控制代碼數限制
檢視:ulimit -n
修改:ulimit -SHn 65535 假如限制數改為65535
selectors模組服務端
import selectors import socket sel = selectors.DefaultSelector() def accept(sock,mask): conn,addr = sock.accept() print('accepted',conn,'from',addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn,mask): data = conn.recv(1024) if data: print('echoing',repr(data),'to',conn) conn.send(data.upper()) else: print('closing',conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost',9999)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key,mask in events: callback = key.data callback(key.fileobj,mask)
selectors模組服務端 封裝類版本
import selectors
import socket
class accept_class(object):
def __init__(self):
self.conn = None
self.addr = None
self.mask = None
def accept(self,sock, mask):
conn, addr = sock.accept()
self.conn = conn
self.addr = addr
self.mask = mask
print('accepted', self.conn, 'from', self.addr)
self.conn.setblocking(False)
read1 = read_class()
sel.register(self.conn, selectors.EVENT_READ, read1.read)
class read_class(object):
def __init__(self):
self.conn = None
self.mask = None
def read(self,conn,mask):
data = conn.recv(1000) # Should be ready
print(data)
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('localhost', 9999))
sock.listen(100)
sock.setblocking(False)
sel = selectors.DefaultSelector()
accept1 = accept_class()
sel.register(sock, selectors.EVENT_READ, accept1.accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)