我的Python成長之路--Day39非同步+回撥機制、執行緒佇列、執行緒Event、協程、Gevent等
1、非同步、同步、阻塞、非阻塞
我們在之前已經簡單介紹過了同步非同步還有阻塞和非阻塞,和本小節我們要介紹的同步呼叫和非同步呼叫有很大關係,所以我們在這裡再囉嗦一下:
1、阻塞與非阻塞指的是程式的兩種執行狀態 阻塞:遇到IO就發生阻塞,程式一旦遇到阻塞操作就會停在原地,並且立刻釋放CPU資源 非阻塞(就緒態或執行態):沒有遇到IO操作,或者通過某種手段讓程式即便是遇到IO操作也不會停在原地,執行其他操作,力求儘可能多的佔有CPU
2、同步與非同步指的是提交任務的兩種方式: 同步呼叫:提交完任務後,就在原地等待,直到任務執行完畢後,拿到任務的返回值,才繼續執行下一行程式碼 非同步呼叫:提交完任務後,不在原地等待,直接執行下一行程式碼,結果?
同步和非同步的詳細解釋 同步,就是在發出一個功能呼叫時,在沒有得到結果之前,該呼叫就不會返回。按照這個定義,其實絕大多數函式都是同步呼叫。但是一般而言,我們在說同步、非同步的時候,特指那些需要其他部件協作或者需要一定時間完成的任務。 舉例:
-
multiprocessing.Pool下的apply 發起同步呼叫後,就在原地等著任務結束,根本不考慮任務是在計算還是在io阻塞,總之就是一股腦地等任務結束,這個是老版本的執行緒池的使用,現在已經基本不用,但是遇到之後要了解
-
concurrent.futures.ProcessPoolExecutor().submit(func,).result() 程序池的使用,同步呼叫,在程序任務提交之後,在原地等待執行結果
-
concurrent.futures.ThreadPoolExecutor().submit(func,).result() 執行緒池的使用,同步呼叫,線上程任務提交之後,程式在原地等待執行結果
非同步的概念和同步相對。當一個非同步功能呼叫發出後,呼叫者不能立刻得到結果。當該非同步功能完成後,通過狀態、通知或回撥來通知呼叫者。如果非同步功能用狀態來通知,那麼呼叫者就需要每隔一定時間檢查一次,效率就很低(有些初學多執行緒程式設計的人,總喜歡用一個迴圈去檢查某個變數的值,這其實是一 種很嚴重的錯誤)。如果是使用通知的方式,效率則很高,因為非同步功能幾乎不需要做額外的操作。至於回撥函式,其實和通知沒太多區別。 舉例: 4. multiprocessing.Pool().apply_async() #發起非同步呼叫後,並不會等待任務結束才返回,相反,會立即獲取一個臨時結果(並不是最終的結果,可能是封裝好的一個物件)。
非同步呼叫:在提交了程序任務後,程式不在此處停留,直接執行後面的程式碼 5. concurrent.futures.ProcessPoolExecutor(3).submit(func,)
非同步呼叫: 在提交了執行緒任務後,程式不在此處停留,直接執行後面的程式碼 7. concurrent.futures.ThreadPoolExecutor(3).submit(func,)
5、阻塞和非阻塞 阻塞呼叫是指呼叫結果返回之前,當前執行緒會被掛起(如遇到io操作)。函式只有在得到結果之後才會將阻塞的執行緒啟用。有人也許會把阻塞呼叫和同步呼叫等同起來,實際上他是不同的。對於同步呼叫來說,很多時候當前執行緒還是啟用的,只是從邏輯上當前函式沒有返回而已。(阻塞呼叫和同步呼叫是不等同的)舉例:1. 同步呼叫:apply一個累計1億次的任務,該呼叫會一直等待,直到任務返回結果為止,但並未阻塞住(即便是被搶走cpu的執行許可權,那也是處於就緒態);2. 阻塞呼叫:當socket工作在阻塞模式的時候,如果沒有資料的情況下呼叫recv函式,則當前執行緒就會被掛起,直到有資料為止。 非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前也會立刻返回,同時該函式不會阻塞當前執行緒
總結一下: 1.只要還擁有cpu的執行權 就不叫阻塞 2. 同步與非同步針對的是函式/任務的呼叫方式:同步就是當一個程序發起一個函式(任務)呼叫的時候,一直等到函式(任務)完成,而程序繼續處於啟用狀態。而非同步情況下是當一個程序發起一個函式(任務)呼叫的時候,不會等函式返回,而是繼續往下執行當,函式返回的時候通過狀態、通知、事件等方式通知程序任務完成。 3. 阻塞與非阻塞針對的是程序或執行緒:阻塞是當請求不能滿足的時候就將程序掛起,而非阻塞則不會阻塞當前程序
2、非同步+回撥機制
我們上邊已經介紹了非同步呼叫,那麼我們現在來思考一個問題,假如我們現在使用非同步呼叫來實現一個簡單的爬蟲小程式,爬蟲的原理就是在在爬到包之後,對字串進行解析,拿到我們想要的內容,首先我們來看一下我們根據之前學的知識寫出來的程式,並分析程式存在什麼問題 在寫出下面的程式之前可能會有的人想到這樣的解決辦法: 我直接在提交任務之後,使用obj.result()來獲取下載到的結果,然後再下邊再進行解析不就可以了麼,額,這樣的話,確實解決了第一個問題,對爬取到的結果及時的進行處理,但是這樣做就把解析的工作加到了主程序中,並且完全是序列的執行方式,只有第一個任務爬取完畢,然後獲得結果進行解析,這樣第一個任務才算結束,所以這個方案不可取,這個方案還不如下面的方案 爬蟲小程式Version1.0 我在這裡使用的執行緒池來完成多個爬取任務的,(因為電腦配置比較低,哈哈哈), 當然也可以使用程序池 實現我們的需求當然是可以實現,下面我們來分析一下這個版本的程式的缺點: 1、任務的返回值不能得到及時的處理,必須等到所有任務都執行完畢才能統一進行處理 2、解析的過程是序列執行的,如果解析一次需要花費2s,解析9次則需要花費18s
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import requests
import time
import random
def get(url):
print('%s GET %s' %(current_thread().name,url))
# 獲取目標網站的域名,併發送連線請求
response=requests.get(url)
time.sleep(random.randint(1,3))
# 如果響應的狀態碼為200,則表示連線成功
if response.status_code == 200:
# 幹解析的活
return response.text
# 對爬取到的內容進行模擬解析
def pasrse(obj):
print('%s 解析結果為:%s' %(current_thread().name,len(res)))
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.python.org',
]
#使用執行緒池完成多個爬取任務
pool=ThreadPoolExecutor(4)
obj_l=[]
for url in urls:
提交爬取任務
obj=pool.submit(get,url)
obj_l.append(obj)
# 等待爬取任務執行完畢
pool.shutdown(wait=True)
for obj in obj_l:
res=obj.result()
pasrse(res)
print('主執行緒',current_thread().name)
針對上個版本的程式的問題我們有沒有更好的解決方案呢,提案一、首先我們要解決對爬取到的內容及時進行解析,並且還要保證解析多個網站不是序列的,對於這兩個問題,可能會有的同學這麼想,我程式最後取結果進行解析的for迴圈改裝成一個多執行緒或者多程序不就實現了併發執行麼,這樣確實可以解決解析是序列的問題,但是要考慮到又要開好幾個執行緒甚至程序來解決問題,本來程式裡邊已經開了4個執行緒了,下面又要開,這樣顯然是不合理的,所以這種方案不可取,提案二、那從第一個問題出發,我們怎麼解決及時處理爬取到的資訊,我們在爬取資訊成功的第一時間直接進行解析是不是就算是及時處理了,當然是的,那麼我們把下邊的迴圈取結果並解析去掉,然後在爬取結果結束的return處,直接呼叫pasrse()函式進行解析可不可以呢,當然可以了, 那我們來分析一下這兩種方案的可行性,首先提案一中,如果所有的任務下載需要花5秒的時間,解析需要花2秒的時間,由於都是併發執行,所以完成所有任務需要的總時間大約是7秒,看一下方案二,首先每個程序需要做的工作是增加了的,原來只需要做爬取工作,而現在還需要做解析的工作,解析的時間我們之前假設的是需要2秒,那麼方案二中每個程序在做完爬取的工作之後需要多做2秒的解析工作,但是這時候每個程序都是併發執行爬取和解析的,所以完成總任務的時間和方案一同樣大約是7秒左右,但是提案二是比提案一可取的,原因是提案一雖然使用併發解決了問題,但是他有多開了其他的執行緒或者程序,耗費了資源增加了系統的執行壓力,顯然方案二就比較聰明一點,在原有的基礎上進行改進,不增加新程序的同時又解決了問題,
我們來接著考慮一下,是不是提案二就是我們想要的解決方案呢,你看方案二把程式中的兩個問題都很好的解決了,真的是這樣麼,方案二雖然解決了程式存在的問題,但是它又出現了新的問題,那就是將兩個任務耦合到了一起,我們知道,在寫程式的時候要儘量避開耦合,因為耦合到一起的任務,只要其中一個出現問題,其他的肯定會受到影響,那有沒有一種更好的解決方案呢?解決方案就是我們今天介紹的回撥機制:
回撥機制的原理: 回撥機制的原理其實就是在上一個函式執行完得到返回結果後,將返回結果拿給目標函式去使用. 在這個案例中,在提交任務的時候,將回調機制捆綁在任務物件上然後丟進執行緒池去執行,執行緒將get任務執行完之後就會將任務丟擲,這個任務丟擲總要有人做呀,這個時候主程序就會接收回調機制返回的物件,並作為引數傳給pasrse,之後再執行,這種過程就成為回撥機制,回撥機制一般情況下都是和非同步呼叫配套使用的,使用回撥機制會解開程式的耦合,並且還能利用主執行緒來分擔一點任務需要注意的是: 1、拿給目標函式使用的不是原函式的返回值本身,而是一個物件,這個物件可以使用obj.result()來獲取原函式的返回值,並別隻有當原函式成功執行結束並拿到返回值之後才會將這個物件傳給目標函式,所以目標函式在使用obj.result的時候並不會阻塞住. 2、在使用回撥機制的時候,目標函式的引數不能隨便設定,它接收的只有一個回撥機制傳過來的物件,所以只需要一個位置引數就可以了 下面我們來看一下使用回撥機制來解決我們上邊程式中的問題: 使用回撥機制所消耗的時間同樣也是大約7秒,為什麼還大約是7秒呢,那我們來分析一下:將任務丟給執行緒池之後,執行緒池同時執行爬取任務,並且在爬取成功後將結果返回給主程序去解析,主程序在解析的時候,執行緒池裡邊的空閒執行緒繼續做沒有處理的爬取任務,這個時候雖然爬取和解析是同時進行的,也就是說在爬取所有任務的這5秒時間裡也做了一部分解析的工作,但是總有一個數據是最後被爬取完的,最後爬取完的資料你也得去解析啊,這樣在所有的爬取任務處理完之後,解析任務應該還有最後剛剛爬取完的資料還沒有解析,所以要在最後加上2秒的解析時間,雖然這樣做沒有提高程式的效率,但是它解開了程式的耦合,充分利用了主程序
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import requests
import time
import random
def get(url):
print('%s GET %s' % (current_thread().name, url))
# 獲取目標網站的域名,併發送連線請求
response = requests.get(url)
time.sleep(random.randint(1, 3))
# 如果響應的狀態碼為200,則表示連線成功
if response.status_code == 200:
# 幹解析的活
return response.text
# 對爬取到的內容進行模擬解析
def pasrse(obj): #注意:這裡pasrse中的引數不可隨便設定,它只接收一個物件,只需要一個位置引數!
res = obj.result()
print('%s 解析結果為:%s' % (current_thread().name, len(res)))
if __name__ == '__main__':
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.python.org',
]
# 使用執行緒池完成多個爬取任務
pool = ThreadPoolExecutor(4)
for url in urls:
# 提交爬取任務
obj = pool.submit(get, url)
#使用回撥機制將get函式的返回值提供給pasrse函式使用(注意:傳過去的是一個物件)
obj.add_done_callback(pasrse)
print('主執行緒', current_thread().name)
3、執行緒佇列
queue佇列 :使用import queue,用法與程序Queue一樣
class queue.
Queue
(maxsize=0) #先進先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''
class queue.
PriorityQueue
(maxsize=0) #儲存資料時可設定優先順序的佇列
import queue
q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''
4、執行緒Event
同進程的一樣
執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞執行緒;
event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;
event.clear():恢復event的狀態值為False。
下面我們來模擬一下有多個工作執行緒嘗試連結伺服器,我們想要在連結前確保伺服器服務正常才讓那些工作執行緒去連線伺服器,如果連線不成功,都會去嘗試重新連線。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作
from threading import Event, current_thread, Thread
import time
event = Event()
def check():
print('%s 正在檢測服務是否正常....' % current_thread().name)
time.sleep(5)
event.set()
def connect():
count = 1
while not event.is_set():
if count == 4:
print('嘗試的次數過多,請稍後重試')
return
print('%s 嘗試第%s次連線...' % (current_thread().name, count))
event.wait(1)
count += 1
print('%s 開始連線...' % current_thread().name)
if __name__ == '__main__':
t1 = Thread(target=connect)
t2 = Thread(target=connect)
t3 = Thread(target=connect)
c1 = Thread(target=check)
t1.start()
t2.start()
t3.start()
c1.start()
5、協程
1、協程: 單執行緒實現併發 在應用程式裡控制多個任務的切換+儲存狀態 優點: 應用程式級別速度要遠遠高於作業系統的切換 缺點: 多個任務一旦有一個阻塞沒有切,整個執行緒都阻塞在原地 該執行緒內的其他的任務都不能執行了
一旦引入協程,就需要檢測單執行緒下所有的IO行為, 實現遇到IO就切換,少一個都不行,以為一旦一個任務阻塞了,整個執行緒就阻塞了, 其他的任務即便是可以計算,但是也無法運行了
2、協程式的目的: 想要在單執行緒下實現併發 併發指的是多個任務看起來是同時執行的 併發=切換+儲存狀態
5.1為什麼要有協程 本節的主題是基於單執行緒來實現併發,即只用一個主執行緒(很明顯可利用的cpu只有一個)情況下實現併發,為此我們需要先回顧下併發的本質:切換+儲存狀態
cpu正在執行一個任務,會在兩種情況下切走去執行其他的任務(切換由作業系統強制控制), 一種情況是該任務發生了阻塞, 另外一種情況是該任務計算的時間過長或有一個優先順序更高的程式替代了它
我們在介紹程序的時候介紹了程序的三種狀態:就緒,執行和阻塞,到後面我們又學習了執行緒,並知道執行緒才是執行單位,所以也可以說這三種狀態是執行緒的三種狀態: 我們還是把原來的圖拿過來:
其中第二種情況並不能提升效率,只是為了讓cpu能夠雨露均沾,實現看起來所有任務都被“同時”執行的效果,如果多個任務都是純計算的,這種切換反而會降低效率。為此我們可以基於yield來驗證。yield本身就是一種在單執行緒下可以儲存任務執行狀態的方法,我們來簡單複習一下: 1 yiled可以儲存狀態,yield的狀態儲存與作業系統的儲存執行緒狀態很像,但是yield是程式碼級別控制的,更輕量級 2 send可以把一個函式的結果傳給另外一個函式,以此實現單執行緒內程式之間的切換
#序列執行
import time
def func1():
for i in range(10000000):
i+1
def func2():
for i in range(10000000):
i+1
start = time.time()
func1()
func2()
stop = time.time()
print(stop - start) # 1.3762071132659912
#基於yield併發執行
import time
def func1():
while True:
yield
def func2():
g=func1()
for i in range(10000000):
i+1
next(g)
start=time.time()
func2()
stop=time.time()
print(stop-start) # 2.443591833114624
單純地切換反而會降低執行效率
二:第一種情況的切換是在任務一遇到io情況下,切到任務二去執行,這樣就可以利用任務一阻塞的時間完成任務二的計算,效率的提升就在於此對於單執行緒下,我們不可避免程式中出現io操作,但如果我們能在自己的程式中(即使用者程式級別,而非作業系統級別)控制單執行緒下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該執行緒能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在使用者程式級別將自己的io操作最大限度地隱藏起來,從而可以迷惑作業系統,讓其看到:該執行緒好像是一直在計算,io比較少,從而更多的將cpu的執行許可權分配給我們的執行緒
協程的本質就是在單執行緒下,由使用者自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案: 1. 可以控制多個任務之間的切換,切換之前將任務的狀態儲存下來,以便重新執行時,可以基於暫停的位置繼續執行。
2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換
我們這節課所要了解的協程,正好可以滿足我們的需求: 協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。、
需要強調的是: 1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行) 2. 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比作業系統控制執行緒的切換,使用者在單執行緒內控制協程的切換 優點如下: 1. 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級 2. 單執行緒內就可以實現併發的效果,最大限度地利用cpu
缺點如下: 1. 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程 2. 協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒
到這裡我們的協程基本上就介紹的差不多了,下面我們來總結一下協程都具有那些特點: 總結協程特點:
- 必須在只有一個單執行緒裡實現併發
- 修改共享資料不需加鎖
- 使用者程式裡自己儲存多個控制流的上下文棧
- 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模組(select機制))
Greenlet介紹:
greenlet其實只是提供了一種比使用yield和send更方便的切換方式,但是他並沒有解決遇到I/O自動切換的功能,
如果我們在單個執行緒內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再呼叫send。。。非常麻煩),而使用greenlet模組可以非常簡單地實現這20個任務直接的切換,我們也可以理解為greenlet裡邊是封裝了yield和send的工作原理
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name)
g2.switch('egon')
print('%s eat 2' %name)
g2.switch()
def play(name):
print('%s play 1' %name)
g1.switch()
print('%s play 2' %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('allen')#可以在第一次switch時傳入引數,以後都不需要
單純的切換(在沒有io的情況下或者沒有重複開闢記憶體空間的操作),反而會降低程式的執行速度 計算密集型序列和切換執行對比:
#順序執行
import time
def f1():
res=1
for i in range(100000000):
res+=i
def f2():
res=1
for i in range(100000000):
res*=i
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337
#切換
from greenlet import greenlet
import time
def f1():
res=1
for i in range(100000000):
res+=i
g2.switch()
def f2():
res=1
for i in range(100000000):
res*=i
g1.switch()
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524
greenlet只是提供了一種比generator(生成器)更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到I/O自動切換來提升效率的問題。
單執行緒裡的這20個任務的程式碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模組。
Gevent模組
由於Gevent是一個第三方庫,在使用的時候要自行進行安裝(安裝命令: pip3 install gevent)
Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程.
'''
Gevent用法
'''
g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程物件g1,spawn括號內第一個引數是函式名,如eat,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式eat的
g2=gevent.spawn(func2)
g1.join() #等待g1結束
g2.join() #等待g2結束
#或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
遇到IO阻塞時會自動切換任務(這個僅限於Gevent本身所帶的類似於time.sleep()這樣的I/O)
import gevent
def eat(name):
print('%s eat 1' %name)
gevent.sleep(2)
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
gevent.sleep(1)
print('%s play 2' %name)
g1=gevent.spawn(eat,'allen')
g2=gevent.spawn(play,name='eric')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')
上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,
而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行程式碼,打補丁,就可以識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模組之前
或者我們乾脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到檔案的開頭在這裡的打補丁我們可以理解為,monkey下的path方法會在每一個I/O操作上設定一個標識,使得Gevent可以識別到,遇到I/O後可以自動切換去執行其他的任務
最終我們可以實現的單執行緒下的併發效果:
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print('play 1')
time.sleep(1)
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')
1.Gevent與非同步和同步
from gevent import spawn,joinall,monkey;monkey.patch_all()
import time
def task(pid):
"""
Some non-deterministic task
"""
time.sleep(0.5)
print('Task %s done' % pid)
def synchronous():
for i in range(10):
task(i)
def asynchronous():
g_l=[spawn(task,i) for i in range(10)]
joinall(g_l)
if __name__ == '__main__':
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
#上面程式的重要部分是將task函式封裝到Greenlet內部執行緒的gevent.spawn。 初始化的greenlet列表存放在陣列threads中,此陣列被傳給gevent.joinall 函式,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。
Gevent的應用: 協程應用:
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' %url)
response=requests.get(url)
if response.status_code == 200:
print('%d bytes received from %s' %(len(response.text),url))
start_time=time.time()
gevent.joinall([
gevent.spawn(get_page,'https://www.python.org/'),
gevent.spawn(get_page,'https://www.yahoo.com/'),
gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
協程應用:爬蟲
2.通過gevent實現單執行緒下的socket併發(from gevent import monkey;monkey.patch_all()一定要放到匯入socket模組之前,否則gevent無法識別socket的阻塞)服務端:
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr)
def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080)
客戶端:
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
客戶端
多執行緒併發多個客戶端:
from threading import Thread
from socket import *
import threading
def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM) #套接字物件一定要加到函式內,即區域性名稱空間內,放在函式外則被所有執行緒共享,則大家公用一個套接字物件,那麼客戶端埠永遠一樣了
c.connect((server_ip,port))
count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(500):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()
多執行緒併發多個客戶端