1. 程式人生 > 實用技巧 >Python協程學習基本!!!

Python協程學習基本!!!

Python協程應該是我最後重點攻克難點,最近寫一個twitter的爬蟲,希望也能用上相關知道:

具體參考的連結:

非常適合小白的 Asyncio 教程:

https://mp.weixin.qq.com/s/BN4l_ek87_bKNe0SYSRFBg

Python中協程非同步IO(asyncio)詳解:

https://zhuanlan.zhihu.com/p/59621713

Python黑魔法 --- 非同步IO( asyncio) 協程:

https://www.jianshu.com/p/b5e347b3a17c

本筆記僅供個人參考,有需要可以去看原文。

協程的定義,需要使用async def語句。

In [1]: import asyncio                                                                                                                                         

In [2]: async def some_work(x):...                                                                                                                             

In [3]: print(asyncio.iscoroutinefunction(some_work))                                                                                                          
True

  通過asyncio.iscoroutinefunction的方法可以來檢查一個函式是否為協程函式, 協程函式前面要加上async

寫一個標準的協程函式

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

  asyncio.sleep也是一個協程,所以await asyncio.sleep(x)就是等待另一個協程。可參見asyncio.sleep的文件:

sleep(delay, result=None, *, loop=None)
Coroutine that completes after a given time (in seconds).

  

2 執行協程

呼叫協程函式,協程並不會開始執行,只是返回一個協程物件,可以通過asyncio.iscoroutine來驗證:

In [2]: async def some_work(x):...                                                                                                                             

In [3]: print(asyncio.iscoroutinefunction(some_work))                                                                                                          
True

In [4]: s = some_work(3)                                                                                                                                       

In [5]: asyncio.iscoroutine(s)                                                                                                                                 
Out[5]: True

  要執行協程物件,需要拿到當前loop,然後執行run_until_complete方法

In [7]: loop.run_until_complete?                                                                                                                               
Signature: loop.run_until_complete(future)
Docstring:
Run until the Future is done.

If the argument is a coroutine, it is wrapped in a Task.

WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.

Return the Future's result, or raise its exception.
File:      ~/opt/anaconda3/lib/python3.8/asyncio/base_events.py
Type:      method

  這個方法只接受引數是一個 future,但是我們這裡傳給它的卻是協程物件,是因為它在內部做了檢查,通過ensure_future函式把協程物件包裝(wrap)成了 future。

In [10]: async def do_some_work(x): 
    ...:     print('Waiting', x) 
    ...:     await asyncio.sleep(x) 
    ...:                                                                                                                                                       

In [12]: loop = asyncio.get_event_loop()                                                                                                                       

In [13]: loop.run_until_complete(do_some_work(3))                                                                                                              
Waiting 3

In [14]: loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))                                                                                       
Waiting 3

  

插入一個疑問,為什麼明明有了協程(coroutine),也能跑起來,但要有future/task這個玩意。

這裡有具體的回答:

https://stackoverflow.com/questions/34753401/difference-between-coroutine-and-future-task-in-python-3-5

Direct Answer:You don't needensure_futureif you don't need the results. They are good if you need the results or retrieve exceptions occurred.

import asyncio


async def slow_operation():
    await asyncio.sleep(1)
    return 'Future is done!'


def got_result(future):
    print(future.result())

    # We have result, so let's stop
    loop.stop()


loop = asyncio.get_event_loop()
task = loop.create_task(slow_operation())
task.add_done_callback(got_result)

# We run forever
loop.run_forever()

  書中的程式碼,當你包裝成一個task以後,就可以添加回調函式,回撥函式的第一個引數預設就是task本身,函式內部能夠呼叫到loop物件,真實神奇。

stackoverflow真是非常不錯的一個網站,第二個回答裡面又介紹了更有意思的。

import asyncio
import time

# coroutine function
async def p(word):
    print(f'{time.time()} - {word}')


async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')  # coroutine
    task2 = loop.create_task(p('create_task'))  # <- runs in next iteration
    await coro  # <-- run directly
    await task2

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

  執行結果

1539486251.7055213 - await
1539486251.7055705 - create_task
async def main():
    loop = asyncio.get_event_loop()
    coro = p('await')
    task2 = loop.create_task(p('create_task'))  # scheduled to next iteration
    await asyncio.sleep(1)  # loop got control, and runs task2
    await coro  # run coro
    await task2

 

如果在中間新增一個await asyncio.sleep,那task2會先執行

1539486378.5244057 - create_task
1539486379.5252144 - await  # note the delay

因為中間的await asyncip.sleep()會讓出控制權,task2已經在loop上面註冊了,所以task2先運行了。

Under the hood

loop.create_taskactually callsasyncio.tasks.Task(), which will callloop.call_soon. Andloop.call_soonwill put the task inloop._ready. During each iteration of the loop, it checks for every callbacks in loop._ready and runs it.

asyncio.wait,asyncio.ensure_futureandasyncio.gatheractually callloop.create_taskdirectly or indirectly.

Also note in thedocs:

Callbacks are called in the order in which they are registered. Each callback will be called exactly once.

上面的說明也介紹了,很多方法都是建立了task並註冊到loop上面。

繼續往下寫

回撥函式,前面的stackoverflow以前有了回撥函式的程式碼,這裡面我就不寫了。

回撥函式可以取到loop以及task本身,loop函式裡面就可以讀到,task需要預設的第一個引數。

多個協程

asyncio.gather可以在裡面新增多個future或者coroutines

Signature: asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
Docstring:
Return a future aggregating results from the given coroutines/futures.

Coroutines will be wrapped in a future and scheduled in the event
loop. They will not necessarily be scheduled in the same order as
passed in.

All futures must share the same event loop.  If all the tasks are
done successfully, the returned future's result is the list of
results (in the order of the original sequence, not necessarily
the order of results arrival).  If *return_exceptions* is True,
exceptions in the tasks are treated the same as successful
results, and gathered in the result list; otherwise, the first
raised exception will be immediately propagated to the returned
future.

Cancellation: if the outer Future is cancelled, all children (that
have not completed yet) are also cancelled.  If any child is
cancelled, this is treated as if it raised CancelledError --
the outer Future is *not* cancelled in this case.  (This is to
prevent the cancellation of one child to cause other children to
be cancelled.)

  接受的是不定長的引數。

gather起聚合的作用,把多個 futures 包裝成單個 future,因為loop.run_until_complete只接受單個 future。

5. run_until_complete和run_forever

import asyncio

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3)
loop.run_until_complete(coro)

  run_until_complete當loop裡面的task跑完了以後,就停止了。

import asyncio

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3)
asyncio.ensure_future(coro)

loop.run_forever()   # 一直阻塞在這裡

  

這個會一直阻塞。

async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')
    loop.stop()

  可以在協程函式裡面傳入loop,停止loop的執行,其實我發現不傳參也可以使用。

import asyncio

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

# 回撥函式必須傳入future引數。就好比例項方法的第一個引數是self一樣。
def done_callback(future):
    loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(1), do_some_work(3))
futus.add_done_callback(done_callback)

loop.run_forever()

  這裡我差點又卡住了,在回撥函式裡面,future引數是必須要填寫的,也就是第一個引數。loop可以通過functools.partial或者lambda的方式傳參進去,但回撥函式也可以直接讀取到執行的loop。

自定義了一個定時任務

import asyncio

async def timer(x, cb):
    futu = asyncio.ensure_future(asyncio.sleep(x))
    futu.add_done_callback(cb)
    await futu

t = timer(3, lambda futu: print('Done'))

loop = asyncio.get_event_loop()
loop.run_until_complete(t)

  通過將asyncio.sleep()協程函式包裝成future然後添加回調函式的方式,定時啟動指令碼。

以上是我https://mp.weixin.qq.com/s/BN4l_ek87_bKNe0SYSRFBg這個連結的筆記,確實講的比較少,也比較基礎。

還是stackoverflow讓我學到了不少。

這裡有asyncio.await與asynvio.gather的區別介紹

https://stackoverflow.com/questions/42231161/asyncio-gather-vs-asyncio-wait

相對來說,await入參為tasks的列表,但可以在方法內部設定具體的引數,返回兩對不同的列表,分別包含完成任務的tasks與沒有完成任務的tasks

以下是來至知乎連結的筆記

https://zhuanlan.zhihu.com/p/59621713,一些重複,或者我覺的已經理解或者不重要的將跳過。

前面講了一些Python協程相關基本知識,跳過了。這篇知道,我感覺有些地方可能不全,我就補一些我自己的理解。

獲取協程返回值

這裡,原作者介紹了,可以通過task.resule()或者回調函式或者返回值。

還有一種就是通過

res = loop.run_until_complete(task)

  loop執行後的返回值也可以得到結果。

後面作者介紹了asyncio.wait的一些簡單使用,而且寫法也特別不舒服。

import asyncio

async def coroutine_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    print('執行完畢name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = [loop.create_task(coroutine_example('Zarten_' + str(i))) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

for task in tasks:
    print(task.result())

loop.close()

 這是要所有協程返回值的寫法,用了asyncio.wait的寫法,但中間建立了一堆task,最後又從task取返回值實在太累了。

import asyncio

async def coroutine_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    print('執行完畢name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = [coroutine_example('Zarten_' + str(i)) for i in range(3)]
# 匯聚
wait_coro = asyncio.gather(*tasks)
# loop註冊跑起來
res = loop.run_until_complete(wait_coro)

for r in res:
    print(r)

loop.close()

  如果就簡單的收集結果,前面我剛學的asyncio.gather更加合適。

作者的回撥函式更加麻煩了。

後面作者一下跳躍到了動態新增協程,開闢了一個新的執行緒新增協程,跳躍的速度感覺向去哪裡複製了一篇文章過來。

主要介紹了,通過新開一條執行緒加入一個loop呼叫一些回撥函式,然後在loop裡面動態的新增一些future,感覺沒啥用,程式碼我也不抄寫了。

邏輯中還是有很多問題的。

import asyncio
from threading import Thread

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def thread_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    return '返回結果:' + name



new_loop = asyncio.new_event_loop()
t = Thread(target= start_thread_loop, args=(new_loop,))
t.start()

future = asyncio.run_coroutine_threadsafe(thread_example('Zarten1'), new_loop)
print(future.result())

asyncio.run_coroutine_threadsafe(thread_example('Zarten2'), new_loop)

print('主執行緒不會阻塞')

asyncio.run_coroutine_threadsafe(thread_example('Zarten3'), new_loop)

print('繼續執行中...')

  這裡作者說

從上面2個例子中,當主執行緒執行完成後,由於子執行緒還沒有退出,故主執行緒還沒退出,等待子執行緒退出中。若要主執行緒退出時子執行緒也退出,可以設定子執行緒為守護執行緒 t.setDaemon(True)

我不知道作者是筆誤還是不瞭解守護執行緒,當主執行緒退出,子執行緒全部關閉,裡面的loop都關了,你所有子執行緒的loop還執行個啥呢?

import asyncio
from threading import Thread
from collections import deque
import random
import time

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def consumer():
    while True:
        if dq:
            msg = dq.pop()
            # 函式內部動態,向指定事件迴圈提交一個協程。執行緒安全。
            if msg:
                asyncio.run_coroutine_threadsafe(thread_example('Zarten'+ msg), new_loop)


async def thread_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(2)
    return '返回結果:' + name



dq = deque()
# 註冊一個新的loop
new_loop = asyncio.new_event_loop()
# 開啟一個執行緒,讓loop跑起來一直跑
loop_thread = Thread(target= start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()

# 再開啟一個執行緒,去啟動consumer函式
consumer_thread = Thread(target= consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()

while True:
    i = random.randint(1, 10)
    dq.appendleft(str(i))
    time.sleep(2)

這是作者的程式碼,我不知道守護執行緒開著還有何用,主執行緒寫了死迴圈。但確實通過

asyncio.run_coroutine_threadsafe

的方法向loop動態新增協程還是不錯的一個方案。