1. 程式人生 > 實用技巧 >Policy Iteration與Value Iteration

Policy Iteration與Value Iteration

Improve a Policy through Policy Iteration

  • Iterate through the two steps:
    • Evaluate the policy \(\pi\) (computing \(v\) given current \(\pi\)),第一步:計算v函式,輸入,環境,策略以及衰減因子,來計算這個策略的價值。
    • Improve the policy by acting greedily wirh respect to \(v^\pi\),第二步:改進策略policy,通過對 \(v^\pi\) (第一步通過\(\pi\)求解出來的\(v\))採取貪心的演算法,來改進策略policy。

gym環境:FrozenLake-v0:http://gym.openai.com/envs/FrozenLake-v0/

程式碼來自:周博磊老師的GitHub:https://github.com/cuhkrlcourse/RLexample/tree/master/MDP

環境如下:

SFFF       (S: starting point, safe)
FHFH       (F: frozen surface, safe)
FFFH       (H: hole, fall to your doom)
HFFG       (G: goal, where the frisbee is located)
  • 環境解釋:冰封湖問題,智慧體控制角色在網格世界中的移動。網格中的某些冰面是可行走的,而某些冰面會導致主體掉入水中。另外,智慧體的移動方向是不確定的,並且僅部分取決於所選方向。(也就是如果你想向上走,你選擇的動作是向上走,但是實際不一定向上走,可能會發生偏移,向左,或向右,三個方向的概率是等價的,也就是都是0.3333)代理商因找到通往目標磚的可步行路徑而獲得獎勵。
# env.nA, 即每個位置的下一個方向的個數為4
# env.nS, 表示九宮格的大小為16
# env.P[state][a], 表示九宮格對應位置的方向,返回的是prob概率, next_state下一個位置,reward及時獎勵, done是夠到達出口
# env.shape, 表示九宮格的形狀,4 * 4

Policy Iteration:

"""
Solving FrozenLake environment using Policy-Iteration.
Adapted by Bolei Zhou for IERG6130. Originally from Moustafa Alzantot ([email protected])
"""
import numpy as np
import gym

RENDER=False
GAMMA=1.0

# 計算策略policy跑一個回合的獎勵:輸入環境,策略以及衰減因子,跑一個回合,返回獎勵值
def run_episode(env, policy, gamma = GAMMA, render = False):
    """ Runs an episode and return the total reward """
    obs = env.reset()
    # 重置環境
    total_reward = 0
    step_idx = 0
    while True:
        if render:
            env.render()
        # 如果想看環境渲染的話,就設定輸入的render為True,render預設為False
        obs, reward, done , _ = env.step(int(policy[obs]))
        total_reward += (gamma ** step_idx * reward)
        step_idx += 1
        if done:
            break
    return total_reward,step_idx

# 計算策略policy的平均獎勵
def evaluate_policy(env, policy, gamma = GAMMA, n = 100):
    scores = [run_episode(env, policy, gamma, render=RENDER) for _ in range(n)]
    return np.mean(scores)

# 第一步:計算v函式,輸入,環境,策略以及衰減因子,來計算這個策略的價值
def compute_policy_v(env, policy, gamma=GAMMA):
    """ Iteratively evaluate the value-function under policy.
    Alternatively, we could formulate a set of linear equations in iterms of v[s]
    and solve them to find the value function.
    """
    v = np.zeros(env.env.nS)
    eps = 1e-10
    # 將精度收斂到eps的時候,就停止更新
    while True:
        prev_v = np.copy(v)
        for s in range(env.env.nS):
            policy_a = policy[s]
            v[s] = sum([p * (r + gamma * prev_v[s_]) for p, s_, r, _ in env.env.P[s][policy_a]])
        if (np.sum((np.fabs(prev_v - v))) <= eps):
            # value converged
            break
    return v

# 第二步:改進策略policy,通過對old_policy_v採取貪心的演算法,來改進策略policy
def extract_policy(v, gamma = GAMMA):
    """ Extract the policy given a value-function """
    policy = np.zeros(env.env.nS)
    for s in range(env.env.nS):
        q_sa = np.zeros(env.env.nA)
        for a in range(env.env.nA):
            q_sa[a] = sum([p * (r + gamma * v[s_]) for p, s_, r, _ in  env.env.P[s][a]])
        policy[s] = np.argmax(q_sa)

    return policy

# policy_iteration的主要演算法
def policy_iteration(env, gamma = GAMMA):
    """ Policy-Iteration algorithm """
    policy = np.random.choice(env.env.nA, size=(env.env.nS))  # initialize a random policy

    max_iterations = 200000
    gamma = GAMMA
    for i in range(max_iterations):
        old_policy_v = compute_policy_v(env, policy, gamma)
        # 第一步:計算v函式,輸入,環境,策略以及衰減因子,來計算這個策略的價值
        new_policy = extract_policy(old_policy_v, gamma)
        # 第二步:改進策略policy,通過對old_policy_v採取貪心的演算法,來改進策略policy
        if (np.all(policy == new_policy)):
            # 如果policy已經不在發生改變了,也就是收斂了,無法提升了
            print ('Policy-Iteration converged at step %d.' %(i+1))
            break
        policy = new_policy
    return policy

if __name__ == '__main__':
    env_name  = 'FrozenLake-v0' # 'FrozenLake4x4-v0'
    env = gym.make(env_name)

    optimal_policy = policy_iteration(env, gamma = GAMMA)
    scores = evaluate_policy(env, optimal_policy, gamma = GAMMA)
    print('Average scores = ', np.mean(scores))

    print(optimal_policy)
    total,step=run_episode(env,optimal_policy,GAMMA,True)
    print("一共走了:",step)