使用LSTM預測股票市場基於Tensorflow
前言
在本文開始前,作者並沒有提倡LSTM是一種高度可靠的模型,它可以很好地利用股票資料中的內在模式,或者可以在沒有任何人蔘與的情況下使用。寫這篇文章,純粹是出於對機器學習的熱愛。在我看來,該模型已經觀察到了資料中的某些模式,因此它可以在大多數時候正確預測股票的走勢。但是,這個模型是否可以用於實際,有待用更多回測和實踐去驗證。
為什麼需要時間序列模型?
你想要正確地模擬股票價格,因此作為股票買家,你可以合理地決定什麼時候買股票,什麼時候賣股票。這就是時間序列建模的切入點。你需要良好的機器學習模型,可以檢視資料序列的歷史記錄,並正確地預測序列的未來元素是什麼。
提示:股市價格高度不可預測且不穩定。這意味著在資料中沒有一致的模式可以讓你近乎完美地模擬股票價格。就像普林斯頓大學經濟學家Burton Malkiel在他1973年的書中寫到的:“隨機漫步華爾街”,如果市場確實是有效的,那麼當股票價格反應的所有因素一旦被公開時,那我們閉著眼睛都可以做的和專業投資者一樣好。
但是,我們不要一直相信這只是一個隨機過程,覺得機器學習是沒有希望的。你不需要預測未來的股票確切的價格,而是股票價格的變動。做到這點就很不錯了!
資料準備
使用以下資料來源:
當然你也可基於Wind資料庫去研究。因為Wind資料相對於其他平臺和資料商而言,總體上在國內算是比較全面和準確的。
從Kaggle獲得資料
在Kaggle上找到的資料是csv檔案,所以,你不需要再進行任何的預處理,因此你可以直接將資料載入到DataFrame中。同時你還應該確保資料是按日期排序的,因為資料的順序在時間序列建模中至關重要。
df = df.sort_values('Date') df.head()
資料視覺化
plt.figure(figsize = (18,9))
plt.plot(range(df.shape[0]),(df['Low']+df['High'])/2.0)
plt.xticks(range(0,df.shape[0],500),df['Date'].loc[::500],rotation=45)
plt.xlabel('Date',fontsize=18)
plt.ylabel('Mid Price',fontsize=18)
plt.show()
上圖已經說明了很多東西。我選擇這家公司而不是其他公司的具體原因是,隨著時間的推移,這張圖中展現了不同的股價行為。這將使學習更加穩健,並且可以更改以便測試各種情況下預測的好壞程度。
資料拆分訓練集和測試集
計算一天中最高和最低價的平均值來計算的中間價格。
high_prices = df.loc[:,'High'].as_matrix()
low_prices = df.loc[:,'Low'].as_matrix()
mid_prices = (high_prices+low_prices)/2.0
現在你可以分離訓練資料和測試資料。訓練資料是時間序列的前11000個數據,其餘的是測試資料。
train_data = mid_prices[:11000]
test_data = mid_prices[11000:]
現在需要定義一個標準對資料進行歸一化。MinMaxScalar方法將所有資料歸到0和1之間。你還可以將訓練和測試資料重新組為[data_size, num_features]。
scaler = MinMaxScaler()
train_data = train_data.reshape(-1,1)
test_data = test_data.reshape(-1,1)
根據之前得資料,可以看出不同時間段有不同的取值範圍,你可以將整個序列拆分為視窗來進行歸一化。如果不這樣做,早期的資料接近於0,並且不會給學習過程增加太多價值。這裡你選擇的視窗大小是2500。
當選擇視窗大小時,確保它不是太小,因為當執行視窗規範化時,它會在每個視窗的末尾引入一箇中斷,因為每個視窗都是**規範化的。
在本例中,4個數據點將受此影響。但假設你有11000個數據點,4個點不會引起任何問題。
smoothing_window_size = 2500
for di in range(0,10000,smoothing_window_size):
scaler.fit(train_data[di:di+smoothing_window_size,:])
train_data[di:di+smoothing_window_size,:] = scaler.transform(train_data[di:di+smoothing_window_size,:])
# You normalize the last bit of remaining data
scaler.fit(train_data[di+smoothing_window_size:,:])
train_data[di+smoothing_window_size:,:] = scaler.transform(train_data[di+smoothing_window_size:,:])
將資料重新塑造為[data_size]的Shape:
train_data = train_data.reshape(-1)
test_data = scaler.transform(test_data).reshape(-1)
現在可以使用指數移動平均平滑資料。可以幫助你避免股票價格資料的雜亂,併產生更平滑的曲線。我們只使用訓練資料來訓練MinMaxScaler,通過將MinMaxScaler與測試資料進行匹配來規範化測試資料是錯誤的。
注意:你應該只平滑訓練資料。
EMA = 0.0
gamma = 0.1
for ti in range(11000):
EMA = gamma*train_data[ti] + (1-gamma)*EMA
train_data[ti] = EMA
all_mid_data = np.concatenate([train_data,test_data],axis=0)
下面是平均結果。它非常接近股票的實際行為。接下來您將看到一個更精確的一步預測方法:
上面的圖(和MSE)說明了什麼呢?對於非常短的predictiosn(一天之後)來說,這個模型似乎不算太壞。考慮到股票價格在一夜之間不會從0變化到100,這種行為是明智的。接下來我們來看一種更有趣的平均技術,稱為指數移動平均。
指數移動平均線
你可能在網際網路上看到過一些文章使用非常複雜的模型來預測股票市場的行為。但是要小心!我所看到的這些都只是視覺錯覺,不是因為學習了有用的東西。下面你將看到如何使用簡單的平均方法複製這種行為。
window_size = 100
N = train_data.size
run_avg_predictions = []
run_avg_x = []
mse_errors = []
running_mean = 0.0
run_avg_predictions.append(running_mean)
decay = 0.5
for pred_idx in range(1,N):
running_mean = running_mean*decay + (1.0-decay)*train_data[pred_idx-1]
run_avg_predictions.append(running_mean)
mse_errors.append((run_avg_predictions[-1]-train_data[pred_idx])**2)
run_avg_x.append(date)
print('MSE error for EMA averaging: %.5f'%(0.5*np.mean(mse_errors)))
MSE error for EMA averaging: 0.00003
如果指數移動平均線很好,為什麼需要更好的模型呢?
可以看到,它符合遵循真實分佈的完美直線(通過非常低的MSE證明了這一點)。實際上,僅憑第二天的股票市值,你就做不了什麼。就我個人而言,我想要的不是第二天股市的確切價格,而是未來30天股市的價格會上漲還是下跌
讓我們試著在視窗中進行預測(假設你預測接下來兩天的視窗,而不是第二天)。然後你就會意識到EMA會有多麼的失敗。讓我們通過一個例子來理解這一點。
不管你預測未來的步驟是多少,你都會得到相同的答案。
輸出有用資訊的一種解決方案是檢視基於動量演算法。他們的預測是基於過去的近期值是上升還是下降(而不是精確的數值)。例如,如果過去幾天的價格一直在下降,第二天的價格可能會更低。這聽起來很合理。然而,我們將使用更復雜的模型:LSTM。
評價結果
我們將使用均值平方誤差來計算我們的模型有多好。均值平方誤差(MSE)的計算方法是先計算真實值與預測值之間的平方誤差,然後對所有的預測進行平均。但是:
平均預測是一種很好的預測方法(這對股票市場的預測不是很有用),但對未來的預測並不是很有用。
LSTM簡介
長短時記憶模型是非常強大的時間序列模型。它們可以預測未來任意數量的步驟。LSTM模組(或單元)有5個基本元件,可以對長期和短期資料進行建模。
LSTM單元格如下所示:
計算方程如下:
Tensorflow為實現時間序列模型提供了一個很好的子API。後面我們會使用到它。
LSTM資料生成器
首先要實現一個數據生成器來訓練LSTM。這個資料生成器將有一個名為unroll_batch(…)的方法,該方法將輸出一組按順序批量獲取num_unrollings的輸入資料,其中批資料的大小為[batch_size, 1]。然後每批輸入資料都有相應的輸出資料。
例如,如果num_unrollings=3和batch_size=4則看起來像一組展開的批次。
輸入資料: [x0,x10,x20,x30],[x1,x11,x21,x31],[x2,x12,x22,x32]
輸出資料: [x1,x11,x21,x31],[x2,x12,x22,x32],[x3,x13,x23,x33]
資料生成器
下面將演示如何視覺化建立一批資料。基本思想是將資料序列劃分為N / b段,使每個段的大小為b,然後定義遊標每段為1。然後對單個數據進行抽樣,我們得到一個輸入(當前段遊標索引)和一個真實預測(在[當前段遊標+1,當前段遊標+5]之間隨機抽樣)。請注意,我們並不總是得到輸入旁邊的值,就像它的預測一樣。這是一個減少過擬合的步驟。在每次抽樣結束時,我們將游標增加1。
定義超引數
在本節中,將定義幾個超引數。D是輸入的維數。很簡單,你以之前的股票價格作為輸入並預測下一個應該為1。然後是num_unrollings,它表示單個優化步驟需要考慮多少連續時間步驟。越大越好。然後是batch_size。批量處理大小是在單個時間步驟中考慮的資料樣本的數量。越大越好,因為在給定的時間內資料的可見性越好。接下來定義num_nodes,它表示每個單元格中隱藏的神經元數量。在這個示例中,你可以看到有三層LSTM。
D = 1
num_unrollings = 50
batch_size = 500
num_nodes = [200,200,150]
n_layers = len(num_nodes)
dropout = 0.2
tf.reset_default_graph()
定義輸入和輸出
接下來為訓練輸入和標籤定義佔位符。這非常簡單,因為你有一個輸入佔位符列表,其中每個佔位符包含一批資料。 該列表包含num_unrollings佔位符,它將用於單個優化步驟。
train_inputs, train_outputs = [],[]
for ui in range(num_unrollings):
train_inputs.append(tf.placeholder(tf.float32, shape=[batch_size,D],name='train_inputs_%d'%ui))
train_outputs.append(tf.placeholder(tf.float32, shape=[batch_size,1], name = 'train_outputs_%d'%ui))
定義LSTM和迴歸層的引數
用三個LSTM層和一個線性迴歸層,用w和b表示,該層提取最後一個長短期記憶體單元的輸出並輸出對下一個時間步驟的預測。你可以使用TensorFlow中的MultiRNNCell來封裝建立的三個LSTMCell物件。此外,還可以使用dropout實現LSTM單元格,因為它們可以提高效能並減少過擬合。
lstm_cells = [
tf.contrib.rnn.LSTMCell(num_units=num_nodes[li],
state_is_tuple=True,
initializer= tf.contrib.layers.xavier_initializer()
)
for li in range(n_layers)]
drop_lstm_cells = [tf.contrib.rnn.DropoutWrapper(
lstm, input_keep_prob=1.0,output_keep_prob=1.0-dropout, state_keep_prob=1.0-dropout
) for lstm in lstm_cells]
drop_multi_cell = tf.contrib.rnn.MultiRNNCell(drop_lstm_cells)
multi_cell = tf.contrib.rnn.MultiRNNCell(lstm_cells)
w = tf.get_variable('w',shape=[num_nodes[-1], 1], initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable('b',initializer=tf.random_uniform([1],-0.1,0.1))
計算LSTM輸出並將其輸入迴歸層,得到最終預測結果
首先建立TensorFlow變數(c和h),它將保持單元狀態和長短期記憶單元的隱藏狀態。 然後將train_input列表轉換為[num_unrollings, batch_size, D],使用tf.nn.dynamic_rnn計算所需輸出。然後使用tf.nn.dynamic_rnn計算LSTM輸出。並將輸出分解為一列num_unrolling的張量。預測和真實股價之間的損失。
c, h = [],[]
initial_state = []
for li in range(n_layers):
c.append(tf.Variable(tf.zeros([batch_size, num_nodes[li]]), trainable=False))
h.append(tf.Variable(tf.zeros([batch_size, num_nodes[li]]), trainable=False))
initial_state.append(tf.contrib.rnn.LSTMStateTuple(c[li], h[li]))
all_inputs = tf.concat([tf.expand_dims(t,0) for t in train_inputs],axis=0)
all_lstm_outputs, state = tf.nn.dynamic_rnn(
drop_multi_cell, all_inputs, initial_state=tuple(initial_state),
time_major = True, dtype=tf.float32)
all_lstm_outputs = tf.reshape(all_lstm_outputs, [batch_size*num_unrollings,num_nodes[-1]])
all_outputs = tf.nn.xw_plus_b(all_lstm_outputs,w,b)
split_outputs = tf.split(all_outputs,num_unrollings,axis=0)
損失計算和優化器
現在,要計算損失。然而,在計算損失時,你應該注意到有一個獨特的特徵。對於每一批預測和真實輸出,計算均方誤差。然後把所有這些均方損失加起來(不是平均值)。最後,定義要用來優化神經網路的優化器在這種情況下,您可以使用Adam,這是一個非常新且效能良好的優化器。
print('Defining training Loss')
loss = 0.0
with tf.control_dependencies([tf.assign(c[li], state[li][0]) for li in range(n_layers)]+
[tf.assign(h[li], state[li][1]) for li in range(n_layers)]):
for ui in range(num_unrollings):
loss += tf.reduce_mean(0.5*(split_outputs[ui]-train_outputs[ui])**2)
print('Learning rate decay operations')
global_step = tf.Variable(0, trainable=False)
inc_gstep = tf.assign(global_step,global_step + 1)
tf_learning_rate = tf.placeholder(shape=None,dtype=tf.float32)
tf_min_learning_rate = tf.placeholder(shape=None,dtype=tf.float32)
learning_rate = tf.maximum(
tf.train.exponential_decay(tf_learning_rate, global_step, decay_steps=1, decay_rate=0.5, staircase=True),
tf_min_learning_rate)
# Optimizer.
print('TF Optimization operations')
optimizer = tf.train.AdamOptimizer(learning_rate)
gradients, v = zip(*optimizer.compute_gradients(loss))
gradients, _ = tf.clip_by_global_norm(gradients, 5.0)
optimizer = optimizer.apply_gradients(
zip(gradients, v))
print('\tAll done')
這裡定義了與預測相關的TensorFlow操作。首先,為輸入(sample_inputs)定義一個佔位符,然後與訓練階段類似,定義預測的狀態變數(sample_c和sample_h)。最後用tf.nn.dynamic_rnn計算預測。後通過迴歸層(w和b)傳送輸出。 還應該定義reset_sample_state操作,該操作將重置單元狀態和隱藏狀態。 每次進行一系列預測時,都應該在開始時執行此操作。
print('Defining prediction related TF functions')
sample_inputs = tf.placeholder(tf.float32, shape=[1,D])
sample_c, sample_h, initial_sample_state = [],[],[]
for li in range(n_layers):
sample_c.append(tf.Variable(tf.zeros([1, num_nodes[li]]), trainable=False))
sample_h.append(tf.Variable(tf.zeros([1, num_nodes[li]]), trainable=False))
initial_sample_state.append(tf.contrib.rnn.LSTMStateTuple(sample_c[li],sample_h[li]))
reset_sample_states = tf.group(*[tf.assign(sample_c[li],tf.zeros([1, num_nodes[li]])) for li in range(n_layers)],
*[tf.assign(sample_h[li],tf.zeros([1, num_nodes[li]])) for li in range(n_layers)])
sample_outputs, sample_state = tf.nn.dynamic_rnn(multi_cell, tf.expand_dims(sample_inputs,0),
initial_state=tuple(initial_sample_state),
time_major = True,
dtype=tf.float32)
with tf.control_dependencies([tf.assign(sample_c[li],sample_state[li][0]) for li in range(n_layers)]+
[tf.assign(sample_h[li],sample_state[li][1]) for li in range(n_layers)]):
sample_prediction = tf.nn.xw_plus_b(tf.reshape(sample_outputs,[1,-1]), w, b)
print('\tAll done')
執行LSTM
在這裡,你將訓練和預測幾個時期的股票價格走勢,看看這些預測是否會隨著時間的推移而變得更好或更糟。按照以下步驟操作:
- 在時間序列上定義一組測試起點(test_points_seq)來計算LSTM
- 對於每一個epoch
- 用於訓練資料的完整序列長度
- 展開一組num_unrollings批次
- 使用展開的批次LSTM進行訓練
- 計算平均訓練損失
- 對於測試集中的每個起點
- 通過迭代在測試點之前找到的以前的num_unrollings資料點來更新LSTM狀態
- 使用先前的預測作為當前輸入,連續預測n_predict_once步驟
- 計算預測到的n_predict_once點與當時股票價格之間的MSE損失
- 用於訓練資料的完整序列長度
部分程式碼
epochs = 30
valid_summary = 1
n_predict_once = 50
train_seq_length = train_data.size
train_mse_ot = []
test_mse_ot = []
predictions_over_time = []
session = tf.InteractiveSession()
tf.global_variables_initializer().run()
loss_nondecrease_count = 0
loss_nondecrease_threshold = 2
print('Initialized')
average_loss = 0
data_gen = DataGeneratorSeq(train_data,batch_size,num_unrollings)
x_axis_seq = []
test_points_seq = np.arange(11000,12000,50).tolist()
for ep in range(epochs):
# ========================= Training =====================================
for step in range(train_seq_length//batch_size):
u_data, u_labels = data_gen.unroll_batches()
feed_dict = {}
for ui,(dat,lbl) in enumerate(zip(u_data,u_labels)):
feed_dict[train_inputs[ui]] = dat.reshape(-1,1)
feed_dict[train_outputs[ui]] = lbl.reshape(-1,1)
feed_dict.update({tf_learning_rate: 0.0001, tf_min_learning_rate:0.000001})
_, l = session.run([optimizer, loss], feed_dict=feed_dict)
視覺化預測
可以看到MSE損失是如何隨著訓練量的減少而減少的。這是一個好跡象,表明模型正在學習一些有用的東西。你可以看到LSTM比標準平均值做得更好。標準平均(雖然不完美)合理地跟隨真實的股票價格運動。
best_prediction_epoch = 28
plt.figure(figsize = (18,18))
plt.subplot(2,1,1)
plt.plot(range(df.shape[0]),all_mid_data,color='b')
predictions with high alpha
start_alpha = 0.25
alpha = np.arange(start_alpha,1.1,(1.0-start_alpha)/len(predictions_over_time[::3]))
for p_i,p in enumerate(predictions_over_time[::3]):
for xval,yval in zip(x_axis_seq,p):
plt.plot(xval,yval,color='r',alpha=alpha[p_i])
plt.title('Evolution of Test Predictions Over Time',fontsize=18)
plt.xlabel('Date',fontsize=18)
plt.ylabel('Mid Price',fontsize=18)
plt.xlim(11000,12500)
plt.subplot(2,1,2)
plt.plot(range(df.shape[0]),all_mid_data,color='b')
for xval,yval in zip(x_axis_seq,predictions_over_time[best_prediction_epoch]):
plt.plot(xval,yval,color='r')
plt.title('Best Test Predictions Over Time',fontsize=18)
plt.xlabel('Date',fontsize=18)
plt.ylabel('Mid Price',fontsize=18)
plt.xlim(11000,12500)
plt.show()
儘管LSTM並不完美,但它似乎在大多數情況下都能正確預測股價走勢。請注意,你的預測大致在0和1之間(也就是說,不是真實的股票價格)。這是可以的,因為你預測的是股價的走勢,而不是股價本身。
結論
股票價格/移動預測是一項極其困難的任務。就我個人而言,我認為任何股票預測模型都不應該被視為理所當然,並且盲目地依賴它們。然而,模型在大多數情況下可能能夠正確預測股票價格的變動,但並不總是如此。
不要被那些預測曲線完全與真實股價重疊的文章所迷惑。這可以用一個簡單的平均技術來複制,但實際上它是無用的。更明智的做法是預測股價走勢。
模型的超引數對你得到的結果非常敏感。因此,一個非常好的事情是在超引數上執行一些超引數優化技術(例如,網格搜尋/隨機搜尋)。這裡列出了一些最關鍵的超引數:優化器的學習率、層數、每層的隱藏單元數,優化器Adam表現最佳,模型的型別(GRU / LSTM / LSTM with peepholes)。
由於本文由於資料量小,我們用測試損耗來衰減學習速率。這間接地將測試集的資訊洩露到訓練過程中。處理這個問題更好的方法是有一個單獨的驗證集(除了測試集)與驗證集效能相關的衰減學習率。