RNN中利用LSTM來預測sin函式
阿新 • • 發佈:2018-12-16
前言:這個例子是用LSTM來預測sin函式的問題,期間遇到了一個了十分致命的問題,就是構造資料的時候,沒有把資料構造成序列,所以一直在報維度上的錯誤,以後對時序問題的預測要格外注意資料是否是序列的資料,否則很難檢查出問題,中間的問題其實比較好看出來,一除錯就能解決。
這個例項來自於《TensorFlow實戰Google深度學習框架》
先給出錯誤的程式碼示例,以便給自己一個警醒。
# !/usr/bin/env python # -*- coding:utf-8 -*- # author:lxy import numpy as np import matplotlib.pyplot as plt import tensorflow as tf hidden_size = 30 num_layers = 2 time_step = 10 train_steps = 10000 batch_size = 32 train_examples =10000 test_examples =1000 sample_gap = 0.01 def generate_data(seq): X = [] y = [] for i in range(len(seq)-time_step): # 輸入資料是10個時間步的,去預測這10個時間步的後面一個的資料,即用sin函式前面的 time_step個點的資訊,去預測第i+time_step的函式值 X.append(seq[i:i+time_step]) y.append(seq[i+time_step]) return np.array(X,dtype = np.float32),np.array(y,dtype = np.float32) def lstm_model(X,y,is_training): # 使用多層的LSTM結構 cell = tf.nn.rnn_cell.MultiRNNCell([tf.nn.rnn_cell.BasicLSTMCell(hidden_size) for _ in range(num_layers)]) outputs,state = tf.nn.dynamic_rnn(cell,X,dtype = tf.float32) # outputs[batch_size,-1,:]==state[1,batch_size,:] output = outputs[:,-1,:] # state[1] # 對LSTM網路的輸出再加一層全連線層 prediction = tf.contrib.layers.fully_connected(output,1,activation = None) if not is_training: return prediction,None,None # 計算損失函式 loss = tf.losses.mean_squared_error(labels=y,prediction=prediction) # 建立優化器 train_op = tf.contrib.layers.optimize_loss(loss,tf.train.get_global_step(), optimizer ="Adagrad",learning_rate = 0.1) return prediction,loss,train_op def trian(sess,train_x,train_y): # 將訓練資料一資料集的形式提供給計算圖 ds = tf.data.Dataset.from_tensor_slices((train_x,train_y)) ds = ds.repeat().shuffle(1000).batch(batch_size) X,y = ds.make_one_shot_iterator().get_next() # 呼叫模型,得到預測結果,損失函式以及訓練操作 with tf.variable_scope("model"): prediction,loss,train_op = lstm_model(X,y,True) # 初始化變數 sess.run(tf.global_variables_initializer()) for i in range(train_steps): train_,l = sess.run([train_op,loss]) if i%100==0: print("train_step:{0},loss is {1}".format(i,l)) def run_eval(sess,test_x,test_y): ds = tf.data.Dataset.from_tensor_slices((test_x,test_y)) ds = ds.batch(1) X,y = ds.make_one_shot_iterator().get_next() # 呼叫模型 with tf.variable_scope("model",reuse=True): test_prediction,test_loss,test_op = lstm_model(X,[0.0],False) # 預測的數字 prediction = [] # 真實的數字 labels = [] for i in range(test_examples): pre,l = sess.run([test_prediction,y]) prediction.append(pre) labels.append(l) # 計算rmse作為評價的指標 pre_squ=np.array(prediction).squeeze() lab_squ = np.array(labels).squeeze() rmse = np.sqrt(((pre_squ-lab_squ)**2).mean(axis = 0)) print("Mean Square Error is :%f" % rmse) #對預測的sin函式曲線進行繪圖 plt.figure() plt.plot(pre_squ,labels ='prediction',colors ='red') plt.plot(lab_squ,labels = 'real_sin',colors ='green') plt.show() # 生成資料集 test_start = (train_examples+time_step)*sample_gap test_end = test_start+(test_examples+time_step)*sample_gap train_x,train_y = generate_data(np.sin(np.linspace(0,test_start,train_examples+time_step,dtype = np.float32))) test_x,test_y = generate_data(np.sin(np.linspace(test_start,test_end,test_examples+time_step, dtype=np.float32))) # print(train_x) # print(train_y) # 開始訓練模型,建立會話 with tf.Session() as sess: trian(sess,train_x,train_y) run_eval(sess,test_x,test_y)
最關鍵的錯誤:
期間還寫錯了
-
prediction = tf.contrib.layers.fully_connected(output,1,activation = None)
-
loss = tf.losses.mean_squared_error(labels=y,prediction=prediction)
# !/usr/bin/env python # -*- coding:utf-8 -*- # author:lxy import numpy as np import matplotlib.pyplot as plt import tensorflow as tf hidden_size = 30 num_layers = 2 time_step = 10 train_steps = 10000 batch_size = 32 train_examples =10000 test_examples =1000 sample_gap = 0.01 def generate_data(seq): X = [] y = [] for i in range(len(seq)-time_step): # 輸入資料是10個時間步的,去預測這10個時間步的後面一個的資料,是很常見的一種時間預測模型的資料格式 X.append([seq[i:i+time_step]]) y.append([seq[i+time_step]]) return np.array(X,dtype = np.float32),np.array(y,dtype = np.float32) def lstm_model(X,y,is_training): # 使用多層的LSTM結構 cell = tf.nn.rnn_cell.MultiRNNCell([tf.nn.rnn_cell.BasicLSTMCell(hidden_size,forget_bias=1.0,state_is_tuple=True) for _ in range(num_layers)]) # cell_initializer = cell.zero_state(batch_size,tf.float32) outputs,state = tf.nn.dynamic_rnn(cell,X,dtype = tf.float32) # outputs[batch_size,-1,:]==state[1,batch_size,:] output = outputs[:,-1,:] # state[1] # 對LSTM網路的輸出再加一層全連線層 prediction = tf.contrib.layers.fully_connected(output,1,activation_fn = None) if not is_training: return prediction,None,None # 計算損失函式 loss = tf.losses.mean_squared_error(labels=y,predictions = prediction) # 建立優化器 train_op = tf.contrib.layers.optimize_loss(loss,tf.train.get_global_step(),optimizer ="Adagrad",learning_rate = 0.1) return prediction,loss,train_op def trian(sess,train_x,train_y): # 將訓練資料一資料集的形式提供給計算圖 ds = tf.data.Dataset.from_tensor_slices((train_x,train_y)) ds = ds.repeat().shuffle(1000).batch(batch_size) X,y = ds.make_one_shot_iterator().get_next() # 呼叫模型,得到預測結果,損失函式以及訓練操作 with tf.variable_scope("model"): prediction,loss,train_op = lstm_model(X,y,True) # 初始化變數 sess.run(tf.global_variables_initializer()) for i in range(train_steps): train_,l = sess.run([train_op,loss]) if i%100==0: print("train_step:{0},loss is {1}".format(i,l)) def run_eval(sess,test_x,test_y): ds = tf.data.Dataset.from_tensor_slices((test_x,test_y)) ds = ds.batch(1) X,y = ds.make_one_shot_iterator().get_next() # 呼叫模型 with tf.variable_scope("model",reuse=True): test_prediction,test_loss,test_op = lstm_model(X,[0.0],False) # 預測的數字 prediction = [] # 真實的數字 labels = [] for i in range(test_examples): pre,l = sess.run([test_prediction,y]) prediction.append(pre) labels.append(l) # 計算rmse作為評價的指標 pre_squ=np.array(prediction).squeeze() lab_squ = np.array(labels).squeeze() rmse = np.sqrt(((pre_squ-lab_squ)**2).mean(axis = 0)) print("Mean Square Error is :%f" % (rmse)) #對預測的sin函式曲線進行繪圖 plt.figure() plt.plot(pre_squ,label ='prediction',linestyle ='-',color='r') plt.scatter(lab_squ,label = 'real_sin',color='green') plt.legend() plt.show() # 生成資料集 test_start = (train_examples+time_step)*sample_gap test_end = test_start+(test_examples+time_step)*sample_gap train_x,train_y = generate_data(np.sin(np.linspace(0,test_start,train_examples+time_step,dtype = np.float32))) test_x,test_y = generate_data(np.sin(np.linspace(test_start,test_end,test_examples+time_step,dtype=np.float32))) # print(train_x) # print(train_y) # 開始訓練模型,建立會話 with tf.Session() as sess: trian(sess,train_x,train_y) run_eval(sess,test_x,test_y) ..... ..... ..... train_step:9100,loss is 4.266162250132766e-06 train_step:9200,loss is 5.570855591940926e-06 train_step:9300,loss is 3.8035254874557722e-06 train_step:9400,loss is 4.238047949911561e-06 train_step:9500,loss is 4.5835963646823075e-06 train_step:9600,loss is 4.353491931397002e-06 train_step:9700,loss is 3.338790065754438e-06 train_step:9800,loss is 4.182937573204981e-06 train_step:9900,loss is 4.109343080926919e-06 Mean Square Error is :0.002006
這次給我的教訓除了訓練資料要構造成序列外,還有一些函式的引數也要注意下。