1. 程式人生 > >12.asyncio併發程式設計

12.asyncio併發程式設計

1. 事件迴圈

  1. asyncio是python3.4以後引進的用於解決非同步io程式設計的一整套解決方案
  2. tornado、gevent、twisted(scrapy、django channels)都使用了asyncio
  • 講解asyncio的簡單使用:

import asyncio
import time


async def get_html(url):
    print('start get url')
    await asyncio.sleep(2)     # 必須加await實現協程   這裡asyncio.sleep(2)是一個子協程,time.sleep不能可await搭配. 
    print('end get url'
) if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # 開始事件迴圈 loop.run_until_complete(get_html('http://baidu.com')) # 阻塞式的, 類似多執行緒的join print(time.time() - start_time)

執行結果:

start get url
end get url
2.0011332035064697

如果將上面程式碼中await asyncio.sleep(2)

改為time.sleep(2)程式碼執行不會報錯,而且結果一樣。但在協程中 不要運行同步程式碼,只要有同步程式碼,協程併發效果立馬作廢。
我們以同時多個任務為例:
asyncio.sleep(2)

import asyncio
import time


async def get_html(url):
    print('start get url')
    await asyncio.sleep(2)     # 必須加await實現協程   這裡asyncio.sleep(2)是一個子協程,time.sleep不能可await搭配.
    print('end get url')


if
__name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # 開始時間迴圈 tasks = [get_html('http://baidu.com') for i in range(10)] loop.run_until_complete(asyncio.wait(tasks)) # 多工使用asyncio.await print(time.time() - start_time)

執行結果依然是2s:

start get url
start get url
start get url
start get url
start get url
start get url
start get url
start get url
start get url
start get url
end get url
end get url
end get url
end get url
end get url
end get url
end get url
end get url
end get url
end get url
2.004601001739502

只將await asyncio.sleep(2)改為time.sleep(2)執行結果如下:

start get url
end get url
start get url
end get url
start get url
end get url
start get url
end get url
start get url
end get url
start get url
end get url
start get url
end get url
start get url
end get url
start get url
end get url
start get url
end get url
20.024510860443115

明顯的使用time.sleep導致併發效果作廢。
所以類似的,在使用asyncio的時候,有很多庫的使用必須找到併發的asyncio版本,不要使用同步版的。

  • 如何獲取返回值

asyncio.ensure_future或者loop.create_task獲取future,再通過future獲取返回值。類似多執行緒
我更改上一段程式碼為例講解獲取返回值以及使用回撥函式:

import asyncio
import time
from functools import partial  
# 偏函式  專門用來解決 當以函式名作為傳入引數,但無法再傳入傳入函式的引數的問題。比如下面的add_done_callback


async def get_html(url):
    print('start get url')
    await asyncio.sleep(2)     # 必須加await實現協程   這裡asyncio.sleep(2)是一個子協程,time.sleep不能可await搭配.
    # time.sleep(2)  # 不會報錯, 但在協程裡不要使用同步的io操作
    print('end get url')
    return 'cannon'


# 回撥函式必須有個future傳入引數,否則會報錯。 這與add_done_callback函式有關
def callback(url, future):  # 回撥函式, 如果有傳入引數, 必須放在future前面。這點比較特殊
    print(url)


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()         # 開始時間迴圈
    get_future = asyncio.ensure_future(get_html('http://baidu.com')) # 類似多執行緒得到的future
    # 或者get_future = loop.create_task(get_html('http://baidu.com'))

    # get_future.add_done_callback(callback)   # 執行完以後執行callback
    get_future.add_done_callback(partial(callback, 'www.baidu.com'))   # callback如果要傳入引數, 使用partial實現
    loop.run_until_complete(get_future)
    print(get_future.result())    # futureresult方法得到返回值, 類似多執行緒ThreadPoolExecutor

執行結果:

start get url
end get url
www.baidu.com     # 回撥callback
cannon            # 獲取的返回值
  • 多工wait和gather方法

在區分time.sleep與await asyncio.sleep 在協程中的區別時, 講解使用了wait方法的多工:

loop.run_until_complete(asyncio.wait(tasks))

而gather的基本用法只需要這麼改:

loop.run_until_complete(asyncio.gather(*tasks))

task = asyncio.gather(*tasks)
loop.run_until_complete(asyncio.gather(task))

那麼gather與wait的區別是什麼呢?如何對它兩進行選擇呢?

gather更加high-level高層, gather除了多工外,還可以對任務進行分組。優先使用gather
程式碼舉例:

import asyncio
import time
from functools import partial


async def get_html(url):
    print('start get url', url)
    # 必須加await實現協程   這裡asyncio.sleep(2)是一個子協程,time.sleep不能可await搭配.
    await asyncio.sleep(2)
    # time.sleep(2)  # 不會報錯, 但在協程裡不要使用同步的io操作
    print('end get url')
    return 'cannon'


def callback(url, future):
    print(url)


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()         # 開始時間迴圈

    tasks1 = [get_html('http://baidu.com') for i in range(3)]
    tasks2 = [get_html('http://google.com') for i in range(3)]
    group1 = asyncio.gather(*tasks1)    # gather可以進行分組
    group2 = asyncio.gather(*tasks2)
    loop.run_until_complete(asyncio.gather(group1, group2))
    print(time.time() - start_time)

執行結果:

start get url http://baidu.com
start get url http://baidu.com
start get url http://baidu.com
start get url http://google.com
start get url http://google.com
start get url http://google.com
end get url
end get url
end get url
end get url
end get url
end get url
2.002768039703369

2. task取消和子協程呼叫原理

  • 取消task

程式碼講解:

import asyncio


async def get_html(sleep_times):
    print('waiting')
    await asyncio.sleep(sleep_times)
    print('done after {}s'.format(sleep_times))


if __name__ == '__main__':
    task1 = get_html(1)
    task2 = get_html(2)
    task3 = get_html(3)
    tasks = [task1, task2, task3]

    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:     # ctl + c  終止時,會進入
        all_tasks = asyncio.Task.all_tasks()   # 得到所有task
        for task in all_tasks:
            print('cancel task')
            print(task.cancel())  # stop成功會返回True
        loop.stop()            # 原始碼中只是_stop置為True
        loop.run_forever()   # stop之後必須呼叫run_forever,否則會報錯
    finally:
        loop.close()        # 會做很多終止工作, 詳情可看原始碼

執行過程中按ctl+c,得到結果:

waiting
waiting
waiting
done after 1s
done after 2s
^Ccancel task  # ctl + c 進入
True
cancel task
False     # task1已經執行完,所以無法取消
cancel task
False       # task2已經執行完,所以無法取消
cancel task
True        # task3取消完成
  • 子協程呼叫原理

來看一段官網的程式碼: 

在理解該程式碼後,我們再看協程的呼叫時序圖:


3. call_soon,call_at,call_later,call_soon_threadsafe的使用

  • call_soon 立即執行函式

import asyncio

def callback(sleep_times):
    print('sleep {} sucess'.format(sleep_times))

def stoploop(loop):  # 停止loop
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2)
    loop.call_soon(stoploop, loop)   # 停止forever
    loop.run_forever()

執行結果

sleep 2 sucess
  • call_later 按等待時間執行函式

import asyncio


def callback(sleep_times):
    print('sleep {} sucess'.format(sleep_times))


def stoploop(loop):   # 停止loop
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.call_later(1, callback, 2)     # 1秒執行
    loop.call_later(2, callback, 1)     # 2秒執行
    loop.call_later(3, callback, 3)     # 3秒執行
    loop.call_later(4, stoploop, loop)  # 4秒執行, 並停止loop
    loop.call_soon(callback, 4)    # 最早執行, call_soon先於call_later
    loop.run_forever()

執行結果:

sleep 4 sucess
sleep 2 sucess
sleep 1 sucess
sleep 3 sucess
  • call_at 在指定時間執行


            
           

相關推薦

12.asyncio併發程式設計

1. 事件迴圈asyncio是python3.4以後引進的用於解決非同步io程式設計的一整套解決方案tornado、gevent、twisted(scrapy、django channels)都使用了asyncio講解asyncio的簡單使用:import asyncio i

12-Java併發程式設計:阻塞佇列

Java併發程式設計:阻塞佇列   在前面幾篇文章中,我們討論了同步容器(Hashtable、Vector),也討論了併發容器(ConcurrentHashMap、CopyOnWriteArrayList),這些工具都為我們編寫多執行緒程式提供了很大的方便。今天我們來討

12併發程式設計

1.基本併發函式 1)Pid=spawn(Mod,Func,Args) : 建立一個新的併發程序來執行Mode模組中定義的Func()函式,args為引數 2)Pid ! Message : 像識別符號為Pid的程序傳送訊息Message.訊息傳送時非同步的,傳送方並不等待,而是會繼

Python 高階程式設計asyncio併發程式設計

Python 高階程式設計之 asyncio併發程式設計 1. asyncio 簡介 1.1 協程與 asyncio 1.2 例子 1. asyncio 簡介 1.1 協程與 asyncio 協

12.併發程式設計--Queue

併發程式設計--Queue Queue - 非阻塞佇列 - 阻塞佇列 Queue是一種特殊的線性表,它只允許在表的前端(front)進行刪除操作,而在表的後端(rear)進行插入操作。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。佇列中沒有元素時,稱為空佇列。 在佇列這種資料結構中

Java程式設計思想 第四版(手碼原書+菜鳥筆記) 第一章 1.12 併發程式設計

文中筆記均為個人觀點,如有錯誤請大家不吝指出,謝謝! 原書為《java程式設計思想 第四版 中文版》 在計算機程式設計中有一個基本概念,就是在同一時刻處理多個任務的思想。 許多程式設計問題都要求,程式能夠停下正在做的工作,轉而處理某個其

【搞定Java併發程式設計】第12篇:happens-before

 上一篇:final域的記憶體語義:https://blog.csdn.net/pcwl1206/article/details/84925372 目  錄: 1、JMM的設計 2、happens-before的定義 3、happens-before規則

2018.12.23 網路程式設計併發程式設計和proxylab

最後兩章的內容還蠻多的,而且程式碼很密,需要去讀才能做proxylab proxylab做的很惱火,因為需要去調,我最後沒做完,因為報頭資訊懶得弄了,part 3 cach的部分也沒做。。。 下面這段是通不過測試的,因為只能響應特定型別的請求格式,或者說能夠響應的請求型別比lab要求的還少。。。以後有時間

多執行緒高併發程式設計(12) -- 阻塞演算法實現ArrayBlockingQueue原始碼分析(1)

一.前言   前文探究了非阻塞演算法的實現ConcurrentLinkedQueue安全佇列,也說明了阻塞演算法實現的兩種方式,使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現,今天來探究下ArrayBlocking

多執行緒高併發程式設計(12) -- 阻塞演算法實現ArrayBlockingQueue原始碼分析

一.前言   前文探究了非阻塞演算法的實現ConcurrentLinkedQueue安全佇列,也說明了阻塞演算法實現的兩種方式,使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現,今天來探究下ArrayBlocking

併發程式設計面試必備:JUC 中的 Atomic 原子類總結

個人覺得這一節掌握基本的使用即可! 本節思維導圖: 1 Atomic 原子類介紹 Atomic 翻譯成中文是原子的意思。在化學上,我們知道原子是構成一般物質的最小單位,在化學反應中是不可分割的。在我們這裡 Atomic 是指一個操作是不可中斷的。即使是在多個執行緒一起執行的時

【轉】Java併發程式設計:同步容器

  為了方便編寫出執行緒安全的程式,Java裡面提供了一些執行緒安全類和併發工具,比如:同步容器、併發容器、阻塞佇列、Synchronizer(比如CountDownLatch)。今天我們就來討論下同步容器。   一、為什麼會出現同步容器?   在Java的集合容器框架中,主要有四大類別:Li

《JAVA併發程式設計實戰》避免活躍性危險

文章目錄 死鎖 鎖順序死鎖 動態的鎖順序死鎖 在協作物件之間發生的死鎖 開放呼叫 資源死鎖 死鎖的避免和診斷 支援定時的鎖 使用執行緒轉儲資訊來分析死鎖 其他活躍性危

《JAVA併發程式設計實戰》取消和關閉

文章目錄 引言 任務取消 中斷 中斷策略 響應中斷 示例:計時執行 通過Future來實現取消 處理不可中斷的阻塞 採用newTaskFor封裝非標準的取消 停止基於執行緒的服務

《JAVA併發程式設計實戰》任務執行

文章目錄 線上程中執行任務 序列執行任務 顯式的為任務建立執行緒 無限制建立執行緒的不足 Executor框架 示例:基於Executor的Web伺服器 執行策略 執行緒池 Exe

《JAVA併發程式設計實戰》基礎構建模組

文章目錄 同步容器類 同步容器類的問題 迭代器和ConcurrentModificationException 隱藏迭代器 併發容器 ConcurrentHashMap 額外的原子Map操作

《JAVA併發程式設計實戰》物件的組合

文章目錄 設計執行緒安全的類 找出構成物件狀態的所有變數 示例 找出約束狀態變數的不變性條件 例項封閉 java監視器模式 示例:車輛追蹤 執行緒安全性的委託

java併發程式設計之利用CAS保證操作的原子性

import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class Counter { private AtomicInteger at

Java併發程式設計之CyclicBarrier

CyclicBarrier可以控制這樣的場景: 對多個執行緒,他們執行自己程式碼(執行run方法)的時間不一樣; 比如有3個執行緒,其run方法執行時間分別為1s, 2s, 3s。如果我們想在三個執行緒都完成自己的任務時執行相應的操作,CyclicBarrier就派上用場了。 寫了一

Java併發程式設計高階技術-高效能併發框架原始碼解析與實戰(資源同步)

第1章 課程介紹(Java併發程式設計進階課程) 什麼是Disruptor?它一個高效能的非同步處理框架,號稱“單執行緒每秒可處理600W個訂單”的神器,本課程目標:徹底精通一個如此優秀的開源框架,面試秒殺面試官。本章會帶領小夥伴們先了解課程大綱與重點,然後模擬千萬,億級資料進行壓力測試。讓大