強化學習-策略叠代代碼實現
阿新 • • 發佈:2019-02-16
eval 擁有 出發點 內容 並且 oat ict step ace
1. 前言
今天要重代碼的角度給大家詳細介紹下策略叠代的原理和實現方式。本節完整代碼GitHub。
我們開始介紹策略叠代前,先介紹一個蛇棋的遊戲
它是我們後面學習的環境,介紹下它的規則:
- 玩家每人擁有一個棋子,出發點在圖中標為“1”的格子處。
- 依次擲骰子,根據骰子的點數將自己的棋子向前行進相應的步數。假設筆者的棋子在“1”處,並且投擲出“4”,則筆者的棋子就可以到達“5”的位置。
- 棋盤上有一些梯子,它的兩邊與棋盤上的兩個格子相連。如果棋子落在其中一個格子上,就會自動走到梯子對應的另一個格子中。以圖5-5所示的棋盤為例,如果筆者的棋子在“1”處,並且投擲出“2”,那麽棋子將到達“3”處,由於此處有梯子,棋子將直接前進到梯子的另一段——“20”的位置。
- 最終的目標是到達“100”處,如果在到達時投擲的數字加上當前的位置超過了100,那麽棋子將首先到達100,剩余的步數將反向前進。
2. 蛇棋實現
我們實現蛇棋的邏輯,應該集成gym的env,然後分別重寫env下面的幾個重要的接口,這樣使用起來就可以和gym裏面封裝的小遊戲一樣了。
class SnakeEnv(gym.Env): SIZE = 100 def __init__(self, ladder_num, actions): """ :param int ladder_num: 梯子的個數 :param list actions: 可選擇的行為 """ self.ladder_num = ladder_num self.actions = actions # 在整個範圍內,隨機生成梯子 self.ladders = dict(np.random.randint(1, self.SIZE, size=(self.ladder_num, 2))) self.observation_space = Discrete(self.SIZE + 1) self.action_space = Discrete(len(actions)) # 因為梯子是兩個方向的,所以添加反方向的梯子 new_ladders = {} for k, v in self.ladders.items(): new_ladders[k] = v new_ladders[v] = k self.ladders = new_ladders self.pos = 1 # 重置初始狀態 def reset(self): self.pos = 1 return self.pos def step(self, action): """ :param int action: 選擇的行動 :return: 下一個狀態,獎勵值,是否結束,其它內容 """ step = np.random.randint(1, self.actions[action] + 1) self.pos += step if self.pos == 100: return 100, 100, 1, {} elif self.pos > 100: self.pos = 200 - self.pos if self.pos in self.ladders: self.pos = self.ladders[self.pos] return self.pos, -1, 0, {} # 返回狀態s的獎勵值 def reward(self, s): if s == 100: return 100 else: return -1
然後再實現一個我們自己的智能體agent,裏面包含的東西有狀態的獎勵、策略、行動狀態轉移矩陣、狀態值函數、狀態行動值函數等。
為了簡單,我們用表格,或者矩陣的形式來表示各種變量。
class TableAgent(object): def __init__(self, env): # 狀態個數 self.s_len = env.observation_space.n # 行動個數 self.a_len = env.action_space.n # 每個狀態的獎勵,shape=[1,self.s_len] self.r = [env.reward(s) for s in range(0, self.s_len)] # 每個狀態的行動策略,默認為0,shape=[1,self.s_len] self.pi = np.array([0 for s in range(0, self.s_len)]) # 行動狀態轉移矩陣,shape=[self.a_len, self.s_len, self.s_len] self.p = np.zeros([self.a_len, self.s_len, self.s_len], dtype=np.float) # 梯子 ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x) # 計算狀態s和行動a確定,下一個狀態s'的概率 for i, action in enumerate(env.actions): prob = 1.0 / action for src in range(1, 100): step = np.arange(action) step += src step = np.piecewise(step, [step > 100, step <= 100], [lambda x: 200 - x, lambda x: x]) step = ladder_move(step) for dst in step: self.p[i, src, dst] += prob self.p[:, 100, 100] = 1 # 狀態值函數 self.value_pi = np.zeros((self.s_len)) # 狀態行動值函數 self.value_q = np.zeros((self.s_len, self.a_len)) # 衰減因子 self.gamma = 0.8
3. 策略叠代實現
前面我們已經介紹過了,策略叠代的過程可以分為2個步驟
- 策略評估:策略評估時計算當前策略下,收斂的數據狀態值函數。
v^T_{\pi}(s_t)=\sum_{a_t}\pi^{T-1}(a_t|s_t)\sum_{s_{t+1}}p(s_{t+1}|s_t,a_t)[r_{a_t}^{s_{t+1}} + v^{T-1}_{\pi}(s_{t+1})]\;\;\;\;\;\;(1)
實現如下:
# 策略評估
def policy_evaluation(self, agent, max_iter=-1):
"""
:param obj agent: 智能體
:param int max_iter: 最大叠代數
"""
iteration = 0
while True:
iteration += 1
new_value_pi = agent.value_pi.copy()
# 對每個state計算v(s)
for i in range(1, agent.s_len):
ac = agent.pi[i]
transition = agent.p[ac, i, :]
value_sa = np.dot(transition, agent.r + agent.gamma * agent.value_pi)
new_value_pi[i] = value_sa
# 前後2次值函數的變化小於一個閾值,結束
diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
if diff < 1e-6:
break
else:
agent.value_pi = new_value_pi
if iteration == max_iter:
break
- 策略提升:在計算出了收斂的狀態值函數,再計算狀態-行動值函數,再找出最好的策略。
v_{\pi}(s_t)=\sum_{a_t}\pi(a_t|s_t)q_{\pi}(s_t,a_t)
q_{\pi}(s_t,a_t)=\sum_{s_{t+1}}p(s_{t+1}|s_t,a_t)[r_{a_t}^{s_{t+1}} + v_{\pi}(s_{t+1})]
實現如下:
# 策略提升
def policy_improvement(self, agent):
"""
:param obj agent: 智能體
"""
# 初始化新策略
new_policy = np.zeros_like(agent.pi)
for i in range(1, agent.s_len):
for j in range(0, agent.a_len):
# 計算每一個狀態行動值函數
agent.value_q[i, j] = np.dot(agent.p[j, i, :], agent.r + agent.gamma * agent.value_pi)
# 選出每個狀態下的最優行動
max_act = np.argmax(agent.value_q[i, :])
new_policy[i] = max_act
if np.all(np.equal(new_policy, agent.pi)):
return False
else:
agent.pi = new_policy
return True
# 策略叠代
def policy_iteration(self, agent):
"""
:param obj agent: 智能體
"""
iteration = 0
while True:
iteration += 1
self.policy_evaluation(agent)
ret = self.policy_improvement(agent)
if not ret:
break
print('Iter {} rounds converge'.format(iteration))
4. 總結
我們通過學習了策略叠代的實現,能夠比較清楚的看出強化學習的過程,策略叠代也是後面算法優化的一個基礎。
強化學習-策略叠代代碼實現