1. 程式人生 > 其它 >TensorFlow強化學習入門(3)——構建模擬環境來進行強化學習

TensorFlow強化學習入門(3)——構建模擬環境來進行強化學習

在上一篇文章中,我演示瞭如何設計一個基於策略的強化學習agent來解決CartPole任務。在本文中,我們將從另一個角度重新審視這個問題——如何構建模擬環境來提升agent在當前環境下的效能。

Model Network : 建模網路,本文中稱為模擬環境

如果你還沒有閱讀本系列之前的文章並且還是強化學習的初學者,我推薦你按照順序來閱讀,文末有之前文章的連結。

模擬環境是什麼,我們為什麼要引入模擬環境?在上圖中,模擬環境是一個用於模擬真實世界中的動態問題的神經網路。拿我們之前CartPole的問題來說,我們需要一個可以根據之前的位置和行動來預測木棒下次位置的模型。在學習得到一個精確的模型之後,我們每次就可以直接用模型來訓練我們的agent而不是必須放在真實環境中訓練。當然如果原始的環境就是模擬得到的(像CartPole就模擬了真實世界中的物理情景),我們就不必再這樣做了。

與計算機模擬不同,真實環境的部署需要時間,而且真實世界中的物理法則也使得環境初始化等一系列操作的可行性大幅下降。相反地,通過模型來模擬環境可以節省時間和成本,agent也可以“假想”自己就在真實環境中運動,我們可以直接在這個虛擬的環境中訓練決策網路。只要我們的模擬環境足夠優秀,agent即使完全在虛擬環境中訓練也可以在真實環境中達到很好的效能。

那麼我們如何使用TensorFlow實現這個需求呢?按照我上面所說,我們需要一個能夠根據之前的觀測和行動轉化輸出得到新的觀測值,收益和狀態的神經網路。我們將使用真實環境來訓練我們的模擬模型,然後使用模擬模型來訓練我們的agent。通過這個方法,我們可以在讓agent在不直接接觸到真實環境的情況下習得行動策略!下面給出是實現程式碼(提供版本為評論區重寫的版本(譯者對程式碼做了一定修正),作者的原始程式碼點

這裡(這個版本有bug,底部有評論指正)檢視)

# 譯者執行環境為jupyterlab,每個分割線對應一個程式碼塊
import numpy as np
import tensorflow as tf
%matplotlib inline
import matplotlib.pyplot as plt
# --------------------------------------------------
import gym
env = gym.make("CartPole-v0")
# --------------------------------------------------
# 超引數
learning_rate = 1e-2
# 收益的折算因子
gamma = 0.99
# RMSProp中的衰減因子
decay_rate = 0.99

model_batch_size = 3
policy_batch_size = 3

dimen = 4 # 環境中的維度數
# --------------------------------------------------
# 輔助函式
def discount(r, gamma=0.99, standardize=False):
    """
    輸入一維的收益陣列,輸出折算後的收益值,例:f([1, 1, 1], 0.99) -> [1, 0.99, 0.9801],折算後根據要求選擇進行歸一化
    """
    discounted = np.array([val * (gamma ** i) for i, val in enumerate(r)])
    if standardize:
        discounted -= np.mean(discounted)
        discounted /= np.std(discounted)
    return discounted

def step_model(sess, xs, action):
    """ 使用神經網路模型根據之前的狀態和行動來生成新的狀態 """
    # 上一狀態
    x = xs[-1].reshape(1, -1)
    # 儲存行動
    x = np.hstack([x, [[action]]])
    # 預測輸出
    output_y = sess.run(predicted_state_m, feed_dict={input_x_m: x})
    # predicted_state_m == [state_0, state_1, state_2, state_3, reward, done]
    output_next_state = output_y[:,:4]
    output_reward = output_y[:,4]
    output_done = output_y[:,5]
    # 限制輸出範圍
    output_next_state[:,0] = np.clip(output_next_state[:,0], -2.4, 2.4)
    output_next_state[:,2] = np.clip(output_next_state[:,2], -0.4, 0.4)
    # 完成的閥值設定
    output_done = True if output_done > 0.01 or len(xs) > 500 else False
    return output_next_state, output_reward, output_done
# --------------------------------------------------
# 用於模擬的神經網路
# 架構
# 網路中包含兩個具有256個神經元的層,relu函式為啟用函式。共有三個輸出層,分別輸出下一個觀測值,收益值和遊戲結束的標誌
tf.reset_default_graph()

num_hidden_m = 256
# 由於要輸入決策網路輸出的行動,維度+1
dimen_m = dimen + 1
# 輸入佔位符
input_x_m = tf.placeholder(tf.float32, [None, dimen_m])
# 第一層
W1_m = tf.get_variable("W1_m", shape=[dimen_m, num_hidden_m], initializer=tf.contrib.layers.xavier_initializer())
B1_m = tf.Variable(tf.zeros([num_hidden_m]), name="B1M")
layer1_m = tf.nn.relu(tf.matmul(input_x_m, W1_m) + B1_m)
# 第二層
W2_m = tf.get_variable("W2_m", shape=[num_hidden_m, num_hidden_m], initializer=tf.contrib.layers.xavier_initializer())
B2_m = tf.Variable(tf.zeros([num_hidden_m]), name="B2_m")
layer2_m = tf.nn.relu(tf.matmul(layer1_m, W2_m) + B2_m)
# 第三層(輸出層)
# 注意這裡有三個單獨的輸出層
W_obs_m = tf.get_variable("W_obs_m", shape=[num_hidden_m, 4], initializer=tf.contrib.layers.xavier_initializer())
B_obs_m = tf.Variable(tf.zeros([4]), name="B_obs_m")
W_reward_m = tf.get_variable("W_reward_m", shape=[num_hidden_m, 1], initializer=tf.contrib.layers.xavier_initializer())
B_reward_m = tf.Variable(tf.zeros([1]), name="B_reward_m")
W_done_m = tf.get_variable("W_done_m", shape=[num_hidden_m, 1], initializer=tf.contrib.layers.xavier_initializer())
B_done_m = tf.Variable(tf.zeros([1]), name="B_done_m")

output_obs_m = tf.matmul(layer2_m, W_obs_m) + B_obs_m
output_reward_m = tf.matmul(layer2_m, W_reward_m) + B_reward_m
output_done_m = tf.sigmoid(tf.matmul(layer2_m, W_done_m) + B_done_m)

# 訓練所需的輸入佔位符
actual_obs_m = tf.placeholder(tf.float32, [None, dimen_m], name="actual_obs")
actual_reward_m = tf.placeholder(tf.float32, [None, 1], name="actual_reward")
actual_done_m = tf.placeholder(tf.float32, [None, 1], name="actual_done")

# 整合輸出
predicted_state_m = tf.concat([output_obs_m, output_reward_m, output_done_m], axis=1)

# 損失函式
loss_obs_m = tf.square(actual_obs_m[-1, 0:4] - output_obs_m)
loss_reward_m = tf.square(actual_reward_m - output_reward_m)
loss_done_m = -tf.log(actual_done_m * output_done_m + (1 - actual_done_m) * (1 - output_done_m))

# 模型損失為三個輸出損失的平均值
loss_m = tf.reduce_max(loss_obs_m + loss_reward_m + loss_done_m)

adam_m = tf.train.AdamOptimizer(learning_rate=learning_rate)
update_m = adam_m.minimize(loss_m)
# --------------------------------------------------
# 決策網路
num_hidden_p = 10 # 決策網路中隱藏層神經元個數

input_x_p = tf.placeholder(tf.float32, [None, dimen], name="input_x")

# 第一層
W1_p = tf.get_variable("W1", shape=[dimen,num_hidden_p], 
                     initializer=tf.contrib.layers.xavier_initializer())
layer1_p = tf.nn.relu(tf.matmul(input_x_p, W1_p))

# 第二層
W2_p = tf.get_variable("W2", shape=[num_hidden_p, 1], 
                     initializer=tf.contrib.layers.xavier_initializer())
output_p = tf.nn.sigmoid(tf.matmul(layer1_p, W2_p))

# 訓練所需的輸入佔位符
input_y_p = tf.placeholder(tf.float32, shape=[None, 1], name="input_y")
advantages_p = tf.placeholder(tf.float32, shape=[None,1], name="reward_signal")

# 損失函式
# 下面表示式等價於 0 if input_y_p == output_p else 1
log_lik_p = tf.log(input_y_p * (input_y_p - output_p) + 
                 (1 - input_y_p) * (input_y_p + output_p))

# We'll be trying to maximize log liklihood
loss_p = -tf.reduce_mean(log_lik_p * advantages_p)

# 梯度
W1_grad_p = tf.placeholder(tf.float32,name="W1_grad")
W2_grad_p = tf.placeholder(tf.float32,name="W2_grad")
batch_grad_p = [W1_grad_p, W2_grad_p]
trainable_vars_p = [W1_p, W2_p]
grads_p = tf.gradients(loss_p, trainable_vars_p)

# 優化器
adam_p = tf.train.AdamOptimizer(learning_rate=learning_rate)

# 更新函式
update_grads_p = adam_p.apply_gradients(zip(batch_grad_p, [W1_p, W2_p]))
# --------------------------------------------------
# 初始化並測試模型執行情況
init = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init)
random_obs = np.random.random(size=[1, env.observation_space.shape[0]])
random_action = env.action_space.sample()

print("obs: {}naction: {}noutput obs: {}nouput reward: {}noutput done: {}noutput policy: {}".format(
        random_obs,
        random_action,
        sess.run(output_obs_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
        sess.run(output_reward_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
        sess.run(output_done_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
        sess.run(output_p,feed_dict={input_x_p: random_obs})))
# --------------------------------------------------
# 在真實環境中訓練
real_rewards = []
num_episodes = 5000

# Trigger used to decide whether we should train from model or from real environment
train_from_model = False
train_first_steps = 500

# 初始化變數跟蹤觀測值,收益和行動
observations = np.empty(0).reshape(0,dimen)
rewards = np.empty(0).reshape(0,1)
actions = np.empty(0).reshape(0,1)

# 梯度
grads = np.array([np.zeros(var.get_shape().as_list()) for var in trainable_vars_p])

num_episode = 0

observation = env.reset()

while num_episode < num_episodes:
    observation = observation.reshape(1,-1)
    
    # 輸出決策
    policy = sess.run(output_p, feed_dict={input_x_p: observation})
    
    # 根據策略選定行為,引入一定概率的隨機決策
    action = 0 if policy > np.random.uniform() else 1

    # 跟蹤觀測值和行動
    observations = np.vstack([observations, observation])
    actions = np.vstack([actions, action])
    
    # 從模擬環境或者真實環境中獲取下一個觀測值
    if train_from_model:
        observation, reward, done = step_model(sess, observations, action)
    else:
        observation, reward, done, _ = env.step(action)
        
    # 跟蹤收益
    rewards = np.vstack([rewards, reward])
    dones = np.zeros(shape=(len(observations),1))
    
    # 遊戲結束或者迭代次數夠多
    if done or len(observations) > 300:
        print("r{} / {} ".format(num_episode, num_episodes),end="")

        # 判斷訓練環境
        if not train_from_model:
             # 訓練模型的上一個狀態
            states = np.hstack([observations, actions])
            prev_states = states[:-1,:]
            next_states = states[1:, :]
            next_rewards = rewards[1:, :]
            next_dones = dones[1:, :]

            feed_dict = {input_x_m: prev_states.astype(np.float32), 
                         actual_obs_m: next_states.astype(np.float32),
                        actual_done_m: next_dones.astype(np.float32),
                        actual_reward_m: next_rewards.astype(np.float32)}

            loss, _ = sess.run([loss_m, update_m], feed_dict=feed_dict)
            
            real_rewards.append(sum(rewards))
            
        
        # 折算收益
        disc_rewards = discount(rewards, standardize=True)
        
        # 計算梯度
        grads += sess.run(grads_p, feed_dict={input_x_p: observations,
                                            input_y_p: actions,
                                            advantages_p: disc_rewards})
        
        num_episode += 1
        
        observation = env.reset()

        # 重置變數
        observations = np.empty(0).reshape(0,dimen)
        rewards = np.empty(0).reshape(0,1)
        actions = np.empty(0).reshape(0,1)
        
        # Toggle between training from model and from real environment allowing sufficient time 
        # to train the model before its used for learning policy
        if num_episode > train_first_steps:
            train_from_model = not train_from_model 

        # If batch full
        if num_episode % policy_batch_size == 0:
            
            # 更新梯度
            sess.run(update_grads_p, feed_dict={W1_grad_p: grads[0], W2_grad_p: grads[1]})
            
            # 重置梯度
            grads = np.array([np.zeros(var.get_shape().as_list()) for var in trainable_vars_p])
            
            # 週期性輸出提示資訊
            if (num_episode % (100 * policy_batch_size) == 0):
                print("Episode {} last batch rewards: {}".format(
                        num_episode, sum(real_rewards[-policy_batch_size:])/policy_batch_size))
            
            # 模型效能足夠好時退出
            if (sum(real_rewards[-10:]) / 10. >= 190): # 可以調至199等更高的值(200為滿分)
                print("Episode {} Training complete with total score of: {}".format(
                        num_episode, sum(real_rewards[-policy_batch_size:])/policy_batch_size))
                break
# --------------------------------------------------
# 測試模型效果

observation = env.reset()
reward_sum = 0

model_losses = []

while True:
    env.render()
    
    observation = np.reshape(observation, [1, -1])
    policy = sess.run(output_p, feed_dict={input_x_p: observation})
    action = 0 if policy > 0.5 else 1
    observation, reward, done, _ = env.step(action)
    reward_sum += reward
    
    if done:
        print("Total score: {}".format(reward_sum))
        break
299 / 5000 Episode 300 last batch rewards: [34.66666667]
599 / 5000 Episode 600 last batch rewards: [75.66666667]
899 / 5000 Episode 900 last batch rewards: [61.]
1199 / 5000 Episode 1200 last batch rewards: [200.]
1499 / 5000 Episode 1500 last batch rewards: [194.33333333]
1799 / 5000 Episode 1800 last batch rewards: [169.33333333]
1979 / 5000 Episode 1980 Training complete with total score of: [200.]

到這裡,我們引入了兩個神經網路,有很多超引數,鼓勵讀者嘗試自己調整超引數來使模型訓練更好更快。在下一節我們會探究如何使用卷積神經網路來在更復雜的環境(如雅達利遊戲)中學習。

系列文章(翻譯進度):

  1. (0) Q-Learning的查詢表實現和神經網路實現
  2. (1) 雙臂賭博機
  3. (1.5) — 上下文賭博機
  4. (2) —— 基於策略的Agents
  5. (3) —— 構建模擬環境來進行強化學習
  6. Part 4 — Deep Q-Networks and Beyond
  7. Part 5 — Visualizing an Agent’s Thoughts and Actions
  8. Part 6 — Partial Observability and Deep Recurrent Q-Networks
  9. Part 7 — Action-Selection Strategies for Exploration
  10. Part 8 — Asynchronous Actor-Critic Agents (A3C)