1. 程式人生 > >DQN 模型訓練 OpenAI CartPole-v0

DQN 模型訓練 OpenAI CartPole-v0

前言:

      最近學習了DQN這個可以玩遊戲的機器學習模型,其實,我們最終的目的是將強化運用到實體機器人上,讓其具有自我學習的能力,道路雖長,始於足下!在這裡給大家一個我們工作室網站連結,歡迎大家參觀指點學習:點選開啟連結

     最初接觸到DQN,還是在莫煩老師的學習視訊下,在此表達對所以分享奉獻者表示感謝。

     DQN就是Deep QLearning Network ,實質上就是基於QLearning 在加上神經網路的模型,本質超級簡單,是基於值的,實時更新的學習方法

     我的測試環境是VS2017 + TensorFlow + OpenAI gym,現在Tensflow OpenAI
都支援Win10 是不是很刺激,可以參考這篇部落格如何進行VS2017開發
Machine Learning. 點選開啟連結
import numpy as np
import pandas as pd
import tensorflow as tfclass DQN(object):      #首先建立一個模型類   def __init__(self,
                n_actions,
                n_features,
                learning_rate = 0.01,    #學習效率
                reward_decay = 0.9,    #獎勵衰減
                e_greedy = 0.9, 
                replace_target_iter = 300,     #跟新預測神經網路的週期
                memory_size = 500,              #記憶庫的大小
                batch_size = 32,                    #神經網路的batch大小
                e_greedy_increment = 0,      #選擇行為時的概率增加因子
                output_graph = False,         #是否輸出Tensorboard
                q_double = True,                #是否使Double DQN
                layer1_elmts = 20,             #神經元個數
                use_e_greedy_increment = 1000     #這裡為閾值, 1000次後減少行為隨機概率
                ):       self.n_actions = n_actions
       self.n_features = n_features
       self.lr = learning_rate   
       self.gamma = reward_decay
       self.epsilon_max = e_greedy
       self.replace_target_iter =replace_target_iter
       self.memory_size = memory_size
       self.batch_size = batch_size
       self.epsilon_increment = e_greedy_increment
       self.epsilon = e_greedy       self.q_double = q_double
       self.layer1_elmts = layer1_elmts       self.use_e_greedy_increment = use_e_greedy_increment
              self.learn_step_counter = 0       self.memory = np.zeros((self.memory_size,self.n_features * 2 +2))
      
       self.build_net()#這裡獲取eval_net的神經網路引數       e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope = 'eval_net')#這裡獲取target_net的神經網路引數
       t_params =  tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope = 'target_net')       with tf.variable_scope('soft_replacement'):  #這裡跟新預測神經網路的引數
           self.target_replace_op = [tf.assign(t,e) for t ,e in zip(t_params,e_params)]       self.sess  = tf.Session()       self.sess.run(tf.global_variables_initializer())
       self.cost_his = []          #這裡建立cost陣列,影象現實cost   def build_net(self):        #********************* build evaluate_net ********************
        self.state = tf.placeholder(tf.float32,[None,self.n_features],name = 'state')        self.q_target = tf.placeholder(tf.float32,[None,self.n_actions],name = 'QTarget')        w_initializer,b_initializer=tf.random_normal_initializer(0.,0.3),tf.constant_initializer(0.1)        with tf.variable_scope('eval_net'):         #建立現實神經網路           e_layer1 = tf.layers.dense( self.state,self.layer1_elmts,tf.nn.relu,kernel_initializer = w_initializer,\
                bias_initializer = b_initializer,name = 'e_layer1' )
           self.q_eval = tf.layers.dense(e_layer1,self.n_actions,kernel_initializer = w_initializer,\
                bias_initializer =b_initializer,name = 'q_eval')        with tf.variable_scope('loss'):
               self.loss = tf.reduce_mean(tf.squared_difference( self.q_target,self.q_eval))
        with tf.variable_scope('train'):
               self.train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
       
        self.state_next = tf.placeholder(tf.float32,[None,self.n_features],name = 'state_next')#建立預測神經網路        with tf.variable_scope('target_net'):
            t_layer1 = tf.layers.dense(self.state_next,self.layer1_elmts,tf.nn.relu,kernel_initializer  = w_initializer,\
                bias_initializer = b_initializer,name = 't_layer1')
            self.q_next = tf.layers.dense(t_layer1,self.n_actions,kernel_initializer = w_initializer,\
                bias_initializer =b_initializer,name = 'q_next')#這裡儲存到記憶庫,採用迴圈儲存的方式,新值替換舊值
   def store_transition(self,state,action,reward,state_next):        if not hasattr(self,'memory_counter'):
            self.memory_counter = 0        transition = np.hstack((state,[action,reward],state_next))
        tmp_index  =  self.memory_counter % self.memory_size        self.memory[tmp_index ,:] = transition        self.memory_counter  += 1 #以一定機率選擇行為,隨訓練的進行,逐漸減小隨機的可能性,選擇最大的值   def choose_action(self,observation):        observation = observation[np.newaxis,:]        if np.random.uniform() < self.epsilon:            action_val  = self.sess.run(self.q_eval,feed_dict={self.state :observation })
            action = np.argmax(action_val)
        else:
            action = np.random.randint(0,self.n_actions)        return action
   def learn(self):        if  self.learn_step_counter % self.replace_target_iter == 0: #以一定週期更新預測神經網路
            self.sess.run( self.target_replace_op)            print('\target_params_replaced\n')        if self.memory_counter > self.memory_size:    #從記憶庫中取出batch
            sample_index = np.random.choice(self.memory_size,size  = self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_counter,size =self.batch_size)        batch_memory = self.memory[sample_index,:]
        q_next, q_eval4next,q_eval = self.sess.run(    #訓練預測,和現實神經網路,這裡的Q_eval4next是 double DQN的要求
            [self.q_next,self.q_eval,self.q_eval],           
            feed_dict = {
                self.state_next: batch_memory[:,-self.n_features:],   #放入的是未來狀態引數
                self.state: batch_memory[:,-self.n_features:],           #放入的是未來狀態引數
                self.state: batch_memory[:,:self.n_features]             #放入的是現在狀態引數
                }
            )
       
        q_target = q_eval.copy()   #這裡是為了使預測值的格式和 現實值的格式 一樣        batch_index = np.arange(self.batch_size,dtype  = np.int32)
        eval_act_index = batch_memory[:,self.n_features].astype(int)
        reward = batch_memory[:,self.n_features+1] #double DQN 實質上就是 未來狀態放入 現實神經網路 獲取最大值的index,在預測的行為中對應,就是不選擇最大的,選擇對應的,這樣可以防止過估計。這樣的操作有點Sarsa-lambda的味道。
        if(self.q_double):
            max_act4next = np.argmax(q_eval4next,axis =1)
           # print('***********: ',q_eval4next.shape[0],'#####:',q_eval4next.shape[1])
            select_q_next = q_next[batch_index,max_act4next]
        else:
            select_q_next = np.max(q_next,axis =1)   #這裡就是一般的預設選擇最大的        q_target[batch_index,eval_act_index] = reward + self.gamma * select_q_next  #這裡重新整理q_target        _,self.cost = self.sess.run([self.train_op,self.loss],feed_dict = {
            self.state: batch_memory[:,:self.n_features],self.q_target:q_target
           
            })        self.cost_his.append(self.cost)        if self.learn_step_counter>self.use_e_greedy_increment:  #到後面就選則最大的action
         self.epsilon = self.epsilon + self.epsilon_increment #if self.epsilon < self.epsilon_max else self.epsilon_increment
                self.learn_step_counter +=1   def plot_cost(self):   #顯示Cost        import matplotlib.pyplot as plt
        plt.plot(np.arange(len(self.cost_his)),self.cost_his)        plt.ylabel('Cost')
        plt.xlabel('training step')
        plt.show()以上就是DQN得基礎模板了,順便還有點Double DQN的在裡面。接下來就是如何訓練CartPole-v0!from mMaze import Maze
from DQN import DQN
import gymif __name__ =="__main__":    env = gym.make('CartPole-v0')   #建立環境
    env= env.unwrapped
    #以下可以顯示這個環境的state 和 action
    print(env.action_space)
    print(env.observation_space.shape[0])
    print(env.observation_space.high)
    print(env.observation_space.low)#初始化DQN的模型    RL= DQN(n_actions = env.action_space.n,
            n_features = env.observation_space.shape[0],
            learning_rate = 0.01,
            e_greedy = 0.9,
            replace_target_iter = 100,
            memory_size =  2000,
            e_greedy_increment = 0.001,
            use_e_greedy_increment = 1000)    steps = 0    for episode in range(300):     #訓練300個回合,這裡環境模型,結束回合的標誌是 傾斜程度和 X 的移動限度,你可以很容易從訓練效果中看出來,當然了,也可以去看gym的底層程式碼,還是比較清晰的。
        observation = env.reset()
        ep_r = 0
        while True:      #訓練沒有結束的時候迴圈
            env.render()      #重新整理環境            action = RL.choose_action(observation)    #根據狀態選擇行為
            observation_next,reward,done,info = env.step(action)     #環境模型 採用行為,獲得下個狀態,和潛在的獎勵
            x,x_dot,theta,theat_dot = observation_next       #這裡拆分了 狀態值 ,裡面有四個引數#這裡用了,x 和theta的限度值 來判斷獎勵的幅度,當然也可以gym自帶的 ,但是這個效率據說比較高            reward1 = (env.x_threshold - abs(x))/env.x_threshold - 0.8
            reward2 = (env.theta_threshold_radians - abs(theta))/env.theta_threshold_radians - 0.5
            reward = reward1+reward2       #這裡將獎勵綜合            RL.store_transition(observation,action,reward,observation_next)   #先儲存到記憶庫            ep_r+= reward         #這裡只是為了觀察獎勵值是否依據實際情況變化,來方便判斷模型的正確性            if steps>1000:          #這裡一開始先不學習,先積累獎勵
                RL.learn()            if done :       #這裡判斷的是回合結束,顯示獎勵積累值,你可以看到每回合獎勵的變化,來判定這樣一連序列為的結果好不好
                print('episode :',episode,
                      'ep_r:',round(ep_r,2),
                      "RL's epsilon",round(RL.epsilon,3))
                break            observation = observation_next   #跟新狀態            steps+=1     RL.plot_cost()  #訓練結束後來觀察我們的cost以上就是全部的程式碼,來看下我們的執行效果:
一開始的時候,還比較2

等待許久後。。。。。。

176回合,後面就比較厲害了到後面的時候cost下降較為明顯,但還是有波動,這不是隨機選擇行為導致的結果,從模型中也可以看出有時候有不穩定的特徵,所以還有待改進。以上就是全部內容了。有誤希望指正。