【python】詳解事件驅動event實現
阿新 • • 發佈:2018-11-19
所有的計算機程式都可以大致分為兩類:指令碼型(單次執行)和連續執行型(直到使用者主動退出)。
- 指令碼型:指令碼型的程式包括最早的批處理檔案以及使用Python做交易策略回測等等,這類程式的特點是在使用者啟動後會按照程式設計時設計好的步驟一步步執行,所有步驟執行完後自動退出。
- 連續執行型:連續執行型的程式包含了作業系統和絕大部分我們日常使用的軟體等等,這類程式啟動後會處於一個無限迴圈中連續執行,直到使用者主動退出時才會結束。
一、連續執行型程式
我們要開發的交易系統就是屬於連續執行型程式,而這種程式根據其計算邏輯的執行機制不同,又可以粗略的分為時間驅動和事件驅動兩種。
1.1 時間驅動
時間驅動的程式邏輯相對容易設計,簡單來說就是讓電腦每隔一段時間自動做一些事情。這個事情本身可以很複雜、包括很多步驟,但這些步驟都是線性的,按照順序一步步執行下來。
from time import sleep
def demo():
print('BB')
while True:
demo()
sleep(1.0)
時間驅動的程式本質上就是每隔一段時間固定執行一次指令碼。儘管指令碼自身可以很長、包含非常多的步驟,但是我們可以看出這種程式的執行機制相對比較簡單、容易理解。
時間驅動的程式在量化交易方面還存在一些其他的缺點:如浪費CPU的計算資源、實現非同步邏輯複雜度高等等。
1.2 事件驅動
與時間驅動對應的就是事件驅動的程式:當某個新的事件被推送到程式中時,程式立即呼叫和這個事件相對應的處理函式進行相關的操作。
舉個例子:
有些人喜歡的某個公眾號,然後去關注這個公眾號,哪天這個公眾號釋出了篇新的文章,沒多久訂閱者就會在微信裡收到這個公眾號推送的新訊息,如果感興趣就開啟來閱讀。
上面公眾號例子可以翻譯為,監聽器(訂閱者)監聽了(關注了)事件源(公眾號),當事件源的傳送事件時(公眾號釋出文章),所有監聽該事件的監聽器(訂閱者)都會接收到訊息並作出響應(閱讀文章)。
- 公眾號為事件源
- 訂閱者為事件監聽器
- 訂閱者關注公眾號,相當於監聽器監聽了事件源
- 公眾號釋出文章這個動作為傳送事件
- 訂閱者收到事件後,做出閱讀文章的響應動作
事件驅動主要包含以下元素和操作函式:
1.2.1 元素
- 事件源
- 事件監聽器
- 事件物件
1.2.2 操作函式
- 監聽動作
- 傳送事件
- 呼叫監聽器響應函式
現在用python實現來實現上述的業務邏輯,先看流程圖:
1.2.3 EventManager事件管理類程式碼如下:
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 13 13:51:31 2018
@author: 18665
"""
# 系統模組
from queue import Queue, Empty
from threading import *
########################################################################
class EventManager:
#----------------------------------------------------------------------
def __init__(self):
"""初始化事件管理器"""
# 事件物件列表
self.__eventQueue = Queue()
# 事件管理器開關
self.__active = False
# 事件處理執行緒
self.__thread = Thread(target = self.__Run)
self.count = 0
# 這裡的__handlers是一個字典,用來儲存對應的事件的響應函式
# 其中每個鍵對應的值是一個列表,列表中儲存了對該事件監聽的響應函式,一對多
self.__handlers = {}
#----------------------------------------------------------------------
def __Run(self):
"""引擎執行"""
print('{}_run'.format(self.count))
while self.__active == True:
try:
# 獲取事件的阻塞時間設為1秒
event = self.__eventQueue.get(block = True, timeout = 1)
self.__EventProcess(event)
except Empty:
pass
self.count += 1
#----------------------------------------------------------------------
def __EventProcess(self, event):
"""處理事件"""
print('{}_EventProcess'.format(self.count))
# 檢查是否存在對該事件進行監聽的處理函式
if event.type_ in self.__handlers:
# 若存在,則按順序將事件傳遞給處理函式執行
for handler in self.__handlers[event.type_]:
handler(event)
self.count += 1
#----------------------------------------------------------------------
def Start(self):
"""啟動"""
print('{}_Start'.format(self.count))
# 將事件管理器設為啟動
self.__active = True
# 啟動事件處理執行緒
self.__thread.start()
self.count += 1
#----------------------------------------------------------------------
def Stop(self):
"""停止"""
print('{}_Stop'.format(self.count))
# 將事件管理器設為停止
self.__active = False
# 等待事件處理執行緒退出
self.__thread.join()
self.count += 1
#----------------------------------------------------------------------
def AddEventListener(self, type_, handler):
"""繫結事件和監聽器處理函式"""
print('{}_AddEventListener'.format(self.count))
# 嘗試獲取該事件型別對應的處理函式列表,若無則建立
try:
handlerList = self.__handlers[type_]
except KeyError:
handlerList = []
self.__handlers[type_] = handlerList
# 若要註冊的處理器不在該事件的處理器列表中,則註冊該事件
if handler not in handlerList:
handlerList.append(handler)
print(self.__handlers)
self.count += 1
#----------------------------------------------------------------------
def RemoveEventListener(self, type_, handler):
"""移除監聽器的處理函式"""
print('{}_RemoveEventListener'.format(self.count))
try:
handlerList = self.handlers[type_]
# 如果該函式存在於列表中,則移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函式列表為空,則從引擎中移除該事件型別
if not handlerList:
del self.handlers[type_]
except KeyError:
pass
self.count += 1
#----------------------------------------------------------------------
def SendEvent(self, event):
"""傳送事件,向事件佇列中存入事件"""
print('{}_SendEvent'.format(self.count))
self.__eventQueue.put(event)
self.count += 1
########################################################################
"""事件物件"""
class Event:
def __init__(self, type_=None):
self.type_ = type_ # 事件型別
self.dict = {} # 字典用於儲存具體的事件資料
1.2.4 測試程式碼
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 13 13:50:45 2018
@author: 18665
"""
# encoding: UTF-8
import sys
from datetime import datetime
from threading import *
#sys.path.append('D:\\works\\TestFile')
#print(sys.path)
from eventManager import *
#事件名稱 新文章
EVENT_ARTICAL = "Event_Artical"
#事件源 公眾號
class PublicAccounts:
def __init__(self,eventManager):
self.__eventManager = eventManager
def WriteNewArtical(self):
#事件物件,寫了新文章
event = Event(type_=EVENT_ARTICAL)
event.dict["artical"] = u'如何寫出更優雅的程式碼\n'
#傳送事件
self.__eventManager.SendEvent(event)
print(u'公眾號傳送新文章\n')
#監聽器 訂閱者
class Listener:
def __init__(self,username):
self.__username = username
#監聽器的處理函式 讀文章
def ReadArtical(self,event):
print(u'%s 收到新文章' % self.__username)
print(u'正在閱讀新文章內容:%s' % event.dict["artical"])
"""測試函式"""
#--------------------------------------------------------------------
def test():
# 例項化監聽器
listner1 = Listener("thinkroom") #訂閱者1
listner2 = Listener("steve") #訂閱者2
# 例項化事件操作函式
eventManager = EventManager()
#繫結事件和監聽器響應函式(新文章)
eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
# 啟動事件管理器,# 啟動事件處理執行緒
eventManager.Start()
publicAcc = PublicAccounts(eventManager)
timer = Timer(2, publicAcc.WriteNewArtical)
timer.start()
if __name__ == '__main__':
test()
通過eventManager可以實現事件觸發的邏輯,當事件觸發時,推送事件到執行緒裡執行。