生成器和協程乾貨
理解生成器
定義生成器
yield關鍵字,可以讓我們定義一個生成器函式。
def generator_func(): print('a') yield 1 g = generator_func() print(g)
>>> <generator object generator_func at 0x10e178b88>
推動生成器
使用next函式從生成器中取值
def generator_func(): print('a') yield 1 g = generator_func() ret1 = next(g) print(ret1)
>>>
a
1
使用next可以推動生成器的執行,下面的程式碼,我們可以看到每一次執行next可以讓generator_func中的程式碼從上一個位置開始繼續執行到yield,並且將yield後面的值返回到函式外部,最終我們可以執行到yield 3。
def generator_func(): print('a') yield 1 print('b') yield 2 print('c') yield 3 print('d') g = generator_func() ret1 = next(g) print(ret1) ret2 = next(g) print(ret2) ret3 = next(g) print(ret3)
>>>
a
1
b
2
c
3
當函式中已經沒有更多的yield時繼續執行next(g),遇到StopIteration
def generator_func(): print('a') yield 1 print('b') yield 2 print('c') yield 3 print('d') g = generator_func() ret1 = next(g) print(ret1) ret2 = next(g) print(ret2) ret3 = next(g) print(ret3) next(g)
send向生成器中傳送資料。send的作用相當於next,只是在驅動生成器繼續執行的同時還可以向生成器中傳遞資料。
import numbers def cal_sum(): sum_num = 0 while True: num = yield if isinstance(num,numbers.Integral): sum_num += num print('sum :',sum_num) elif num is None: break g = cal_sum() g.send(None) # 相當於next(g),預啟用生成器 g.send(31) g.send(25) g.send(17) g.send(8) >>> sum : 31 sum : 56 sum : 73 sum : 81
生成器中的return和StopIteration
import numbers def cal_sum(): sum_num = 0 while True: num = yield if isinstance(num,numbers.Integral): sum_num += num print('sum :',sum_num) elif num is None: break return sum_num g = cal_sum() g.send(None) # 相當於next(g),預啟用生成器 g.send(31) g.send(25) g.send(17) g.send(8) g.send(None) # 停止生成器 >>> sum : 31 sum : 56 sum : 73 sum : 81 Traceback (most recent call last): File "/Users/jingliyang/PycharmProjects/python的進階/manager.py", line 19, in <module> g.send(None) StopIteration: 81
import numbers def cal_sum(): sum_num = 0 while True: num = yield if isinstance(num,numbers.Integral): sum_num += num print('sum :',sum_num) elif num is None: break return sum_num g = cal_sum() g.send(None) # 相當於next(g),預啟用生成器 g.send(31) g.send(25) g.send(17) g.send(8) try: g.send(None) # 停止生成器 except StopIteration as e: print(e.value)異常處理以及獲取return的值
生成器的close和throw
使用throw向生成器中拋一個異常
def throw_test(): print('a') yield 1 print('b') yield 2 g = throw_test() next(g) g.throw(Exception,'value error')
>>>
a
Traceback (most recent call last):
File "/Users/jingliyang/PycharmProjects/python的進階/manager.py", line 32, in <module>
g.throw(ValueError,'value error') # throw和send、next相同,都是驅動生成器繼續執行,只不過throw用來向生成器中拋一個異常
File "/Users/jingliyang/PycharmProjects/python的進階/manager.py", line 26, in throw_test
yield 1
ValueError: value error
def throw_test(): print('a') try: yield 1 except ValueError: pass print('b') yield 2 g = throw_test() next(g) ret = g.throw(ValueError,'value error') # throw和send、next相同,都是驅動生成器繼續執行,只不過throw用來向生成器中拋一個異常 print(ret) >>> a b 2throw+異常處理
使用close關閉一個生成器
def throw_test(): print('a') yield 1 print('b') yield 2 g = throw_test() ret1 = next(g) print(ret1) g.close() next(g) >>> a 1 Traceback (most recent call last): File "/Users/jingliyang/PycharmProjects/python的進階/manager.py", line 45, in <module> next(g) StopIteration
yield from關鍵字
yield from關鍵字可以直接返回一個生成器
l = ['h','e','l'] dic = {'l':'v1','o':'v2'} s = 'eva' def yield_from_gen(): for i in l: yield i for j in dic: yield j for k in s: yield k for item in yield_from_gen(): print(item,end='') >>>helloeva l = ['h','e','l'] dic = {'l':'v1','o':'v2'} s = 'eva' def yield_from_gen(): yield from l yield from dic yield from s for item in yield_from_gen(): print(item,end='') >>>helloeva
from itertools import chain l = ['h','e','l'] dic = {'l':'v1','o':'v2'} s = 'eva' def yield_from_gen(): yield from chain(l,dic,s) for item in yield_from_gen(): print(item,end='')chain和yield from
利用yield from完成股票的計算,yield from能夠完成一個委派生成器的作用,在子生成器和呼叫者之間建立起一個雙向通道。
def son_gen(): avg_num = 0 sum_num = 0 count = 1 while True: num = yield avg_num if num: sum_num += num avg_num = sum_num/count count += 1 else:break return avg_num def depute_gen(key): while True: ret = yield from son_gen() print(key,ret) def main(): shares_list= { 'sogou':[6.4,6.5,6.6,6.2,6.1,6.6,6.7], 'alibaba':[181.72,184.58,183.54,180,88,169.88,178.21,189.32], '美團':[59.7,52.6,47.2,55.4,60.7,66.1,74.0] } for key in shares_list: g = depute_gen(key) next(g) for v in shares_list[key]: g.send(v) g.send(None) main()
協程
概念
根據維基百科給出的定義,“協程 是為非搶佔式多工產生子程式的計算機程式元件,協程允許不同入口點在不同位置暫停或開始執行程式”。從技術的角度來說,“協程就是你可以暫停執行的函式”。如果你把它理解成“就像生成器一樣”,那麼你就想對了。
使用yield實現協程
#基於yield實現非同步 import time def consumer(): '''任務1:接收資料,處理資料''' while True: x=yield def producer(): '''任務2:生產資料''' g=consumer() next(g) for i in range(10000000): g.send(i) producer()
使用yield from實現的協程
import datetime import heapq # 堆模組 import types import time class Task: def __init__(self, wait_until, coro): self.coro = coro self.waiting_until = wait_until def __eq__(self, other): return self.waiting_until == other.waiting_until def __lt__(self, other): return self.waiting_until < other.waiting_until class SleepingLoop: def __init__(self, *coros): self._new = coros self._waiting = [] def run_until_complete(self): for coro in self._new: wait_for = coro.send(None) heapq.heappush(self._waiting, Task(wait_for, coro)) while self._waiting: now = datetime.datetime.now() task = heapq.heappop(self._waiting) if now < task.waiting_until: delta = task.waiting_until - now time.sleep(delta.total_seconds()) now = datetime.datetime.now() try: print('*'*50) wait_until = task.coro.send(now) print('-'*50) heapq.heappush(self._waiting, Task(wait_until, task.coro)) except StopIteration: pass
def sleep(seconds): now = datetime.datetime.now() wait_until = now + datetime.timedelta(seconds=seconds) print('before yield wait_until') actual = yield wait_until # 返回一個datetime資料型別的時間 print('after yield wait_until') return actual - now def countdown(label, length, *, delay=0): print(label, 'waiting', delay, 'seconds before starting countdown') delta = yield from sleep(delay) print(label, 'starting after waiting', delta) while length: print(label, 'T-minus', length) waited = yield from sleep(1) length -= 1 print(label, 'lift-off!') def main(): loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2), countdown('C', 4, delay=1)) start = datetime.datetime.now() loop.run_until_complete() print('Total elapsed time is', datetime.datetime.now() - start) if __name__ == '__main__': main()
await和async關鍵字
使用 async function
可以定義一個 非同步函式,在async關鍵字定義的函式中不能出現yield和yield from
# 例1 async def download(url): # 加入新的關鍵字 async ,可以將任何一個普通函式變成協程 return 'eva' ret = download('http://www.baidu.com/') print(ret) # <coroutine object download at 0x108b3a5c8> ret.send(None) # StopIteration: eva # 例2 async def download(url): return 'eva' def run(coroutine): try: coroutine.send(None) except StopIteration as e: return e.value coro = download('http://www.baidu.com/') ret = run(coro) print(ret)
async關鍵字不能和yield一起使用,引入coroutine裝飾器來裝飾downloader生成器。
await 操作符後面必須跟一個awaitable物件(通常用於等待一個會有io操作的任務), 它只能在非同步函式 async function
內部使用。
# 例3 import types @types.coroutine # 將一個生成器變成一個awaitable的物件 def downloader(url): yield 'aaa' async def download_url(url): # 協程 waitable = downloader(url) print(waitable) # <generator object downloader at 0x1091e2c78> 生成器 html = await waitable return html coro = download_url('http://www.baidu.com') print(coro) # <coroutine object download_url at 0x1091c9d48> ret = coro.send(None) print(ret)
asyncio模組
asyncio
是Python 3.4版本引入的標準庫,直接內建了對非同步IO的支援。
asyncio
的程式設計模型就是一個訊息迴圈。我們從asyncio
模組中直接獲取一個EventLoop
的引用,然後把需要執行的協程扔到EventLoop
中執行,就實現了非同步IO。
coroutine+yield from
import asyncio @asyncio.coroutine def hello(): print("Hello world!") # 非同步呼叫asyncio.sleep(1): r = yield from asyncio.sleep(1) print("Hello again!") # 獲取EventLoop: loop = asyncio.get_event_loop() # 執行coroutine loop.run_until_complete(hello()) loop.close()
async+await
import asyncio async def hello(): print("Hello world!") # 非同步呼叫asyncio.sleep(1): r = await asyncio.sleep(1) print("Hello again!") # 獲取EventLoop: loop = asyncio.get_event_loop() # 執行coroutine loop.run_until_complete(hello()) loop.close()
執行多個任務
import asyncio async def hello(): print("Hello world!") await asyncio.sleep(1) print("Hello again!") return 'done' loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([hello(),hello()])) loop.close()
獲取返回值
import asyncio async def hello(): print("Hello world!") await asyncio.sleep(1) print("Hello again!") return 'done' loop = asyncio.get_event_loop() task = loop.create_task(hello()) loop.run_until_complete(task) ret = task.result() print(ret)
執行多個任務獲取返回值
import asyncio async def hello(i): print("Hello world!") await asyncio.sleep(i) print("Hello again!") return 'done',i loop = asyncio.get_event_loop() task1 = loop.create_task(hello(2)) task2 = loop.create_task(hello(1)) task_l = [task1,task2] tasks = asyncio.wait(task_l) loop.run_until_complete(tasks) for t in task_l: print(t.result())
執行多個任務按照返回的順序獲取返回值
import asyncio async def hello(i): print("Hello world!") await asyncio.sleep(i) print("Hello again!") return 'done',i async def main(): tasks = [] for i in range(20): tasks.append(asyncio.ensure_future(hello((20-i)/10))) for res in asyncio.as_completed(tasks): result = await res print(result) loop = asyncio.get_event_loop() loop.run_until_complete(main())
asyncio使用協程完成http訪問
import asyncio async def get_url(): reader,writer = await asyncio.open_connection('www.baidu.com',80) writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n') all_lines = [] async for line in reader: data = line.decode() all_lines.append(data) html = '\n'.join(all_lines) return html async def main(): tasks = [] for url in range(20): tasks.append(asyncio.ensure_future(get_url())) for res in asyncio.as_completed(tasks): result = await res print(result) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) # 處理一個任務 loop.run_until_complete(asyncio.wait([main()])) # 處理多個任務 task = loop.create_task(main()) # 使用create_task獲取返回值 loop.run_until_complete(task) loop.run_until_complete(asyncio.wait([task]))
gevent模組實現協程
http://www.cnblogs.com/Eva-J/articles/8324673.html
&nbs