強化學習之策略梯度(Policy Gradient)
1、什麼是 Policy Gradients
策略梯度的基本思想,就是直接根據狀態輸出動作或者動作的概率。那麼怎麼輸出呢,最簡單的就是使用神經網路。
我們使用神經網路輸入當前的狀態,網路就可以輸出我們在這個狀態下采取每個動作的概率,那麼網路應該如何訓練來實現最終的收斂呢?我們之前在訓練神經網路時,使用最多的方法就是反向傳播演算法,我們需要一個誤差函式,通過梯度下降來使我們的損失最小。但對於強化學習來說,我們不知道動作的正確與否,只能通過獎勵值來判斷這個動作的相對好壞。基於上面的想法,我們有個非常簡單的想法:
如果一個動作得到的reward多,那麼我們就使其出現的概率增加,如果一個動作得到的reward少,我們就使其出現的概率減小。
根據這個思想,我們構造如下的損失函式:loss= -log(prob)*vt
上式中log(prob)表示在狀態 s 對所選動作 a 的吃驚度, 如果概率越小, 反向的log(prob) 反而越大. 而vt代表的是當前狀態s下采取動作a所能得到的獎勵,這是當前的獎勵和未來獎勵的貼現值的求和。也就是說,我們的策略梯度演算法必須要完成一個完整的eposide才可以進行引數更新,而不是像值方法那樣,每一個(s,a,r,s')都可以進行引數更新。如果在prob很小的情況下, 得到了一個大的Reward, 也就是大的vt, 那麼-log(prob)*vt就更大, 表示更吃驚, (我選了一個不常選的動作, 卻發現原來它能得到了一個好的 reward, 那我就得對我這次的引數進行一個大幅修改)。
這就是 -log(prob)*vt的物理意義,Policy Gradient的核心思想是更新引數時有兩個考慮:如果這個回合選擇某一動作,下一回合選擇該動作的概率大一些,然後再看獎懲值,如果獎懲是正的,那麼會放大這個動作的概率,如果獎懲是負的,就會減小該動作的概率。
策略梯度演算法輸出的是動作的概率,而不是Q值。
本文的程式碼思路完全按照policy gradient的過程展開。
定義引數
首先,我們定義了一些模型的引數:
self.ep_obs,self.ep_as,self.ep_rs分別儲存了當前episode的狀態,動作和獎勵。 self.n_actions = n_actions self.n_features = n_features self.lr = learning_rate self.gamma = reward_decay self.ep_obs,self.ep_as,self.ep_rs = [],[],[]
定義模型輸入
模型的輸入包括三部分,分別是觀察值,動作和獎勵值。
with tf.name_scope('inputs'):
self.tf_obs = tf.placeholder(tf.float32,[None,self.n_features],name='observation')
self.tf_acts = tf.placeholder(tf.int32,[None,],name='actions_num')
self.tf_vt = tf.placeholder(tf.float32,[None,],name='actions_value')
構建模型
我們的模型定義了兩層的神經網路,網路的輸入是每次的狀態值,而輸出是該狀態下采取每個動作的概率,這些概率在最後會經過一個softmax得到歸一化之後的各個動作的概率值向量。
layer = tf.layers.dense(
inputs = self.tf_obs,
units = 10,
activation= tf.nn.tanh,
kernel_initializer=tf.random_normal_initializer(mean=0,stddev=0.3),
bias_initializer= tf.constant_initializer(0.1),
name='fc1'
)
all_act = tf.layers.dense(
inputs = layer,
units = self.n_actions,
activation = None,
kernel_initializer=tf.random_normal_initializer(mean=0,stddev=0.3),
bias_initializer = tf.constant_initializer(0.1),
name='fc2'
)
self.all_act_prob = tf.nn.softmax(all_act,name='act_prob')
模型的損失
我們之前介紹過了,模型的損失函式計算公式為:loss= -log(prob)*vt,我們可以直接使用tf.nn.sparse_softmax_cross_entropy_with_logits 來計算前面一部分,即-log(prob),不過為了更清楚的顯示我們的計算過程,我們使用瞭如下的方式:
with tf.name_scope('loss'):
neg_log_prob = tf.reduce_sum(-tf.log(self.all_act_prob) * tf.one_hot(indices=self.tf_acts,depth=self.n_actions),axis=1)
loss = tf.reduce_mean(neg_log_prob * self.tf_vt)
而我們選擇AdamOptimizer優化器進行引數的更新:
with tf.name_scope('train'):
self.train_op = tf.train.AdamOptimizer(self.lr).minimize(loss)
動作選擇
我們這裡動作的選擇不再根據貪心的策略來選擇了,而是根據輸出動作概率的大小來選擇不同的可能性選擇對應的動作:
def choose_action(self,observation):
prob_weights = self.sess.run(self.all_act_prob,feed_dict={self.tf_obs:observation[np.newaxis,:]})
action = np.random.choice(range(prob_weights.shape[1]),p=prob_weights.ravel())
return action
儲存經驗
之前說過,policy gradient是在一個完整的episode結束後才開始訓練的,因此,在一個episode結束前,我們要儲存這個episode所有的經驗,即狀態,動作和獎勵。
def store_transition(self,s,a,r):
self.ep_obs.append(s)
self.ep_as.append(a)
self.ep_rs.append(r)
計算獎勵的貼現值
我們之前儲存的獎勵是當前狀態s採取動作a獲得的即時獎勵,而當前狀態s採取動作a所獲得的真實獎勵應該是即時獎勵加上未來直到episode結束的獎勵貼現和。
def _discount_and_norm_rewards(self):
discounted_ep_rs = np.zeros_like(self.ep_rs)
running_add = 0
# reserved 返回的是列表的反序,這樣就得到了貼現求和值。
for t in reversed(range(0,len(self.ep_rs))):
running_add = running_add * self.gamma + self.ep_rs[t]
discounted_ep_rs[t] = running_add
discounted_ep_rs -= np.mean(discounted_ep_rs)
discounted_ep_rs /= np.std(discounted_ep_rs)
return discounted_ep_rs
模型訓練
在定義好上面所有的部件之後,我們就可以編寫模型訓練函數了,這裡需要注意的是,我們餵給模型的並不是我們儲存的獎勵值,而是在經過上一步計算的獎勵貼現和。另外,我們需要在每一次訓練之後清空我們的經驗池。
def learn(self):
discounted_ep_rs_norm = self._discount_and_norm_rewards()
self.sess.run(self.train_op,feed_dict={
self.tf_obs:np.vstack(self.ep_obs),
self.tf_acts:np.array(self.ep_as),
self.tf_vt:discounted_ep_rs_norm,
})
self.ep_obs,self.ep_as,self.ep_rs = [],[],[]
return discounted_ep_rs_norm
好了,模型相關的程式碼我們就介紹完了,如何呼叫這個模型的程式碼相信大家一看便明白,我們就不再介紹啦。<