LSTM預測時間序列
阿新 • • 發佈:2019-01-10
LSTM網路(長短期記憶網路)可以理解為是RNN的改進版,它的出現解決了RNN的記憶問題。也就是說RNN網路的某一時刻對距離此刻最近的時刻的狀態最為敏感,而對距離此刻稍遠時刻的狀態不太敏感,這就導致了RNN某一時刻的輸出受上一時刻影響最大(雖然不少情況確實如此),但實際情況並不是永遠這樣,例如一篇首尾呼應的文章,最後一段的內容顯然極度依賴第一段的內容,而和文章中間部分的關係不大。
LSTM的出現,就是為了解決上述問題。通過在網路中增加精心設計的“門”來決定記住或者忘記之前時刻的內容,準確的說是決定要記住或者忘記之前時刻內容的程度(“門”的輸出值為0-1,0表示“一點也不”,1表示“完全”,之間表示“部分”)。這樣LSTM對於前面的問題,在預測最後一段內容時,就可以選擇讓第一段完全通過,而讓中間部分完全不通過。
既然lstm這麼厲害,接下來就介紹一下lstm的應用例項(keras實現),接下來也會貼出lstm的tensorflow實現:
lstm實現
模型介紹
例項的主要目的就是學習序列的變化規律,來預測下一時刻的序列值。
程式碼講解
- 資料生成
import pandas as pd
from random import random
flow = (list(range(1,10,1)) + list(range(10,1,-1)))*100
pdata = pd.DataFrame({"a":flow, "b":flow})
pdata.b = pdata.b.shift(9)
data = pdata.iloc[10:] * random()
上述的程式碼是用來生成本次的資料集。最後乘以一個隨機值得到data作為總的資料集。
- 資料預處理
import numpy as np
def _load_data(data, n_prev = 100):
"""
data should be pd.DataFrame()
"""
docX, docY = [], []
for i in range(len(data)-n_prev):
docX.append(data.iloc[i:i+n_prev].as_matrix())
docY.append(data.iloc[i+n_prev].as_matrix())
alsX = np.array(docX)
alsY = np.array(docY)
return alsX, alsY
def train_test_split(df, test_size=0.1):
ntrn = round(len(df) * (1 - test_size))
X_train, y_train = _load_data(df.iloc[0:ntrn])
X_test, y_test = _load_data(df.iloc[ntrn:])
return (X_train, y_train), (X_test, y_test)
(X_train, y_train),(X_test, y_test)=train_test_split(data)
X_train:訓練資料
y_train:訓練標籤
X_test:測試資料
y_test:測試標籤
通過呼叫自定義的兩個函式就可以得到上述的(X_train, y_train),(X_test, y_test)。
- 模型搭建
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
in_out_neurons = 2
hidden_neurons = 50
model = Sequential()
model.add(LSTM(in_out_neurons, hidden_neurons, return_sequences=False))
model.add(Dense(hidden_neurons, in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mean_squared_error", optimizer="rmsprop")
- 訓練
model.fit(X_train, y_train, batch_size=700, nb_epoch=100, validation_split=0.05)
- 預測
predicted = model.predict(X_test)
經過訓練,通過觀察predicted和y_test發現,得到的效果很好!
下面是完整程式碼:
import pandas as pd
from random import random
flow = (list(range(1,10,1)) + list(range(10,1,-1)))*100
pdata = pd.DataFrame({"a":flow, "b":flow})
pdata.b = pdata.b.shift(9)
data = pdata.iloc[10:] * random() # some noise
import numpy as np
def _load_data(data, n_prev = 100):
"""
data should be pd.DataFrame()
"""
docX, docY = [], []
for i in range(len(data)-n_prev):
docX.append(data.iloc[i:i+n_prev].as_matrix())
docY.append(data.iloc[i+n_prev].as_matrix())
alsX = np.array(docX)
alsY = np.array(docY)
return alsX, alsY
def train_test_split(df, test_size=0.1):
ntrn = round(len(df) * (1 - test_size))
X_train, y_train = _load_data(df.iloc[0:ntrn])
X_test, y_test = _load_data(df.iloc[ntrn:])
return (X_train, y_train), (X_test, y_test)
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
in_out_neurons = 2
hidden_neurons = 50
model = Sequential()
model.add(LSTM(in_out_neurons, hidden_neurons, return_sequences=False))
model.add(Dense(hidden_neurons, in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mean_squared_error", optimizer="rmsprop")
(X_train, y_train), (X_test, y_test) = train_test_split(data) # retrieve data
model.fit(X_train, y_train, batch_size=700, nb_epoch=100, validation_split=0.05)
predicted = model.predict(X_test)