老男孩14期自動化運維day10隨筆和作業
1.IO(磁碟,網路等)操作不佔用CPU
計算佔用CPU,例如1+1
多執行緒使用場景:python多執行緒不適合CPU密集操作型的任務,適合IO密集型的任務(例如socket server )
2.程序
每一個程序都是由預設父程序啟動的(每一個子程序都是由主程序啟動的)
比如在pycharm啟動程式 ,在windows上是pycharm為父程序:主程序的父程序為pycharm
比如在linux終端啟動程式,在linux上是terminal為父程序,主程序的父程序為terminal
兩個方法:os.getpid() 獲取程序號
os.getppid() 獲取父程序號
建立程序與執行緒類似
from multiprocessing import Process
def func(a):
pass
p=Process(target=func,args=(a,))
p.start()
3.多程序(multiprocess)
多程序間的通訊:不同程序之間是不允許訪問對方記憶體的,多程序要實現通訊,只能通過以下方式
----程序Queue
----pipe 管道
以下為後兩種方式詳細解釋:
(1)程序通過程序Queue進行通訊
與執行緒的queue不一樣 只能用於程序通訊的特殊的queue叫程序Queue
from multiprocessing import Process,Queue
def f(q):
q.put([42,None,'1'])
if __name__=='__main__':
q=Queue()
p=Process(target=f,args=(q,))
p.start()
print(q.get())
上述過原理為: 在主程序通過程序QUEUE讀到了子程序往q裡存的資料
解析過程:
其實是兩個queue,主程序開啟queue,把queue作為引數傳入子程序中,是複製了另一個queue給子程序(pickle序列化)
然後 子程序往queue傳資料,再pickle反序列化給主程序的queue
只是實現了這個程序的資料傳給另一個程序
(2)PIPE管道
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello from child'])
conn.send([42, None, 'hello from child3'])
print("",conn.recv())
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # 生成管道有兩頭,返回兩個值
p = Process(target=f, args=(child_conn,))
p.start()
print("parent",parent_conn.recv()) # prints "[42, None, 'hello']"
print("parent",parent_conn.recv()) # prints "[42, None, 'hello']"
parent_conn.send(" from hshs") # prints "[42, None, 'hello']"
p.join()
就是生成一個Pipe管道物件,管道有兩頭,返回兩個值,在這裡兩頭沒有準確定義必須是傳資料的還是收資料的
(2)程序間的資料共享:
本來程序之間記憶體不能互相訪問,但是可以通過manager實現程序間的資料共享
----通過manager
from multiprocessing import Process, Manager
import os
# 程序間的共享資料
# 其實也是copy了十個資料 最後彙總,而且內部加了鎖 ,不用人為加鎖
def f(d, l):
d[os.getpid()]=os.getpid()
l.append(1)
print(l,d)
if __name__ == '__main__':
with Manager() as manager: # 和 manger=Manager() 一樣
d = manager.dict() # 生成一個可在多個程序之間共享、傳遞的字典
l = manager.list(range(5)) # 生成一個可在多個程序之間共享、傳遞的list
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list: # 等待結果
res.join()
l.append("from parent")
print(d)
print(l)
原理:建立manager物件,通過manager.dict()生成一個可以再多程序間共享、傳遞的字典物件,通過manager.list() 生成一個可以在多程序間共享、傳遞的列表物件
其實就是copy了十個資料,最後彙總,而且內部加了鎖,所以這裡不用加鎖
4.程序鎖
本身程序之間記憶體不共享,為什麼還需要程序鎖?
原因:主要是保證在螢幕上列印的時候不亂
from multiprocessing import Process, Lock
# 程序鎖
# 本身程序之間記憶體不共享,為什麼還需要程序鎖?
# 原因:主要是保證在螢幕上列印的時候不亂
def f(l, i):
#l.acquire()
print('hello world', i)
#l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(100):
Process(target=f, args=(lock, num)).start()
5.程序池
程序啟動開銷比執行緒啟動大很多,所以python有程序池 概念防止電腦崩潰。執行緒池可以通過訊號量自己定義
在windows上啟動多程序跟linux不一樣 ,要import freeze_support或者加if name==‘main’
from multiprocessing import Process,Pool,freeze_support
# 在windows上啟動多程序跟linux不一樣 ,要import freeze_support或者加if __name__=='__main__'
import time
import os
def Foo(i):
time.sleep(2)
print("in process ",os.getpid())
return i+100
def Bar(arg):
print("--->exec done:",arg)
if __name__=='__main__':
#freeze_support()
pool=Pool(5) # 和 pool=Pool(process=5)一樣 意思為 允許程序池同時放入5個程序,同時執行的程序只有五個
for i in range(10):# 啟動了 但是被放進程序池的程序才會執行
pool.apply_async(func=Foo,args=(1,),callback=Bar) # callback=回撥 執行完Foo 再執行Bar ,注意:回撥是主程序呼叫的而不是子程序,例如備份資料庫時候只用父程序建立連線,子程序去回撥父程序的連線 而不用多次建立連線
# pool.apply(func=Foo,args=(i,)) # 序列
# pool.apply_async(func=Foo,args=(1,)) # 並行
print('end')
pool.close()
pool.join() # 等所有程序結束 不寫join 主程序不會等子程序結束
# 注意python的要求(官方文件都沒寫),先close再join(死記硬背),如果把join註釋了,不等子程序執行完畢程式就關閉了
過程:通過Pool()建立一個pool物件。
pool=Pool(5) 與 pool=Pool(process=5) 表示允許同時放入5個程序,同時執行的程序只有五個,跟多執行緒的訊號量一樣。
在for迴圈裡啟動程序,但這是還沒有啟動,只有放入程序池的程序才能執行。
pool.apply(func=Foo,args=(1,)) 這是序列執行程序
pool.apply_async(func=Foo,args=(1,)) 這是並行執行程序(非同步)
pool.app;y_async(func=Foo,args=(1,),callback=Bar)
這裡callback是回撥函式,執行完Foo,再執行Bar,注意:回撥是主程序的呼叫而不是子程序的呼叫,例如備份資料庫時候只用父程序建立連線,子程序去回撥父程序的連線
而不用多次建立連線
最後很重要的一點:python的官方要求文件都沒寫,必須先close再join(死記硬背),如果把join註釋了,不等子程序執行完畢程式就關閉了。(而不像多執行緒時,先join再close)
pool.close()
pool.join()
另外:這裡再將一下_ _name _ _的的意思:
如果再程式里加入 if name== 'main’則是手動執行的時候會執行
如果把該程式當做模組被另外程式import後,另外的程式不執行 if name== 'main’裡的內容
____ main__代表主程序的__name_ 子程序為__mp_main__的__name__
所以 if name=='main’是判斷是否__name__為主程序
(就是當前該程式手動執行自己,被其他模組匯入則不執行裡面的內容)
6.協程
(微執行緒)是一種使用者態的輕量級執行緒
協程為什麼很快:協程是遇到IO操作就切換,所以剩下都是CPU操作就很快
執行緒的切換是儲存在cpu的暫存器裡,而協程擁有自己的暫存器上下文和棧。
可以在單執行緒下實現併發效果(實際上還是序列 因為切換時間很快,所以在使用者視角下是並行)
優點:
1.無需執行緒上下文切換的開銷
2.無需原子操作的鎖定及同步的開銷(改變數可以叫原子操作)
3.方便切換控制流,簡化程式設計模型
4.高併發、高擴充套件、低成本:一個cpu支援上萬的協程都不是問題缺點:
1.因為是單執行緒,無法利用多核資源,它不能同時將單個CPU的多個核用上(不能多核),協程
需要和程序配合才能執行在多CPU上,除非是CPU密集型應用
2.進行阻塞(Blocking)操作(如IO)會阻塞掉整個程式
注意:當一個函式含有yield關鍵字時 ,第一次呼叫它是變成一個生成器,必須加__next__()才執行
兩種切換方式
(1)手動切換 greenlet
from greenlet import greenlet
# 手動切換 gevent 封裝了greenlet
def test1():
print(12)
gr2.switch() # 切換gr2
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch() # 切換gr1
print(78)
gr1 = greenlet(test1) #啟動一個協程
gr2 = greenlet(test2)
gr1.switch()
(2)自動切換 gevent
gevent自動IO切換 封裝了greenlet (手動IO切換),可以通過greenlet實現併發同步或非同步程式設計
import gevent
# 自動IO切換
def foo():
print('Running in foo')
gevent.sleep(2)
print('Explicit context switch to foo again')
def bar():
print('Explicit精確的 context內容 to bar')
gevent.sleep(1)
print('Implicit context switch back to bar')
def func3():
print("running func3 ")
gevent.sleep(0)
print("running func3 again ")
gevent.joinall([
gevent.spawn(foo), # 啟動協程
gevent.spawn(bar),
gevent.spawn(func3),
])
程式碼中 sleep只是gevent模擬io消耗的時間代指類似於io的消耗的時間
gevent.spawn(協程) 啟動協程,遇到IO操作自動切換
執行過程為:
foo 先列印第一句 然後遇到io 切換給bar列印第一句 然後遇到io 切換給func3 列印第一句 然後遇到io
又給foo 發現foo還在等io,就給bar bar 也在等io ,又給func3 io結束後 列印第二句,返回給foo 還在等io給bar,bar 等io結束列印第二句 發給foo,foo等待io結束 列印foo第二句
這個單執行緒的非同步執行,只要2s ,但是不使用協程要3s,要2s是指單執行緒中io等待時間最多的點
通過gevent自動切換協程能實現什麼?
(1)很牛逼的實現:gevent實現大併發單執行緒socket server(通過協程)
from gevent import socket, monkey
monkey.patch_all()
# 通過gevent自動切換協程實現單執行緒下的socket併發(很牛逼!!!)
# 大併發socket server(單執行緒)
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(9999)
(2)gevent實現簡單大併發單執行緒爬蟲
import gevent,time
from urllib import request
from gevent import monkey
#簡單協程大併發爬網頁
monkey.patch_all() # 把當前程式的所有io操作給我單獨做上標記
def f(url):
print('GET:%s'%url)
resp=request.urlopen(url)
data=resp.read()
print('%d bytes received from %s .'%(len(data),url))
urls=['https://www.python.org/',
'https://www.yahoo.com/',
'https://github.com/'
]
time_start=time.time()
for url in urls:
f(url)
print("同步cost:",time.time()-time_start)
async_time_start=time.time()
gevent.joinall([
gevent.spawn(f,'https://www.python.org/'),
gevent.spawn(f,'https://www.yahoo.com/'),
gevent.spawn(f,'https://github.com/'),
])
print("非同步cost:",time.time()-async_time_start)
(沒加monkey.path.all()前提下)時間一樣是因為gevent 跟urllib沒關係 ,gevent不知道urllib在做io ,所以就沒有切換,可以通過加入 monkey補丁
monkey.patch_all() # 把當前程式的所有io操作給我單獨做上標記,標記他是IO操作,遇到他就切換
7.論事件驅動與非同步IO
通常,我們寫伺服器處理模型的程式時,有以下幾種模型:
(1)每收到一個請求,建立一個新程序,來處理該請求
(2)每收到一個請求,建立一個新執行緒,來處理該請求
(3)每收到一個請求,放入一個事件列表,讓主程序通過非阻塞IO方式來處理請求(協程)
io是作業系統執行的(就是利用事件驅動模型把io操作扔到作業系統中一個佇列,io執行完後呼叫回撥函式告知你執行完的標記)
上面的幾種方式,各有千秋:
第(1)種方法,由於建立新的程序的開銷比較大,所以,會導致伺服器效能比較差,但實現比較簡單
第(2)種方法,由於要涉及到執行緒的同步,有可能面臨死鎖等問題
第(3)種方法,在寫應用程式程式碼時,邏輯比前面兩種都複雜
綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網路伺服器採用的方式
事件驅動模型:
1.有一個事件(訊息)佇列
2.例如滑鼠按下時 ,往這個佇列中增加一個點選事件(訊息)
3.有個迴圈,不斷從佇列中取出事件,根據不同的時間,呼叫不同的函式,如onClick()、onKeyDown()等
4.事件(訊息)一般都各自儲存各自的處理函式指標,這樣,每個訊息都有獨立的處理函式
事件驅動程式設計是一種程式設計正規化,這裡程式的執行流由外部事件決定
8.IO多路複用
程序的阻塞:
只有處於執行態的程序,才有可能轉為阻塞狀態。當程序進入阻塞狀態時,不會佔用CPU資源檔案描述符 fd:
就是一組非負整數,是作業系統內部檔案記錄表(有序的,存放的是控制代碼物件)的索引值,作業系統拿到檔案描述 符,從檔案記錄表中找到檔案控制代碼物件,從物件中操作資料。在unix,linux上才有檔案描述符的概念快取IO:
又被稱作標準I/O,大多數檔案系統預設IO都是快取IO。在linux快取IO機制中,資料會先拷貝到作業系統核心的快取 區中,然後從作業系統核心再拷貝到應用程式的地址空間(比如socket中,兩次send會發送在一起(黏包),是因為系統為了減少作業系統核心拷貝
到應用程式的開銷。)(核心態—》使用者態的資料轉換) 快取IO缺點:這些資料拷貝對cpu以及記憶體的開銷是非常大的
9.IO五種網路模式(有一種驅動IO很少用)
情景:使用者有個read操作
1-3都是同步IO(synchronous IO 必須等核心態到使用者態的轉變)
(1)阻塞IO(blocking iO)
在linux ,預設情況下所有的socket都是阻塞IO (blocking IO) 使用者傳送read操作到核心
核心中沒有資料,在等待資料被髮送過來,此時使用者程序在等待,當核心中有資料後,再返回給使用者。 使用者在等待的時候就是阻塞I/O
(2)非阻塞IO(nonblocjing io)
linux下可以通過設定socket為nonblocking
使用者傳送read操作到核心,核心中沒有資料,使用者不用等核心是或否有資料,核心沒有資料會發送一個error到使用者,使用者收到
核心的資訊做判斷,當為error的時候可以去做其他事,收到資料之後再處理資料 所以 nonblocking
IO的特點是使用者程序徐不斷的主動詢問kernel資料好了沒有,可以實現使用者視角下的單執行緒多併發
但是在核心態到使用者態 如果資料過大 還是會阻塞
(3)I/O多路複用(IO multiplexing或者 event driven IO 事件驅動IO)
常用的select poll epoll 是建立在非租塞IO的情況下,因為非阻塞IO情況下,在等待
接受資料的時候沒有阻塞,但是在拷貝資料的時候,如果從核心拷貝到使用者的資料太大,則會阻塞,這是IO多路複用要解決的問題。
三種方式 select poll epoll:
select (windows,linux) 例如多個連線 迴圈這些連線(例如有一百個連結,就迴圈這個一百個,有一個返回資料就返回給使用者),任意一個返回就返回訊號(缺點,檔案描述符上限1024,當然可以自行修改,如果要迴圈連線(陣列輪詢)過多,容易浪費資源)
poll 沒有最大檔案描述符限制(基於select優化 但還是有select的缺點)
epoll (最流行的,windows不支援,linux2.6核心.Django就是用的這個,例如nginx)
(1)epoll_create 建立一個epoll物件,一般epollfd = epoll_create()
(2)epoll_ctl (epoll_add/epoll_del的合體),往epoll物件中增加/刪除某一個流的某一個事件
比如
epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//註冊緩衝區非空事件,即有資料流入
epoll_ctl(epollfd, EPOLL_CTL_DEL, socket,
EPOLLOUT);//註冊緩衝區非滿事件,即流可以被寫入(3)epoll_wait(epollfd,…)等待直到註冊的事件發生
(注:當對一個非阻塞流的讀寫發生緩衝區滿或緩衝區空,write/read會返回-1,並設定errno=EAGAIN。而epoll只關心緩衝區非滿和緩衝區非空事件)。
4.非同步IO(asynchronous IO,不用等核心態到使用者態)—用得少(其實很多叫非同步IO都用的是IO多路複用 epoll)
發起一個read操作,立刻返回,所以不會對使用者程序產生任何block。然後kernel會等待資料準備完成,然後拷貝到使用者記憶體,
當這一切完成之後,kenel會給使用者程序傳送一個signal,告訴他read操作已完成。
注意 :這裡拷貝完成才會給使用者程序傳送一個signal,所以使用者程序是不會阻塞的,使用者程序只是把任務丟給核心,可以去做其他事,當他收到核心傳送的signal時就知道資料已經從核心態到使用者態了。
以上這四種網路模式圖解:
一個單執行緒下通過select方式實現IO多路複用的socket server 例子:
import select
import socket
import queue
# 單執行緒下的io 多路複用的selcet實現socket_server
server=socket.socket()
server.bind(('localhost',9000))
server.listen(1000)
# 設定為非阻塞模式
server.setblocking(False) # 不阻塞
inputs=[server]
outputs=[]
while True:
readable,writeable,exceptional=select.select(inputs,outputs,inputs)
print(readable,writeable,exceptional)
for r in readable:
if r is server: # 如果是server 代表來了一個新連線
conn,addr=server.accept()
print("來了個新連線",addr)
inputs.append(conn) # 是應為這個新建立的連線還沒發資料過來,現在就接受的話程式就要報錯
# 所以要想實現這個客戶端發資料來時,server端能知道,就需要讓select再監測這個conn
else: # 如果是之前的conn 表示發資料了
data=r.recv(1024)
print("收到資料",data)
r.send(data)
print("send done..")
server.setblocking(False) 設為非阻塞模式
兩個列表 inputs 和 outputs
select 去迴圈去inputs列表裡的物件
在inputs列表裡的物件首先必須要是server本身,其次是conn連線例項。
如果是server 代表來了一個新連線
然後 inputs.append(conn) 是應為這個新建立的連線還沒發資料過來,現在就接受的話程式就要報錯 ,所以要想實現這個客戶端發資料來時,server端能知道,就需要讓select再監測這個conn
如果是conn 表示這個連線開始發資料了