python blinker庫學習
Blinker 是一個基於Python的強大的訊號庫,它既支援簡單的物件到物件通訊,也支援針對多個物件進行組播。Flask的訊號機制就是基於它建立的。
Blinker的核心雖然小巧,但是功能卻非常強大,它支援以下特性:
- 支援註冊全域性命名訊號
- 支援匿名訊號
- 支援自定義命名訊號
- 支援與接收者之間的持久連線與短暫連線
- 通過弱引用實現與接收者之間的自動斷開連線
- 支援傳送任意大小的資料
- 支援收集訊號接收者的返回值
- 執行緒安全
建立訊號
訊號通過signal()
方法進行建立:
>>> from blinker import signal
>>> initialized = signal("initialized")
>>> initialized is signal("initialized")
True
每次呼叫signal('name')
都會返回同一個訊號物件。因此這裡signal()
方法使用了單例模式。
訂閱訊號
使用Signal.connect()
方法註冊一個函式,每當觸發訊號的時候,就會呼叫該函式。該函式以觸發訊號的物件作為引數,這個函式其實就是訊號訂閱者。
>>> def subscriber(sender):
... print("Got a signal sent by %r" % sender)
. ..
>>> ready = signal('ready')
>>> ready.connect(subscriber)
觸發訊號
使用Signal.send()
方法通知訊號訂閱者。
下面定義類Processor
,在它的go()
方法中觸發前面宣告的ready
訊號,send()
方法以self
為引數,也就是說Processor
的例項是訊號的傳送者。
>>> class Processor:
... def __init__(self, name):
... self.name = name
...
... def go (self):
... ready = signal('ready')
... ready.send(self)
... print("Processing.")
... complete = signal('complete')
... complete.send(self)
...
... def __repr__(self):
... return '<Processor %s>' % self.name
...
>>> processor_a = Processor('a')
>>> processor_a.go()
Got a signal sent by <Processor a>
Processing.
注意到go()
方法中的complete
訊號沒?並沒有訂閱者訂閱該訊號,但是依然可以觸發該訊號。如果沒有任何訂閱者的訊號,結果是什麼訊號也不會發送,而且Blinker
內部對這種情況進行了優化,以儘可能的減少記憶體開銷。
訂閱特定的釋出者
預設情況下,任意釋出者觸發訊號,都會通知訂閱者。可以給Signal.connect()
傳遞一個可選的引數,以便限制訂閱者只能訂閱特定傳送者。
>>> def b_subscriber(sender):
... print("Caught signal from processor_b.")
... assert sender.name == 'b'
...
>>> processor_b = Processor('b')
>>> ready.connect(b_subscriber, sender=processor_b)
現在訂閱者只訂閱了processor_b
釋出的ready
訊號:
>>> processor_a.go()
Got a signal sent by <Processor a>
Processing.
>>> processor_b.go()
Got a signal sent by <Processor b>
Caught signal from processor_b.
Processing.
通過訊號收發資料
可以給send()
方法傳遞額外的關鍵字引數,這些引數會傳遞給訂閱者。
>>> send_data = signal('send-data')
>>> @send_data.connect
... def receive_data(sender, **kw):
... print("Caught signal from %r, data %r" % (sender, kw))
... return 'received!'
...
>>> result = send_data.send('anonymous', abc=123)
Caught signal from 'anonymous', data {'abc': 123}
send()
方法的返回值收集每個訂閱者的返回值,拼接成一個元組組成的列表。每個元組的組成為(receiver function, return value)
。
匿名訊號
前面我們建立的訊號都是命名訊號,每次呼叫Signal
構造器都會建立一個唯一的訊號,,也就是說每次建立的訊號是不一樣的。下面對前面的Processor
類進行改造,將signal
作為它的類屬性。
>>> from blinker import Signal
>>> class AltProcessor:
... on_ready = Signal()
... on_complete = Signal()
...
... def __init__(self, name):
... self.name = name
...
... def go(self):
... self.on_ready.send(self)
... print("Alternate processing.")
... self.on_complete.send(self)
...
... def __repr__(self):
... return '<AltProcessor %s>' % self.name
上面建立的就是匿名訊號。on_ready與on_complete
是兩個不同的訊號。
使用修飾器訂閱訊號
除了使用connect()
方法訂閱訊號之外,使用@connect
修飾器可以達到同樣的效果。
>>> apc = AltProcessor('c')
>>> @apc.on_complete.connect
... def completed(sender):
... print "AltProcessor %s completed!" % sender.name
...
>>> apc.go()
Alternate processing.
AltProcessor c completed!
儘管這樣用起來很方便,但是這種形式不支援訂閱指定的傳送者。這時,可以使用connect_via()
:
>>> dice_roll = signal('dice_roll')
>>> @dice_roll.connect_via(1)
... @dice_roll.connect_via(3)
... @dice_roll.connect_via(5)
... def odd_subscriber(sender):
... print("Observed dice roll %r." % sender)
...
>>> result = dice_roll.send(3)
Observed dice roll 3.
優化訊號傳送
訊號通常會進行優化,以便快速的傳送。不管有沒有訂閱者,都可以傳送訊號。如果傳送訊號時需要傳送的引數要計算很長時間,可以在傳送之前使用receivers
屬性先檢查一下是否有訂閱者。
>>> bool(signal('ready').receivers)
True
>>> bool(signal('complete').receivers)
False
>>> bool(AltProcessor.on_complete.receivers)
True
還可以檢查訂閱者是否訂閱了某個具體的訊號釋出者。
>>> signal('ready').has_receivers_for(processor_a)
True