1. 程式人生 > 其它 >基於超級瑪麗的PPO演算法實現【TF 2.4】

基於超級瑪麗的PPO演算法實現【TF 2.4】

技術標籤:RL強化學習tensorflow

文章目錄

前言

  PPO是目前強化學習中的主流演算法之一,本文參考了github上已有的pytorch實現,實現了tensorflow版的PPO演算法。

RL環境配置

環境簡介

  在實現演算法之前,首先需要有一個超級瑪麗環境,github上已經有人為我們提供了封裝好gym api的gym-super-mario-bros,只需利用命令pip install gym-super-mario-bros即可安裝,該環境相關的配置(如動作、獎勵等)可參考上述連結。

自定義環境

  原始的環境為我們提供了每一幀的影象,這對於演算法來說不能獲取到當前時刻的變化情況,因此需要將多個連續幀影象作為當前的狀態,而每一幀呼叫演算法輸出一個動作的做法並不十分必要,我們可採用跳幀的方法,在所跳的幀之間採用與前一幀相同的動作。另外,為了減少計算量,我們將每一幀的影象縮小為(84,84)的影象並灰度化。為了便於使用環境,不需再人為控制環境結束狀態的處理,可包裝一下環境設定環境自動處理結束狀態並重置。我們還可自定義環境的獎勵,增加訓練效率。以下是基於上述想法對環境的包裝實現:

import cv2
# from IPython.display import clear_output
from gym import Wrapper from gym.spaces import Box import gym_super_mario_bros from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT, RIGHT_ONLY from nes_py.wrappers import JoypadSpace import numpy as np def process_frame(frame, height, width): frame = cv2.cvtColor(frame, cv2.
COLOR_RGB2GRAY) frame = cv2.resize(frame, (width, height))[:, :, None] / 255. return frame class CustomEnvironment(Wrapper): def __init__(self, env, height, width): super().__init__(env) self.observation_space = Box(low=0, high=1, shape=(height, width, 1)) self.height = height self.width = width def step(self, action): state, reward, done, info = self.env.step(action) state = process_frame(state, self.height, self.width) if done: if info["flag_get"]: reward += 50 else: reward -= 50 return state, reward / 10., done, info def reset(self): return process_frame(self.env.reset(), self.height, self.width) class SkipFrame(Wrapper): def __init__(self, env, skip=4): super().__init__(env) self.observation_space = Box( low=0, high=1, shape=(*self.env.observation_space.shape[:-1], skip) ) self.skip = skip def step(self, action): total_reward = 0 states = [] state, reward, done, info = self.env.step(action) for i in range(self.skip): if not done: state, reward, done, info = self.env.step(action) total_reward += reward states.append(state) else: states.append(state) states = np.concatenate(states, axis=-1) return states.astype(np.float32), total_reward, done, info def reset(self): state = self.env.reset() states = np.concatenate([state for _ in range(self.skip)], axis=-1) return states.astype(np.float32) class AutoReset(Wrapper): def __init__(self, env): super().__init__(env) def step(self, action): state, reward, done, info = self.env.step(action) if done: state = self.env.reset() return state, reward, done, info def reset(self): return self.env.reset() def create_env(world, stage, action_type, height, width): env = gym_super_mario_bros.make(f'SuperMarioBros-{world}-{stage}-v0') env = JoypadSpace(env, action_type) env = CustomEnvironment(env, height, width) env = SkipFrame(env) env = AutoReset(env) return env

多程序資料取樣

  為了增加訓練的穩定性及增加取樣效率,我們可以使用多程序同時取樣訓練資料,這裡我們使用multiprocessing這個庫的管道(Pipe)進行通訊,其示例可參考此處。下面的程式碼根據gym 常用api將多程序環境進行了封裝:

import multiprocessing as mp

class MultipleEnvironments():
    def __init__(self, num_envs, create_env, *args):
        assert num_envs > 0
        self.agent_conns, self.env_conns = zip(*[mp.Pipe() for _ in range(num_envs)])
        for conn in self.env_conns:
            process = mp.Process(target=self.run, args=(conn, create_env, *args))
            process.start()

    @staticmethod
    def run(conn, create_env, *args):
        env = create_env(*args)
        while True:
            request, action = conn.recv()
            if request == 'step':
                conn.send(env.step(action))
            elif request == 'reset':
                conn.send(env.reset())
            elif request == 'render':
                env.render()
            elif request == 'close':
                env.close()
                break
            elif hasattr(env, request):
                conn.send(getattr(env, request))
            else:
                raise NotImplementedError
    
    def __getattr__(self, name):
        if name in self.__dict__:
            return self.__dict__[name]
        assert not env.agent_conns[0].closed, 'Environment closed.'
        self.agent_conns[0].send([name, None])
        return self.agent_conns[0].recv()
                
    def step(self, actions):
        assert not env.agent_conns[0].closed, 'Environment closed.'
        for conn, action in zip(self.agent_conns, actions):
            conn.send(['step', action.item()])
        return tuple(zip(*[conn.recv() for conn in self.agent_conns]))
    
    def reset(self):
        assert not env.agent_conns[0].closed, 'Environment closed.'
        for conn in self.agent_conns:
            conn.send(['reset', None])
        return tuple(conn.recv() for conn in self.agent_conns)
    
    def render(self):
        assert not env.agent_conns[0].closed, 'Environment closed.'
        for conn in self.agent_conns:
            conn.send(['render', None])
                
    def close(self):
        assert not env.agent_conns[0].closed, 'Environment closed.'
        for conn in self.agent_conns:
            conn.send(['close', None])
            conn.close()
        for conn in self.env_conns:
            conn.close()

訓練模型實現

取樣資料儲存容器

  在實現模型之前,我們先實現一個用於儲存取樣資料的類。由於之後模型需要從後往前計算GAE,為了便於後續模型使用資料,這裡我將逆序儲存取樣資料。

class Memory():
    def __init__(self):
        self.reset()
    
    def store(self, state, action, prob, reward, next_state, done):
        self.states.insert(0, state)
        self.actions.insert(0, action)
        self.probs.insert(0, prob)
        self.rewards.insert(0, reward)
        self.next_states.insert(0, next_state)
        self.dones.insert(0, done)
        
    def reset(self):
        self.states = []
        self.actions = []
        self.probs = []
        self.rewards = []
        self.next_states = []
        self.dones = []

特徵提取模型

  環境的狀態由多幀連續影象組成,這裡的特徵提取模型採用CNN網路。為了提高網路的計算速度,這裡關閉了TF的動態圖機制。

import tensorflow as tf
from tensorflow.keras import Input, Model, Sequential
import tensorflow.keras.backend as K
from tensorflow.keras.layers import *
tf.compat.v1.disable_eager_execution() # 關閉動態圖機制

class Feature(Layer):
    def __init__(self):
        super().__init__()
        self.model = Sequential([
            Conv2D(32, 3, strides=2, activation='relu', padding='same'),
            Conv2D(32, 3, strides=2, activation='relu', padding='same'),
            Conv2D(32, 3, strides=2, activation='relu', padding='same'),
            Conv2D(32, 3, strides=2, activation='relu', padding='same'),
            Flatten(),
            Dense(512, activation='relu'),
        ])
        
    def call(self, x):
        return self.model(x)

PPO實現

class PPOTrainer():
    def __init__(
        self, 
        obs_shape, 
        act_n, 
        lmbda=0.97, 
        gamma=0.99, 
        lr=2e-4, 
        eps_clip=0.2,
        train_step=10,
        entropy_coef=0.05,
        checkpoint_path='mario',
    ):
        self.memory = Memory()
        self.lmbda = lmbda
        self.gamma = gamma 
        self.lr = lr
        self.obs_shape = obs_shape
        self.act_n = act_n
        self.eps_clip = eps_clip
        self.train_step = train_step
        self.entropy_coef = entropy_coef
        self.policy, self.value, self.train_model = self.build_model()
        ckpt = tf.train.Checkpoint(
            train_model=self.train_model,
            optimizer=self.train_model.optimizer,
        )
        self.ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=1) 
        
    def build_model(self):
        s_input = Input(self.obs_shape)
        prob_old_input = Input([])
        action_old_input = Input([], dtype='int32')
        gae_input = Input([])
        v_target_input = Input([])
        
        feature = Feature()
        x = feature(s_input)
        policy_dense = Dense(self.act_n, activation='softmax')
        value_dense = Dense(1)
        prob = policy_dense(x)
        v = value_dense(x)
        policy = Model(inputs=s_input, outputs=prob)
        value = Model(inputs=s_input, outputs=v)

        prob_cur = tf.gather(prob, action_old_input, batch_dims=1)
        ratio = prob_cur / (prob_old_input + 1e-3)
        surr1 = ratio * gae_input
        surr2 = K.clip(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * gae_input
        
        # 第二項為熵值計算,由於是按照動作概率取樣,因此計算時不需再乘上概率,並且只需要計算當前動作概率的對數
        policy_loss = -K.mean(K.minimum(surr1, surr2)) + K.mean(K.log(prob_cur + 1e-3)) * self.entropy_coef 
        
        value_loss = K.mean((v[:, 0] - v_target_input) ** 2)
        loss = policy_loss + value_loss
        train_model = Model(inputs=[s_input, prob_old_input, action_old_input, gae_input, v_target_input], outputs=loss)
        train_model.add_loss(loss)
        train_model.compile(tf.keras.optimizers.Adam(self.lr))
        return policy, value, train_model
    
    def choose_action(self, states):
        # states.shape: (env_num, height, width, skip_frames) 
        probs = self.policy.predict(states) # shape: (env_num, act_n)
        actions = [np.random.choice(range(self.act_n), p=prob) for prob in probs] # shape: (env_num)
        return actions, probs[np.arange(len(probs)), actions]

    def store(self, states, actions, probs, rewards, next_states, dones):
        self.memory.store(states, actions, probs, rewards, next_states, dones)
           
    def update_model(self, batch_size=128):
        states = np.array(self.memory.states) # shape: (-1, env_num, height, width, skip_frames)
        actions = np.array(self.memory.actions) # shape: (-1, env_num)
        probs = np.array(self.memory.probs) # shape: (-1, env_num)
        rewards = np.array(self.memory.rewards) # shape: (-1, env_num)
        next_states = np.array(self.memory.next_states) # shape: (-1, env_num, height, width, skip_frames)
        dones = np.array(self.memory.dones) # shape: (-1, env_num)
        
        env_num = states.shape[1]
        states = states.reshape([-1, *states.shape[2:]])
        next_states = next_states.reshape([-1, *next_states.shape[2:]])
        actions = actions.flatten()
        probs = probs.flatten()
        
        for step in range(self.train_step):
            v = self.value.predict(states, batch_size=batch_size)
            v_next = self.value.predict(next_states, batch_size=batch_size)
            v = v.reshape([v.shape[0] // env_num, env_num])
            v_next = v_next.reshape([v_next.shape[0] // env_num, env_num])
            
            v_target = rewards + self.gamma * v_next * ~dones
            td_errors = v_target - v
            gae_lst = []
            adv = 0
            for delta in td_errors:
                adv = self.gamma * self.lmbda * adv + delta
                gae_lst.append(adv)
            
            gaes = np.array(gae_lst)
            gaes = gaes.flatten()
            v_target = v_target.flatten()
            self.train_model.fit([states, probs, actions, gaes, v_target], batch_size=batch_size)

        self.memory.reset()
        
    def save(self):
        self.ckpt_manager.save()
        
    def load(self):     
        if self.ckpt_manager.latest_checkpoint:
            status = agent.ckpt_manager.checkpoint.restore(self.ckpt_manager.latest_checkpoint)
            status.run_restore_ops() # 關閉動態圖後需要新增這句執行restore操作

訓練模型

  現在我們可以開始訓練模型了,訓練程式碼如下:

max_step = 512
num_envs = 8
height = 84
width = 84
world = 1
stage = 1
action_type = SIMPLE_MOVEMENT
try:
    env = MultipleEnvironments(num_envs, create_env, world, stage, action_type, height, width)
    agent = PPOTrainer(
        env.observation_space.shape, 
        env.action_space.n, 
        train_step=10, 
        lr=1e-4, 
        entropy_coef=0.05, 
        checkpoint_path=f'mario_{world}_{stage}'
    )
    agent.load()
    states = env.reset()
    for epoch in range(1, 201):
        max_pos = 0
        min_pos = np.inf
        for step in range(max_step):
            actions, probs = agent.choose_action(np.stack(states, axis=0))
            next_states, rewards, dones, infos = env.step(actions)
            agent.store(states, actions, probs, rewards, next_states, dones)
            states = next_states
            max_pos = max(max_pos, max([info['x_pos'] for info in infos]))
            min_pos = min(min_pos, min([info['x_pos'] if done else np.inf for info, done in zip(infos, dones)]))
        # clear_output() # jupyter notebook 清屏
        print(f'epoch: {epoch} | max position: {max_pos} | min position: {min_pos}')
        agent.update_model(batch_size=256)
        if epoch % 10 == 0:
            agent.save()
finally:
    env.close()

測試結果

  我目前只試了前8關,其中1-3對模型來說比較難,始終沒能訓練成功,其他7關都能夠抵達終點,只是有時仍會在一些地方掛掉,以下是一些關卡的展示:
1-1
1-2
1-4
2-1
2-2
2-3

2-4

  完整程式碼