python並行程式設計 - 非同步篇
目錄1
介紹
除了線性
、並行
執行模式外,還有非同步
模式,它與事件程式設計一樣,十分重要
在併發的非同步模式中,不同的任務在時間線上是相互交錯的,而且一切都是在單一控制流(單執行緒)下進行的
1.asyncio (過時)
基本使用
1.1 使用asyncio實現事件迴圈管理
什麼是事件迴圈?
在計算系統中,能夠產生事件的實體被稱為事件源(event source),而負責協商管理事件的實體被稱為事件處理器(event handler)
它實現了管理計算程式碼中所有事件的功能:在程式執行期間事件迴圈不斷週期反覆,追蹤某個資料內部發生的事件,將其納入佇列,如果主執行緒空閒則呼叫事件處理器一個一個地處理這些事件
注:事件迴圈不能使用@asyncio.coroutine
標為協程
示例1:
延遲3秒後執行
import asyncio
import time
def A(x):
print(x)
time.sleep(1) # 使用run_forever()不能用ayncio.sleep()延時
loop.call_soon(B)
print('c')
def B():
print('b')
loop.stop()
loop = asyncio.get_event_loop()
# loop.call_soon(A, 'a')
loop. call_later(3.0, A, 'a')
loop.run_forever()
loop.close()
print('end')
輸出:
a
c
b
end
在A()中再利用loop呼叫其它函式B()時,A也並不停下來,實現協程效果
1.2使用asyncio實現協程
什麼是協程?
當程式變得冗長複雜時,將其劃分成子例程的方式會使處理變得更加便利,每個子例程完成一個特定的任務
子例程無法獨立執行,只能在主程式的要求下才能執行,主程式負責協調子例程的使用,協程就是子例程的泛化。在協程中,可以暫停執行點,同時保持干預時的本地狀態,便於後續繼續執行
協程相互交錯的控制組件就是事件迴圈,事件迴圈追蹤全部的協程,並安排其執行時間
協程的其它重要特點:
- 協程支援多個進入點,可以多次生成(yield)
- 協程能夠執行轉移至任何其它協程
生成(yield)這個術語用於描述那些暫停並將控制流傳遞給另一個協程的協程,協程可以同時傳遞控制流和值
示例2:
A()和B()類似並行
import asyncio
@asyncio.coroutine
def A():
print('a - start')
yield from asyncio.sleep(1)
print('a - end')
@asyncio.coroutine
def B(x):
print('b - start')
result = yield from C()
print(x)
yield from asyncio.sleep(1)
print(f'b :{result}')
@asyncio.coroutine
def C():
print('c - start')
yield from asyncio.sleep(1)
print('c - end')
return 'this is C return'
loop = asyncio.get_event_loop()
# loop.run_until_complete(A()) # 只執行一個
# loop.run_until_complete(asyncio.wait([A(), B('d')])) # 併發執行方法1
tasks = [asyncio.Task(A()), asyncio.Task(B('b - end'))]
loop.run_until_complete(asyncio.wait(tasks)) # 併發執行方法2
loop.close()
print('end')
# 類asyncio.Task(coroutine)用於排程協程的執行
# asyncio.wait(tasks)將等待給定協程執行完畢
輸出:
a - start
b - start
c - start
a - end
c - end
b - end
b :this is C return
end
分析:asyncio.sleep()
期間,主執行緒並未等待,而是去執行EventLoop
中可執行的coroutine
注:@asyncio.coroutine
把一個generator標記為coroutine
型別,再把這個coroutine放到EventLoop中執行(實測,可以不@標記)
相關方法
loop = get_event_loop() : 獲得當前上下文的事件迴圈
如果close()關閉了後,重新開啟需要以下操作:
loop = asyncio.new_event_loop() : 建立新的時間迴圈物件
asyncio.set_event_loop(loop) : 將當前上下文的時間迴圈設定為指定的迴圈
loop.call_soon(callback, *args) : 立即呼叫回撥物件,引數
loop.call_later(delay, callback, *args) : 延時delay秒後,呼叫回撥物件
loop.call_at(when, callback, *args) : 在指定的時間呼叫回撥物件,(when是絕對時間,可以參考loop.time()設定)
loop.run_forever() : 一直執行,直到呼叫stop()
loop.run_until_complete(future) : 執行指定的協程函式(Future3)
loop.time() : 獲取事件迴圈的內部時鐘
loop.close() : 關閉事件迴圈
loop.is_running() : 是否執行中
loop.is_close() : 是否關閉
使用示例
示例:
非同步網路並行訪問
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.baidu.com', 'www.aliyun.com', 'www.qq.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
2.async/await
2.1 客戶端使用
為了簡化標識非同步io,python3.5引入新語法async
、await
只需將2步替換:
asyncio.coroutine
->async
yield from
->await
示例:
import asyncio
@asyncio.coroutine
def A():
print('a')
yield from asyncio.sleep(1)
print('c')
loop = asyncio.get_event_loop()
tasks = [asyncio.Task(A())]
loop.run_until_complete(asyncio.wait(tasks)) # 併發執行方法2
loop.close()
print('end')
替換為:
import asyncio
async def A():
print('a')
await asyncio.sleep(1)
print('c')
loop = asyncio.get_event_loop()
tasks = [asyncio.Task(A())]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('end')
2.2 伺服器端使用
asyncio
可以實現單執行緒併發io操作,如果僅用於客戶端,效果不大
可以用在伺服器端,由於HTTP連線就是io操作,因此可以使用單執行緒+協程實現多使用者的高併發
asyncio
實現了TCP、UDP、SSL等協議,aiohttp則是基於asyncio實現的HTTP框架
示例:
啟動一個web服務,通過瀏覽器訪問localhost:8000
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body='<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text)
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
參考書籍:《Python並行程式設計手冊》 ↩︎
這篇主要參考:廖雪峰 - asyncio:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/00143208573480558080fa77514407cb23834c78c6c7309000 ↩︎
Future:是Asyncio的一個類,與concurrent.futures.Futures非常相似,Futures類代表一個還不可用的結果,它是對尚未完成的任務的抽象表示;Python 3.2引入concurrent.futures模組,支援管理併發程式設計任務,如程序池和執行緒池、非確定性執行流、多程序、執行緒同步(這個目前沒看出有什麼特別的,池化管理不是多執行緒和多程序庫自帶嗎?
concurrent.futures.ProcessPoolExecutor) ↩︎