1. 程式人生 > >強化學習-策略叠代代碼實現

強化學習-策略叠代代碼實現

eval 擁有 出發點 內容 並且 oat ict step ace

1. 前言

今天要重代碼的角度給大家詳細介紹下策略叠代的原理和實現方式。本節完整代碼GitHub。

我們開始介紹策略叠代前,先介紹一個蛇棋的遊戲

技術分享圖片

它是我們後面學習的環境,介紹下它的規則:

  1. 玩家每人擁有一個棋子,出發點在圖中標為“1”的格子處。
  2. 依次擲骰子,根據骰子的點數將自己的棋子向前行進相應的步數。假設筆者的棋子在“1”處,並且投擲出“4”,則筆者的棋子就可以到達“5”的位置。
  3. 棋盤上有一些梯子,它的兩邊與棋盤上的兩個格子相連。如果棋子落在其中一個格子上,就會自動走到梯子對應的另一個格子中。以圖5-5所示的棋盤為例,如果筆者的棋子在“1”處,並且投擲出“2”,那麽棋子將到達“3”處,由於此處有梯子,棋子將直接前進到梯子的另一段——“20”的位置。
  4. 最終的目標是到達“100”處,如果在到達時投擲的數字加上當前的位置超過了100,那麽棋子將首先到達100,剩余的步數將反向前進。

2. 蛇棋實現

我們實現蛇棋的邏輯,應該集成gym的env,然後分別重寫env下面的幾個重要的接口,這樣使用起來就可以和gym裏面封裝的小遊戲一樣了。

class SnakeEnv(gym.Env):
    SIZE = 100

    def __init__(self, ladder_num, actions):
        """
        :param int ladder_num: 梯子的個數
        :param list actions: 可選擇的行為
        """
        self.ladder_num = ladder_num
        self.actions = actions
        # 在整個範圍內,隨機生成梯子
        self.ladders = dict(np.random.randint(1, self.SIZE, size=(self.ladder_num, 2)))
        self.observation_space = Discrete(self.SIZE + 1)
        self.action_space = Discrete(len(actions))

        # 因為梯子是兩個方向的,所以添加反方向的梯子
        new_ladders = {}
        for k, v in self.ladders.items():
            new_ladders[k] = v
            new_ladders[v] = k
        self.ladders = new_ladders
        self.pos = 1

    # 重置初始狀態
    def reset(self):
        self.pos = 1
        return self.pos

    def step(self, action):
        """
        :param int action: 選擇的行動
        :return: 下一個狀態,獎勵值,是否結束,其它內容
        """
        step = np.random.randint(1, self.actions[action] + 1)
        self.pos += step
        if self.pos == 100:
            return 100, 100, 1, {}
        elif self.pos > 100:
            self.pos = 200 - self.pos

        if self.pos in self.ladders:
            self.pos = self.ladders[self.pos]
        return self.pos, -1, 0, {}

    # 返回狀態s的獎勵值
    def reward(self, s):
        if s == 100:
            return 100
        else:
            return -1

然後再實現一個我們自己的智能體agent,裏面包含的東西有狀態的獎勵、策略、行動狀態轉移矩陣、狀態值函數、狀態行動值函數等。

為了簡單,我們用表格,或者矩陣的形式來表示各種變量。

class TableAgent(object):
    def __init__(self, env):
        # 狀態個數
        self.s_len = env.observation_space.n
        # 行動個數
        self.a_len = env.action_space.n
        # 每個狀態的獎勵,shape=[1,self.s_len]
        self.r = [env.reward(s) for s in range(0, self.s_len)]
        # 每個狀態的行動策略,默認為0,shape=[1,self.s_len]
        self.pi = np.array([0 for s in range(0, self.s_len)])
        # 行動狀態轉移矩陣,shape=[self.a_len, self.s_len, self.s_len]
        self.p = np.zeros([self.a_len, self.s_len, self.s_len], dtype=np.float)
        # 梯子
        ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x)

        # 計算狀態s和行動a確定,下一個狀態s'的概率
        for i, action in enumerate(env.actions):
            prob = 1.0 / action
            for src in range(1, 100):
                step = np.arange(action)
                step += src
                step = np.piecewise(step, [step > 100, step <= 100],
                                    [lambda x: 200 - x, lambda x: x])
                step = ladder_move(step)
                for dst in step:
                    self.p[i, src, dst] += prob

        self.p[:, 100, 100] = 1
        # 狀態值函數
        self.value_pi = np.zeros((self.s_len))
        # 狀態行動值函數
        self.value_q = np.zeros((self.s_len, self.a_len))
        # 衰減因子
        self.gamma = 0.8

3. 策略叠代實現

前面我們已經介紹過了,策略叠代的過程可以分為2個步驟

  • 策略評估:策略評估時計算當前策略下,收斂的數據狀態值函數。
v^T_{\pi}(s_t)=\sum_{a_t}\pi^{T-1}(a_t|s_t)\sum_{s_{t+1}}p(s_{t+1}|s_t,a_t)[r_{a_t}^{s_{t+1}} + v^{T-1}_{\pi}(s_{t+1})]\;\;\;\;\;\;(1)

實現如下:

# 策略評估
def policy_evaluation(self, agent, max_iter=-1):
    """
    :param obj agent: 智能體
    :param int max_iter: 最大叠代數
    """
    iteration = 0

    while True:
        iteration += 1
        new_value_pi = agent.value_pi.copy()
        # 對每個state計算v(s)
        for i in range(1, agent.s_len):
            ac = agent.pi[i]
            transition = agent.p[ac, i, :]
            value_sa = np.dot(transition, agent.r + agent.gamma * agent.value_pi)
            new_value_pi[i] = value_sa

        # 前後2次值函數的變化小於一個閾值,結束
        diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
        if diff < 1e-6:
            break
        else:
            agent.value_pi = new_value_pi
        if iteration == max_iter:
            break
  • 策略提升:在計算出了收斂的狀態值函數,再計算狀態-行動值函數,再找出最好的策略。
v_{\pi}(s_t)=\sum_{a_t}\pi(a_t|s_t)q_{\pi}(s_t,a_t)
q_{\pi}(s_t,a_t)=\sum_{s_{t+1}}p(s_{t+1}|s_t,a_t)[r_{a_t}^{s_{t+1}} + v_{\pi}(s_{t+1})]

實現如下:

# 策略提升
def policy_improvement(self, agent):
    """
    :param obj agent: 智能體
    """

    # 初始化新策略
    new_policy = np.zeros_like(agent.pi)
    for i in range(1, agent.s_len):
        for j in range(0, agent.a_len):
            # 計算每一個狀態行動值函數
            agent.value_q[i, j] = np.dot(agent.p[j, i, :], agent.r + agent.gamma * agent.value_pi)

        # 選出每個狀態下的最優行動
        max_act = np.argmax(agent.value_q[i, :])
        new_policy[i] = max_act
    if np.all(np.equal(new_policy, agent.pi)):
        return False
    else:
        agent.pi = new_policy
        return True

# 策略叠代
def policy_iteration(self, agent):
    """
    :param obj agent: 智能體
    """
    iteration = 0
    while True:
        iteration += 1
        self.policy_evaluation(agent)
        ret = self.policy_improvement(agent)
        if not ret:
            break
    print('Iter {} rounds converge'.format(iteration))

4. 總結

我們通過學習了策略叠代的實現,能夠比較清楚的看出強化學習的過程,策略叠代也是後面算法優化的一個基礎。

強化學習-策略叠代代碼實現