深度有趣 | 10 股票價格預測
簡介
股票價格預測是一件非常唬人的事情,但如果只基於歷史資料進行預測,顯然完全不靠譜
股票價格是典型的時間序列資料(簡稱時序資料),會受到經濟環境、政府政策、人為操作多種複雜因素的影響
不像氣象資料那樣具備明顯的時間和季節性模式,例如一天之內和一年之內的氣溫變化等
儘管如此,以股票價格為例,介紹如何對時序資料進行預測,仍然值得一做
以下使用TensorFlow和Keras,對S&P 500
股價資料進行分析和預測
資料
S&P 500
股價資料爬取自Google Finance API,已經進行過缺失值處理
載入庫,pandas主要用於資料清洗和整理
# -*- coding: utf-8 -*- import pandas as pd import numpy as np import tensorflow as tf import matplotlib.pyplot as plt %matplotlib inline from sklearn.preprocessing import MinMaxScaler import time
用pandas讀取csv檔案為DataFrame,並用describe()
檢視特徵的數值分佈
data = pd.read_csv('data_stocks.csv')
data.describe()
還可以用info()
檢視特徵的概要
data.info()
資料共502列,41266行,502列分別為:
DATE
:該行資料的時間戳SP500
:可以理解為大盤指數- 其他:可以理解為500支個股的股價
檢視資料的前五行
data.head()
檢視時間跨度
print(time.strftime('%Y-%m-%d', time.localtime(data['DATE'].max())), time.strftime('%Y-%m-%d', time.localtime(data['DATE'].min())))
繪製大盤趨勢折線圖
plt.plot(data['SP500'])
去掉DATE
一列,訓練集測試集分割
data.drop('DATE', axis=1, inplace=True)
data_train = data.iloc[:int(data.shape[0] * 0.8), :]
data_test = data.iloc[int(data.shape[0] * 0.8):, :]
print(data_train.shape, data_test.shape)
資料歸一化,只能使用data_train
進行fit()
scaler = MinMaxScaler(feature_range=(-1, 1)) scaler.fit(data_train) data_train = scaler.transform(data_train) data_test = scaler.transform(data_test)
同步預測
同步預測是指,使用當前時刻的500支個股股價,預測當前時刻的大盤指數,即一個迴歸問題,輸入共500維特徵,輸出一維,即[None, 500] => [None, 1]
使用TensorFlow實現同步預測,主要用到多層感知機(Multi-Layer Perceptron,MLP),損失函式用均方誤差(Mean Square Error,MSE)
X_train = data_train[:, 1:]
y_train = data_train[:, 0]
X_test = data_test[:, 1:]
y_test = data_test[:, 0]
input_dim = X_train.shape[1]
hidden_1 = 1024
hidden_2 = 512
hidden_3 = 256
hidden_4 = 128
output_dim = 1
batch_size = 256
epochs = 10
tf.reset_default_graph()
X = tf.placeholder(shape=[None, input_dim], dtype=tf.float32)
Y = tf.placeholder(shape=[None], dtype=tf.float32)
W1 = tf.get_variable('W1', [input_dim, hidden_1], initializer=tf.contrib.layers.xavier_initializer(seed=1))
b1 = tf.get_variable('b1', [hidden_1], initializer=tf.zeros_initializer())
W2 = tf.get_variable('W2', [hidden_1, hidden_2], initializer=tf.contrib.layers.xavier_initializer(seed=1))
b2 = tf.get_variable('b2', [hidden_2], initializer=tf.zeros_initializer())
W3 = tf.get_variable('W3', [hidden_2, hidden_3], initializer=tf.contrib.layers.xavier_initializer(seed=1))
b3 = tf.get_variable('b3', [hidden_3], initializer=tf.zeros_initializer())
W4 = tf.get_variable('W4', [hidden_3, hidden_4], initializer=tf.contrib.layers.xavier_initializer(seed=1))
b4 = tf.get_variable('b4', [hidden_4], initializer=tf.zeros_initializer())
W5 = tf.get_variable('W5', [hidden_4, output_dim], initializer=tf.contrib.layers.xavier_initializer(seed=1))
b5 = tf.get_variable('b5', [output_dim], initializer=tf.zeros_initializer())
h1 = tf.nn.relu(tf.add(tf.matmul(X, W1), b1))
h2 = tf.nn.relu(tf.add(tf.matmul(h1, W2), b2))
h3 = tf.nn.relu(tf.add(tf.matmul(h2, W3), b3))
h4 = tf.nn.relu(tf.add(tf.matmul(h3, W4), b4))
out = tf.transpose(tf.add(tf.matmul(h4, W5), b5))
cost = tf.reduce_mean(tf.squared_difference(out, Y))
optimizer = tf.train.AdamOptimizer().minimize(cost)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for e in range(epochs):
shuffle_indices = np.random.permutation(np.arange(y_train.shape[0]))
X_train = X_train[shuffle_indices]
y_train = y_train[shuffle_indices]
for i in range(y_train.shape[0] // batch_size):
start = i * batch_size
batch_x = X_train[start : start + batch_size]
batch_y = y_train[start : start + batch_size]
sess.run(optimizer, feed_dict={X: batch_x, Y: batch_y})
if i % 50 == 0:
print('MSE Train:', sess.run(cost, feed_dict={X: X_train, Y: y_train}))
print('MSE Test:', sess.run(cost, feed_dict={X: X_test, Y: y_test}))
y_pred = sess.run(out, feed_dict={X: X_test})
y_pred = np.squeeze(y_pred)
plt.plot(y_test, label='test')
plt.plot(y_pred, label='pred')
plt.title('Epoch ' + str(e) + ', Batch ' + str(i))
plt.legend()
plt.show()
最後測試集的loss在0.005左右,預測結果如下
使用Keras實現同步預測,程式碼量會少很多,但具體實現細節不及TensorFlow靈活
from keras.layers import Input, Dense
from keras.models import Model
X_train = data_train[:, 1:]
y_train = data_train[:, 0]
X_test = data_test[:, 1:]
y_test = data_test[:, 0]
input_dim = X_train.shape[1]
hidden_1 = 1024
hidden_2 = 512
hidden_3 = 256
hidden_4 = 128
output_dim = 1
batch_size = 256
epochs = 10
X = Input(shape=[input_dim,])
h = Dense(hidden_1, activation='relu')(X)
h = Dense(hidden_2, activation='relu')(h)
h = Dense(hidden_3, activation='relu')(h)
h = Dense(hidden_4, activation='relu')(h)
Y = Dense(output_dim, activation='sigmoid')(h)
model = Model(X, Y)
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, shuffle=False)
y_pred = model.predict(X_test)
print('MSE Train:', model.evaluate(X_train, y_train, batch_size=batch_size))
print('MSE Test:', model.evaluate(X_test, y_test, batch_size=batch_size))
plt.plot(y_test, label='test')
plt.plot(y_pred, label='pred')
plt.legend()
plt.show()
最後測試集的loss在0.007左右,預測結果如下
非同步預測
非同步預測是指,使用歷史若干個時刻的大盤指數,預測當前時刻的大盤指數,這樣才更加符合預測的定義
例如,使用前五個大盤指數,預測當前的大盤指數,每組輸入包括5個step,每個step對應一個歷史時刻的大盤指數,輸出一維,即[None, 5, 1] => [None, 1]
使用Keras實現非同步預測,主要用到迴圈神經網路即RNN(Recurrent Neural Network)中的LSTM(Long Short-Term Memory)
from keras.layers import Input, Dense, LSTM
from keras.models import Model
output_dim = 1
batch_size = 256
epochs = 10
seq_len = 5
hidden_size = 128
X_train = np.array([data_train[i : i + seq_len, 0] for i in range(data_train.shape[0] - seq_len)])[:, :, np.newaxis]
y_train = np.array([data_train[i + seq_len, 0] for i in range(data_train.shape[0] - seq_len)])
X_test = np.array([data_test[i : i + seq_len, 0] for i in range(data_test.shape[0] - seq_len)])[:, :, np.newaxis]
y_test = np.array([data_test[i + seq_len, 0] for i in range(data_test.shape[0] - seq_len)])
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
X = Input(shape=[X_train.shape[1], X_train.shape[2],])
h = LSTM(hidden_size, activation='relu')(X)
Y = Dense(output_dim, activation='sigmoid')(h)
model = Model(X, Y)
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, shuffle=False)
y_pred = model.predict(X_test)
print('MSE Train:', model.evaluate(X_train, y_train, batch_size=batch_size))
print('MSE Test:', model.evaluate(X_test, y_test, batch_size=batch_size))
plt.plot(y_test, label='test')
plt.plot(y_pred, label='pred')
plt.legend()
plt.show()
最後測試集的loss在0.0015左右,預測結果如下,一層LSTM的效果已經好非常多了
當然,還有一種可能的嘗試,使用歷史若干個時刻的500支個股股價以及大盤指數,預測當前時刻的大盤指數,即[None, 5, 501] => [None, 1]
from keras.layers import Input, Dense, LSTM
from keras.models import Model
output_dim = 1
batch_size = 256
epochs = 10
seq_len = 5
hidden_size = 128
X_train = np.array([data_train[i : i + seq_len, :] for i in range(data_train.shape[0] - seq_len)])
y_train = np.array([data_train[i + seq_len, 0] for i in range(data_train.shape[0] - seq_len)])
X_test = np.array([data_test[i : i + seq_len, :] for i in range(data_test.shape[0] - seq_len)])
y_test = np.array([data_test[i + seq_len, 0] for i in range(data_test.shape[0] - seq_len)])
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
X = Input(shape=[X_train.shape[1], X_train.shape[2],])
h = LSTM(hidden_size, activation='relu')(X)
Y = Dense(output_dim, activation='sigmoid')(h)
model = Model(X, Y)
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, shuffle=False)
y_pred = model.predict(X_test)
print('MSE Train:', model.evaluate(X_train, y_train, batch_size=batch_size))
print('MSE Test:', model.evaluate(X_test, y_test, batch_size=batch_size))
plt.plot(y_test, label='test')
plt.plot(y_pred, label='pred')
plt.legend()
plt.show()
最後的loss在0.004左右,結果反而變差了
500支個股加上大盤指數的預測效果,還不如僅使用大盤指數
說明特徵並不是越多越好,有時候反而會引入不必要的噪音
由於並未涉及到複雜的CNN或RNN,所以在CPU上執行的速度還可以