IO阻塞模型 非阻塞模型
IO阻塞模型(blocking IO)
在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:
所以,blocking IO的特點就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。
from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind((‘127.0.0.1‘,8080))
server.listen(5)
while True:
conn,addr = server.accept()
print(addr)
while True:
try:
data = conn.recv(1024)
if not data:break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8080))
while True:
msg = input(‘>>:‘).strip()
if not msg:continue
client.send(msg.encode(‘utf-8‘))
data = client.recv(1024)
print(data.decode(‘utf-8‘))
client.close()
非阻塞IO模型
Linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:
所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數據準備好了沒有。
# 1.對cpu的占用率過多,但是是無用的占用
# 2.在鏈接數過多的情況下不能及時響應客戶端的消息
from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind((‘127.0.0.1‘,8080))
server.listen(5)
server.setblocking(False) # 非阻塞型,默認為阻塞型True
conn_l = []
while True:
try:
conn,addr = server.accept()
conn_l.append(conn)
print(addr)
except BlockingIOError:
# print(‘幹其它活去了‘)
# time.sleep(2)
del_l = []
for conn in conn_l:
try:
data = conn.recv(1024)
if not data: # 針對linux系統
conn.close()
del_l.append(conn)
continue
conn.send(data.upper())
except BlockingIOError:
pass
except ConnectionResetError:
conn.close()
del_l.append(conn)
for conn in del_l:
conn_l.remove(conn)
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8081))
while True:
msg = input(‘>>:‘).strip()
if not msg:continue
client.send(msg.encode(‘utf-8‘))
data = client.recv(1024)
print(data.decode(‘utf-8‘))
client.close()
IO多路復用
IO multiplexing這個詞可能有點陌生,但是如果我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式為事件驅動IO(event driven IO)。我們都知道,select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:
當用戶進程調用了select,那麽整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上還更差一些。因為這裏需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。
強調:
1. 如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。
2. 在多路復用模型中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
結論: select的優勢在於可以處理多個連接,不適用於單個連接
from socket import *
import select
server = socket(AF_INET,SOCK_STREAM)
server.bind((‘127.0.0.1‘,8080))
server.listen(5)
server.setblocking(False) # 非阻塞型,默認為阻塞型True
read_l = [server,]
print(‘strating....‘)
while True:
rl,wl,xl = select.select(read_l,[],[]) # 整體的返回值是一個元組,rl為元組裏的一個列表
# print(‘===>‘,rl) # rl裏的值就是server對象或conn對象
for r in rl:
if r is server:
conn,addr = r.accept()
read_l.append(conn)
else:
try:
data = r.recv(1024)
if not data:
r.close()
read_l.remove(r)
r.send(data.upper())
except ConnectionResetError:
r.close()
read_l.remove(r)
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8081))
while True:
msg = input(‘>>:‘).strip()
if not msg:continue
client.send(msg.encode(‘utf-8‘))
data = client.recv(1024)
print(data.decode(‘utf-8‘))
client.close()
socketserver模塊
TCP
import socketserver
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
print(‘========?>‘,self.request) # self.request is conn
while True:
data = self.request.recv(1024)
self.request.send(data.upper())
if __name__ == ‘__main__‘:
# socketserver.ForkingTCPServer 這個模塊的多進程只能在linux上用
server = socketserver.ThreadingTCPServer((‘127.0.0.1‘,8080),MyTCPHandler)
server.serve_forever()
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8081))
while True:
msg = input(‘>>:‘).strip()
if not msg:continue
client.send(msg.encode(‘utf-8‘))
data = client.recv(1024)
print(data.decode(‘utf-8‘))
client.close()
UDP
import socketserver
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
print(‘========?>‘,self.request) # self.request 是一個元組,第一個值是客戶端發來的消息,第二個值是一個套接字對象
client_data=self.request[0]
self.request[1].sendto(client_data.upper(),self.client_address)
if __name__ == ‘__main__‘:
# socketserver.ForkingTCPServer 這個模塊的多進程只能在linux上用
server = socketserver.ThreadingUDPServer((‘127.0.0.1‘,8080),MyTCPHandler)
server.serve_forever()
from socket import *
client = socket(AF_INET,SOCK_DGRAM)
while True:
msg = input(‘>>:‘).strip()
if not msg:continue
client.sendto(msg.encode(‘utf-8‘),(‘127.0.0.1‘,8080))
data,server_addr = client.recvfrom(1024)
print(data.decode(‘utf-8‘))
client.close()
paramiko模塊
paramiko是一個用於做遠程控制的模塊,使用該模塊可以對遠程服務器進行命令或文件操作,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實
下載安裝
pip3 install paramiko #在python3中
SSHClient
用於連接遠程服務器並執行基本命令
基於用戶名密碼連接:
import paramiko
# 創建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務器
ssh.connect(hostname=‘120.92.84.249‘, port=22, username=‘root‘, password=‘xxx‘)
# 執行命令
stdin, stdout, stderr = ssh.exec_command(‘df‘)
# 獲取命令結果
result = stdout.read()
print(result.decode(‘utf-8‘))
# 關閉連接
ssh.close()
基於公鑰密鑰連接:
客戶端文件名:id_rsa
服務端必須有文件名:authorized_keys(在用ssh-keygen時,必須制作一個authorized_keys,可以用ssh-copy-id來制作)
import paramiko
private_key = paramiko.RSAKey.from_private_key_file(‘/tmp/id_rsa‘)
# 創建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務器
ssh.connect(hostname=‘120.92.84.249‘, port=22, username=‘root‘, pkey=private_key)
# 執行命令
stdin, stdout, stderr = ssh.exec_command(‘df‘)
# 獲取命令結果
result = stdout.read()
print(result.decode(‘utf-8‘))
# 關閉連接
ssh.close()
SFTPClient
用於連接遠程服務器並執行上傳下載
基於用戶名密碼上傳下載
import paramiko
transport = paramiko.Transport((‘120.92.84.249‘,22))
transport.connect(username=‘root‘,password=‘xxx‘)
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put(‘/tmp/id_rsa‘, ‘/etc/test.rsa‘)
# 將remove_path 下載到本地 local_path
sftp.get(‘remove_path‘, ‘local_path‘)
transport.close()
IO阻塞模型 非阻塞模型