1. 程式人生 > >Tensorflow LSTM時間序列預測的嘗試

Tensorflow LSTM時間序列預測的嘗試

一、網上的資源

網上有不少用LSTM來預測時間序列的資源,如下面:

深度學習(08)_RNN-LSTM迴圈神經網路-03-Tensorflow進階實現

http://blog.csdn.net/u013082989/article/details/73693392

Applying Deep Learning to Time Series Forecasting with TensorFlow

https://mapr.com/blog/deep-learning-tensorflow/

Tensorflow 筆記 RNN 預測時間序列

https://www.v2ex.com/t/339544

tf19: 預測鐵路客運量

http://blog.csdn.net/u014365862/article/details/53869802


但是除錯起來,都很困難!借鑑比較多的是tf19:預測鐵路客運量這篇博文。這篇博文首先是基本上可以執行的。但是訓練模型和測試模型分開,需要通過檔案來傳遞模型引數。而且訓練和測試不能同時執行。因此除錯起來也費了不少功夫!

二、LSTM時間序列預測

1. 用namedtuple來配置模型的超引數。

HParams = namedtuple('HParams', 'seq_size, hidden_size, learning_rate')


這種方式比定義一個Config類好。

2. 構建時間序列預測模型類TS_LSTM

class TS_LSTM(object):
    def __init__(self, hps):
        self._X = X = tf.placeholder(tf.float32, [None, hps.seq_size, 1])  
        self._Y = Y = tf.placeholder(tf.float32, [None, hps.seq_size])      
        W = tf.Variable(tf.random_normal([hps.hidden_size, 1]), name='W')  
        b = tf.Variable(tf.random_normal([1]), name='b')  
        lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(hps.hidden_size)  #測試cost 1.3809
 
        outputs, states = tf.nn.dynamic_rnn(lstm_cell, X, dtype=tf.float32)  
        W_repeated = tf.tile(tf.expand_dims(W, 0), [tf.shape(X)[0], 1, 1])  
        output = tf.nn.xw_plus_b(outputs, W_repeated, b)  
        self._output = output = tf.squeeze(output)  
        self._cost = cost = tf.reduce_mean(tf.square(output - Y))  
        self._train_op = tf.train.AdamOptimizer(hps.learning_rate).minimize(cost)  
 
    @property
    def X(self):
        return self._X
 
    @property
    def Y(self):
        return self._Y   
 
    @property
    def cost(self):
        return self._cost
 
    @property
    def output(self):
        return self._output
 
    @property
    def train_op(self):
        return self._train_op


這種方式比用函式定義模型更加方便。@property的設計使得模型用起來更加方便! 
模型的關鍵就是: 
1). 設定BasicLSTMCell的隱藏節點個數 
2). 呼叫dynamic_rnn(lstm_cell,X)來計算輸出outputs 
3). 呼叫xw_plus_b將outputs計算為單個的output 
模型中各變數的維度如下:(batch_size=100, seq_size=3, hidden_size=6) 
- X定義為[None, hps.seq_size, 1]是因為dynamic_rnn的輸入針對的是二維影象樣本的輸入,因此,必須多定義一個1的維度,傳入的實際應該為100*3*1。 
- Y的維度維持與影象標籤輸入資料維度相同,傳入的實際應該為100*3。 
- W為6*1 
- b為1*1 
- outputs為100*3*6 
- W_repeated為100*6*1,其變化過程6*11*6*1100*6*1。 
- output在squeeze之前為100*3*1,squeeze後為100*3 
- cost為1*1

3. 訓練和測試函式train_test

def train_test(hps, data):
     #訓練資料準備
    train_data_len = len(data)*2//3
    train_x, train_y = [], []  
    for i in range(train_data_len - hps.seq_size - 1):  
        train_x.append(np.expand_dims(data[i : i + hps.seq_size], axis=1).tolist())  
        train_y.append(data[i + 1 : i + hps.seq_size + 1].tolist())  
    #測試資料準備    
    test_data_len = len(data)//3
    test_x, test_y = [], []  
    for i in range(train_data_len,
                   train_data_len+test_data_len - hps.seq_size - 1):  
        test_x.append(np.expand_dims(data[i : i + hps.seq_size], axis=1).tolist())  
        test_y.append(data[i + 1 : i + hps.seq_size + 1].tolist())  
 
    with tf.Graph().as_default(), tf.Session() as sess:  
        with tf.variable_scope('model',reuse=None):
            m_train = TS_LSTM(hps)         
 
        #訓練
        tf.global_variables_initializer().run()
        for step in range(20000):  
            _, train_cost = sess.run([m_train.train_op, m_train.cost], 
                              feed_dict={m_train.X: train_x, m_train.Y: train_y})  
 
        #預測 
        test_cost, output = sess.run([m_train.cost, m_train.output],
                  feed_dict={m_train.X: test_x, m_train.Y: test_y})  
        #print(hps, train_cost, test_cost)
        return train_cost, test_cost


這裡的關鍵是測試用是訓練模型,我也不知道為什麼好多網路資源都將訓練模型和測試模型分離開來。測試不就是用測試資料來測試訓練模型的效果嗎?因此這裡把2/3的資料劃給訓練,1/3的資料用於測試。自己動手編程式碼時一定要對session.run函式用法和原理熟悉。

4. 主函式(對超引數組合的測試誤差進行比較)

def main():
    #讀取原始資料
    f=open('鐵路客運量.csv')  
    df=pd.read_csv(f)   
    data = np.array(df['鐵路客運量_當期值(萬人)'])  
    normalized_data = (data - np.mean(data)) / np.std(data)     
 
    #測試不同組合的超引數對測試誤差的影響
    costs =[]
    for seq_size in [4,6,12,16,24]:
        for hidden_size in [6,10,20,30]:
            print(seq_size, hidden_size)
            hps = HParams(seq_size, hidden_size, 0.003)
            train_cost, test_cost = train_test(hps, normalized_data) 
            costs.append([train_cost,test_cost])


進行了初步比較,感覺有兩個: 
1)同一個超引數,測試誤差相差挺大。 
2)不同超引數,訓練時誤差基本都很小,但是測試誤差相差很大,如何限制學習過程中的過擬合是一個很大的問題。 
可以看看我執行的訓練誤差和測試誤差的比較。程式碼已放到csdn下載資源,csdn下載程式碼來!

     訓練誤差            測試誤差
[[  4.04044241e-02   4.97651482e+00]
 [  3.57200466e-02   6.96304381e-01]
 [  2.97380015e-02   1.77482967e+01]
 [  3.09452992e-02   2.62166214e+00]
 [  3.62494551e-02   2.53422332e+00]
 [  2.57663596e-02   1.44900203e+00]
 [  2.24006996e-02   2.28607416e+00]
 [  2.28729844e-02   1.12727535e+00]
 [  2.58173030e-02   1.43265343e+00]
 [  1.48035632e-02   1.05281734e+00]
 [  1.24982912e-02   6.59598827e+00]
 [  1.27354050e-02   1.69984627e+00]
 [  1.60749555e-02   4.03962803e+00]
 [  1.18473349e-02   7.92685986e-01]
 [  7.39684049e-03   6.16959620e+00]
 [  7.60479691e-03   3.01771784e+00]
 [  1.40351299e-02   4.48093843e+00]
 [  7.94599950e-03   3.78614712e+00]
 [  5.50406286e-03   5.83478451e-01]
 [  4.54067113e-03   8.15259743e+00]]