1. 程式人生 > 其它 >強化學習-蒙特卡洛方法

強化學習-蒙特卡洛方法

參考:

(1)強化學習(第二版)

(2)強化學習精要-核心演算法與TensorFlow實現

(3)https://www.cnblogs.com/pinard/p/9492980.html

(4)https://deepmind.com/learning-resources/-introduction-reinforcement-learning-david-silver

一、蒙特卡洛(MC)方法

無論是在策略迭代還是在價值迭代中,都利用了狀態轉移概率來求解價值函式,並根據價值函式來尋找最優策略。我們將知道狀態轉移概率的問題稱為“基於模型”的問題(Model-based Problem)。但是,在很多問題當中,我們是無法獲得狀態轉移概率資訊的,將這型別的問題稱為“無模型”問題(Model-free Problem)。蒙特卡洛演算法就是用於解決“無模型”問題,其思想和GPI(廣義策略迭代)相同,包括策略評估和策略改進兩個部分。

2、蒙特卡洛預測

給定一個策略,在不知道狀態轉移概率的情況下,蒙特卡洛演算法是如何來學習其價值函式的呢?那就是通過經驗。我們知道一個狀態的價值是從該狀態開始的期望回報,即未來的折扣收益累積值的期望。一個很自然的想法就是對所有經過這個狀態之後產生的回報進行平均。隨著越來越多的回報被觀察到,根據大數定律,該平均值就會收斂於期望值,我們就用這種方法來估計價值函式。由於只通過狀態價值函式我們是無法進行後期的策略改進的(因為不知道狀態轉移概率),所以我們實際上的估計目標是動作價值函式。

這裡涉及到兩種估計方法:(1)首次訪問型MC預測演算法(first-visit)。(2)每次訪問型MC預測演算法(every-visit)。

下面對這兩種方法進行解釋:

假設給定在策略$\pi$下的多幕資料,在每幕資料中每次狀態-動作對(s,a)的出現都稱為對(s,a)的一次訪問。當然,在同一幕中(s,a)可能會被多次訪問到。在這種情況下,我們稱(s,a)的第一次訪問為首次訪問。首次訪問型MC演算法用(s,a)在所有幕中首次訪問的回報的平均值估計$q_{\pi}\left(s,a\right)$,而每次訪問型MC演算法則用(s,a)在所有幕中所有訪問的回報的平均值估計$q_{\pi}\left(s,a\right)$。

兩種方法的比較:當(s,a)的訪問次數足夠多時,兩種方法都能收斂,首次訪問方法獲得的(s,a)的價值函式方差較小,比每次訪問方法收斂得更快。但當幕數較少時,每次訪問方法可能會獲得更好的效果。

下面是首次訪問型MC預測演算法:

為了節省儲存量,在遍歷每一幕時,我們使用增量式的更新方法來計算均值:$Q(s,a)=Q(s,a)+\frac{1}{N(s,a)}\left( G_{t}-Q(s,a) \right)$

增量式更新的推導過程:$\mu_{k}=\frac{1}{k}\sum_{j=k}^{k}{x_{j}}=\frac{1}{k}\left( x_{k} + \sum_{j=1}^{k-1}{x_{j}} \right)=\frac{1}{k}\left( x_{k}+\left( k-1 \right)\mu_{k-1} \right)=\mu_{k-1}+\frac{1}{k}\left( x_{k}-\mu_{k-1} \right)$

現在,我們考慮一個問題,如果採用確定性策略,那麼一些(s,a)可能永遠不會被訪問到,也就無法根據經驗改善$q_{\pi}\left( s,a \right)$的估計。這會不利於我們找到最優策略。因此,我們提出試探性出發假設,即每個(s,a)都有非零的概率可以作為一幕的起點,這樣就保證了在取樣的幕個數趨向於無窮的時候,每一個(s,a)都會被訪問到無數次。可以看出,在試探性出發假設中還隱含了另一個假設,即無限幕假設。我們之所以提出這兩個假設就是為了保證對於任意的$\pi$,蒙特卡洛演算法都能精確地計算對應的$q_{\pi}$。

3、蒙特卡洛控制

如上圖所示,這是經典策略迭代流程,蒙特卡洛演算法也可以使用這種方法。根據上文我們提出的兩個很強的假設,可以保證蒙特卡羅演算法的最終收斂。但是在實際操作中,我們是無法實現這兩個假設的,因此,為了得到可行的演算法,必須想辦法去掉這兩個假設。

首先,我們來設法去掉無限幕假設:(1)在每次策略評估中儘量確保對$q_{\pi}$的逼近。可以設定一個較小的誤差值來作為判定標準(這一想法在策略迭代的演算法實現中已經有所體現)。(2)不再追求$q_{\pi}$的收斂。我們可以在$q_{\pi}$尚未收斂的情況下進行策略評估。這種思想在GPI中已經提及到了。對於蒙特卡洛演算法來說,自然可以逐幕交替進行評估與改進。

下面是基於試探性出發的蒙特卡洛演算法(蒙特卡洛ES演算法):

 然後,我們來設法去掉試探性出發假設:(1)同軌策略方法(on-policy)。(2)離軌策略方法(off-policy)。

下面來詳細介紹這兩種方法:

(1)同軌策略方法

在同軌策略中,用於生成取樣資料序列的策略和用於實際決策的待改進的策略是相同的。其實,我們上面提到的蒙特卡洛ES演算法就是同軌策略方法的一個例子。現在,我們要提出一個不再依賴試探性出發假設的同軌策略的蒙特卡洛控制方法。即在同軌策略方法中,我們使用“軟性”策略。“軟性”策略是指,對於任意$s\in \mathcal{S}$以及$a\in \mathcal{A}$,都有$\pi\left( a|s \right)>0$,但它們會逐漸地逼近一個確定性策略。在具體演算法實現當中,我們使用$\epsilon-$貪心策略,即選擇非貪心動作的概率為$\frac{\epsilon}{\left| \mathcal{A}\left( s \right) \right|}$,選擇貪心動作的概率為$1-\epsilon+\frac{\epsilon}{\left| \mathcal{A}\left( s \right) \right|}$。這種策略既保證了對(s,a)的探索又儘可能地靠近了貪心策略。

下面是同軌策略的首次訪問型MC控制演算法:

(2)離軌策略方法

在同軌策略中,用於生成取樣資料序列的策略和用於實際決策的待改進的策略是不同的。用來學習的策略稱為目標策略($\pi$),用來生成行動樣本的策略稱為行動策略(b)。這樣分離的好處在於當行動策略能對所有可能的動作進行取樣時,目標策略可以使用確定策略(例如貪心策略)。

為了使用從b得到的多幕樣本序列去預測$\pi$,我們要求在$\pi$下發生的每個動作都至少偶爾能在b下發生。換句話說,對於任意$\pi\left( a|s \right)>0$,要求$b\left( a|s \right)>0$。我們稱其為覆蓋假設。

因為要在一個分佈(b)下去估計另一個分佈($\pi$)的期望回報,所以我們引入重要性取樣。

什麼是重要性取樣?

我們希望能夠得到定義在服從P分佈的X上的函式f(X)的期望,但是我們無法直接從P分佈上進行取樣,因此,我們引入另一個Q分佈,從Q分佈上進行取樣,從而間接地求出f(X)在P分佈上的期望。

當我們在Q分佈上取樣$\{ x_1,x_2,...,x_N \}$後,可以估計f(X)的期望,$E_{X\sim P}[f(X)]=\frac{1}{N}\sum_i^N{\frac{P\left( x_i \right)}{Q\left( x_i \right)}f\left( x_i \right)}$。其中$w^i=\frac{P\left( x_i \right)}{Q\left( x_i \right)}$稱為重要性權重。這種方法稱為普通重要性取樣。還有一種方法稱為加權重要性取樣,即$E_{X\sim P}[f(X)]= \sum_i^N{\frac{w^i}{\sum_j^N{w^j}}f\left( x_i \right)}$。

這兩種方法的區別在於:

普通重要性取樣是真實期望的無偏估計,但是在P分佈和Q分佈相差較大時,會導致$\frac{P\left( x_i \right)}{Q\left( x_i \right)}f\left( x_i \right)$的方差較大,難以收斂。

加權重要性取樣是真實期望的有偏估計,但是降低了$\frac{P\left( x_i \right)}{Q\left( x_i \right)}f\left( x_i \right)$的方差,易於收斂。

重要性取樣在離軌策略中的應用:

在行動策略b下采樣得到的樣本序列:$\{ S_t,A_t,R_{t+1},...,S_{T-1},A_{T-1},R_T,S_T \}$,產生該樣本序列的概率為$\prod_t^{T-1}{b\left( A_t|S_t \right)P\left( S_{t+1},R_{t+1}|S_t,A_t \right)}$。

在目標策略$\pi$下產生同樣樣本序列的概率為$\prod_t^{T-1}{\pi\left( A_t|S_t \right)P\left( S_{t+1},R_{t+1}|S_t,A_t \right)}$。

$\rho_{t:T-1}=\frac{\prod_t^{T-1}{\pi\left( A_t|S_t \right)P\left( S_{t+1},R_{t+1}|S_t,A_t \right)}}{\prod_t^{T-1}{b\left( A_t|S_t \right)P\left( S_{t+1},R_{t+1}|S_t,A_t \right)}}=\frac{\prod_t^{T-1}{\pi\left( A_t|S_t \right)}}{\prod_t^{T-1}{b\left( A_t|S_t \right)}}$,$\rho_{t:T-1}$即為重要性權重。

以每次訪問型方法為例,$\tau\left( s,a \right)$表示(s,a)在每一幕中出現時刻的集合,$T\left( t \right),t\in \tau\left( s,a \right)$表示時刻t所在幕的終止時刻,兩種取樣方式如下所示:

普通重要性取樣:$Q\left( s,a \right) =\frac{\sum_{t\in \tau \left( s,a \right)}{\rho _{t+1:T\left( t \right) -1}G_t}}{\left| \tau \left( s,a \right) \right|}$

加權重要性取樣:$Q\left( s,a \right) =\frac{\sum_{t\in \tau \left( s,a \right)}{\rho _{t+1:T\left( t \right) -1}G_t}}{\sum_{t\in \tau \left( s,a \right)}{\rho_{t+1:T\left( t \right)-1}}}$

下面是離軌策略MC預測演算法:

在遍歷每一幕時,我們仍舊使用增量式的更新方法來計算加權平均:

加權平均增量式更新的推導過程:

$
c_{k}=\sum_{i}^{k}{w_{i}}\\
\mu_{k}=\frac{\sum_{i}^{k}{w_{i}x_{i}}}{c_{k}}=\frac{\sum_{i}^{k-1}{w_{i}x_{i}}+w_{k}x_{k}}{c_{k}}=
\frac{c_{k-1}\mu_{k-1}}{c_{k}}+\frac{w_{k}x_{k}}{c_{k}}=
\frac{\left( c_{k}-w_{k} \right)\mu_{k-1}}{c_{k}}+\frac{w_{k}x_{k}}{c_{k}}=
\mu_{k-1}+\frac{w_{k}}{c_{k}}\left( x_{k}-\mu_{k-1} \right)
$

下面是離軌策略MC控制演算法:

二、蒙特卡洛例項

1、遊戲背景介紹

請參考:https://www.cnblogs.com/lihanlihan/p/15956427.html

2、程式碼實現

(1)同軌策略的首次訪問型MC控制演算法

import numpy as np
import gym
from gym.spaces import Discrete
from contextlib import contextmanager
import time

class SnakeEnv(gym.Env):
    
    #棋格數
    SIZE = 100
    
    def __init__(self, dices):
        
        #動作上限列表
        self.dices = dices 
        #梯子
        self.ladders = {82: 52, 52: 92, 26: 66, 98: 22, 14: 22, 96: 63, 35: 12, 54: 78, 76: 57}
        #狀態空間
        self.observation_space = Discrete(self.SIZE + 1)
        #動作空間
        self.action_space = Discrete(len(dices))
        #初始位置
        self.pos = 1
        
    def reset(self):
        
        self.pos = 1
        return self.pos
    
    def step(self, a):
        
        step = np.random.randint(1, self.dices[a] + 1)
        self.pos += step
        
        #到達終點,結束遊戲
        if self.pos == 100:
            return 100, 100, 1, {}
        #超過終點位置,回退
        elif self.pos > 100:
            self.pos = 200 - self.pos
            
        if self.pos in self.ladders:
            self.pos = self.ladders[self.pos]
            
        return self.pos, -1, 0, {}
    
    def reward(self, s):
        
        if s == 100:
            return 100
        else:
            return -1
        
    def render(self):
        
        pass
    
class TableAgent():
    
    def __init__(self, env):
        
        #狀態空間數
        self.s_len = env.observation_space.n
        #動作空間數
        self.a_len = env.action_space.n
        
        #訓練時使用ε-貪心策略,測試時使用貪心策略
        self.pi = np.full([self.s_len, self.a_len], 1 / self.a_len)
        
        #狀態-動作價值函式
        self.value_q = np.zeros((self.s_len, self.a_len))
        #q(s,a)取樣數
        self.value_n = np.zeros((self.s_len, self.a_len))
        #打折率
        self.gamma = 0.5
        
    def play(self, state):
        
        return np.random.choice(a = np.arange(self.a_len), p = self.pi[state])
        
class MonteCarlo():
    
    def __init__(self):
        
        pass
        
    def monte_carlo(self, agent, env, epsilon):
        
        for i in range(20000):

            #取樣
            state = env.reset()
            episode = []
            while True:
                ac = agent.play(state)
                next_state, reward, terminate, _ = env.step(ac)
                episode.append((state, ac, reward))
                state = next_state
                if terminate:
                    break
            
            #計算(s,a)的長期回報
            value = []
            return_val = 0
            for item in reversed(episode):
                return_val = return_val * agent.gamma + item[2]
                value.append((item[0], item[1], return_val))
            
            states = {}
            f = 0
            for item in reversed(value):
                #first-visit
                if not states.get((item[0], item[1]), False):
                    states[(item[0], item[1])] = True
                    
                    d = agent.value_q[item[0]][item[1]]
                    
                    agent.value_n[item[0]][item[1]] += 1
                    agent.value_q[item[0]][item[1]] += (item[2] - agent.value_q[item[0]][item[1]]) / agent.value_n[item[0]][item[1]]
                    
                    f = max(f, abs(agent.value_q[item[0]][item[1]] - d))
                    
                    max_act = np.argmax(agent.value_q[item[0], :])
                    agent.pi[item[0]] = epsilon / agent.a_len
                    agent.pi[item[0]][max_act] += 1 - epsilon
            
            print(f)

def eval_game(env, policy):
    
    state = env.reset()
    return_val = 0
    
    for epoch in range(100):
        while True:
            if isinstance(policy, TableAgent):
                act = policy.play(state)
            elif isinstance(policy, list):
                act = policy[state]
            else:
                raise IOError('Illegal policy')
            
            state, reward, terminate, _ = env.step(act)
            return_val += reward
            
            if terminate:
                break
        
    return return_val / 100

@contextmanager
def timer(name):
    
    start = time.time()
    yield
    end = time.time()
    print('{} cost:{}'.format(name, end - start))

def monte_carlo_demo():
    
    env = SnakeEnv([3, 6])
    agent = TableAgent(env)
    mc = MonteCarlo()
    with timer('耗時'):
        mc.monte_carlo(agent, env, 0.2)
    policy = []
    for i in range(agent.s_len):
        policy.append(np.argmax(agent.pi[i]))
    print('貪心策略:', policy)
    
monte_carlo_demo()

執行結果:

利用eval_game函式測試貪心策略結果:

(2)離軌策略MC控制演算法

import numpy as np
import gym
from gym.spaces import Discrete
from contextlib import contextmanager
import time

class SnakeEnv(gym.Env):
    
    #棋格數
    SIZE = 100
    
    def __init__(self, dices):
        
        #動作上限列表
        self.dices = dices 
        #梯子
        self.ladders = {82: 52, 52: 92, 26: 66, 98: 22, 14: 22, 96: 63, 35: 12, 54: 78, 76: 57}
        #狀態空間
        self.observation_space = Discrete(self.SIZE + 1)
        #動作空間
        self.action_space = Discrete(len(dices))
        #初始位置
        self.pos = 1
        
    def reset(self):
        
        self.pos = 1
        return self.pos
    
    def step(self, a):
        
        step = np.random.randint(1, self.dices[a] + 1)
        self.pos += step
        
        #到達終點,結束遊戲
        if self.pos == 100:
            return 100, 100, 1, {}
        #超過終點位置,回退
        elif self.pos > 100:
            self.pos = 200 - self.pos
            
        if self.pos in self.ladders:
            self.pos = self.ladders[self.pos]
            
        return self.pos, -1, 0, {}
    
    def reward(self, s):
        
        if s == 100:
            return 100
        else:
            return -1
        
    def render(self):
        
        pass
    
class TableAgent():
    
    def __init__(self, env):
        
        #狀態空間數
        self.s_len = env.observation_space.n
        #動作空間數
        self.a_len = env.action_space.n
        
        #目標策略
        self.pi = np.zeros(self.s_len)
        #行動策略
        self.b = np.full([self.s_len, self.a_len], 1 / self.a_len)
        
        #動作價值函式
        self.value_q = np.zeros((self.s_len, self.a_len))
        #(s,a)取樣數
        self.c = np.zeros((self.s_len, self.a_len))
        #打折率
        self.gamma = 0.5
        
    def play(self, state):
        
        #訓練時使用策略b,測試時使用策略π
        return np.random.choice(a = np.arange(self.a_len), p = self.b[state])
        
class MonteCarlo():
    
    def __init__(self):
        
        pass
        
    def monte_carlo(self, agent, env, epsilon):
        
        for i in range(20000):
            
            #取樣
            state = env.reset()
            episode = []
            while True:
                ac = agent.play(state)
                next_state, reward, terminate, _ = env.step(ac)
                episode.append((state, ac, reward))
                state = next_state
                if terminate:
                    break
                    
            #計算(s,a)的長期回報
            value = []
            return_val = 0
            for item in reversed(episode):
                return_val = return_val * agent.gamma + item[2]
                value.append((item[0], item[1], return_val))
                
            f = 0
            w = 1
            #every-visit
            for item in value:
                d = agent.value_q[item[0]][item[1]]
                
                agent.c[item[0]][item[1]] += w
                agent.value_q[item[0]][item[1]] += (w / agent.c[item[0]][item[1]]) * (item[2] - agent.value_q[item[0]][item[1]])
                
                f = max(f, abs(agent.value_q[item[0]][item[1]] - d))
                
                max_act = np.argmax(agent.value_q[item[0]])
                agent.pi[item[0]] = max_act
                if item[1] != agent.pi[item[0]]:
                    break
                w *= 1 / agent.b[item[0]][item[1]]
                
            #b使用ε-貪心策略
            for state in range(agent.s_len):
                max_act = np.argmax(agent.value_q[state])
                agent.b[state] = epsilon / agent.a_len
                agent.b[state][max_act] += 1 - epsilon
                
            print(f)
            
def eval_game(env, policy):
    
    state = env.reset()
    return_val = 0
    
    for epoch in range(100):
        while True:
            if isinstance(policy, TableAgent):
                act = policy.play(state)
            elif isinstance(policy, list):
                act = policy[state]
            else:
                raise IOError('Illegal policy')
            
            state, reward, terminate, _ = env.step(act)
            return_val += reward
    
            if terminate:
                break
    
    return return_val / 100

@contextmanager
def timer(name):
    
    start = time.time()
    yield
    end = time.time()
    print('{} cost:{}'.format(name, end - start))

def monte_carlo_demo():
    
    env = SnakeEnv([3, 6])
    agent = TableAgent(env)
    mc = MonteCarlo()
    with timer('耗時'):
        mc.monte_carlo(agent, env, 0.5)
    print('目標策略:', list(agent.pi.astype(np.int32)))
    
monte_carlo_demo()

執行結果:

利用eval_game函式測試目標策略結果: