OpenAI gym: 'Copy-v0'
最近對增強學習領域很感興趣,剛好OpenAI推出了測試RL演算法的gym(https://gym.openai.com/),上面有很多預設的測試環境,並且提供視覺化的支援,於是決定用它小試牛刀。
眾所周知,在增強學習演算法中有觀察(observation)、回報(reward)、價值(value)、策略(policy)四個核心元素。使用python可以安裝並載入OpenAI Gym提供的預設環境。這些環境在每一輪動作執行後以新的觀察和即時回報作為輸出,接受下一輪執行的動作作為輸入(輸入輸出的格式見官方文件)。
以下為gym的基本使用方法。
import gym
# load Copy-v0 environment
env = gym.make('Copy-v0')
# the action space of the environment
print(env.action_space)
# the observation space of the environment
print(env.observation_space)
for i_episode in range(100):
# reset environment
observation = env.reset()
for t in range(1000):
# display training process
env .render()
# choose an action randomly
action = env.action_space.sample()
observation, reward, done, info = env.step(action)
if done:
print('episode done in %d episodes'%i_episode)
break
之前我在金漵林老師的部落格(http://www.cnblogs.com/jinxulin/)上學習了MDP、Monte Carlo等方法。手動實現演算法之後,發現缺少一個好的問題來進行測試。為此我在gym中選擇了一個入門級別的環境:Copy-v0。
Copy-v0的題目要求實現一個拷貝機器人,螢幕上方給出一行為待拷貝的標準串,標準串上有一個可以移動的游標;下面一行是輸入行,僅能向後輸入,不能改變游標位置。
每一輪,環境輸出的觀察值是標準串上游標指著的字元序號(標準串僅由英文字母A,B,C,D,E構成,對應輸出0,1,2,3,4),如果指向的是標準串結尾的空格則返回5。而要求的輸入為一個三元組(x, y, z),其中x取0或1;y取0或1;z取0~5——x代表下一輪向左(0)或向右(1)移動游標;y代表下一輪是否進行一次輸入;z代表下一輪要輸入的字元(5代表F)。
舉例:
第n輪模式串
ABC**D**DA(粗體為游標位置)
輸入串:ABC
觀察值:3
下一輪動作:(0,1,4)第n+1輪模式串:AB**C**DDA
輸入串:ABCE(因為輸入有誤,實際上環境會返回done=True)
觀察值:2
下一輪動作:……
雖然知道了輸入與輸出的意義之後問題的最優解(1, 1, observation)看起來非常明確,但難點在於我們的目標是設計一個agent來自學這個系統的規則——也就是說,一個好的演算法其實不需要知道輸入與輸出代表的物理意義。
我的解決方案使用了金漵林老師第四篇部落格講授的蒙特卡羅控制演算法。
收斂速度不算很快(solved in 4282 episodes),當然這個收斂速度有一定運氣成分,但是和175步就訓練完成的演算法相比一定還是有差距的。
在這個任務中我得到的最重要的經驗是,在必要的時候,要有意識地調整“回報”來“激勵”演算法的學習——這樣說比較模糊。以這個問題為例,最開始我實現的時候直接使用了環境給出的reward值,始終沒有辦法訓練出可用的agent。確認演算法本身無誤後,我又手動輸入action運行了幾輪,發現了一個很重要的問題:如果僅僅移動游標而不進行輸入,這一輪的回報值是零。回報值是零就意味著,雖然這個動作沒有什麼好處,但agent也沒有必要學著去規避它。但這個問題是非常嚴重的,因為
1.觀察值不包含游標位置資訊,一旦非法移動游標,之後的方案將是完全隨機的.
2.在所有可能的動作中,存在非法移動游標問題的佔絕大多數,而在第一輪就採用正確答案(1, 1, observation)的概率微乎其微,幾乎不可能用1-epsilon方法來補救(epsilon等於1也不行).
所以最後我將所有的reward都再扣除0.5分,讓agent感受到這種動作的危害,它們就可以自學成優秀的agent了。
程式碼實現
import numpy as np
import gym
import random
env = gym.make('Copy-v0')
# observation_space: Discrete(6)
# action_space: (Discrete(2), Discrete(2), Discrete(5))
# states: 0, 1, 2, 3, 4, 5
# MARK: problem specific functions
def action_to_index(action):
return action[0]*10+action[1]*5+action[2]
def index_to_action(index):
action = [index/10, (index/5) % 2, index % 5]
return tuple(action)
# MARK: Monte Carlo ES method
STATE_COUNT = 6
ACTION_COUNT = 20
def init_mces(state_count, action_count):
q = np.random.rand(state_count, action_count)
rets = np.zeros((state_count, action_count), dtype=np.double)
policy = [random.randint(0, action_count-1) for _ in range(state_count)]
return q, rets, policy
def learning(env):
q, rets, policy = init_mces(STATE_COUNT, ACTION_COUNT)
gamma = 0.7
epsilon = 1
total_score = 0.0
i_episode = 0
for i_episode in range(5000):
total_reward = 0
observation = env.reset()
g = np.zeros((STATE_COUNT, ACTION_COUNT), dtype=np.double)
passed = np.zeros((STATE_COUNT, ACTION_COUNT), dtype=np.double)
for t in range(100):
raw_action = policy[observation]
# 1 - epsilon greedy
if random.random() < epsilon:
raw_action = action_to_index((random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 4)))
if passed[observation, raw_action] == 0.0:
passed[observation, raw_action] = 1.0
action = index_to_action(raw_action)
env.render()
observation, reward, done, info = env.step(action)
reward -= 0.5 # IMPORTANT: punish useless (even dangerous) actions whose environment reward is 0.0
total_reward += reward
for i in range(STATE_COUNT):
for j in range(ACTION_COUNT):
passed[i][j] *= gamma
g[i][j] += reward * passed[i][j]
if done:
break
# reduce exploration chance
if i_episode % 100 == 0:
epsilon *= 0.9
rets += g
q = rets / (i_episode+1)
policy = np.argmax(q, axis=1).tolist()
total_score += total_reward
return policy, i_episode
def test_policy(env, policy):
total_reward = 0.0
obs = env.reset()
for t in range(1000):
action = index_to_action(policy[obs])
obs, reward, done, info = env.step(action)
total_reward += reward
if done:
break
print('total reward: %f'%total_reward)
env.monitor.start('Copyv0-experiment-0')
policy, n_episode = learning(env)
env.monitor.close()
print('final policy: '+str(policy))
print('episodes trained: '+str(n_episode))
test_policy(env, policy)