1. 程式人生 > >Faster RCNN 的tensorflow版復現

Faster RCNN 的tensorflow版復現

Faster RCNN是一個比較有代表性的two-stage檢測網路,由RPN+Fast RCNN檢測網路構成,RPN網路用於生成region proposal,Fast RCNN網路的輸入為region proposal,輸出為最終的檢測結果。

目前的工作主要是,使用Tensorflow架構對Faster RCNN網路進行復現。

訓練和測試的訓練集是PASCAL VOC資料集,需要讀取PASCAL VOC資料集,並能夠返回下一個batch的 Image,ground truth資訊,程式碼:

import os
import xml.etree.ElementTree as ET
import numpy as np
import cv2
import pickle
import copy
import config as cfg


class pascal_voc(object):
    def __init__(self, phase, rebuild=False):
        self.devkil_path = os.path.join(cfg.PASCAL_PATH, 'VOCdevkit')
        self.data_path = os.path.join(self.devkil_path, 'VOC2007')
        self.cache_path = cfg.CACHE_PATH
        self.batch_size = cfg.BATCH_SIZE
        self.target_size = cfg.target_size
        self.max_size = cfg.max_size
        self.classes = cfg.CLASSES
        self.pixel_means = cfg.pixel_means
        self.class_to_ind = dict(zip(self.classes, range(len(self.classes)))) #構造class字典
        self.flipped = cfg.FLIPPED
        self.phase = phase
        self.rebuild = rebuild
        self.cursor = 0 #遊標
        self.epoch = 1
        self.gt_labels = None
        self.prepare()
    
    
    def image_read(self, imname, flipped=False):
        image = cv2.imread(imname)  #opencv 中預設圖片色彩格式為BRG
        #image = cv2.resize(image, (self.image_size, self.image_size))
        #image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        if flipped:
            image = image[:, ::-1, :]
        return image
    
    def get(self): #在get中完成 self.epoch+1的操作
        #images = np.zeros((self.batch_size, self.image_size, self.image_size, 3))
        #gt_box = np.zeros((self.batch_size, 4), dtype=np.uint16)
        #gt_cls = np.zeros((num_objs), dtype=np.int32)
        count = 0
        images = []
        tf_blob = {}
        assert self.batch_size == 1, "only support single batch" 
        while count < self.batch_size:
            imname = self.gt_labels[self.cursor]['imname']
            flipped = self.gt_labels[self.cursor]['flipped']
            image = self.image_read(imname)
            image, image_scale = self.prep_im_for_blob(image, self.pixel_means, self.target_size, self.max_size)#resize後的image
            image = np.reshape(image, (self.batch_size, image.shape[0], image.shape[1], 3)) #將image 轉化成tensorflow輸入的形式
            gt_box = self.gt_labels[self.cursor]['boxes'] * image_scale #將gt_box sclae與scale相乘 boxes.shape=[num_obj,4]
            gt_cls = self.gt_labels[self.cursor]['gt_classs']
            count += 1
            self.cursor += 1
            if self.cursor >= len(self.gt_labels):
                np.random.shuffle(self.gt_labels)
                self.cursor = 0
                self.epoch += 1
        tf_blob = {'image':image, 'scale':image_scale, 'cls':gt_cls, 'box': gt_box, 'imname': imname}
        return tf_blob #返回的image.shape=[batch,size,size,3] image_scale, gt_box.shape=[num_objs,4]

    

    def prepare(self):
        gt_labels = self.load_labels()
        if self.flipped:
            print('Appending horizontally-flipped training examples ...')
            gt_labels_cp = copy.deepcopy(gt_labels)
            for idx in range(len(gt_labels_cp)):
                gt_labels_cp[idx]['flipped'] = True
                gt_labels_cp[idx]['label'] =\
                    gt_labels_cp[idx]['label'][:, ::-1, :]
                for i in range(self.cell_size):
                    for j in range(self.cell_size):
                        if gt_labels_cp[idx]['label'][i, j, 0] == 1:
                            gt_labels_cp[idx]['label'][i, j, 1] = \
                                self.image_size - 1 -\
                                gt_labels_cp[idx]['label'][i, j, 1]
            gt_labels += gt_labels_cp
        np.random.shuffle(gt_labels)
        self.gt_labels = gt_labels
        return gt_labels

    def load_labels(self):
        cache_file = os.path.join(
            self.cache_path, 'pascal_' + self.phase + '_gt_labels.pkl')

        if os.path.isfile(cache_file) and not self.rebuild:
            print('Loading gt_labels from: ' + cache_file)
            with open(cache_file, 'rb') as f:
                gt_labels = pickle.load(f)  #從.pkl檔案中反序列物件
            return gt_labels

        print('Processing gt_labels from: ' + self.data_path)

        if not os.path.exists(self.cache_path):
            os.makedirs(self.cache_path)

        if self.phase == 'train':
            txtname = os.path.join(
                self.data_path, 'ImageSets', 'Main', 'trainval.txt')
        else:
            txtname = os.path.join(
                self.data_path, 'ImageSets', 'Main', 'test.txt')
        with open(txtname, 'r') as f:
            self.image_index = [x.strip() for x in f.readlines()]

        gt_labels = []
        for index in self.image_index:
            gt_label = self.load_pascal_annotation(index) #groundtruth_roidb 包括objet box座標資訊 以及類別資訊(轉換成dict後的)
            filp = {'flipped':False}
            gt_label.update(filp)
            gt_labels.append(gt_label)
        print('Saving gt_labels to: ' + cache_file)
        with open(cache_file, 'wb') as f:
            pickle.dump(gt_labels, f)
        return gt_labels

    def load_pascal_annotation(self, index):
        """
        Load image and bounding boxes info from XML file in the PASCAL VOC
        format.
        """

        filename = os.path.join(self.data_path, 'Annotations', index + '.xml')
        tree = ET.parse(filename)
        objs = tree.findall('object')
        num_objs = len(objs) #object的數量
        boxes = np.zeros((num_objs, 4), dtype=np.uint16) #boxes 座標 (num_objs,4)個 dtype=np.uint16
        gt_classes = np.zeros((num_objs), dtype=np.int32) #class 的數量num_objs個 dtype=np.int32 應該是groundtruth中讀到的class
        
        for ix, obj in enumerate(objs):
            bbox = obj.find('bndbox')
            # Make pixel indexes 0-based
            x1 = float(bbox.find('xmin').text) - 1
            y1 = float(bbox.find('ymin').text) - 1
            x2 = float(bbox.find('xmax').text) - 1
            y2 = float(bbox.find('ymax').text) - 1
            cls = self.class_to_ind[obj.find('name').text.lower().strip()] #找到class對應的類別資訊
            boxes[ix, :] = [x1, y1, x2, y2] #注意boxes是一個np類的矩陣 大小為[num_objs,4]
            gt_classes[ix] = cls #將class資訊存入gt_classses中,注意gt_classes也是一個np類的矩陣 大小為[num_objs] 是int值 對應於name
            imname = os.path.join(self.data_path, 'JPEGImages', index + '.jpg')
        return {'boxes':boxes, 'gt_classs':gt_classes, 'imname':imname}
    
    def prep_im_for_blob(self, im, pixel_means, target_size, max_size): #傳入image 背景 600 1000
            im = im.astype(np.float32, copy=False)
            #im -= pixel_means #去掉背景
            im_shape = im.shape
            im_size_min = np.min(im_shape[0:2])
            im_size_max = np.max(im_shape[0:2])
            im_scale = float(target_size) / float(im_size_min) #600/最短邊
            # Prevent the biggest axis from being more than MAX_SIZE
            if np.round(im_scale * im_size_max) > max_size:
                im_scale = float(max_size) / float(im_size_max)
            im = cv2.resize(im, None, None, fx=im_scale, fy=im_scale,interpolation=cv2.INTER_LINEAR)
            return im, im_scale #返回im 和 im_scale

if __name__ == '__main__':
    pascal = pascal_voc('train')
    tf_blob = pascal.get()
    print ('the gt_boxes is ', tf_blob['box'])
    print ('the imname is ', tf_blob['imname'])
    print ('the image size is', tf_blob['image'].shape[1], '*', tf_blob['image'].shape[2])

通過建立一個pascal_voc的類,在類初始化的過程中,從快取資料或者pascal_voc的資料中讀取資料,通過get()類方法得到下一個batch的tf_blob,tf_blob包含image資料,ground_truth_box資料,以及圖片的名稱imname。並且資料集是經過resize處理的,處理後圖像最短邊的邊長為600,最長不超過1000。

特徵提取網路採用經典的vgg16網路,程式碼:

# -*- coding: utf-8 -*-
"""
Created on Thu Oct 18 18:12:44 2018

@author: LongJun
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
slim = tf.contrib.slim

#vgg16網路 輸入圖片 返回feature_map
def vgg16(inputs):
    with tf.variable_scope('vgg_16') :
        with slim.arg_scope([slim.conv2d, slim.fully_connected],
                             activation_fn=tf.nn.relu,\
                             weights_initializer=tf.truncated_normal_initializer(stddev=0.01),\
                             weights_regularizer=slim.l2_regularizer(0.0005)):
             net = slim.repeat(inputs, 2, slim.conv2d, 64, [3, 3], trainable=False, scope='conv1')
             net = slim.max_pool2d(net, [2, 2], padding='SAME', scope='pool1')
             net = slim.repeat(net, 2, slim.conv2d, 128, [3, 3], trainable=False, scope='conv2')
             net = slim.max_pool2d(net, [2, 2], padding='SAME', scope='pool2')
             net = slim.repeat(net, 3, slim.conv2d, 256, [3, 3], scope='conv3')
             net = slim.max_pool2d(net, [2, 2], padding='SAME', scope='pool3')
             net = slim.repeat(net, 3, slim.conv2d, 512, [3, 3], scope='conv4')
             net = slim.max_pool2d(net, [2, 2], padding='SAME', scope='pool4')
             net = slim.repeat(net, 3, slim.conv2d, 512, [3, 3], scope='conv5')
             #net = slim.max_pool2d(net, [2, 2], padding='SAME', scope='pool5')
             return net

#rpn網路 輸入feature_map,返回rois的cls和bbox
def rpn_net(input_feature_map,num_anchor):
    with tf.variable_scope('rpn') :
        with slim.arg_scope([slim.conv2d, slim.fully_connected],
                             activation_fn=tf.nn.relu,\
                             weights_initializer=tf.truncated_normal_initializer(stddev=0.01),\
                             weights_regularizer=slim.l2_regularizer(0.0005)):
             rpn_feature = slim.conv2d(input_feature_map, 256, [3,3], scope='conv6')
             rois_cls = slim.conv2d(rpn_feature, 2*num_anchor, [1,1], scope='conv7')
             rois_reg = slim.conv2d(rpn_feature, 4*num_anchor, [1,1], scope='conv8')
             return rois_cls, rois_reg
       
#獲取需要restore 變數scope的名稱
def get_var_list():
    var_list = []
    for i in range(1,5):
        var = 'vgg_16/conv%d'%i
        var_list.append(var)
    return var_list

#程式碼測試
if __name__ == '__main__':
    with tf.Session() as sess:
        model_path = 'vgg_16.ckpt'
        input_images = tf.placeholder(tf.float32, [None, 600, 600, 3])
        feature_map = vgg16(input_images)
        var_list = get_var_list()
        variables_to_restore = slim.get_variables_to_restore(include=var_list)
        #print (variables_to_restore)
        init = tf.global_variables_initializer()
        sess.run(init)
        saver = tf.train.Saver(var_list=variables_to_restore)
        saver.restore(sess, model_path)
        print (sess.run('vgg_16/conv4/conv4_2/biases:0'))
        #print (tf.contrib.framework.list_variables(model_path))

程式碼主要完成了vgg16網路架構的構建,以前使用pretrained model對網路進行過載。vgg pretrained 權重下載地址

Faster RCNN會在featrue map(w*h)上生成k個anchors,anchors在訓練網路之前要提前生成好。程式碼:

# -*- coding: utf-8 -*-
"""
Created on Fri Oct 19 22:18:37 2018

@author: LongJun
"""

import numpy as np
import tensorflow as tf
#generate_anchors: 用於生成anchor的座標
#輸入:
#anchor_scales為anchor的3個長度值 [128,256,512]
#anchor_ratios為anchor的3種比例 [0.5,1,2]
#anchor_bias_x_ctr, anchor_bias_y_ctr為特徵圖譜上最左邊上點的中心座標在原影象上的對映座標 vgg16網路為(8,8)
#輸出:
#anchor_conner 每個特徵圖譜上都有w*h*k個anchor w為特徵圖譜的寬, h為特徵圖譜的高 k為每個位置上anchor的數量
#輸出這w*h*k個anchor的左上角和右下角座標  (x1,y1,x2,y2) anchor_conner.shape=(w*h,k,4)
def generate_anchors(anchor_scales=[128,256,512], anchor_ratios=[0.5,1,2], anchor_bias_x_ctr=8, anchor_bias_y_ctr=8):
    anchor_width = np.array(anchor_scales)
    anchor_length = np.array(anchor_scales)
    anchor_ratios = np.array(anchor_ratios)
    bias_x_ctr = anchor_bias_x_ctr
    bias_y_ctr = anchor_bias_y_ctr
    anchor_scales = np.stack((anchor_width, anchor_length), axis=-1)
    anchor_size = ratios_process(anchor_scales, anchor_ratios)
    anchor_conner = generate_anchors_conner(anchor_size, bias_x_ctr, bias_y_ctr)
    return anchor_conner


#ratios_process: 生成基準的k個anchor座標 特徵圖譜上最左上角的 k個anchor座標
#輸入:
#anchor_scales:[128,256,512] anchor_ratios:[0.5,1,2]
#輸出:
#anchors 基準anchor的座標  (x1,y1,x2)
def ratios_process(anchor_scales, anchor_ratios):
    anchor_area = anchor_scales[:,0] * anchor_scales[:,1]
    anchors = np.vstack([get_anchor_size(anchor_area[i], anchor_ratios) for i in range(anchor_area.shape[0])])
    return anchors
    
def get_anchor_size(anchor_area, anchor_ratios):
    width = np.round(np.sqrt(anchor_area/anchor_ratios))
    length = width * anchor_ratios
    anchors = np.stack((width, length), axis=-1)
    return anchors

def generate_anchors_conner(anchor_size, x_ctr, y_ctr):
    width = anchor_size[:,0]
    length = anchor_size[:,1]
    x1 = np.round(x_ctr - 0.5*width)
    y1 = np.round(y_ctr -0.5*length)
    x2 = np.round(x_ctr + 0.5*width)    
    y2 = np.round(y_ctr +0.5*length)
    conners = np.stack((x1, y1, x2, y2), axis=-1)
    #print (conners)
    return conners

def all_anchor_conner(image_width, image_height, stride=16):
    bias_anchor_conner = generate_anchors()
    #print (bias_anchor_conner.shape)
    stride = np.float32(stride)
    #return 0
    dmap_width = tf.to_int32(tf.ceil(image_width/stride))
    dmap_height = tf.to_int32(tf.ceil(image_height/stride))
    
    total_pos = dmap_height*dmap_width
    offset_x = tf.range(dmap_width) * stride
    offset_y = tf.range(dmap_height) * stride
    x,y = tf.meshgrid(offset_x,offset_y)
    x = tf.reshape(x, [-1])
    y = tf.reshape(y, [-1])
    coordinate = tf.stack((x, y, x, y), axis=-1)
    coordinate = tf.reshape(coordinate, [total_pos,1,4])
    #coordinate = tf.reshape(coordinate, [total_pos,4])
   # print (coordinate.shape)
    all_anchor_conner = coordinate + bias_anchor_conner
    all_anchor_conner = tf.reshape(all_anchor_conner, [-1,4])
    return all_anchor_conner
if __name__ == '__main__':
   # all_anchor_conner()
    a = np.array(600)
    b = np.array(800)
    image_width = tf.placeholder(tf.int32)
    image_height = tf.placeholder(tf.int32)
    with tf.Session() as sess:
        conners = all_anchor_conner(image_width, image_height, stride=16)
        feed_dict = {image_width:a, image_height:b}
        conners = sess.run(conners, feed_dict=feed_dict)
        #print (conners.shape)
    #print (np.array([128,256,512])*np.array([0.5,1,2]))

呼叫all_anchor_conner()函式可以生成w*h*k個anhors,返回的是這些anchor的左上角和右下角的座標(x1,y1,x2,y2),所有anchor在一個np.array中,shape= [num_anchors,4]

在生成所有的anchors後,要根據anchors與ground_truth_box的IOU值,將anchors分為positive和negative兩類,同時對anchors進行篩選 選取256個anchors(positive anchor和negative anchor的比例為1:1)用於rpn網路的訓練。目前已經完成anchors的分類,程式碼:

# -*- coding: utf-8 -*-
"""
Created on Thu Oct 25 09:53:41 2018

@author: LongJun
"""
#import tensorflow as tf
import numpy as np
def calculate_IOU (gt_boxes, target_boxes):  #gt_boxes[num_obj,4] targer_boxes[w*h,k,4]
    num_gt = gt_boxes.shape[0] 
    num_tr = target_boxes.shape[0]
    IOU_s = np.zeros((num_gt,num_tr), dtype=np.float)
    for ix in range(num_gt):
        gt_area = (gt_boxes[ix,2]-gt_boxes[ix,0]) * (gt_boxes[ix,3]-gt_boxes[ix,1])
        #print (gt_area)
        for iy in range(num_tr):
            iw = min(gt_boxes[ix,2],target_boxes[iy,2]) - max(gt_boxes[ix,0],target_boxes[iy,0])
            #print (iw)
            if iw > 0:
                ih = min(gt_boxes[ix,3],target_boxes[iy,3]) - max(gt_boxes[ix,1],target_boxes[iy,1])
                #print (ih)
                if ih > 0:
                    tar_area = (target_boxes[iy,2]-target_boxes[iy,0]) * (target_boxes[iy,3]-target_boxes[iy,1])
                    #print (tar_area)
                    i_area = iw * ih
                    iou = i_area/float((gt_area+tar_area-i_area))
                    IOU_s[ix,iy] = iou
    return IOU_s
def lables_generate (gt_boxes, target_boxes, overlaps_pos, overlaps_neg):
    labels = np.empty((target_boxes.shape[0],), dtype=np.float32)
    labels.fill(-1)
    IOUs = calculate_IOU(gt_boxes, target_boxes)
    max_arg = np.argmax(IOUs, axis=1)
    labels[max_arg] = 1
    pos_arg = np.where(IOUs>overlaps_pos)
if __name__ == '__main__':
    IOUs = calculate_IOU(np.array([[221,66,675,589]]), np.array([[168,72,680,584]]))
    print (IOUs)

接下來的工作是完成anchors的篩選,已經網路訓練部分的編寫,在完成這些工作後會再次更新部落格