PyalgoTrade源碼閱讀完結篇
本文著重於回測相關得模塊。
由於上一篇文章實在是寫得太爛了, 這一篇文章重新開始寫。
Pyalgotrade業務邏輯及實現原理
以官方教程示例為例
下載數據
python -c "from pyalgotrade.tools import yahoofinance; yahoofinance.download_daily_bars(‘orcl‘, 2000, ‘orcl-2000.csv‘)"
構建策略並運行
from pyalgotrade import strategy from pyalgotrade.barfeed import yahoofeed from pyalgotrade.technical import ma class MyStrategy(strategy.BacktestingStrategy): def __init__(self, feed, instrument, smaPeriod): super(MyStrategy, self).__init__(feed, 1000) self.__position = None self.__instrument = instrument # We‘ll use adjusted close values instead of regular close values. self.setUseAdjustedValues(True) self.__sma = ma.SMA(feed[instrument].getPriceDataSeries(), smaPeriod) def onEnterOk(self, position): execInfo = position.getEntryOrder().getExecutionInfo() self.info("BUY at $%.2f" % (execInfo.getPrice())) def onEnterCanceled(self, position): self.__position = None def onExitOk(self, position): execInfo = position.getExitOrder().getExecutionInfo() self.info("SELL at $%.2f" % (execInfo.getPrice())) self.__position = None def onExitCanceled(self, position): # If the exit was canceled, re-submit it. self.__position.exitMarket() def onBars(self, bars): # Wait for enough bars to be available to calculate a SMA. if self.__sma[-1] is None: return bar = bars[self.__instrument] # If a position was not opened, check if we should enter a long position. if self.__position is None: if bar.getPrice() > self.__sma[-1]: # Enter a buy market order for 10 shares. The order is good till canceled. self.__position = self.enterLong(self.__instrument, 10, True) # Check if we have to exit the position. elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive(): self.__position.exitMarket() def run_strategy(smaPeriod): # Load the yahoo feed from the CSV file feed = yahoofeed.Feed() feed.addBarsFromCSV("orcl", "orcl-2000.csv") # Evaluate the strategy with the feed. myStrategy = MyStrategy(feed, "orcl", smaPeriod) myStrategy.run() print "Final portfolio value: $%.2f" % myStrategy.getBroker().getEquity() run_strategy(15)
業務邏輯概括
- 創建Feed對象加載回測歷史數據
- 創建策略
- 將Feed對象傳入策略
- 內部創建Broker對象
- 在策略中初始化技術指標
- 運行策略(內部會創建事件循環,依次讀取每一個bars數據調用策略邏輯,即onBars函)
回測數據 Feed對象
用於承載回測的數據,提供接口訪問,驅動整個事件循環。
創建Feed對象
# 導入yahoofeed模塊 from pyalgotrade.barfeed import yahoofeed # 創建yahoofeed.Feed類創建其實例 feed = yahoofeed.Feed() # 通過addBarsFromCSV加載本地csv文件 # 傳入股票代碼名, 文件路徑 feed.addBarsFromCSV("orcl", "orcl-2000.csv")
Feed對象繼承鏈
註: 由IntelliJ Idea生成
由上圖可知, 分別繼承不同的BarFeed,最終業務邏輯基類pyalgotrade.observer.subject.
Feed數據結構構建過程
主要方法調用順序如下:
yahooFeed.addBarsFromCSV
-> csvFeed.BarFeed.addBarsFromCSV
-> membf.BarFeed.addBarsFromSequence
-> barfeed.registerInstrument
-> feed.registerDataSeries
-> barfeed.createDataSeries
Feed數據結構
在Feed中有兩個比較重要的數據對象
- self.__bars = {}
- self.__ds = BarDataSeries()
其中BarDataSeries對象有以下定義
pyalgotrade/pyalgotrade/dataseries/bards.py
class BarDataSeries(dataseries.SequenceDataSeries):
def __init__(self, maxLen=None):
super(BarDataSeries, self).__init__(maxLen)
self.__openDS = dataseries.SequenceDataSeries(maxLen)
self.__closeDS = dataseries.SequenceDataSeries(maxLen)
self.__highDS = dataseries.SequenceDataSeries(maxLen)
self.__lowDS = dataseries.SequenceDataSeries(maxLen)
self.__volumeDS = dataseries.SequenceDataSeries(maxLen)
self.__adjCloseDS = dataseries.SequenceDataSeries(maxLen)
self.__extraDS = {}
self.__useAdjustedValues = False
BarDataSeries提供一系列方法返回相應的數據序列,以getOpenDataSeries為例
pyalgotrade/pyalgotrade/dataseries/bards.py:87
def getOpenDataSeries(self):
"""Returns a :class:`pyalgotrade.dataseries.DataSeries` with the open prices."""
return self.__openDS
而dataseries.SequenceDataSeries對象是一個數據存儲在collections.ListDeque對象上,並集成事件監聽的類對象.
self._bars在membf.BarFeed.addBarsFromSequence方法中讀取csv文件生成.
self._ds在barfeed.createDataSeries方法中創建一個默認長度為1024的BarDataSeries空數據對象.
小結
bar是含有時間, 開盤價, 收盤價, 當日最高價, 當日最低價, 成交量,復權收盤價的數據對象.
self.__bars是key為股票代碼, value是元素為bars數據對象的列表的字典.
self.__ds是BarDataSeries對象
事件循環
事件循環是PyalgoTrade的數據引擎,驅動著整個策略運轉.
下面是Pyalgotrade內部事件循環的一個簡單的實現。
# coding: utf8
import abc
class Event(object):
"""事件類.
用於訂閱指定的操作,如函數
當事件執行emit方法的時候,遍歷訂閱了的操作,並執行該操作"""
def __init__(self):
# 內部handlers列表
self.__handlers = []
def subscribe(self, handler):
if handler not in self.__handlers:
self.__handlers.append(handler)
def emit(self, *args, **kwargs):
"""執行所有訂閱了的操作"""
for handler in self.__handlers:
handler(*args, **kwargs)
class Subject(object):
"""將元類指向abc.ABCMeta元類
1. 當抽象方法未被實現的時候,不能新建該類的實例
2. abstractmethod相當於子類要實現的接口,如果不實現,則不能新建該類的實例"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def start(self):
pass
@abc.abstractmethod
def stop(self):
pass
@abc.abstractmethod
def dispatch(self):
raise NotImplementedError()
@abc.abstractmethod
def eof(self):
raise NotImplementedError()
class Dispatcher(object):
"""調度類
1. 維護事件循環
2. 不斷的調度subject的disptch操作並判斷是否結束"""
def __init__(self):
self.__subjects = []
self.__stop = False
def run(self):
"""運行整個事件循環並在調度之前,之後分別調用subject的start, stop方法"""
try:
for subject in self.__subjects:
subject.start()
while not self.__stop:
eof, dispatched = self.dispatch()
if eof:
self.__stop = True
finally:
for subject in self.__subjects:
subject.stop()
def dispatch(self):
ret = False
eof = False
for subject in self.__subjects:
ret = subject.dispatch() is True
eof = subject.eof()
return eof, ret
def addSubject(self, subject):
self.__subjects.append(subject)
class Broker(Subject):
"""Broker 類"""
def dispatch(self):
return None
def eof(self):
return None
def start(self):
pass
def stop(self):
pass
class Feed(Subject):
"""Feed類
1. 承載數據源
2. 通過數據驅動事件循環"""
def __init__(self, size):
self.__data = range(size)
self.__nextPos = 0
self.__event = Event()
def start(self):
pass
def stop(self):
pass
def dispatch(self):
value = self.__data[self.__nextPos]
self.__event.emit(value)
self.__nextPos += 1
return True
def getNewValueEvent(self):
return self.__event
def eof(self):
return self.__nextPos >= len(self.__data)
class Strategy(object):
def __init__(self, broker, feed):
self.__dispatcher = Dispatcher()
self.__feed = feed
self.__broker = broker
# 將策略的self.__onBars方法傳入Feed的self.__event裏面
# 當Feed調用dispatch方法的時候, 會指定self.__onBars函數
self.__feed.getNewValueEvent().subscribe(self.__onBars)
# 註意順序,Feed對象必須在最後
self.__dispatcher.addSubject(self.__broker)
self.__dispatcher.addSubject(self.__feed)
def __onBars(self, value):
print("dispatch before.")
self.onBars(value)
print("dispatch after")
def onBars(self, value):
print("on Bar: {}".format(value))
def run(self):
self.__dispatcher.run()
if __name__ == ‘__main__‘:
feed = Feed(3)
broker = Broker()
myStrategy = Strategy(broker, feed)
myStrategy.run()
output:
dispatch before.
on Bar: 0
dispatch after
dispatch before.
on Bar: 1
dispatch after
dispatch before.
on Bar: 2
dispatch after
上面的代碼主要說明策略的onBars方法是怎麽被調用的。
關於Broker怎麽被驅動,在後面講解
- 策略中維護一個調度器dispatcher,當策略啟動的時候, 調度器dipatcher啟動, 並嘗試調用feed,broker start方法.
- 不斷調用feed, broker的dispatch方法, 判斷是否結束, 如果結束, 則做結束動作, 調用feed, broker的stop方法
- feed對象在調用dispatch方法的時候, feed對象會觸發自身維護的self._event. 而self._event在MyStrategy._init_方法中,通過self._feed.getNewValueEvent().subscribe(self._onBars)訂閱了MyStrategy._onBars方法, 所以Feed對象每次dispatch的時候,MyStrategy._onBars都會被調用.
至此, Feed對象怎麽驅動策略的邏輯已經清晰。
接下來,講解BaseStrategy, BacktestingStrategy初始化過程
策略初始化
策略的繼承鏈並不復雜, 所有策略的基類是BaseStartegy, BacktestingStrategy是提供給用戶使用的策略,至少實現onBars函數則可以回測。
BaseStrategy, BacktestingStrategy的初始化源代碼如下
pyalgotrade/pyalgotrade/strategy/__init__.py
class BaseStartegy(object):
def __init__(self, barFeed, broker):
# 綁定barFeed對象
self.__barFeed = barFeed
# 綁定broker對象
self.__broker = broker
# 交易相關的倉位
self.__activePositions = set()
# 訂單處理順序
self.__orderToPosition = {}
# bar被處理後的事件
self.__barsProcessedEvent = observer.Event()
# analyzer列表
self.__analyzers = []
# 命名的analyzer列表
self.__namedAnalyzers = {}
# 重新取樣的feed對象列表
self.__resampledBarFeeds = []
# 調度器對象
self.__dispatcher = dispatcher.Dispatcher()
# broker的訂單被更新時的事件, 訂閱self.__onOrderEvent方法
self.__broker.getOrderUpdatedEvent().subscribe(self.__onOrderEvent)
# barfeed值被更新的時候的事件(當barfeed被調度的時候),訂閱self.__onBars方法
self.__barFeed.getNewValuesEvent().subscribe(self.__onBars)
# 調度器的開始事件,訂閱self.onStart方法
self.__dispatcher.getStartEvent().subscribe(self.onStart)
# 調度器的空閑事件, 訂閱self.__onIdle方法
self.__dispatcher.getIdleEvent().subscribe(self.__onIdle)
# 分別將繼承了Subject類的broker,barFeed對象加入到調度器的subject列表
self.__dispatcher.addSubject(self.__broker)
self.__dispatcher.addSubject(self.__barFeed)
# 日誌級別的初始化
self.__logger = logger.getLogger(BaseStrategy.LOGGER_NAME)
class BacktestingStrategy(BaseStrategy):
# 默認初始化一個持有100w現金的虛擬賬戶
def __init__(self, barFeed, cash_or_brk=1000000):
# 如果沒有傳入cash_or_brk參數, 或者傳入數值類型的值
# 則傳入cash_or_brk,barFeed對象新建一個backtesting.Broker實例,並調用父類的__init__方法
# 如果傳入的cash_or_brk參數值是backtesting.Broker的實例, 則直接使用
if isinstance(cash_or_brk, pyalgotrade.broker.Broker):
broker = cash_or_brk
else:
broker = backtesting.Broker(cash_or_brk, barFeed)
BaseStrategy.__init__(self, barFeed, broker)
# 默認self.__useAdjustedValue=False
self.__useAdjustedValues = False
# 配置日誌參數
self.setUseEventDateTimeInLogs(True)
self.setDebugMode(True)
總的來說真正Strategy對象,barFeed對象,broker對象訂閱了更多的事件, 以及更多的判斷。但,內核都是調度器驅動著barFeed, broker對象不斷的被調度(調用dispatch方法), 而barFeed對象會不斷的從self._bars中取數據追加到self._ds對象中,並將取出來的數據提交的self._event中,而self._event訂閱了Strategy.__onBars方法, 所以不斷的驅動著Strategy的自定義策略(onBars裏面定義的交易邏輯).
交易賬戶 Broker對象
在Strategy對象初始化時候, 會初始化一個虛擬的回測賬戶.
回測賬戶broker需要傳入barfeed對象, 並在barfeed的event對象裏面訂閱自己的onBars函數,源碼如下:
pyalgotrade/pyalgotrade/broker/__init__.py
class Broker(broker.Broker):
LOGGER_NAME = "broker.backtesting"
def __init__(self, cash, barFeed, commission=None):
super(Broker, self).__init__()
assert(cash >= 0)
self.__cash = cash
if commission is None:
self.__commission = NoCommission()
else:
self.__commission = commission
self.__shares = {}
self.__activeOrders = {}
self.__useAdjustedValues = False
# 持倉策略, 使用DefaultStrategy
# 使用DefaultStrategy.volumeLimit = 0.25
# 當交易訂單的成交量大於當前bar的成交量的25%則不能成交
# 沒有滑點
# 沒有手續費
self.__fillStrategy = fillstrategy.DefaultStrategy()
self.__logger = logger.getLogger(Broker.LOGGER_NAME)
# 讓barfeed對象訂閱self.onBars方法
barFeed.getNewValuesEvent().subscribe(self.onBars)
self.__barFeed = barFeed
self.__allowNegativeCash = False
self.__nextOrderId = 1
由上可知,當barFeed對象數據更新的時候,還會調用BackTestBroker.onBars方法.
交易倉位 Position對象
當使用enterLong之類交易方法,則會返回一個Postion的對象,這個對象承載著當前各股的持倉比例,以及持有現金.
以enterLong方法說明持倉流程.
- 實例化一個LongPosition對象
- 調用broker的createMarketOrder方法創建一個MarketOrder.
- 註冊order, 以便barFeed對象數據驅動的時候,使用該order
以exitMarket方法說明平倉流程.
- 使用Position對象的exitMarket方法提交平倉訂單.
- 註冊order, 以便barFeed對象數據驅動的時候,使用該order
源代碼調用鏈太長....所以文字概括.
交易訂單 Order對象
當我們買入或者賣出的時候,其實是提交一個訂單給交易賬戶(broker), 交易賬戶會根據交易訂單的類型,動作等相關信息執行相關的操作.
交易訂單的類型參考: https://www.thebalance.com/understanding-stock-orders-3141318
一般有買入(做多), 賣出(做空)兩種交易類型, 但是這兩種類型成交的方式分別由市價成交, 限價成交.
所以一共由以下四種類型,對應Strategy的四個方法:
- enterLong 以市價(下一個Bar的開盤價)買入
- enterLongLimit 當市價(下一個Bar的開盤價)低於或等於指定的價格時買入
- enterShort 與enterLong相反
- enterShortLimit 與enterLongShort相反.
以enter開頭是更加上層的方法, 建議使用.
goodTillCanceled為了適配實盤接口, 實盤接口可能有前一天的訂單不會再執行的限制,所以設置goodTillCanceled=True保證第二天或者更後的時間,訂單依然有效,直至手動取消.
除了提交交易訂單還可以提交止損訂單, 分別對應Strategy的兩個方法.
- StopOrder 提交一個止損訂單, 傳入止損價格, 當價格突破止損價位, 以市價成交進行止損.
- StopLimitOrder 提交一個止損訂單, 傳入止損價格, 當價格突破止損價位, 並且價格在限定的價格區間才會止損.
每個提交的訂單會到下一個事件循環才會判斷條件是否符合,才會執行.
技術指標 EventBasedFilter對象
通過借助自定義指標或者自帶的指標,如SMA,EMA,MACD等可以更全面的看待股票的走勢以及信號.
下面是技術指標基類的初始化過程.
pyalgotrade/pyalgotrade/technical/__init__.py
class EventWindow(object):
"""數據實際承載類
數據保存在self__values裏面
"""
def __init__(self, windowSize, dtype=float, skipNone=True):
assert(windowSize > 0)
assert(isinstance(windowSize, int))
self.__values = collections.NumPyDeque(windowSize, dtype)
self.__windowSize = windowSize
self.__skipNone = skipNone
def onNewValue(self, dateTime, value):
"""提供onNewValue方法將新的值傳入"""
if value is not None or not self.__skipNone:
self.__values.append(value)
def getValues(self):
"""獲取EventWindows的所有值"""
return self.__values.data()
def getWindowSize(self):
"""獲取EventWindow Size"""
return self.__windowSize
def windowFull(self):
"""eventWindow是否已經填滿"""
return len(self.__values) == self.__windowSize
def getValue(self):
"""子類須實現的類"""
raise NotImplementedError()
class EventBasedFilter(dataseries.SequenceDataSeries):
def __init__(self, dataSeries, eventWindow, maxLen=None):
super(EventBasedFilter, self).__init__(maxLen)
self.__dataSeries = dataSeries
# 當dataseries數據有新值的時候,調用self.__onNewValues方法
self.__dataSeries.getNewValueEvent().subscribe(self.__onNewValue)
self.__eventWindow = eventWindow
def __onNewValue(self, dataSeries, dateTime, value):
# 讓EventWindow對象計算新值
self.__eventWindow.onNewValue(dateTime, value)
# 獲取計算後的結果
newValue = self.__eventWindow.getValue()
# 將值保存到自身實例裏面, 即self.__values
# 因為繼承了dataseries.SequenceDataSeries類
# 而dataseries.SequenceDataSeries父類實現了__getitem__方法, 所以可以使用索引取值.
self.appendWithDateTime(dateTime, newValue)
def getDataSeries(self):
return self.__dataSeries
def getEventWindow(self):
return self.__eventWindow
在Feed對象初始過程中,會初始化兩個比較重要的數據結構, 一個是self._bars, 一個是self._ds,在整個事件驅動中, 策略不停的從self_bars中取數據,然後使用appendWithDateTime方法將數據追加的self._ds裏面。
源碼如下:
pyalgotrade/pyalgotrade/dataseries/bards.py
# 首先調用BarDataSeries的appendWithDateTime方法
class BarDataSeries(dataseries.SequenceDataSeries):
def appendWithDateTime(self, dateTime, bar):
assert(dateTime is not None)
assert(bar is not None)
bar.setUseAdjustedValue(self.__useAdjustedValues)
super(BarDataSeries, self).appendWithDateTime(dateTime, bar)
self.__openDS.appendWithDateTime(dateTime, bar.getOpen())
self.__closeDS.appendWithDateTime(dateTime, bar.getClose())
self.__highDS.appendWithDateTime(dateTime, bar.getHigh())
self.__lowDS.appendWithDateTime(dateTime, bar.getLow())
self.__volumeDS.appendWithDateTime(dateTime, bar.getVolume())
self.__adjCloseDS.appendWithDateTime(dateTime, bar.getAdjClose())
# Process extra columns.
for name, value in bar.getExtraColumns().iteritems():
extraDS = self.__getOrCreateExtraDS(name)
extraDS.appendWithDateTime(dateTime, value)
pyalgotrade/dataseries/__init__.py
# 然後調用SequenceDataSeries對象的appendWithDateTime
# 在這個方法中提交數據更新的事件
class SequenceDataSeries(DataSeries):
def appendWithDateTime(self, dateTime, value):
"""
Appends a value with an associated datetime.
.. note::
If dateTime is not None, it must be greater than the last one.
"""
if dateTime is not None and len(self.__dateTimes) != 0 and self.__dateTimes[-1] >= dateTime:
raise Exception("Invalid datetime. It must be bigger than that last one")
assert(len(self.__values) == len(self.__dateTimes))
self.__dateTimes.append(dateTime)
self.__values.append(value)
self.getNewValueEvent().emit(self, dateTime, value)
小結
使用技術指標需要傳入dataSeries對象, 可以通過getPriceDataSeries, getOpenDataSeries等獲得.
創建策略
由於上面已經有完整版本的代碼,這裏做一定的刪減, 並做註解.
# 集成strategy.BacktestingStrategy類
class MyStrategy(strategy.BacktestingStrategy):
def __init__(self, feed, instrument, smaPeriod):
# 調用父類__init__方法
super(MyStrategy, self).__init__(feed, 1000)
# 初始情況下,postion設置為零, postion一般只持倉比例
self.__position = None
# 股票代碼
self.__instrument = instrument
# We‘ll use adjusted close values instead of regular close values.
# 是否使用復權收盤價
self.setUseAdjustedValues(True)
# 初始化策略指標
self.__sma = ma.SMA(feed[instrument].getPriceDataSeries(), smaPeriod)
# 省略其他鉤子函數
# 必須實現的onBars函數,用於買賣的主要邏輯
def onBars(self, bars):
# 如果沒有簡單移動平均值則什麽都不做
if self.__sma[-1] is None:
return
# 取出指定股票代碼的bar對象
bar = bars[self.__instrument]
# 如果postion is None,即持倉為0
if self.__position is None:
# 如果收盤價大於簡單移動平均值則買入
if bar.getPrice() > self.__sma[-1]:
# 買入,enterLong=做多
self.__position = self.enterLong(self.__instrument, 10, True)
# 反之賣出
elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive():
self.__position.exitMarket()
總結
BarFeed像是PyalgoTrade中的燃料,不斷的供給給策略的Dispatcher調度器, 使整個策略不斷運行,直至沒有燃料(沒有新的數據.)
BarFeed使數據源的一個抽象,裏面保存著兩個重要的數據結構, self._bars, self._ds.
self.__bars是key為股票代碼, value是元素為bar數據對象的列表的字典.
self.__ds為BarDataSeries對象.
Broker維護著虛擬賬戶裏面的現金以及相關股票的倉位.接收訂單並實時的處理訂單, 計算收益等.
Position為股票倉位持有情況的對象, 提供交易的相關接口.
EventBasedFilter為技術指標, 可以計算相關指標如MACD, SMA等, 也可以自定義自己的技術指標.
Strategy為自定義策略,只需實現onBars函數即可完成買賣邏輯, 將Broker,Position相關接口放在Strategy實例方法裏面, 同一調用接口.
PyalgoTrade源碼閱讀完結篇