生成器、協程
生成器、協程
目錄1 基礎知識準備
2 生成器Generator
在 Python 中,使用了 yield
的函式被稱為生成器(generator)。
-
只有一個
.__next__()
方法:
跟普通函式不同的是,生成器是一個返回迭代器的函式,只能用於迭代操作,更簡單點理解生成器就是一個迭代器。 -
只記錄當前位置:
在呼叫生成器執行的過程中,每次遇到yield
時函式會暫停並儲存當前所有的執行資訊,返回yield
next()
方法時從當前位置繼續執行。 -
呼叫一個生成器函式,返回的是一個迭代器物件。
2.1 列表生成式
列表生成式在呼叫列表之前建立完整的列表,如果列表引數龐大,會佔用大量的記憶體空間。
L = [x*2 for x in range(10)]
print(L)
print(L[3])
print(len(L))
# 輸出:
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
6
10
2.2 生成器
generator儲存的是演算法,每次呼叫next(G)
,就計算出G的下一個元素的值,直到計算到最後一個元素。沒有更多的元素時,丟擲StopIteration
# 建立L和G的區別僅在於最外層的[]和(),L是一個list,而G是一個generator。
G = (i * 2 for i in range(10))
print(G)
print(G.__next__())
print(G.__next__())
print(G.__next__())
print(G.__next__())
for value in G:
print('value', value)
print('G[5]', G[5])
# 輸出
<generator object <genexpr> at 0x000001CF71EF0430>
0
2
4
6
value 8
value 10
value 12
value 14
value 16
value 18
Traceback (most recent call last):
File "generator01.py", line 9, in <module>
print('G[5]', G[5])
TypeError: 'generator' object is not subscriptable
2.3 斐波拉契數列(Fibonacci)
2.3.1 斐波拉契數列函式寫法
# Description:列印斐波拉契數列(Fibonacci):
# 除第一個和第二個數外,任意一個數都可由前兩個數相加得到:
# 1, 1, 2, 3, 5, 8, 13, 21, 34, ...
# 斐波拉契數列用列表生成式寫不出來,但是,用函式把它打印出來卻很容易:
def fib(number):
x, y = 0, 1
for i in range(number):
print(y)
x, y = y, x + y
fib(10)
注意,賦值語句:
x, y = y, x + y
相當於:
z = (y, x + y) # z是一個tuple
x = z[0]
y = z[1]
# 結論:不必顯式寫出臨時變數z就可以賦值。
2.3.2 yield 方式生成斐波拉契數列函式
- 如果一個函式定義中包含yield關鍵字,那麼這個函式就不再是一個普通函式,而是一個generator
def fib(number):
x, y = 0, 1
for i in range(number):
yield y
x, y = y, x + y
f = fib(10)
print(f)
print(f.__next__())
print(f.__next__())
print('-----out-----')
print(f.__next__())
print(f.__next__())
print('-----start loop-----')
for i in f:
print(i)
# 輸出:
<generator object fib at 0x000001BF8A080430>
1
1
-----out-----
2
3
-----start loop-----
5
8
13
21
34
55
2.4 yield 生成器返回值特點
- 生成器返回值為
yield
的返回值,沒有更多的元素時,丟擲StopIteration
的錯誤。
def fib(max):
x, y = 0, 1
for i in range(max):
yield y
x, y = y, x + y
return 'done'
g = fib(5)
while True:
try:
z = next(g)
print('z:', z)
except StopIteration as error_info:
print('Generator return value:', error_info.value) # 錯誤時丟擲返回值。
break
# 輸出
z: 1
z: 1
z: 2
z: 3
z: 5
Generator return value: done
2.5 yield from
-
yield from
內部會自動捕獲StopIteration
異常,並把異常物件的value
屬性變成yield from
表示式的值。
def for_test():
for i in range(3):
yield i
def yield_from_test():
yield from range(3)
print(list(for_test()))
print(list(yield_from_test()))
# 輸出:
[0, 1, 2]
[0, 1, 2]
-
yield from x
表示式內部首先是呼叫iter(x)
,然後再呼叫next()
,因此x是任何的可迭代物件。 -
yield from
的主要功能就是開啟雙向通道,作用是把最外層的呼叫方和最內層的子生成器連線起來,同時子生成器也可使用yield from
呼叫另一個子生成器,一直巢狀下去直到遇到yield
表示式結束鏈式。
def test(name):
print('in test(): ', name)
x = yield name # 呼叫next()時,產出yield右邊的值後暫停;呼叫send()時,產出值賦給x,並往下執行
print('send值:', x)
return 'fcarey'
def grouper2():
result2 = yield from test('fcarey') # 在此處暫停,等待子生成器的返回後繼續往下執行
print('result2的值:', result2)
return result2
def grouper():
result = yield from grouper2() # 在此處暫停,等待子生成器的返回後繼續往下執行
print('result的值:', result)
return result
def main():
g = grouper()
next(g)
try:
g.send(10)
except StopIteration as e:
print('返回值:', e.value)
if __name__ == '__main__':
main()
3 協程
3.1 協程介紹
-
非同步IO:就是發起一個IO操作(如:網路請求,檔案讀寫等),這些操作一般是比較耗時的,不用等待它結束,可以繼續做其他事情,結束時會發來通知。
-
協程:又稱為微執行緒,協程英文名Coroutine。在一個執行緒中執行,執行函式時可以隨時中斷,由程式(使用者)自身控制,執行效率極高,與多執行緒比較,沒有切換執行緒的開銷和多執行緒鎖機制。
-
協程和生成器:
- 從語句上看,協程和生成器類似,都是包含了yield關鍵字。
- 不同之處在於協程中yield關鍵詞通常出現在
=
右邊,可以產出值a(y = yield a)
或不產出值時為None(y = yield)
。呼叫方可以用send函式傳送值給協程。 - 啟用協程時在
yield
處暫停,等待呼叫方傳送資料,下次繼續在yield
暫停。從根本上看,yield
是流程控制的工具,可以實現協作式多工。
-
python中非同步IO操作是通過
asyncio
模組來實現的。 -
符合什麼條件就能稱之為協程:
- 必須在只有一個單執行緒裡實現併發
- 修改共享資料不需加鎖
- 使用者程式裡自己儲存多個控制流的上下文棧
- 一個協程遇到IO操作自動切換到其它協程
3.1.1 存在yield函式執行過程
以示例來說明:
def Foo():
print('starting eating baozi...')
while True:
baozi = yield 'caibao'
print('\033[34;1m fcarey\033[0m is eating baozi %s' % baozi)
if __name__ == '__main__':
f = Foo()
print('第一次', next(f))
print('第二次', next(f))
# 輸出
starting eating baozi...
第一次 caibao
fcarey is eating baozi None
第二次 caibao
# 單步輸出
def Foo():
print('starting eating baozi...')
while True:
baozi = yield 'caibao'
print('\033[34;1m fcarey\033[0m is eating baozi %s' % baozi)
if __name__ == '__main__':
f = Foo()
print(f)
print('step 1 :', f.__next__())
print('*' * 25)
print('step 2 :', f.__next__())
print('*' * 25)
# `send()`方法的作用是恢復generator併發送一個值給當前yield表示式。
print('step 3 :', f.send('roubao'))
# 輸出
starting eating baozi...
step 1 : caibao
*************************
fcarey is eating baozi None
step 2 : caibao
*************************
fcarey is eating baozi roubao
step 3 : caibao
# 呼叫next()時,返回yield右邊的值後暫停;呼叫send()時,返回值賦給baozi,並往下執行
- 程式執行後,會得到一個生成器f,
<generator object Foo at 0x000001D120FB2180>
- 呼叫
f.__next__()
指標下移一位,執行Foo函式,列印starting eating baozi...
再進入while迴圈,執行yield
語句,會暫停Foo函式執行、返回caibao
並跳出此函式。 - 此時由於程式已經跳出此函式,變數'baozi'並沒有被賦值。
- 執行
print('*'*25)
- 呼叫
f.__next__()
指標下移一位,Foo函式將從上個暫停的指標位置執行函式,baozi
沒有被賦值,返回None
;由於while迴圈,執行yield
語句,會暫停Foo函式執行、返回caibao
並跳出此函式 - 執行
print('*'*25)
- 呼叫
f.send(3)
,由於send()
方法的作用是恢復generator併發送一個值給當前yield表示式。Foo函式將從上個暫停的指標位置執行函式,send()
將roubao
賦值給baozi
變數,返回roubao
,由於while迴圈,再執行yield
語句,會暫停Foo函式執行、返回caibao
並跳出此函式
3.1.2 通過yield實現協程併發運算的效果
def consumer(name):
print("%s 準備吃包子啦!" %name)
while True:
baozi = yield
print("包子[%s]來了,被[%s]吃了!" %(baozi,name))
b1 = 'big baozi'
c = consumer('abc')
c.__next__()
c.__next__()
c.send(b1)
c.__next__()
# 輸出
abc 準備吃包子啦!
包子[None]來了,被[abc]吃了!
包子[big baozi]來了,被[abc]吃了!
包子[None]來了,被[abc]吃了!
結論:
-
yield
: 儲存當前狀態,並返回當前值 -
next()
呼叫yield但不會給yield傳值 -
send()
呼叫yield會給yield傳值
3.2 Greenlet 協程模組
greenlet是一個用C實現的協程模組。相比與python自帶的yield,它可以使你在任意函式之間隨意切換,而不需把這個函式先宣告為 generator
from greenlet import greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1.switch()
# 生成一個協程
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
# 輸出結果:
12
56
34
78
3.3 Gevent 協程模組
Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。
3.3.1 協程間自動切換
import gevent
def Foo():
print('runnig Foo')
# gevent.sleep()觸發切換
gevent.sleep(2)
print('explicit running in Foo again')
def Bar():
print('running Bar')
gevent.sleep(1)
print('explicit running in Bar again')
def Fun():
print('running Fun')
gevent.sleep(0)
print('explicit running in Fun again')
# 列表形式,產生3個協程,實現協程之間自動切換
gevent.joinall([
gevent.spawn(Foo),
gevent.spawn(Bar),
gevent.spawn(Fun),
])
# 輸出:
runnig Foo
running Bar
running Fun
explicit running in Fun again
explicit running in Bar again
explicit running in Foo again
3.3.2 同步與非同步的效能區別
非同步遇到IO阻塞時會自動切換任務
from urllib import request
import gevent
import time
from gevent import monkey
# 預設urllib無法與gevent協同,需要打個補丁,將當前程式所有I/O操作打標記
monkey.patch_all()
def Fun(url):
print('GET: %s' % url)
req = request.urlopen(url)
data = req.read()
print('%d bytes receied from %s.' % (len(data), url))
urls = ['https://www.python.org',
'https://www.yahoo.com',
'https://www.baidu.com',
]
time_start = time.time()
for url in urls:
Fun(url)
print('同步cost', time.time() - time_start)
async_time_start = time.time()
gevent.joinall([
gevent.spawn(Fun, 'https://www.python.org'),
gevent.spawn(Fun, 'https://www.yahoo.com'),
gevent.spawn(Fun, 'https://www.baidu.com'),
])
print("非同步cost", time.time() - async_time_start)
# 輸出
GET: https://www.python.org
50435 bytes receied from https://www.python.org.
GET: https://www.yahoo.com
3369 bytes receied from https://www.yahoo.com.
GET: https://www.baidu.com
227 bytes receied from https://www.baidu.com.
同步cost 2.240593194961548
GET: https://www.python.org
GET: https://www.yahoo.com
GET: https://www.baidu.com
227 bytes receied from https://www.baidu.com.
3369 bytes receied from https://www.yahoo.com.
50435 bytes receied from https://www.python.org.
非同步cost 1.5009031295776367
3.3.3 實現單執行緒下的多socket併發
server端
import socket
import gevent
from gevent import monkey
monkey.patch_all()
def server(port):
ser = socket.socket()
ser.bind(('localhost', port))
ser.listen(1024)
while True:
conn, addr = ser.accept()
gevent.spawn(handle_request, conn)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print('rece: ', data)
conn.send(data.upper())
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(6666)
client端
#單次連線
import socket
HOST = 'localhost'
PORT = 6666
cli = socket.socket()
cli.connect((HOST, PORT))
while True:
msg = input('>>: ').strip().encode('utf-8')
if len(msg) == 0: continue
cli.send(msg)
data = cli.recv(1024)
print(data.decode('utf-8'))
# 併發
import socket
import threading
HOST = 'localhost'
PORT = 6666
def sock_conn():
client = socket.socket()
client.connect((HOST, PORT))
client.send( ("hello %s" %count).encode("utf-8"))
data = client.recv(1024)
print("[%s]recv from server:" % threading.get_ident(),data.decode()) #結果
client.close()
for i in range(100):
t = threading.Thread(target=sock_conn)
t.start()
3.4 協程非同步IO(asyncio)模組
一定要看:Python中協程非同步IO(asyncio)詳解 - 知乎 (zhihu.com)
3.4.1 asyncio 中的概念
-
事件迴圈:管理所有的事件,在整個程式執行過程中不斷迴圈執行並追蹤事件發生的順序將它們放在佇列中,空閒時呼叫相應的事件處理者來處理這些事件。
-
Future物件:表示尚未完成的計算,還未完成的結果
-
Task:是Future的子類,作用是在執行某個任務的同時可以併發的執行多個任務。
asyncio.Task
是用於實現協作式多工的庫,且Task物件不能使用者手動例項化,通過下面2個函式建立:loop.create_task() asyncio.ensure_future()
-
run_until_complete():阻塞呼叫,直到協程執行結束才返回。引數是future,傳入協程物件時內部會自動變為future
-
asyncio.sleep():模擬IO操作,這樣的休眠不會阻塞事件迴圈,前面加上await後會把控制權交給主事件迴圈,在休眠(IO操作)結束後恢復這個協程。
若在協程中需要有延時操作,應該使用
await asyncio.sleep()
,而不是使用time.sleep()
,因為使用time.sleep()
後會釋放GIL(全域性直譯器鎖),阻塞整個主執行緒,從而阻塞整個事件迴圈。 -
建立Task:
loop.create_task()
:接收一個協程,返回一個asyncio.Task的例項,也是asyncio.Future
的例項。返回值可直接傳入run_until_complete()
-
獲取協程返回值:
-
task.result()
,只有執行完畢後才能獲取,若沒有執行完畢,result()
方法不會阻塞去等待結果,而是丟擲asyncio.InvalidStateError
錯誤。 - 通過
add_done_callback()
回撥:
-
-
控制任務:通過
asyncio.wait()
可以控制多工,asyncio.wait()
是一個協程,不會阻塞,立即返回,返回的是協程物件。傳入的引數是future或協程構成的可迭代物件。最後將返回值傳給run_until_complete()
加入事件迴圈 -
動態新增協程:方案是建立一個執行緒,使事件迴圈線上程內永久執行,相關函式:
loop.call_soon_threadsafe() :與 call_soon()類似,等待此函式返回後馬上呼叫回撥函式,返回值是一個 asyncio.Handle 物件,此物件內只有一個方法為 cancel()方法,用來取消回撥函式。 loop.call_soon() : 與call_soon_threadsafe()類似,call_soon_threadsafe() 是執行緒安全的 loop.call_later():延遲多少秒後執行回撥函式 loop.call_at():在指定時間執行回撥函式,這裡的時間統一使用 loop.time() 來替代 time.sleep() asyncio.run_coroutine_threadsafe(): 動態的加入協程,引數為一個回撥函式和一個loop物件,返回值為future物件,通過future.result()獲取回撥函式返回值
3.4.2 非同步協程呼叫過程
- 非同步協程
- 在函式(特殊的函式)定義的時候,如果使用了async修飾的話,則改函式呼叫後會返回一個協程物件,並且函式內部的實現語句不會被立即執行
- 在特殊函式內部的實現中不可以出現不支援非同步的模組程式碼
- 使用者可主動控制程式,在認為耗時IO處新增
await(yield from)
。在asyncio
庫中,協程使用@asyncio.coroutine
裝飾,使用yield from
來驅動,在python3.5後作瞭如下更改:@asyncio.coroutine -> async
yield from -> await
import asyncio
# 返回async修飾的函式return值,需要使用回撥函式
def call_back(task):
print(task.result())
async def test():
print("it's test()")
return "test()"
# async修飾的函式,呼叫之後返回一個協程物件
c = test()
# 建立一個迴圈物件
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop=loop)
# 將協程物件註冊到迴圈物件中,啟動loop
loop.run_until_complete(c)
# 輸出
it's test()
# task 的使用
task = loop.create_task(c)
# 將 callback 方法傳遞給了封裝好的 task 物件,這樣當 task 執行完畢之後就呼叫 callback 方法;
# 同時 task 物件還會作為引數傳遞給 callback 方法,呼叫 task 物件的 result 方法就可以獲取返回結果。
task.add_done_callback(call_back)
# 將task註冊到迴圈物件中,啟動loop
loop.run_until_complete(task)
print(task)
# 輸出
it's test()
test()
<Task finished name='Task-1' coro=<test() done, defined at test.py:10> result='test()'>
# future 的使用
future = asyncio.ensure_future(c, loop=loop)
# 將callback 方法傳遞給了封裝好的 future 物件,這樣當 future 執行完畢之後就呼叫 callback 方法;
# 同時 future 物件還會作為引數傳遞給 callback 方法,呼叫 future 物件的 result 方法就可以獲取返回結果。
future.add_done_callback(call_back)
loop.run_until_complete(future)
3.4.3 多工處理
import asyncio
import time
start = time.time()
# 在特殊函式內部的實現中不可以出現不支援非同步的模組程式碼
async def get_url(url):
await asyncio.sleep(2)
print('得到url:', url)
urls = [
'www.1.com',
'www.2.com',
'www.3.com',
]
# 建立一個迴圈物件
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop=loop)
# 將協程物件註冊到future迴圈物件中,啟動loop
tasks = []
for url in urls:
c = get_url(url)
task = asyncio.ensure_future(c, loop=loop)
tasks.append(task)
# 啟動loop
loop.run_until_complete(asyncio.wait(tasks))
print(time.time() - start)
# 輸出
得到url: www.1.com
得到url: www.2.com
得到url: www.3.com
2.0074543952941895
3.4.4 協程處理request請求
- 服務端
from flask import Flask
import time
app = Flask(__name__)
@app.route('/site1')
def index_site1():
time.sleep(2)
return 'site1'
@app.route('/site2')
def index_site2():
time.sleep(2)
return 'site2'
@app.route('/site3')
def index_site3():
time.sleep(2)
return 'site3'
if __name__ == '__main__':
app.run(threaded=True)
- 客戶端
import requests
# aiohttp:支援非同步網路請求的模組
import aiohttp
import time
import asyncio
s = time.time()
urls = [
'http://127.0.0.1:5000/site1',
'http://127.0.0.1:5000/site2',
'http://127.0.0.1:5000/site3',
]
"""
# 在特殊函式內部的實現中不可以出現不支援非同步的模組程式碼,如此處request是不支援的
# 最終導致執行後沒有非同步處理效果
async def get_request(url):
page_text = requests.get(url).text
print(page_text)
return page_text
# 輸出
site1
site2
site3
6.022470474243164
"""
async def get_request(url):
async with aiohttp.ClientSession() as s:
async with s.get(url=url) as response:
page_text = await response.text()
print(page_text)
return page_text
"""
# 輸出
site2
site3
site1
2.007105588912964
"""
# 建立迴圈物件
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop=loop)
# 將協程物件註冊到future迴圈物件中,啟動loop
tasks = []
for url in urls:
c = get_request(url)
task = asyncio.ensure_future(c, loop=loop)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print(time.time() - s)