DQN玩FlappyBird的核心程式碼和註釋
阿新 • • 發佈:2018-12-05
文章的轉載地址
http://lanbing510.info/2018/07/17/DQN.html
# File: FlappyBirdDQN.py import cv2 import wrapped_flappy_bird as game from BrainDQN_Nature import BrainDQN import numpy as np import sys sys.path.append("game/") # 輔助函式:將80*80大小的影象進行灰度二值化處理 def preprocess(observation): observation = cv2.cvtColor(cv2.resize(observation, (80, 80)), cv2.COLOR_BGR2GRAY) ret, observation = cv2.threshold(observation,1,255,cv2.THRESH_BINARY) return np.reshape(observation,(80,80,1)) # 主函式:初始化DQN和遊戲,並開始遊戲進行訓練 def playFlappyBird(): # Step 1: 初始化BrainDQN actions = 2 brain = BrainDQN(actions) # Step 2: 初始化Flappy Bird遊戲 flappyBird = game.GameState() # Step 3: 開始遊戲 # Step 3.1: 得到初始狀態 action0 = np.array([1,0]) observation0, reward0, terminal = flappyBird.frame_step(action0) observation0 = cv2.cvtColor(cv2.resize(observation0, (80, 80)), cv2.COLOR_BGR2GRAY) ret, observation0 = cv2.threshold(observation0,1,255,cv2.THRESH_BINARY) brain.setInitState(observation0) # Step 3.2: 開始遊戲 while 1!= 0: # 得到一個動作 action = brain.getAction() # 通過遊戲介面得到動作後返回的下一幀影象、回報和終止標誌 nextObservation,reward,terminal = flappyBird.frame_step(action) # 影象灰度二值化處理 nextObservation = preprocess(nextObservation) # 將動作後得到的下一幀影象放入到新狀態newState,然後將新狀態、當前狀態、動作、回報和終止標誌放入都遊戲回放記憶序列 brain.setPerception(nextObservation,action,reward,terminal) def main(): playFlappyBird() if __name__ == '__main__': main()
# File: BrainDQN_NIPS.py import tensorflow as tf import numpy as np import random from collections import deque # 超引數 FRAME_PER_ACTION = 1 GAMMA = 0.99 # decay rate of past observations OBSERVE = 100. # timesteps to observe before training EXPLORE = 150000. # frames over which to anneal epsilon FINAL_EPSILON = 0.0 # final value of epsilon INITIAL_EPSILON = 0.9 # starting value of epsilon REPLAY_MEMORY = 50000 # number of previous transitions to remember BATCH_SIZE = 32 # size of minibatch class BrainDQN: # 初始化函式 def __init__(self,actions): # 初始化回放記憶佇列 self.replayMemory = deque() # 初始化一些引數 self.timeStep = 0 self.epsilon = INITIAL_EPSILON self.actions = actions # 初始化Q網路 self.createQNetwork() # 建立Q深度神經網路 def createQNetwork(self): # 網路權值 W_conv1 = self.weight_variable([8,8,4,32]) b_conv1 = self.bias_variable([32]) W_conv2 = self.weight_variable([4,4,32,64]) b_conv2 = self.bias_variable([64]) W_conv3 = self.weight_variable([3,3,64,64]) b_conv3 = self.bias_variable([64]) W_fc1 = self.weight_variable([1600,512]) b_fc1 = self.bias_variable([512]) W_fc2 = self.weight_variable([512,self.actions]) b_fc2 = self.bias_variable([self.actions]) # 輸入層 self.stateInput = tf.placeholder("float",[None,80,80,4]) # 隱層 h_conv1 = tf.nn.relu(self.conv2d(self.stateInput,W_conv1,4) + b_conv1) h_pool1 = self.max_pool_2x2(h_conv1) h_conv2 = tf.nn.relu(self.conv2d(h_pool1,W_conv2,2) + b_conv2) h_conv3 = tf.nn.relu(self.conv2d(h_conv2,W_conv3,1) + b_conv3) h_conv3_flat = tf.reshape(h_conv3,[-1,1600]) h_fc1 = tf.nn.relu(tf.matmul(h_conv3_flat,W_fc1) + b_fc1) # Q值層 self.QValue = tf.matmul(h_fc1,W_fc2) + b_fc2 # 訓練配置 self.actionInput = tf.placeholder("float",[None,self.actions]) self.yInput = tf.placeholder("float", [None]) Q_action = tf.reduce_sum(tf.mul(self.QValue, self.actionInput), reduction_indices = 1) self.cost = tf.reduce_mean(tf.square(self.yInput - Q_action)) self.trainStep = tf.train.AdamOptimizer(1e-6).minimize(self.cost) # 保持與載入網路 self.saver = tf.train.Saver() self.session = tf.InteractiveSession() self.session.run(tf.initialize_all_variables()) checkpoint = tf.train.get_checkpoint_state("saved_networks") if checkpoint and checkpoint.model_checkpoint_path: self.saver.restore(self.session, checkpoint.model_checkpoint_path) print ("Successfully loaded:", checkpoint.model_checkpoint_path) else: print ("Could not find old network weights") # 訓練Q網路 def trainQNetwork(self): # Step 1: 從回放記憶中隨機抽取小批量資料 minibatch = random.sample(self.replayMemory,BATCH_SIZE) state_batch = [data[0] for data in minibatch] action_batch = [data[1] for data in minibatch] reward_batch = [data[2] for data in minibatch] nextState_batch = [data[3] for data in minibatch] # Step 2: 計算y y_batch = [] QValue_batch = self.QValue.eval(feed_dict={self.stateInput:nextState_batch}) for i in range(0,BATCH_SIZE): terminal = minibatch[i][4] if terminal: y_batch.append(reward_batch[i]) else: y_batch.append(reward_batch[i] + GAMMA * np.max(QValue_batch[i])) # Step 3: 訓練 self.trainStep.run(feed_dict={ self.yInput : y_batch, self.actionInput : action_batch, self.stateInput : state_batch }) # 每10000次迭代儲存一次網路 if self.timeStep % 10000 == 0: self.saver.save(self.session, 'saved_networks/' + 'network' + '-dqn', global_step = self.timeStep) # 更新回放記憶序列,當回放資料足夠時呼叫trainQNetwork進行訓練 def setPerception(self,nextObservation,action,reward,terminal): newState = np.append(self.currentState[:,:,1:],nextObservation,axis = 2) self.replayMemory.append((self.currentState,action,reward,newState,terminal)) if len(self.replayMemory) > REPLAY_MEMORY: self.replayMemory.popleft() if self.timeStep > OBSERVE: self.trainQNetwork() # 訓練網路 self.currentState = newState self.timeStep += 1 # 得到動作 def getAction(self): QValue = self.QValue.eval(feed_dict= {self.stateInput:[self.currentState]})[0] action = np.zeros(self.actions) action_index = 0 if self.timeStep % FRAME_PER_ACTION == 0: if random.random() <= self.epsilon: action_index = random.randrange(self.actions) action[action_index] = 1 else: action_index = np.argmax(QValue) action[action_index] = 1 else: action[0] = 1 if self.epsilon > FINAL_EPSILON and self.timeStep > OBSERVE: self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON)/EXPLORE return action # 設定初始狀態 def setInitState(self,observation): self.currentState = np.stack((observation, observation, observation, observation), axis = 2) # 輔助函式,用於生成網路權值 def weight_variable(self,shape): initial = tf.truncated_normal(shape, stddev = 0.01) return tf.Variable(initial) # 輔助函式,用於生成網路bias def bias_variable(self,shape): initial = tf.constant(0.01, shape = shape) return tf.Variable(initial) # 輔助函式,2D卷積 def conv2d(self,x, W, stride): return tf.nn.conv2d(x, W, strides = [1, stride, stride, 1], padding = "SAME") # 輔助函式,2*2 max pooling def max_pool_2x2(self,x): return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME")
訓練完成後儲存網路,則可以進行遊戲: