TensorFlow強化學習入門(2)——基於策略的Agents
在本教程系列的(1)中,我演示瞭如何構建一個agent來在多個選擇中選取最有價值的一個。在本文中,我將講解如何得到一個從現實世界中獲取 觀測值 ,並作出 長期收益 最大的 行動 的agent。正如前文所說,本文解決的問題將是一個完備的強化學習問題。
完備的強化學習問題所處的環境又被稱為馬爾科夫決策過程(MDPs)。這個環境不再僅僅通過當前的行動來提供收益並進行狀態切換,它提供的收益同時取決於環境的狀態以及在當前狀態中執行的行為。這是一個動態的暫態過程並且可以延遲生效。
下面給出更加正式一點的馬爾科夫決策過程定義:一個馬爾科夫決策過程包含可能狀態集S
(包含所有可能經歷的狀態s
)和可選行動集A
(包含所有agent可選的行動a
(s,a)
,下一個狀態的概率分佈通過T(s,a)
確定,收益由R(s,a)
給定。由此,在馬爾科夫決策過程的任意時刻,agent在給定狀態s
和行動a
後獲得新的狀態s'
和收益r
。
雖然這個概念很簡單,但是我們可以將之應用於任何可以視為MDP的任務上。舉例來說,你可以想象開門的場景,狀態資訊為我們眼睛看到的場景以及我們的身體和門在真實世界中的位置,可選行動為我們身體可以作出的任何動作,收益為門是否被成功開啟。像走向門這種動作是解決該問題必需的步驟,但是它們並不能為我們直接帶來收益,只有開門這個動作才能直接帶來收益。在這種場景下,agent需要學會為導向收益的一系列行動分配價值分數,這涉及了開門這一動態過程中的一系列暫態過程。
Cart-Pole 案例
為了實現這樣的功能,我們需要一個比之前的雙臂賭博機更有挑戰性的問題。OpenAI gym包含了一系列強化學習問題所需的環境,本文也正是利用其中的一個經典案例:Cart-Pole(檢視相關文件)。在這個案例中,我們希望agent在學習之後可以使木杆平衡儘可能長的時間不倒下。和雙臂賭博機不同,這個任務需要額外考慮以下兩點:
- 觀測值 —— agent需要直到當前木杆的位置以及它的平衡角。為了得到它,我們的agent在每次輸出行動的概率分佈時都將進行觀測並使用它。
- 延時收益 —— 保持木杆平衡的時間儘可能長意味著當前的行動對目前和將來都是有利的。為了達成這一功能,我們將設計一個函式,使收益值按照設計的權重分配在過去的一系列行動上。
考慮到延遲收益,我們之前教程使用的策略梯度的形式需要調整。首先我們每次要利用多個過程來更新agent。為了實現這一點,我們將過程存在中間變數(作為緩衝)裡,需要的時候用它來更新agent。這些過程組成的序列有時又被稱作rollouts
或experience trace
。儲存起來的序列並不能直接使用,我們還需要引入折算引子進行調整。
直觀地來看,延遲收益使得每次的行動的收益除了來自當前收益的一部分以外,還有後續全部收益折算給之前行動的收益。相應地,我們使用修改後的收益作為損失函式中對行動的評估標準。在完成這一改動之後,我們可以嘗試解決CartPole案例了!
程式碼示例(選用了評論區使用者重寫作者程式碼後的版本):
%matplotlib inline
import numpy as np
from matplotlib import animation
from IPython.display import Image
import tensorflow as tf
import matplotlib.pyplot as plt
import math
import gym
env = gym.make("CartPole-v0")
# 嘗試隨機行動來測試環境
env.reset()
reward_sum = 0
num_games = 10
num_game = 0
while num_game < num_games:
env.render()
observation, reward, done, _ = env.step(env.action_space.sample())
reward_sum += reward
if done:
print("本episode的收益:{}".format(reward_sum))
reward_sum = 0
num_game += 1
env.reset()
# 初始化agent的神經網路
# 我們使用基於策略梯度的神經網路來接受觀測值並傳遞給隱藏層來產生選擇各個行為(左移/右移)的概率分佈
# 神經網路超引數
hidden_layer_neurons = 13
batch_size = 50
learning_rate = 1e-2
gamma = .99
dimen = 4
tf.reset_default_graph()
# 定義輸入佔位符
observations = tf.placeholder(tf.float32, [None, dimen], name="input_x")
# 第一個權重層
W1 = tf.get_variable("W1", shape=[dimen, hidden_layer_neurons], initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.relu(tf.matmul(observations, W1))
# 第二個權重層
W2 = tf.get_variable("W2", shape=[hidden_layer_neurons, 1], initializer=tf.contrib.layers.xavier_initializer())
output = tf.nn.sigmoid(tf.matmul(layer1, W2))
# 定義網路用於學習的計算圖元件
trainable_vars = [W1, W2]
input_y = tf.placeholder(tf.float32, [None, 1], name="input_y")
advantages = tf.placeholder(tf.float32, name="reward_signal")
# 損失函式
log_lik = tf.log(input_y * (input_y - output) + (1 - input_y) * (input_y + output))
loss = -tf.reduce_mean(log_lik * advantages)
# 梯度
new_grads = tf.gradients(loss, trainable_vars)
W1_grad = tf.placeholder(tf.float32, name="batch_grad1")
W2_grad = tf.placeholder(tf.float32, name="batch_grad2")
# 學習
batch_grad = [W1_grad, W2_grad]
adam = tf.train.AdamOptimizer(learning_rate=learning_rate)
update_grads = adam.apply_gradients(zip(batch_grad, [W1, W2]))
def discount_rewards(r, gamma=0.99):
"""
輸入一維的收益陣列,輸出折算後的收益值,例:f([1, 1, 1], 0.99) -> [1, 0.99, 0.9801]
"""
return np.array([val * (gamma ** i) for i, val in enumerate(r)])
reward_sum = 0
init = tf.global_variables_initializer()
# 定義觀測值,輸出值,收益值的佔位符
xs = np.empty(0).reshape(0, dimen)
ys = np.empty(0).reshape(0, 1)
rewards = np.empty(0).reshape(0, 1)
# 初始化環境
sess = tf.Session()
rendering = False
sess.run(init)
observation = env.reset()
# 梯度的佔位符
gradients = np.array([np.zeros(var.get_shape()) for var in trainable_vars])
num_episodes = 10000
num_episode = 0
while num_episode < num_episodes:
# 將觀測值作為該批次的輸入
x = np.reshape(observation, [1, dimen])
# 執行神經網路來決定輸出
tf_prob = sess.run(output, feed_dict={observations: x})
# 基於我們的網路來決定輸出,允許一定的隨機性
y = 0 if tf_prob > np.random.uniform() else 1
# 將觀測值和輸出值追加至列表中以供學習
xs = np.vstack([xs, x])
ys = np.vstack([ys, y])
# 獲取行動的結果
observation, reward, done, _ = env.step(y)
reward_sum += reward
rewards = np.vstack([rewards, reward])
if done:
# 標準化收益值
discounted_rewards = discount_rewards(rewards, gamma)
discounted_rewards -= discounted_rewards.mean()
discounted_rewards /= discounted_rewards.std()
# 根據實時得到的梯度調整梯度
gradients += np.array(sess.run(new_grads, feed_dict={observations: xs, input_y: ys, advantages: discounted_rewards}))
# 重置遊戲變數
xs = np.empty(0).reshape(0, dimen)
ys = np.empty(0).reshape(0, 1)
rewards = np.empty(0).reshape(0, 1)
# 一個batch執行結束
if num_episode % batch_size == 0:
# 更新梯度
sess.run(update_grads, feed_dict={W1_grad: gradients[0], W2_grad: gradients[1]})
# 重置梯度
gradients *= 0
# 輸出本輪執行狀態
print("episode = {} 時的平均收益:{}".format(num_episode, reward_sum / batch_size))
if reward_sum / batch_size > 150:
print("問題在episode = {} 時解決!".format(num_episode))
break
reward_sum = 0
num_episode += 1
observation = env.reset()
# 去除隨機決策,測試agent的效能
observation = env.reset()
observation
reward_sum = 0
while True:
env.render()
x = np.reshape(observation, [1, dimen])
y = sess.run(output, feed_dict={observations: x})
y = 0 if y > 0.5 else 1
observation, reward, done, _ = env.step(y)
reward_sum += reward
if done:
print("最終分數: {}".format(reward_sum))
break
···
episode = 2850 時的平均收益:98.26
episode = 2900 時的平均收益:96.96
episode = 2950 時的平均收益:95.62
episode = 3000 時的平均收益:98.82
episode = 3050 時的平均收益:122.34
episode = 3100 時的平均收益:119.22
episode = 3150 時的平均收益:115.66
episode = 3200 時的平均收益:125.16
episode = 3250 時的平均收益:133.44
episode = 3300 時的平均收益:140.7
episode = 3350 時的平均收益:153.4
問題在episode = 3350 時解決!
最終分數: 200.0
現在我們已經擁有了一個實用而又有趣的強化學習agent,不過這離目前最先進的技術還很遠。儘管我們使用了基於策略梯度的神經網路,但是網路的深度和複雜度遠遠不及大部分先進的網路。在下一篇文章中我將展示如何使用深度神經網路來建立agent去在更復雜的環境中學習,同時深入講解網路在複雜環境下的表徵手段。
系列文章(翻譯進度):
- (0) Q-Learning的查詢表實現和神經網路實現
- (1) 雙臂賭博機
- (1.5) — 上下文賭博機
- (2)——基於策略的Agents
- Part 3 — Model-Based RL
- Part 4 — Deep Q-Networks and Beyond
- Part 5 — Visualizing an Agent’s Thoughts and Actions
- Part 6 — Partial Observability and Deep Recurrent Q-Networks
- Part 7 — Action-Selection Strategies for Exploration
- Part 8 — Asynchronous Actor-Critic Agents (A3C)