1. 程式人生 > >python中的協程(一)

python中的協程(一)

協程

協程概念及目的

1、協程:

單執行緒實現併發、在應用程式裡控制多個任務的切換+儲存狀態

優點:

應用程式級別速度要遠遠高於作業系統的切換

缺點:

多個任務一旦有一個阻塞沒有切,整個執行緒都阻塞在原地,該執行緒內的其他的任務都不能執行了

一旦引入協程,就需要檢測單執行緒下所有的IO行為, 實現遇到IO就切換,少一個都不行,以為一旦一個任務阻塞了,整個執行緒就阻塞了, 其他的任務即便是可以計算,但是也無法運行了

2、協程式的目的:

想要在單執行緒下實現併發,主要用於io密集型

併發指的是多個任務看起來是同時執行的

併發=切換+儲存狀態

3、協程相比於執行緒:

最大的區別在於,協程不需要像執行緒那樣來回的中斷切換,也不需要執行緒的鎖機制,因為執行緒中斷或者鎖機制都會對效能問題造成影響,所以協程的效能相比於執行緒,效能有明顯的提高,尤其線上程越多的時候,優勢越明顯。


複習生成器

生成器的一個作用是類似於迭代器,每次迭代的值為yield右值(重點),還有一種是利用yield斷點,然後切換到另一個任務,下面是第一種用法的複習

def f(maxx):
    n, a, b = 0, 1, 1
    while n < maxx:
        # print(b)
        y = yield b
        a, b = b, a + b
        n += 1
        print(y)
    return 'error_name'  # 原函式的return變成了迭代完報出錯誤的值(value)


fi = f(6)  # 將一個函式變成生成器,並賦值給fi,每次迭代的值都是yield右邊的值
print(fi.__next__())  # 執行一次生成器,到yield處中斷,執行下面的程式
print(fi.send('Done'))  # 回到第一次執行的生成器的yield中斷處,並把Done賦予yield,然後執行下面的程式,到yield再次停止
fi.send('Done')  # 如果這句換成print(fi.send('Done')),則會輸出3
print(fi.send('Done'))  # next和send的返回值都是fi迭代器本次的值

########################
1
Done
2
Done
Done
5

這裡有一點要注意,要啟用一個生成器,一定要呼叫next()方法或者send(None)啟動生成器,而不是呼叫生成器的send()方法,如果直接呼叫send()方法會報錯,同時在生成器這裡,最先呼叫 next() 函式這一步通常稱為“預激”(prime)協程(即,讓協程向前執行到第一個 yield 表示式,準備好作為活躍的協程使用)。

從Python2.5 開始,我們可以在生成器上呼叫兩個方法,顯式的把異常發給協程。 這兩個方法是throw和close

generator.throw詳解

generator.throw(exc_type[, exc_value[, traceback]])

外層程式碼呼叫throw使生成器在暫停的yield表示式處丟擲指定的異常。如果生成器處理了丟擲的異常,程式碼會向前(迴圈)執行到下一個yield表示式,而產出的值會成為呼叫throw方法得到的返回值。如果沒有處理,則向上冒泡(傳遞給外層,值傳遞給外一層)。

def myGenerator():
    value = 1
    while True:
        yield value
        value += 1
 
 
gen = myGenerator()
print gen.next()
print gen.next()
print gen.throw(Exception, "Method throw called!")

  輸出結果為

1
2
Traceback (most recent call last):
  File "test.txt", line 11, in <module>
    print gen.throw(Exception, "Method throw called!")
  File "test.txt", line 4, in myGenerator
    yield value
Exception: Method throw called!

  外層程式碼的最後一句向生成器物件丟擲了一個異常。但是,在生成器物件的方法時沒有處理該異常的程式碼,因此異常會被丟擲到主方法,主方法任然未處理,最終報錯。

下面的示例中,添加了處理異常的程式碼

def myGenerator():
    value = 1
    while True:
        try:
            yield value
            value += 1
        except:
            value = 1
 
 
gen = myGenerator()
print gen.next()
print gen.next()
print gen.throw(Exception, "Method throw called!")

  程式碼的輸出如下

1
2
1
Exception RuntimeError: 'generator ignored GeneratorExit' in <generator object myGenerator at 0x00000000028BB900> ignored
  上面輸出中,第2個1是gen.throw方法的返回值。在執行完該方法後,生成器物件方法的while迴圈並沒有結束,也即是說生成器方法的執行還沒有結束。這個時候如果強制結束主程式,會丟擲一個RuntimeError(這裡不懂下面在介紹)。也就是上面輸出的第4行。要優雅地關閉主程式,需要用到生成器物件的close方法。下面繼續介紹。

GeneratorExit異常

當一個生成器物件被銷燬時,或者生成器遇到異常退出時,會丟擲一個GeneratorExit異常。請看下面的程式碼。
def myGenerator():  
    try:
        yield 1
    except GeneratorExit:
        print "myGenerator exited"
 
gen = myGenerator()
print gen.next()

輸出結果為
1
myGenerator exited

  上面程式碼的執行邏輯如下: 當呼叫到gen.next()方法時,會執行生成器物件方法的yield語句。此後,主程式結束,系統會自動產生一個GeneratorExit異常,被生成器物件方法的Except語句塊截獲。

  值得一提的是,GeneratorExit異常只有在生成器物件被啟用後,才有可能產生。更確切的說,需要至少呼叫一次生成器物件的next方法後,系統才會產生GeneratorExit異常。請看下面的程式碼。

def myGenerator():  
    try:
        yield 1
        yield 2
    except GeneratorExit:
        print "myGenerator exited"
 
gen = myGenerator()
del gen
print "Main caller exited"

  其輸出結果如下:

Main caller exited

  在上面的示例中,我們都顯式地捕獲了GeneratorExit異常。如果該異常沒有被顯式捕獲,生成器物件也不會把該異常向主程式丟擲。因為GeneratorExit異常定義的初衷,是方便開發者在生成器物件呼叫結束後定義一些收尾的工作,如釋放資源等。

generator.close()

  生成器物件的close方法會在生成器物件方法的掛起處丟擲一個GeneratorExit異常。GeneratorExit異常產生後,系統會繼續把生成器物件方法後續的程式碼執行完畢。參見下面的程式碼。

def myGenerator():  
    try:
        yield 1
        print "Statement after yield"
    except GeneratorExit:
        print "Generator error caught"
 
    print "End of myGenerator"
 
gen = myGenerator()
print gen.next()
gen.close()
print "End of main caller"

程式碼執行過程如下:

  • 當呼叫gen.next方法時,會啟用生成器,直至遇到生成器方法的yield語句,返回值1。同時,生成器方法的執行被掛起。
  • 當呼叫gen,close方法時,恢復生成器方法的執行過程。系統在yield語句處丟擲GeneratorExit異常,執行過程跳到except語句塊。當except語句塊處理完畢後,系統會繼續往下執行,直至生成器方法執行結束。

程式碼的輸出如下:

1
Generator error caught
End of myGenerator
End of main caller

  需要注意的是,GeneratorExit異常的產生意味著生成器物件的生命週期已經結束。因此,一旦產生了GeneratorExit異常,生成器方法後續執行的語句中,不能再有yield語句,否則會產生RuntimeError。請看下面的例子。

def myGenerator():  
    try:
        yield 1
        print "Statement after yield"
    except GeneratorExit:
        print "Generator error caught"
 
    yield 3
 
gen = myGenerator()
print gen.next()
gen.close()
print "End of main caller"


輸出結果為
1
Generator error caught
Traceback (most recent call last):
  File "test.txt", line 12, in <module>
    gen.close()
RuntimeError: generator ignored GeneratorExit

  注意,由於RuntimError會向主方法丟擲,因此主方法最後的print語句沒有執行。

  有了上面的知識,我們就可以理解為什麼下面的程式碼會丟擲RuntimError錯誤了。

def myGenerator():  
    value = 1  
    while True:  
        try:  
            yield value  
            value += 1  
        except:  
            value = 1  
  
  
gen = myGenerator()  
print gen.next()  
print gen.next()  
print gen.throw(Exception, "Method throw called!")  

  上面程式碼中,當主程式結束前,系統產生GeneratorExit異常,被生成器物件方法的except語句捕獲,但是此時while語句還沒有退出,因此後面還會執行“yield value”這一語句,從而發生RuntimeError。要避免這個錯誤非常簡單,請看下面的程式碼。

def myGenerator():  
    value = 1  
    while True:  
        try:  
            yield value  
            value += 1  
        except Exception:  
            value = 1  
  
  
gen = myGenerator()  
print gen.next()  
print gen.next()  
print gen.throw(Exception, "Method throw called!")  

  程式碼第7行的except語句宣告只捕獲Exception異常物件。這樣,當系統產生GeneratorExit異常後,不再被except語句捕獲,繼續向外丟擲,從而跳出了生成器物件方法的while語句。

  這裡再簡單說一句,GeneratorExit異常繼承自BaseException類。BaseException類與Exception類不同。一般情況下,BaseException類是所有內建異常類的基類,而Exception類是所有使用者定義的異常類的基類。

生成器和協程

生成器和協程都是通過python中的yield的關鍵字實現的,不同的是,生成器的任務指向不可控,協程可以理解為任務指向可控的生成器

寫成是:程式設計師可控制的併發流程,不管是程序還是執行緒,其切換都是作業系統在排程,而對於協程,程式設計師可以控制什麼時候切換出去,什麼時候切換回來

生成器可以作為協程使用。協程是指一個過程,這個過程與呼叫方協作,產出由呼叫方提供的值。

儘管生成器和協程看起來很像,但是它們代表的卻是完全不同的設計理念。生成器是用來生成資料的,而協程從某種意義上來說是消耗資料的。協程和迭代無關,儘管協程也會用next來獲取資料,但是協程和迭代無關,不要嘗試像使用生成器那樣去迭代地使用協程。


協程(生成器)有四種狀態,分別是

GEN_CREATED:等待執行

GEN_RUNNING:直譯器執行

GEN_SUSPENDED:在yield表示式處暫停

GEN_CLOSED:執行結束

協程(生成器)的狀態可以用inspect.getgeneratorstate()函式來確定,這裡是:inspect模組官方文件

中文方面參考如下:https://blog.csdn.net/qq_26398495/article/details/80109689

來看下面的例子:

from inspect import getgeneratorstate
from time import sleep
import threading


def get_state(coro):
    print("其他執行緒生成器狀態:%s", getgeneratorstate(coro))  # <1>


def simple_coroutine():
    for i in range(3):
        sleep(0.5)
        x = yield i + 1  # <1>


my_coro = simple_coroutine()
print("生成器初始狀態:%s" % getgeneratorstate(my_coro))  # <2>
first = next(my_coro)
for i in range(5):
    try:
        my_coro.send(i)
        print("主執行緒生成器初始狀態:%s" % getgeneratorstate(my_coro))  # <3>
        t = threading.Thread(target=get_state, args=(my_coro,))
        t.start()
    except StopIteration:
        print("生成器的值拉取完畢")
print("生成器最後狀態:%s" % getgeneratorstate(my_coro))  # <4>

#################
生成器初始狀態:GEN_CREATED
主執行緒生成器初始狀態:GEN_SUSPENDED
其他執行緒生成器狀態:%s GEN_SUSPENDED
主執行緒生成器初始狀態:GEN_SUSPENDED
其他執行緒生成器狀態:%s GEN_SUSPENDED
生成器的值拉取完畢
生成器的值拉取完畢
生成器的值拉取完畢
生成器最後狀態:GEN_CLOSED

  裝飾器加yield實現協程,生成器可實現自動迴圈,加上異常捕捉,生成器中的return為異常stopitertions的值

from functools import wraps
 
 
def coroutine(func):
    @wraps(func)
    def primer(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
 
    return primer
 
 
@coroutine
def averager():
    total = .0
    count = 0
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average = total / count
 
 
try:
    coro_avg = averager()  # 預激裝飾器
    print(coro_avg.send(10))
    print(coro_avg.send(20))
    print(coro_avg.send(30))
    coro_avg.close()
    print(coro_avg.send(40))
except StopIteration:
    print("協程已結束")
except TypeError:
        print("傳入值異常")

####################
10.0
15.0
20.0
協程已結束

協程(生成器)的返回值

之前我們知道,生成器的返回值是異常StopIteration的值,具體獲取值的方式可以如此

from collections import namedtuple

Result = namedtuple('Result', 'count average')


def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield  # yield右邊是每次迭代的值,不寫則為None
        if term is None:
            break  # 為了返回值,協程必須正常終止;這裡是退出條件
        total += term
        count += 1
        average = total/count
    # 返回一個namedtuple,包含count和average兩個欄位。在python3.3前,如果生成器返回值,會報錯
    return Result(count, average)

>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(20) # 並沒有返回值
>>> coro_avg.send(30)
>>> coro_avg.send(40)
>>> try:
...     coro_avg.send(None)
... except StopIteration as exc:
...     result = exc.value
...
>>> result
Result(count=3, average=30)

  這裡獲取返回值的方法很繁瑣,下面引出yield from。

yield from 初解

yield from 結果會在內部自動捕獲StopIteration 異常。這種處理方式與 for 迴圈處理StopIteration異常的方式一樣。 對於yield from 結構來說,直譯器不僅會捕獲StopIteration異常,還會把value屬性的值變成yield from 表示式的值。

在函式外部不能使用yield from(yield也不行)。

yield from 是 Python3.3 後新加的語言結構。和其他語言的await關鍵字類似,它表示:*在生成器 gen 中使用 yield from subgen()時,subgen 會獲得控制權,把產出的值傳給gen的呼叫方,即呼叫方可以直接控制subgen。於此同時,gen會阻塞,等待subgen終止,subgen是子生成器。

yield from 可用於簡化for迴圈中的yield表示式。

例如:

def gen():
    for c in 'AB':
        yield c
    for i in range(1, 3):
        yield i

list(gen())
['A', 'B', '1', '2']

可以改寫為:

def gen():
    yield from 'AB'
    yield from range(1, 3)
    
list(gen())
['A', 'B', '1', '2']

  還有一個稍微複雜點的例子

from collections import Iterable

def flatten(items, ignore_types=(str, bytes)):
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, ignore_types):
            # yield from x(subgen)表示式對x物件做的第一件事是,呼叫iter(x),獲取迭代器,所以要求x是可迭代物件,每次迭代的值為yield的右值。
            yield from flatten(x)  # 這裡遞迴呼叫,如果flatten(x)中引數x是可迭代物件,繼續分解
            print('委派器從yield from阻塞中還原')
        else:
            yield x  # 這裡是每次迭代的值,直接傳給呼叫者,跳過(不經過)委派者管道


items = [1, 2, [3, 4, [5, 6], 7], 8]

# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):  # 第一步yield的右值
    print(x)  # x為子生成器中每次迭代的
    print('呼叫者對委派器進行了一次迭代,委派器返回一個值')

items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
for x in flatten(items):
    print(x)

  這裡的註釋看不懂先往下看,理解了yield from的委派器、管道作用就好理解了。

  上面的例子如果進行了斷點除錯的話,你看你會發現在子生成器(subgen)中,yield x直接能吧值傳給最外層的for迴圈,這就涉及到下面要說的yield from是連線子生成器和呼叫者的一個通道

 yield from的作用

PEP380 的標題是 ”syntax for delegating to subgenerator“(把指責委託給子生成器的句法)。由此我們可以知道,yield from是可以實現巢狀生成器的使用。注意,使用 yield from 句法呼叫協程時,會自動預激。

yield from 的主要功能是開啟雙向通道,把最外層的呼叫方與最內層的子生成器連線起來,使兩者可以直接傳送和產出值,還可以直接傳入異常,而不用在中間的協程新增異常處理的程式碼。

yield from 包含幾個概念:

  • 委派生成器

包含yield from 表示式的生成器函式

  • 子生成器

從yield from 部分獲取的生成器。

  • 呼叫方

呼叫委派生成器的客戶端(呼叫方)程式碼

這個示意圖(圖一)是對yield from 的呼叫過程

上面的圖難以理解可以看下面這個簡易圖

  這兩張圖說明了,委派生成器可以理解為管道,send和yield直接在呼叫方和子生成器(subgen)進行互動,send可以直接把值由呼叫方“穿過(不經過)“委派生成器直接傳遞給子生成器,子生成器的yield也可以把yield右邊的值穿過(不經過)委派生成器直接傳遞給呼叫者。

  yield from執行的全過程可以概括為:委派生成器在 yield from 表示式處暫停時,呼叫方可以直接把資料發給字生成器,子生成器再把產出的值傳送給呼叫方。子生成器返回之後,直譯器會丟擲StopIteration異常,並把返回值附加到異常物件上,這時委派生成器恢復。

  下面看一個例子來理解yield from的執行方式,這段程式碼從一個字典中讀取男生和女生的身高和體重。然後把資料傳給之前定義的 averager 協程,最後生成一個報告。

from collections import namedtuple

Result = namedtuple('Result', 'count average')

# 子生成器
# 這個例子和上邊示例中的 averager 協程一樣,只不過這裡是作為子生成器使用
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        # main 函式傳送資料到這裡 
        term = yield  # yield右邊如果有資料,則這個資料會直接傳遞給main,main中委派生成器迭代一次,本次迭代的值是yield的右值
        if term is None: # 終止條件
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average) # 返回的Result 會成為grouper函式中yield from表示式的值


# 委派生成器
def grouper(results, key):
     # 這個迴圈每次都會新建一個averager 例項,每個例項都是作為協程使用的生成器物件
    while True:
        # grouper 傳送的每個值都會經由yield from 處理,通過管道傳給averager 例項。grouper會在yield from表示式處暫停,等待averager例項處理客戶端發來的值。averager例項執行完畢後,返回的值繫結到results[key] 上。while 迴圈會不斷建立averager例項,處理更多的值。
        results[key] = yield from averager()


# 呼叫方
def main(data):
    results = {}
    for key, values in data.items():
        # group 是呼叫grouper函式得到的生成器物件,傳給grouper 函式的第一個引數是results,用於收集結果;第二個是某個鍵
        group = grouper(results, key)
        next(group)
        for value in values:
            # 把各個value傳給grouper 傳入的值最終到達averager函式中;
            # grouper並不知道傳入的是什麼,同時grouper例項在yield from處暫停
            group.send(value)
        # 把None傳入groupper,傳入的值最終到達averager函式中,導致當前例項終止。然後繼續建立下一個例項。
        # 如果沒有group.send(None),那麼averager子生成器永遠不會終止,委派生成器也永遠不會在此啟用,也就不會為result[key]賦值
        group.send(None)
    report(results)


# 輸出報告
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))


data = {
    'girls;kg':[40, 41, 42, 43, 44, 54],
    'girls;m': [1.5, 1.6, 1.8, 1.5, 1.45, 1.6],
    'boys;kg':[50, 51, 62, 53, 54, 54],
    'boys;m': [1.6, 1.8, 1.8, 1.7, 1.55, 1.6],
}

if __name__ == '__main__':
    main(data)

  執行結果為

6 boys  averaging 54.00kg
6 boys  averaging 1.68m
6 girls averaging 44.00kg
6 girls averaging 1.58m

  這斷程式碼展示了yield from 結構最簡單的用法。委派生成器相當於管道,所以可以把任意數量的委派生成器連線在一起—一個委派生成器使用yield from 呼叫一個子生成器,而那個子生成器本身也是委派生成器,使用yield from呼叫另一個生成器。最終以一個只是用yield表示式的生成器(或者任意可迭代物件)結束。

 yield from 的意義

PEP380 分6點說明了yield from 的行為。

  • 子生成器產出的值(yield右值)都直接傳給委派生成器的呼叫方(客戶端程式碼)
  • 使用send() 方法發給委派生成器的值都直接傳給子生成器。如果傳送的值是None,那麼會呼叫子生成器的 next()方法。如果傳送的值不是None,那麼會呼叫子生成器的send()方法。如果呼叫的方法丟擲StopIteration異常,那麼委派生成器恢復執行(從yield from阻塞中恢復)。任何其他異常都會向上冒泡(向外層傳遞),傳給委派生成器。
  • 生成器退出時,生成器(或子生成器)中的return expr 表示式會觸發 StopIteration(expr) 異常丟擲。
  • yield from表示式的值是子生成器終止時傳給StopIteration異常的第一個引數。
  • 傳入委派生成器的異常,除了 GeneratorExit 之外都傳給子生成器的throw()方法。如果呼叫throw()方法時丟擲 StopIteration 異常,委派生成器恢復執行。StopIteration之外的異常會向上冒泡(有異常處理就處理,沒處理繼續上冒)。傳給委派生成器。
  • 如果把 GeneratorExit 異常傳入委派生成器,或者在委派生成器上呼叫close() 方法,那麼在子生成器上呼叫close() 方法,如果他有的話。如果呼叫close() 方法導致異常丟擲,那麼異常會向上冒泡,傳給委派生成器;否則,委派生成器丟擲 GeneratorExit 異常。

PEP380 還有個說明:

In a generator, the statement

return value

is semantically equivalent to

raise StopIteration(value)

except that, as currently, the exception cannot be caught by except clauses within the returning generator.

這也就是為什麼 yield from 可以使用return 來返回值而 yield 只能使用 try … except StopIteration … 來捕獲異常的value 值。

yield 實現併發(協程)

會建立幾輛計程車,每輛計程車會拉幾個乘客,然後回家。計程車會首先駛離車庫,四處徘徊,尋找乘客;拉到乘客後,行程開始;乘客下車後,繼續四處徘徊。

import random
import collections
import queue
import argparse

DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERAVAL = 5

# time 是事件發生的模擬時間,proc 是計程車程序例項的編號,action是描述活動的字串
Event = collections.namedtuple('Event', ['time', 'proc', 'action'])


# 開始 計程車程序
# 每輛計程車呼叫一次taxi_process 函式,建立一個生成器物件,表示各輛計程車的運營過程。
def taxi_process(ident, trips, start_time=0):
"""
每次狀態變化時向建立事件,把控制權交給模擬器
:param ident: 計程車編號
:param trips: 計程車回家前的行程數量,接客數
:param start_time: 離開車庫的時間
:return:
"""
time = yield Event(start_time, ident, 'leave garage') # 產出的第一個Event
for i in range(trips): # 每次行程都會執行一遍這個程式碼塊
# 產出一個Event例項,表示拉到了乘客 協程在這裡暫停 等待下一次send() 啟用
time = yield Event(time, ident, 'pick up passenger')
# 產出一個Event例項,表示乘客下車 協程在這裡暫停 等待下一次send() 啟用
time = yield Event(time, ident, 'drop off passenger')
# 指定的行程數量完成後,for 迴圈結束,最後產出 'going home' 事件。協程最後一次暫停
yield Event(time, ident, 'going home')
# 協程執行到最後 丟擲StopIteration 異常


def compute_duration(previous_action):
"""使用指數分佈計算操作的耗時"""
if previous_action in ['leave garage', 'drop off passenger']:
# 新狀態是四處徘徊
interval = SEARCH_DURATION
elif previous_action == 'pick up passenger':
# 新狀態是開始行程
interval = TRIP_DURATION
elif previous_action == 'going home':
interval = 1
else:
raise ValueError('Unkonw previous_action: %s' % previous_action)
return int(random.expovariate(1 / interval)) + 1


# 開始模擬
class Simulator:

def __init__(self, procs_map):
self.events = queue.PriorityQueue() # 帶優先順序的佇列 會按時間正向排序
self.procs = dict(procs_map) # 從獲取的procs_map 引數中建立本地副本,為了不修改使用者傳入的值

def run(self, end_time):
"""
排程並顯示事件,直到時間結束
:param end_time: 結束時間 只需要指定一個引數
:return:
"""
# 排程各輛計程車的第一個事件
for iden, proc in sorted(self.procs.items()):
first_event = next(proc) # 預激協程 併產出一個 Event 物件
self.events.put(first_event) # 把各個事件加到self.events 屬性表示的 PriorityQueue物件中

# 此次模擬的主迴圈
sim_time = 0 # 把 sim_time 歸0
while sim_time < end_time:
if self.events.empty(): # 事件全部完成後退出迴圈
print('*** end of event ***')
break
current_event = self.events.get() # 獲取優先順序最高(time 屬性最小)的事件
sim_time, proc_id, previous_action = current_event # 更新 sim_time
print('taxi:', proc_id, proc_id * ' ', current_event)
active_proc = self.procs[proc_id] # 從self.procs 字典中獲取表示當前活動的計程車協程
next_time = sim_time + compute_duration(previous_action)
try:
next_event = active_proc.send(next_time) # 把計算得到的時間傳送給出租車協程。協程會產出下一個事件,或者丟擲 StopIteration
except StopIteration:
del self.procs[proc_id] # 如果有異常 表示已經退出, 刪除這個協程
else:
self.events.put(next_event) # 如果沒有異常,把next_event 加入到佇列
else: # 如果超時 則走到這裡
msg = '*** end of simulation time: {} event pendding ***'
print(msg.format(self.events.qsize()))


def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS, seed=None):
"""初始化隨機生成器,構建過程,執行模擬程式"""
if seed is not None:
random.seed(seed) # 獲取可復現的結果
# 構建taxis 字典。值是三個引數不同的生成器物件。
taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERAVAL)
for i in range(num_taxis)}
sim = Simulator(taxis)
sim.run(end_time)


if __name__ == '__main__':
# parser = argparse.ArgumentParser(description='Taxi fleet simulator.')
# parser.add_argument('-e', '--end-time', type=int,
# default=DEFAULT_END_TIME,
# help='simulation end time; default=%s' % DEFAULT_END_TIME)
# parser.add_argument('-t', '--taxis', type=int,
# default=DEFAULT_NUMBER_OF_TAXIS,
# help='number of taxis running; default = %s' % DEFAULT_NUMBER_OF_TAXIS)
# parser.add_argument('-s', '--seed', type=int, default=None,
# help='random generator seed (for testing)')
#
# args = parser.parse_args()
main()

執行程式

# -s 3 引數設定隨機生成器的種子,以便除錯的時候隨機數不變,輸出相同的結果
python taxi_sim.py -s 3

輸出結果如下圖

從結果我們可以看出,3輛計程車的行程是交叉進行的。不同顏色的箭頭代表不同計程車從乘客上車到乘客下車的跨度。

從結果可以看出:

  • 計程車每5隔分鐘從車庫出發
  • 0 號計程車2分鐘後拉到乘客(time=2),1號計程車3分鐘後拉到乘客(time=8),2號計程車5分鐘後拉到乘客(time=15)
  • 0 號計程車拉了兩個乘客
  • 1 號計程車拉了4個乘客
  • 2 號計程車拉了6個乘客
  • 在此次示中,所有排定的事件都在預設的模擬時間內完成

我們先在控制檯中呼叫taxi_process 函式,自己駕駛一輛計程車,示例如下:

In [1]: from taxi_sim import taxi_process
# 建立一個生成器,表示一輛計程車 編號是13 從t=0 開始,有兩次行程
In [2]: taxi = taxi_process(ident=13, trips=2, start_time=0) In [3]: next(taxi) # 預激協程 Out[3]: Event(time=0, proc=13, action='leave garage') # 傳送當前時間 在控制檯中,變數_繫結的是前一個結果 # _.time + 7 是 0 + 7 In [4]: taxi.send(_.time+7) Out[4]: Event(time=7, proc=13, action='pick up passenger') # 這個事件有for迴圈在第一個行程的開頭產出 # 傳送_.time+12 表示這個乘客用時12分鐘 In [5]: taxi.send(_.time+12) Out[5]: Event(time=19, proc=13, action='drop off passenger') # 徘徊了29 分鐘 In [6]: taxi.send(_.time+29) Out[6]: Event(time=48, proc=13, action='pick up passenger') # 乘坐了50分鐘 In [7]: taxi.send(_.time+50) Out[7]: Event(time=98, proc=13, action='drop off passenger') # 兩次行程結束 for 迴圈結束產出'going home' In [8]: taxi.send(_.time+5) Out[8]: Event(time=103, proc=13, action='going home') # 再發送值,會執行到末尾 協程返回後 丟擲 StopIteration 異常 In [9]: taxi.send(_.time+10) --------------------------------------------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-9-d775cc8cc079> in <module>() ----> 1 taxi.send(_.time+10) StopIteration: 

在這個示例中,我們用控制檯模擬模擬主迴圈。從taxi協程中產出的Event例項中獲取 .time 屬性,隨意加一個數,然後呼叫send()方法傳送兩數之和,重新啟用協程。

在taxi_sim.py 程式碼中,計程車協程由 Simulator.run 方法中的主迴圈驅動。

Simulator 類的主要資料結構如下:

self.events

PriorityQueue 物件,儲存Event例項。元素可以放進PriorityQueue物件中,然後按 item[0](物件的time 屬性)依序取出(按從小到大)。

self.procs

一個字典,把計程車的編號對映到模擬過程的程序(表示出租車生成器的物件)。這個屬性會繫結前面所示的taxis字典副本。

優先佇列是離散事件模擬系統的基礎構件:建立事件的順序不定,放入這種佇列後,可以按各個事件排定的順序取出。

比如,我們把兩個事件放入佇列:

Event(time=14, proc=0, action='pick up passenger')
Event(time=10, proc=1, action='pick up passenger') 

這個意思是 0號計程車14分拉到一個乘客,1號計程車10分拉到一個乘客。但是主迴圈獲取的第一個事件將是

Event(time=10, proc=1, action=‘pick up passenger’)

下面我們分析一下模擬系統的主演算法–Simulator.run 方法。

  1. 迭代表示各輛計程車的程序
    • 在各輛計程車上呼叫next()函式,預激協程。
    • 把各個事件放入Simulator類的self.events屬性中。
  2. 滿足 sim_time < end_time 條件是,執行模擬系統的主迴圈。
    • 檢查self.events 屬性是否為空;如果為空,跳出迴圈
    • 從self.events 中獲取當前事件
    • 顯示獲取的Event物件
    • 獲取curent_event 的time 屬性,更新模擬時間
    • 把時間傳送給current_event 的pro屬性標識的協程,產出下一個事件
    • 把next_event 新增到self.events 佇列中,排定 next_event

我們程式碼中 while 迴圈有一個else 語句,模擬系統到達結束時間後,程式碼會執行else中的語句。

這個示例主要是想說明如何在一個主迴圈中處理事件,以及如何通過傳送資料驅動協程,同時解釋瞭如何使用生成器代替執行緒和回撥,實現併發。

本文參考http://blog.gusibi.com/post/python-coroutine-yield-from/