Policy Iteration與Value Iteration
阿新 • • 發佈:2021-01-21
Improve a Policy through Policy Iteration
- Iterate through the two steps:
Evaluate
the policy \(\pi\) (computing \(v\) given current \(\pi\)),第一步:計算v函式,輸入,環境,策略以及衰減因子,來計算這個策略的價值。Improve
the policy by acting greedily wirh respect to \(v^\pi\),第二步:改進策略policy,通過對 \(v^\pi\) (第一步通過\(\pi\)求解出來的\(v\))採取貪心的演算法,來改進策略policy。
gym環境:FrozenLake-v0:http://gym.openai.com/envs/FrozenLake-v0/
程式碼來自:周博磊老師的GitHub:https://github.com/cuhkrlcourse/RLexample/tree/master/MDP
環境如下:
SFFF (S: starting point, safe) FHFH (F: frozen surface, safe) FFFH (H: hole, fall to your doom) HFFG (G: goal, where the frisbee is located)
- 環境解釋:冰封湖問題,智慧體控制角色在網格世界中的移動。網格中的某些冰面是可行走的,而某些冰面會導致主體掉入水中。另外,智慧體的移動方向是不確定的,並且僅部分取決於所選方向。(也就是如果你想向上走,你選擇的動作是向上走,但是實際不一定向上走,可能會發生偏移,向左,或向右,三個方向的概率是等價的,也就是都是0.3333)代理商因找到通往目標磚的可步行路徑而獲得獎勵。
# env.nA, 即每個位置的下一個方向的個數為4 # env.nS, 表示九宮格的大小為16 # env.P[state][a], 表示九宮格對應位置的方向,返回的是prob概率, next_state下一個位置,reward及時獎勵, done是夠到達出口 # env.shape, 表示九宮格的形狀,4 * 4
Policy Iteration:
"""
Solving FrozenLake environment using Policy-Iteration.
Adapted by Bolei Zhou for IERG6130. Originally from Moustafa Alzantot ([email protected])
"""
import numpy as np
import gym
RENDER=False
GAMMA=1.0
# 計算策略policy跑一個回合的獎勵:輸入環境,策略以及衰減因子,跑一個回合,返回獎勵值
def run_episode(env, policy, gamma = GAMMA, render = False):
""" Runs an episode and return the total reward """
obs = env.reset()
# 重置環境
total_reward = 0
step_idx = 0
while True:
if render:
env.render()
# 如果想看環境渲染的話,就設定輸入的render為True,render預設為False
obs, reward, done , _ = env.step(int(policy[obs]))
total_reward += (gamma ** step_idx * reward)
step_idx += 1
if done:
break
return total_reward,step_idx
# 計算策略policy的平均獎勵
def evaluate_policy(env, policy, gamma = GAMMA, n = 100):
scores = [run_episode(env, policy, gamma, render=RENDER) for _ in range(n)]
return np.mean(scores)
# 第一步:計算v函式,輸入,環境,策略以及衰減因子,來計算這個策略的價值
def compute_policy_v(env, policy, gamma=GAMMA):
""" Iteratively evaluate the value-function under policy.
Alternatively, we could formulate a set of linear equations in iterms of v[s]
and solve them to find the value function.
"""
v = np.zeros(env.env.nS)
eps = 1e-10
# 將精度收斂到eps的時候,就停止更新
while True:
prev_v = np.copy(v)
for s in range(env.env.nS):
policy_a = policy[s]
v[s] = sum([p * (r + gamma * prev_v[s_]) for p, s_, r, _ in env.env.P[s][policy_a]])
if (np.sum((np.fabs(prev_v - v))) <= eps):
# value converged
break
return v
# 第二步:改進策略policy,通過對old_policy_v採取貪心的演算法,來改進策略policy
def extract_policy(v, gamma = GAMMA):
""" Extract the policy given a value-function """
policy = np.zeros(env.env.nS)
for s in range(env.env.nS):
q_sa = np.zeros(env.env.nA)
for a in range(env.env.nA):
q_sa[a] = sum([p * (r + gamma * v[s_]) for p, s_, r, _ in env.env.P[s][a]])
policy[s] = np.argmax(q_sa)
return policy
# policy_iteration的主要演算法
def policy_iteration(env, gamma = GAMMA):
""" Policy-Iteration algorithm """
policy = np.random.choice(env.env.nA, size=(env.env.nS)) # initialize a random policy
max_iterations = 200000
gamma = GAMMA
for i in range(max_iterations):
old_policy_v = compute_policy_v(env, policy, gamma)
# 第一步:計算v函式,輸入,環境,策略以及衰減因子,來計算這個策略的價值
new_policy = extract_policy(old_policy_v, gamma)
# 第二步:改進策略policy,通過對old_policy_v採取貪心的演算法,來改進策略policy
if (np.all(policy == new_policy)):
# 如果policy已經不在發生改變了,也就是收斂了,無法提升了
print ('Policy-Iteration converged at step %d.' %(i+1))
break
policy = new_policy
return policy
if __name__ == '__main__':
env_name = 'FrozenLake-v0' # 'FrozenLake4x4-v0'
env = gym.make(env_name)
optimal_policy = policy_iteration(env, gamma = GAMMA)
scores = evaluate_policy(env, optimal_policy, gamma = GAMMA)
print('Average scores = ', np.mean(scores))
print(optimal_policy)
total,step=run_episode(env,optimal_policy,GAMMA,True)
print("一共走了:",step)