1. 程式人生 > >PyalgoTrade源碼閱讀完結篇

PyalgoTrade源碼閱讀完結篇

方法 get dds -s .data 下一個 成交量 技術分享 對象繼承

前言

本文著重於回測相關得模塊。

由於上一篇文章實在是寫得太爛了, 這一篇文章重新開始寫。

Pyalgotrade業務邏輯及實現原理

以官方教程示例為例

下載數據

python -c "from pyalgotrade.tools import yahoofinance; yahoofinance.download_daily_bars(‘orcl‘, 2000, ‘orcl-2000.csv‘)"

構建策略並運行

from pyalgotrade import strategy
from pyalgotrade.barfeed import yahoofeed
from pyalgotrade.technical import ma

class MyStrategy(strategy.BacktestingStrategy):
    def __init__(self, feed, instrument, smaPeriod):
        super(MyStrategy, self).__init__(feed, 1000)
        self.__position = None
        self.__instrument = instrument
        # We‘ll use adjusted close values instead of regular close values.
        self.setUseAdjustedValues(True)
        self.__sma = ma.SMA(feed[instrument].getPriceDataSeries(), smaPeriod)

    def onEnterOk(self, position):
        execInfo = position.getEntryOrder().getExecutionInfo()
        self.info("BUY at $%.2f" % (execInfo.getPrice()))

    def onEnterCanceled(self, position):
        self.__position = None

    def onExitOk(self, position):
        execInfo = position.getExitOrder().getExecutionInfo()
        self.info("SELL at $%.2f" % (execInfo.getPrice()))
        self.__position = None

    def onExitCanceled(self, position):
        # If the exit was canceled, re-submit it.
        self.__position.exitMarket()

    def onBars(self, bars):
        # Wait for enough bars to be available to calculate a SMA.
        if self.__sma[-1] is None:
            return

        bar = bars[self.__instrument]
        # If a position was not opened, check if we should enter a long position.
        if self.__position is None:
            if bar.getPrice() > self.__sma[-1]:
                # Enter a buy market order for 10 shares. The order is good till canceled.
                self.__position = self.enterLong(self.__instrument, 10, True)
        # Check if we have to exit the position.
        elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive():
            self.__position.exitMarket()

def run_strategy(smaPeriod):
    # Load the yahoo feed from the CSV file
    feed = yahoofeed.Feed()
    feed.addBarsFromCSV("orcl", "orcl-2000.csv")

    # Evaluate the strategy with the feed.
    myStrategy = MyStrategy(feed, "orcl", smaPeriod)
    myStrategy.run()
    print "Final portfolio value: $%.2f" % myStrategy.getBroker().getEquity()

run_strategy(15)

業務邏輯概括

  1. 創建Feed對象加載回測歷史數據
  2. 創建策略
  3. 將Feed對象傳入策略
  4. 內部創建Broker對象
  5. 在策略中初始化技術指標
  6. 運行策略(內部會創建事件循環,依次讀取每一個bars數據調用策略邏輯,即onBars函)

回測數據 Feed對象

用於承載回測的數據,提供接口訪問,驅動整個事件循環。

創建Feed對象

# 導入yahoofeed模塊
from pyalgotrade.barfeed import yahoofeed

# 創建yahoofeed.Feed類創建其實例
feed = yahoofeed.Feed()

# 通過addBarsFromCSV加載本地csv文件
# 傳入股票代碼名, 文件路徑
feed.addBarsFromCSV("orcl", "orcl-2000.csv")

Feed對象繼承鏈

技術分享圖片

註: 由IntelliJ Idea生成

由上圖可知, 分別繼承不同的BarFeed,最終業務邏輯基類pyalgotrade.observer.subject.

Feed數據結構構建過程

主要方法調用順序如下:

yahooFeed.addBarsFromCSV

-> csvFeed.BarFeed.addBarsFromCSV

-> membf.BarFeed.addBarsFromSequence

-> barfeed.registerInstrument

-> feed.registerDataSeries

-> barfeed.createDataSeries

Feed數據結構

在Feed中有兩個比較重要的數據對象

  1. self.__bars = {}
  2. self.__ds = BarDataSeries()
    其中BarDataSeries對象有以下定義
pyalgotrade/pyalgotrade/dataseries/bards.py

class BarDataSeries(dataseries.SequenceDataSeries):
    def __init__(self, maxLen=None):
        super(BarDataSeries, self).__init__(maxLen)
        self.__openDS = dataseries.SequenceDataSeries(maxLen)
        self.__closeDS = dataseries.SequenceDataSeries(maxLen)
        self.__highDS = dataseries.SequenceDataSeries(maxLen)
        self.__lowDS = dataseries.SequenceDataSeries(maxLen)
        self.__volumeDS = dataseries.SequenceDataSeries(maxLen)
        self.__adjCloseDS = dataseries.SequenceDataSeries(maxLen)
        self.__extraDS = {}
        self.__useAdjustedValues = False

BarDataSeries提供一系列方法返回相應的數據序列,以getOpenDataSeries為例

pyalgotrade/pyalgotrade/dataseries/bards.py:87

    def getOpenDataSeries(self):
        """Returns a :class:`pyalgotrade.dataseries.DataSeries` with the open prices."""
        return self.__openDS

而dataseries.SequenceDataSeries對象是一個數據存儲在collections.ListDeque對象上,並集成事件監聽的類對象.

self._bars在membf.BarFeed.addBarsFromSequence方法中讀取csv文件生成.
self.
_ds在barfeed.createDataSeries方法中創建一個默認長度為1024的BarDataSeries空數據對象.

小結

bar是含有時間, 開盤價, 收盤價, 當日最高價, 當日最低價, 成交量,復權收盤價的數據對象.

self.__bars是key為股票代碼, value是元素為bars數據對象的列表的字典.

self.__ds是BarDataSeries對象

事件循環

事件循環是PyalgoTrade的數據引擎,驅動著整個策略運轉.

下面是Pyalgotrade內部事件循環的一個簡單的實現。

# coding: utf8
import abc

class Event(object):
    """事件類.
    用於訂閱指定的操作,如函數
    當事件執行emit方法的時候,遍歷訂閱了的操作,並執行該操作"""
    def __init__(self):
        # 內部handlers列表
        self.__handlers = []

    def subscribe(self, handler):
        if handler not in self.__handlers:
            self.__handlers.append(handler)

    def emit(self, *args, **kwargs):
        """執行所有訂閱了的操作"""
        for handler in self.__handlers:
            handler(*args, **kwargs)

class Subject(object):
    """將元類指向abc.ABCMeta元類
    1. 當抽象方法未被實現的時候,不能新建該類的實例
    2. abstractmethod相當於子類要實現的接口,如果不實現,則不能新建該類的實例"""
    __metaclass__ = abc.ABCMeta

    @abc.abstractmethod
    def start(self):
        pass

    @abc.abstractmethod
    def stop(self):
        pass

    @abc.abstractmethod
    def dispatch(self):
        raise NotImplementedError()

    @abc.abstractmethod
    def eof(self):
        raise NotImplementedError()

class Dispatcher(object):
    """調度類
    1. 維護事件循環
    2. 不斷的調度subject的disptch操作並判斷是否結束"""
    def __init__(self):
        self.__subjects = []
        self.__stop = False

    def run(self):
        """運行整個事件循環並在調度之前,之後分別調用subject的start, stop方法"""
        try:
            for subject in self.__subjects:
                subject.start()

            while not self.__stop:
                eof, dispatched = self.dispatch()
                if eof:
                    self.__stop = True
        finally:
            for subject in self.__subjects:
                subject.stop()

    def dispatch(self):
        ret = False
        eof = False
        for subject in self.__subjects:
            ret = subject.dispatch() is True
            eof = subject.eof()

        return eof, ret

    def addSubject(self, subject):
        self.__subjects.append(subject)

class Broker(Subject):
    """Broker 類"""
    def dispatch(self):
        return None

    def eof(self):
        return None

    def start(self):
        pass

    def stop(self):
        pass

class Feed(Subject):
    """Feed類
    1. 承載數據源
    2. 通過數據驅動事件循環"""
    def __init__(self, size):
        self.__data = range(size)
        self.__nextPos = 0
        self.__event = Event()

    def start(self):
        pass

    def stop(self):
        pass

    def dispatch(self):
        value = self.__data[self.__nextPos]
        self.__event.emit(value)
        self.__nextPos += 1
        return True

    def getNewValueEvent(self):
        return self.__event

    def eof(self):
        return self.__nextPos >= len(self.__data)

class Strategy(object):
    def __init__(self, broker, feed):
        self.__dispatcher = Dispatcher()
        self.__feed = feed
        self.__broker = broker
        # 將策略的self.__onBars方法傳入Feed的self.__event裏面
        # 當Feed調用dispatch方法的時候, 會指定self.__onBars函數
        self.__feed.getNewValueEvent().subscribe(self.__onBars)
        # 註意順序,Feed對象必須在最後
        self.__dispatcher.addSubject(self.__broker)
        self.__dispatcher.addSubject(self.__feed)

    def __onBars(self, value):
        print("dispatch before.")
        self.onBars(value)
        print("dispatch after")

    def onBars(self, value):
        print("on Bar: {}".format(value))

    def run(self):
        self.__dispatcher.run()

if __name__ == ‘__main__‘:
    feed = Feed(3)
    broker = Broker()
    myStrategy = Strategy(broker, feed)
    myStrategy.run()

output: 
dispatch before.
on Bar: 0
dispatch after
dispatch before.
on Bar: 1
dispatch after
dispatch before.
on Bar: 2
dispatch after

上面的代碼主要說明策略的onBars方法是怎麽被調用的。

關於Broker怎麽被驅動,在後面講解

  1. 策略中維護一個調度器dispatcher,當策略啟動的時候, 調度器dipatcher啟動, 並嘗試調用feed,broker start方法.
  2. 不斷調用feed, broker的dispatch方法, 判斷是否結束, 如果結束, 則做結束動作, 調用feed, broker的stop方法
  3. feed對象在調用dispatch方法的時候, feed對象會觸發自身維護的self._event. 而self._event在MyStrategy._init_方法中,通過self._feed.getNewValueEvent().subscribe(self._onBars)訂閱了MyStrategy._onBars方法, 所以Feed對象每次dispatch的時候,MyStrategy._onBars都會被調用.

至此, Feed對象怎麽驅動策略的邏輯已經清晰。
接下來,講解BaseStrategy, BacktestingStrategy初始化過程

策略初始化

策略的繼承鏈並不復雜, 所有策略的基類是BaseStartegy, BacktestingStrategy是提供給用戶使用的策略,至少實現onBars函數則可以回測。

BaseStrategy, BacktestingStrategy的初始化源代碼如下

pyalgotrade/pyalgotrade/strategy/__init__.py

class BaseStartegy(object):
    def __init__(self, barFeed, broker):
        # 綁定barFeed對象
        self.__barFeed = barFeed
        # 綁定broker對象
        self.__broker = broker
        # 交易相關的倉位
        self.__activePositions = set()
        # 訂單處理順序
        self.__orderToPosition = {}
        # bar被處理後的事件
        self.__barsProcessedEvent = observer.Event()
        # analyzer列表
        self.__analyzers = []
        # 命名的analyzer列表
        self.__namedAnalyzers = {}
        # 重新取樣的feed對象列表
        self.__resampledBarFeeds = []
        # 調度器對象
        self.__dispatcher = dispatcher.Dispatcher()
        # broker的訂單被更新時的事件, 訂閱self.__onOrderEvent方法
        self.__broker.getOrderUpdatedEvent().subscribe(self.__onOrderEvent)
        # barfeed值被更新的時候的事件(當barfeed被調度的時候),訂閱self.__onBars方法
        self.__barFeed.getNewValuesEvent().subscribe(self.__onBars)

        # 調度器的開始事件,訂閱self.onStart方法
        self.__dispatcher.getStartEvent().subscribe(self.onStart)
        # 調度器的空閑事件, 訂閱self.__onIdle方法
        self.__dispatcher.getIdleEvent().subscribe(self.__onIdle)

        # 分別將繼承了Subject類的broker,barFeed對象加入到調度器的subject列表
        self.__dispatcher.addSubject(self.__broker)
        self.__dispatcher.addSubject(self.__barFeed)

        # 日誌級別的初始化
        self.__logger = logger.getLogger(BaseStrategy.LOGGER_NAME)

class BacktestingStrategy(BaseStrategy):
    # 默認初始化一個持有100w現金的虛擬賬戶
    def __init__(self, barFeed, cash_or_brk=1000000):

        # 如果沒有傳入cash_or_brk參數, 或者傳入數值類型的值
        # 則傳入cash_or_brk,barFeed對象新建一個backtesting.Broker實例,並調用父類的__init__方法
        # 如果傳入的cash_or_brk參數值是backtesting.Broker的實例, 則直接使用
        if isinstance(cash_or_brk, pyalgotrade.broker.Broker):
            broker = cash_or_brk
        else:
            broker = backtesting.Broker(cash_or_brk, barFeed)

        BaseStrategy.__init__(self, barFeed, broker)
        # 默認self.__useAdjustedValue=False
        self.__useAdjustedValues = False
        # 配置日誌參數
        self.setUseEventDateTimeInLogs(True)
        self.setDebugMode(True)

總的來說真正Strategy對象,barFeed對象,broker對象訂閱了更多的事件, 以及更多的判斷。但,內核都是調度器驅動著barFeed, broker對象不斷的被調度(調用dispatch方法), 而barFeed對象會不斷的從self._bars中取數據追加到self._ds對象中,並將取出來的數據提交的self._event中,而self._event訂閱了Strategy.__onBars方法, 所以不斷的驅動著Strategy的自定義策略(onBars裏面定義的交易邏輯).

交易賬戶 Broker對象

在Strategy對象初始化時候, 會初始化一個虛擬的回測賬戶.

回測賬戶broker需要傳入barfeed對象, 並在barfeed的event對象裏面訂閱自己的onBars函數,源碼如下:

pyalgotrade/pyalgotrade/broker/__init__.py

class Broker(broker.Broker):
    LOGGER_NAME = "broker.backtesting"

    def __init__(self, cash, barFeed, commission=None):
        super(Broker, self).__init__()

        assert(cash >= 0)
        self.__cash = cash
        if commission is None:
            self.__commission = NoCommission()
        else:
            self.__commission = commission
        self.__shares = {}
        self.__activeOrders = {}
        self.__useAdjustedValues = False
        # 持倉策略, 使用DefaultStrategy
        # 使用DefaultStrategy.volumeLimit = 0.25
        # 當交易訂單的成交量大於當前bar的成交量的25%則不能成交
        # 沒有滑點
        # 沒有手續費
        self.__fillStrategy = fillstrategy.DefaultStrategy()
        self.__logger = logger.getLogger(Broker.LOGGER_NAME)

        # 讓barfeed對象訂閱self.onBars方法
        barFeed.getNewValuesEvent().subscribe(self.onBars)
        self.__barFeed = barFeed
        self.__allowNegativeCash = False
        self.__nextOrderId = 1

由上可知,當barFeed對象數據更新的時候,還會調用BackTestBroker.onBars方法.

交易倉位 Position對象

當使用enterLong之類交易方法,則會返回一個Postion的對象,這個對象承載著當前各股的持倉比例,以及持有現金.

以enterLong方法說明持倉流程.

  1. 實例化一個LongPosition對象
  2. 調用broker的createMarketOrder方法創建一個MarketOrder.
  3. 註冊order, 以便barFeed對象數據驅動的時候,使用該order

以exitMarket方法說明平倉流程.

  1. 使用Position對象的exitMarket方法提交平倉訂單.
  2. 註冊order, 以便barFeed對象數據驅動的時候,使用該order

源代碼調用鏈太長....所以文字概括.

交易訂單 Order對象

當我們買入或者賣出的時候,其實是提交一個訂單給交易賬戶(broker), 交易賬戶會根據交易訂單的類型,動作等相關信息執行相關的操作.

交易訂單的類型參考: https://www.thebalance.com/understanding-stock-orders-3141318

一般有買入(做多), 賣出(做空)兩種交易類型, 但是這兩種類型成交的方式分別由市價成交, 限價成交.

所以一共由以下四種類型,對應Strategy的四個方法:

  1. enterLong 以市價(下一個Bar的開盤價)買入
  2. enterLongLimit 當市價(下一個Bar的開盤價)低於或等於指定的價格時買入
  3. enterShort 與enterLong相反
  4. enterShortLimit 與enterLongShort相反.

以enter開頭是更加上層的方法, 建議使用.

goodTillCanceled為了適配實盤接口, 實盤接口可能有前一天的訂單不會再執行的限制,所以設置goodTillCanceled=True保證第二天或者更後的時間,訂單依然有效,直至手動取消.

除了提交交易訂單還可以提交止損訂單, 分別對應Strategy的兩個方法.

  1. StopOrder 提交一個止損訂單, 傳入止損價格, 當價格突破止損價位, 以市價成交進行止損.
  2. StopLimitOrder 提交一個止損訂單, 傳入止損價格, 當價格突破止損價位, 並且價格在限定的價格區間才會止損.

每個提交的訂單會到下一個事件循環才會判斷條件是否符合,才會執行.

技術指標 EventBasedFilter對象

通過借助自定義指標或者自帶的指標,如SMA,EMA,MACD等可以更全面的看待股票的走勢以及信號.

下面是技術指標基類的初始化過程.

pyalgotrade/pyalgotrade/technical/__init__.py

class EventWindow(object):
    """數據實際承載類
    數據保存在self__values裏面
    """
    def __init__(self, windowSize, dtype=float, skipNone=True):
        assert(windowSize > 0)
        assert(isinstance(windowSize, int))
        self.__values = collections.NumPyDeque(windowSize, dtype)
        self.__windowSize = windowSize
        self.__skipNone = skipNone

    def onNewValue(self, dateTime, value):
        """提供onNewValue方法將新的值傳入"""
        if value is not None or not self.__skipNone:
            self.__values.append(value)

    def getValues(self):
        """獲取EventWindows的所有值"""
        return self.__values.data()

    def getWindowSize(self):
        """獲取EventWindow Size"""
        return self.__windowSize

    def windowFull(self):
        """eventWindow是否已經填滿"""
        return len(self.__values) == self.__windowSize

    def getValue(self):
        """子類須實現的類"""
        raise NotImplementedError()

class EventBasedFilter(dataseries.SequenceDataSeries):
    def __init__(self, dataSeries, eventWindow, maxLen=None):
        super(EventBasedFilter, self).__init__(maxLen)
        self.__dataSeries = dataSeries
        # 當dataseries數據有新值的時候,調用self.__onNewValues方法
        self.__dataSeries.getNewValueEvent().subscribe(self.__onNewValue)
        self.__eventWindow = eventWindow

    def __onNewValue(self, dataSeries, dateTime, value):
        # 讓EventWindow對象計算新值
        self.__eventWindow.onNewValue(dateTime, value)
        # 獲取計算後的結果
        newValue = self.__eventWindow.getValue()
        # 將值保存到自身實例裏面, 即self.__values
        # 因為繼承了dataseries.SequenceDataSeries類
        # 而dataseries.SequenceDataSeries父類實現了__getitem__方法, 所以可以使用索引取值.
        self.appendWithDateTime(dateTime, newValue)

    def getDataSeries(self):
        return self.__dataSeries

    def getEventWindow(self):
        return self.__eventWindow

在Feed對象初始過程中,會初始化兩個比較重要的數據結構, 一個是self._bars, 一個是self._ds,在整個事件驅動中, 策略不停的從self_bars中取數據,然後使用appendWithDateTime方法將數據追加的self._ds裏面。
源碼如下:

pyalgotrade/pyalgotrade/dataseries/bards.py

# 首先調用BarDataSeries的appendWithDateTime方法
class BarDataSeries(dataseries.SequenceDataSeries):
    def appendWithDateTime(self, dateTime, bar):
        assert(dateTime is not None)
        assert(bar is not None)
        bar.setUseAdjustedValue(self.__useAdjustedValues)

        super(BarDataSeries, self).appendWithDateTime(dateTime, bar)

        self.__openDS.appendWithDateTime(dateTime, bar.getOpen())
        self.__closeDS.appendWithDateTime(dateTime, bar.getClose())
        self.__highDS.appendWithDateTime(dateTime, bar.getHigh())
        self.__lowDS.appendWithDateTime(dateTime, bar.getLow())
        self.__volumeDS.appendWithDateTime(dateTime, bar.getVolume())
        self.__adjCloseDS.appendWithDateTime(dateTime, bar.getAdjClose())

        # Process extra columns.
        for name, value in bar.getExtraColumns().iteritems():
            extraDS = self.__getOrCreateExtraDS(name)
            extraDS.appendWithDateTime(dateTime, value)

pyalgotrade/dataseries/__init__.py

# 然後調用SequenceDataSeries對象的appendWithDateTime
# 在這個方法中提交數據更新的事件
class SequenceDataSeries(DataSeries):
    def appendWithDateTime(self, dateTime, value):
        """
        Appends a value with an associated datetime.

        .. note::
            If dateTime is not None, it must be greater than the last one.
        """

        if dateTime is not None and len(self.__dateTimes) != 0 and self.__dateTimes[-1] >= dateTime:
            raise Exception("Invalid datetime. It must be bigger than that last one")

        assert(len(self.__values) == len(self.__dateTimes))
        self.__dateTimes.append(dateTime)
        self.__values.append(value)

        self.getNewValueEvent().emit(self, dateTime, value)            

小結

使用技術指標需要傳入dataSeries對象, 可以通過getPriceDataSeries, getOpenDataSeries等獲得.

創建策略

由於上面已經有完整版本的代碼,這裏做一定的刪減, 並做註解.

# 集成strategy.BacktestingStrategy類
class MyStrategy(strategy.BacktestingStrategy):
    def __init__(self, feed, instrument, smaPeriod):
        # 調用父類__init__方法
        super(MyStrategy, self).__init__(feed, 1000)
        # 初始情況下,postion設置為零, postion一般只持倉比例
        self.__position = None
        # 股票代碼
        self.__instrument = instrument
        # We‘ll use adjusted close values instead of regular close values.
        # 是否使用復權收盤價
        self.setUseAdjustedValues(True)
        # 初始化策略指標
        self.__sma = ma.SMA(feed[instrument].getPriceDataSeries(), smaPeriod)

    # 省略其他鉤子函數

    # 必須實現的onBars函數,用於買賣的主要邏輯
    def onBars(self, bars):
        # 如果沒有簡單移動平均值則什麽都不做
        if self.__sma[-1] is None:
            return

        # 取出指定股票代碼的bar對象
        bar = bars[self.__instrument]

        # 如果postion is None,即持倉為0
        if self.__position is None:
            # 如果收盤價大於簡單移動平均值則買入
            if bar.getPrice() > self.__sma[-1]:
                # 買入,enterLong=做多
                self.__position = self.enterLong(self.__instrument, 10, True)
        # 反之賣出
        elif bar.getPrice() < self.__sma[-1] and not self.__position.exitActive():
            self.__position.exitMarket()

總結

BarFeed像是PyalgoTrade中的燃料,不斷的供給給策略的Dispatcher調度器, 使整個策略不斷運行,直至沒有燃料(沒有新的數據.)

BarFeed使數據源的一個抽象,裏面保存著兩個重要的數據結構, self._bars, self._ds.

self.__bars是key為股票代碼, value是元素為bar數據對象的列表的字典.

self.__ds為BarDataSeries對象.

Broker維護著虛擬賬戶裏面的現金以及相關股票的倉位.接收訂單並實時的處理訂單, 計算收益等.

Position為股票倉位持有情況的對象, 提供交易的相關接口.

EventBasedFilter為技術指標, 可以計算相關指標如MACD, SMA等, 也可以自定義自己的技術指標.

Strategy為自定義策略,只需實現onBars函數即可完成買賣邏輯, 將Broker,Position相關接口放在Strategy實例方法裏面, 同一調用接口.

PyalgoTrade源碼閱讀完結篇