用LSTM進行時間序列預測
阿新 • • 發佈:2021-09-01
LSTM(long short-term memory)長短期記憶網路是一種比較老的處理NLP的模型,但是其在時間序列預測方面的精度還是不錯的,我這裡以用“流量”資料為例進行時間序列預測。作者使用的是pytorch框架,在jupyter-lab環境下執行。
匯入必要的包
import torch import torch.nn as nn import seaborn as sns import numpy as np import pandas as pd import matplotlib.pylab as plt import sklearn.preprocessing as preprocessing %matplotlib inline
載入資料集
dataset = pd.read_excel('./data3.xlsx')
dataset.columns = ['ds', 'y']
# 列印前5行資料
dataset.head()
輸出的結果如下:
ds表示時間戳,y是每個時間段對應的流量值。為了看的更方便,將ds轉換為標準格式
dataset['ds'] = pd.to_datetime(dataset['ds'], format='%Y%m%d%H')
dataset.head()
結果如下:
劃分測試集和訓練集
為了進行模型的評估,我們需要將資料劃分成訓練集和測試集,這裡我以最後24個小時(即一天)作為測試集:
# 最後24個值作為測試集
test_size = 24
train_set = dataset[:-test_size].y.values
test_set = dataset[-test_size:].y.values
print(train_set.shape)
print(test_set.shape)
將訓練集標準化
為了消除值的量綱的差異,同時,由於神經網路對於資料較大的值比較敏感,運算會增加複雜度,因此我們將訓練集進行標準化,這裡我採用z-score標準化,即:
\[X = \frac{X-\mu}{\sigma}, \]注意,這裡只能將標準化運用到訓練集上,如果運用在測試集上則會造成資料洩露,相當於已經看過了測試集,那麼訓練就沒有意義了。
norm_scaler = preprocessing.StandardScaler()
train_set_normed = norm_scaler.fit_transform(train_set.reshape(-1,1)).reshape(-1, ).tolist()
train_set_normed[:5]
將資料轉換為tensor
由於pytorch只能對tensor型別的資料進行訓練,故需要轉化:
train_set_normed = torch.FloatTensor(train_set_normed)
train_set_normed[:5]
製作用於訓練的sequence
我們採用滑動視窗的形式進行sequence的製作,這裡我以168小時(一週)的資料來預測後24小時(一天)的資料,所以我們標籤視窗的大小為168,標籤視窗的大小為24。相當於每168個數據預測完後24個數據後,視窗向右移動一位,用下一個168資料預測後24個數據,直到看完所有的訓練集:
# 這裡用168小時(即一週)的資料來預測24小時(一天)的資料
train_window_len = 168 # 訓練視窗
pred_window_len = 24 #標籤視窗
l = len(train_set_normed)
input_sequence = []
for i in range(l - train_window_len - pred_window_len + 1):
train_seq = train_set_normed[i:i+train_window_len]
label_seq = train_set_normed[i+train_window_len:i+train_window_len+pred_window_len]
input_sequence.append((train_seq, label_seq))
print(len(input_sequence))
input_sequence[0] # 展示第一個訓練樣本
可以看到,每個訓練樣本由168個輸入值和24個標籤值組成
搭建LSTM模型
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(LSTMModel, self).__init__()
self.hidden_size = hidden_size
# input_size表示輸入的特徵維數
self.lstm = nn.LSTM(input_size, hidden_size)
# output_size表示輸出的特徵維數
self.linear = nn.Linear(hidden_size, output_size)
# memory_cell
self.hidden_cell = self.init_hidden_cell(output_size)
def forward(self, input_seq):
lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq), 1, -1), self.hidden_cell)
# 由於最終的預測值只儲存在最後一個單元格中, 所以只要輸出最後一個
return self.linear(lstm_out.view(len(input_seq), -1))[-1]
def init_hidden_cell(self, output_size):
return (torch.zeros(1,1,self.hidden_size),
torch.zeros(1,1,self.hidden_size))
model = LSTMModel(1, 100, pred_window_len)
# 定義損失函式和優化器
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 迭代次數,這裡為了省時間僅訓練5次
n_epochs = 5
開始訓練
for epoch in range(n_epochs):
train_loss = 0.
for seq, label in input_sequence:
optimizer.zero_grad()
# 重新初始化隱藏層資料,避免受之前執行程式碼的干擾,如果不重新初始化,會有報錯。
model.hidden_cell = model.init_hidden_cell(pred_window_len)
pred_label = model(seq)
loss = criterion(pred_label, label)
train_loss += loss.item()
loss.backward()
optimizer.step()
print(f'epoch{epoch+1:2} loss: {train_loss/len(input_sequence):.4f}')
預測測試集的值進行模型的評估
# 最後168個值,可以用來預測後24個值,即測試集的值
test_inputs = train_set_normed[-train_window_len:].tolist()
model.eval()
pred_value = [] # 用於存放預測值
with torch.no_grad():
test_sequence = torch.FloatTensor(test_inputs)
model.hidden_cell = model.init_hidden_cell(test_size)
pred = model(test_sequence)
pred_value = pred
# 由於前面對訓練集進行了標準化,故預測結果都是小數,這裡需要將預測結果反標準化
pred_value = norm_scaler.inverse_transform(pred_value).tolist()
pred_value[:5]
前5個預測值
畫出預測值和真實值的對比圖
plt.figure(figsize=(15,5),dpi = 500)
plt.grid(linestyle='--')
plt.plot(list(range(24)), test_set, label='actual value')
plt.plot(list(range(24)), pred_value, label='predict value')
plt.legend()
由圖可以看到預測的結果還是不錯的
計算準確率來評估模型
模型的準確率計算公式為:
\[Accuarcy=\left(1-\sqrt{\frac{1}{n}\sum_{i=1}^n E_i^2}\right)\times 100\% \]其中n為預測總數,\(E_i\)為某一點的相對誤差,計算公式為:
\[E_i=\frac{|actual-predict|}{actual}\times 100\% \]relative_error = 0.
for i in range(24):
relative_error += (abs(pred_value[i] - test_set[i]) / test_set[i]) ** 2
acc = 1- np.sqrt(relative_error / 24)
print(f'模型的測試準確率為:{acc*100:.2f}%')
模型的最終準確率為93.38%,可以看到效果還是不錯的,如果繼續調整學習率和迭代次數,搭配交叉驗證和網格搜尋,效果應該會更好一點。