1. 程式人生 > >python -- 非同步I/O

python -- 非同步I/O

非同步操作目的是使用一個主執行緒實現多個協程併發執行

定義一個函式,該函式是一個消費者的生成器,使用裝飾器@asyncio.coroutine 把生成器變成協程,協程的程式是可以中斷再執行的,非同步操作需要在協程中中通過yield from完成;yield from語法可以讓我們方便地呼叫另一個generator,生成器既可以是生產者也可以是消費者,當一個任務的程式執行到這裡時,會中斷程式,呼叫其他的生成器,yield返回值是需要時間的,但是主執行緒是不會等待這個任務結束的,而是會去event_loop中執行其他可執行的任務,等到yield from拿到值時,這時會繼續執行中斷的程式

非同步操作是通過yield from 來實現的 當程式執行到該語句時,會由於避免等待而去執行其他任務,yield from 後面可以跟攜程或者生成器
當哪個攜程執行不了了, 執行緒會先執行其他的協程

import asyncio
import threading

# 把生成器變成協程
@asyncio.coroutine
def countdown(name, num):
    # 列印當前的執行緒
    print(threading.current_thread())
    while num > 0:
        # 列印任務的名稱 和 名字 這一句是為了驗證任務的並
        發執行,任務1 執行到yield from會中斷程式,執行緒會
        執行任務2 任務2 依然打印出名字 之後 執行到 yield
        from 中斷程式, 執行任務3
,這時就會把任務123 的名字同時打印出來,等到任務yield 拿到值時中斷的程式 會接著往下執行 print((f'Countdown[{name}]: {num}')) # 非同步執行 - 非阻塞式 asyncio.sleep()也是一個 協程 非同步執行靠的就是yield from 執行其他的協程, 執行其他協程時需要耗費時間,此時程式會終止到這裡, 但是非同步執行時,是不會阻塞在這裡的, 主執行緒並未等待, 因此可以實現併發執行任務 這裡yield
得到的值是None, 如果換成是真正的I/O操作這裡就可以得到返回值了 yield from asyncio.sleep(1) # 同步執行 - 阻塞式 如果把休息時間寫成這樣的話, 程式執 行到這裡會卡在這裡,形成阻塞,也不會去執行其他可執行的任 務了 # time.sleep(1) num -= 1 def main(): # 通過下面的event_loop實現多工的併發執行 loop = asyncio.get_event_loop() # 非同步I/O - 雖然只有一個執行緒但是兩個任務相互之間不阻塞 tasks = [ countdown('A', 10), countdown('B', 5) ] loop.run_until_complete(asyncio.wait(tasks)) # 任務執行完了,關閉迴圈 loop.close() if __name__ == '__main__': main()

asyncio.sleep(1), 可以看做是一個耗時一秒的I/O操作,在此期間,主執行緒並未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現併發執行。
如果把asyncio.sleep()換成真正的IO操作,則多個coroutine就可以由一個執行緒併發執行,具體操作看廖雪峰官網

async/await

用asyncio提供的@asyncio.coroutine可以把一個generator標記為coroutine型別,然後在coroutine內部用yield from呼叫另一個coroutine實現非同步操作。

為了簡化並更好地標識非同步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的程式碼更簡潔易讀
請注意,async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:

把@asyncio.coroutine替換為async;
把yield from替換為await。
@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")
新用法:
async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

例項示範:

import asyncio
import requests
import time

async def download(url):
    print('Fetch:', url)
    # 聯網下載可能耗時, 非同步取, 加上yield變成了協程
    # time.sleep(10) 阻塞式的
    # 程式執行到這裡 中斷3秒 上面的print併發執行
    await asyncio.sleep(3)
    resp = requests.get(url)
    print(resp.status_code)
    # 程式執行到這裡中斷5秒 上面的print併發執行
    await asyncio.sleep(5)
    print(url)
    # print(resp.headers)


def main():
    loop = asyncio.get_event_loop()
    urls = [
        'https://www.baidu.com',
        'http://www.sohu.com',
        'http://www.sina.com',
        'https://www.taobao.com',
        'http://www.qq.com'
    ]
    # 啟動下載任務
    tasks = [download(url) for url in urls]
    # 非同步執行任務
    loop.run_until_complete(asyncio.wait(tasks))


if __name__ == '__main__':
    main()

程式執行結果如下圖:
這裡寫圖片描述

aiohttp

例項示範:

import asyncio
import aiohttp


async def download(url):
    print('Fetch:', url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(url, '--->', resp.status)
            print(url, '--->', resp.cookies)
            # \n\n意思是隔兩個空行列印一次
            # await 列印可能耗時, 中斷非同步執行, 
            # print('\n\n', await resp.text())


def main():
    loop = asyncio.get_event_loop()
    urls = [
        'https://www.baidu.com',
        'http://www.sohu.com',
        'http://www.sina.com',
        'https://www.taobao.com',
        'http://www.qq.com'
    ]
    tasks = [download(url) for url in urls]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()


if __name__ == '__main__':
    main()