1. 程式人生 > 其它 >影象識別:基於RetNet網路的cifar-10資料集影象識別分析與引數對比

影象識別:基於RetNet網路的cifar-10資料集影象識別分析與引數對比

技術標籤:影象處理NN技巧

1 影象識別的背景

作為人工智慧與計算機視覺的重要組成部分,影象識別技術是目標檢測、影象語義分割等技術的基礎,因此,影象識別的準確率往往決定了很多其他計算機視覺技術的效果。一般來說,影象識別任務主要包括對於影象的分類;但是準確的來說,計算機本身不具有理解影象的能力,影象識別就是為了讓計算機有和人類一樣對影象理解的能力,包括影象表示的內容,影象中物體之間的關係等要素。
影象識別是一項比較傳統的技術,從一開始的模板匹配法、霍夫變換(Hough Transform),到發展了一定時期後產生的基於貝葉斯模型和馬爾科夫隨機場等影象統計模型的方法,再到了目前應用最為廣泛的各種深度學習方法。隨著各類卷積神經網路的逐漸應用,影象識別任務的效果也逐漸獲得了大幅度的提升。

在我看來,使用神經網路進行影象識別任務的關鍵在於影象特徵的充分提取。在傳統的影象處理手段中,往往會對彩色影象進行二值化,這樣其實就丟失了很多影象的相關資訊,導致特徵提取不完全;而通過3D卷積核的卷積運算,卻可以做到對於RGB彩色特徵的充分利用。通過調研論文以及查閱相關資料,總結目前主流的一些卷積神經網路的模型如表所示。

改進方面主要模型
卷積核的空間特徵提取能力LeNet、AlexNet、ZefNet、VGG-16、GoogleNet
提升網路深度ResNet、Inception-V3/V4
多路徑網路DenseNet、ResNet
橫向多連線WideResNet、Xception、PyramidalNet
卷積運算與feature mapGabor CNN、Squeeze and Excitation
注意力機制Residual Attention NN、Convolutional Block Attention

一直以來對於殘差塊這一結構十分感興趣,因此以ResNet為研究物件,並通過cifar-10資料集對其進行訓練、測試和分析。

2 所用ResNet的模型結構

ResNet網路是參考了VGG19網路,在其基礎上進行了修改,並加入了殘差單元。相比於一些增加網路層數和進行模型組合的方法,ResNet可以說是對於網路結構一種極大的創新,它通過增加殘差連線這一結構,解決了深層網路隨著網路層數加深,網路會出現由於梯度彌散和梯度爆炸而導致深層次網路模型的效果反而變差。神經網路的每一層分別對應於提取不同層次的特徵資訊,有低層,中層和高層,而網路越深的時候,提取到的不同層次的資訊會越多,而不同層次間的層次資訊的組合也會越多。ResNet中主要採用殘差塊來實現殘差網路的殘差連線,如圖1所示為ResNet所採用的主要的兩種殘差結構。

在這裡插入圖片描述
ResNet中的殘差塊主要有兩種結構,分別是basicblock與bottleblock,兩者最大的區別就在於basicblock採用了跨越了兩層結構而bottleblock跨越了三層,並且bottleblock利用了尺寸1*1的卷積核進行簡化計算以及改變輸出的維度。通過這兩種殘差連線手段,使得上一層網路的張量計算值可以直接“跳傳”到下下層網路中去,而不必非要通過中間網路層讓張量發生改變。在我個人理解下,通過這種方式,ResNet模型通過訓練最差可以至少保證深層次網路具有和淺層次網路一樣的效果,並且其提供了一種低層特徵直接與高層特徵連結的渠道,增加了網路的可擬合的範圍。
為了更好地實際體驗ResNet對於深層次網路的改進效果,使用了不同層數的ResNet網路對cifar-10資料集進行了訓練和測試。

模型結構網路層數引數總數量
ResNet20200.27M
ResNet32320.46M
ResNet44440.66M
ResNet56560.85M
ResNet1101101.7M
ResNet1202120219.4M

3 實驗設定與結果

由於cifar10資料集不是很大,並且資料集中每張圖片的尺寸都是32*32的小尺寸,因此訓練對於硬體的要求不高。但是由於ResNet110、ResNet1202模型層數過多,筆記本無法支援對其充分的訓練,因此自己訓練了剩下的四種ResNet模型。由於計算資源有限(Epoch均取得50),每個模型可能訓練的不是特別充分,訓練時的超引數設定如表所示。

超引數
epoch50
batch-size128
學習率(α)從0.1開始衰減
優化演算法SGD
weight-decay0.0001
Momentum(β)0.95
網路結構ResNet20、ResNet32、ResNet44、ResNet56

將cifar資料集按照5:1的比例進行訓練集和測試集的劃分,同時從網上找到了ResNet相關的已經訓練好的模型作為對比,並且也與論文中的結果進行了對比,測試集上的結果如表所示。

模型結構自己訓練的網上的論文中的
ResNet2091.05%91.73%91.25%
ResNet3292.49%92.63%92.49%
ResNet4492.57%93.10%92.83%
ResNet5692.76%93.39%93.03%
ResNet110-93.68%93.57%
ResNet1202-93.82%92.07%

部分測試集影象展示,使用這個測試不同引數的網路,結果如下表:
在這裡插入圖片描述

預測結果
真實值Cat ship ship plane frog frog fish frog
ResNet20Cat ship car plane frog frog fish bird
ResNet32Cat ship ship plane frog dog fish frog
ResNet44Cat ship ship plane frog frog fish frog
ResNet56Cat ship ship plane frog frog fish bird
ResNet110Cat ship ship plane frog frog fish frog
ResNet1202Cat ship ship ship frog frog fish frog

4 結果分析與深入討論

首先,在訓練過程中發現,ResNet網路出乎意料的收斂性好,不管是隨機正態分佈初始化還是使用finetuning方法,訓練過程模型都具有很好的收斂性。本次實驗限於計算資源沒有對於原始ResNet網路新增過多tricks進行對比,只是相比於原始模型增加了學習率衰減,以提升模型前期的訓練速度。在嘗試使用Adam作為優化器時,遇到了訓練速度大幅減慢的問題,因此沒有采用。在後續,感覺還可以從批歸一化的角度對模型進行進一步的改進,並且可以考慮將訓練資料進行增強來提升訓練效果。
其次,在表4中可以發現,隨著模型深度的增加,模型的準確率逐步提高,這也體現了殘差網路的深度優勢。無論是自己訓練的模型,還是網路上和論文裡的模型,模型效果都是隨著網路層次的加深而提高的。因此,使用殘差塊的模型在層次深度上和模型效果是正相關的,這一點與之前介紹的殘差原理也一致。
最後,通過圖2和表5的結果可以發現,淺層網路更容易出現誤判的問題,而深層網路則較少。同時,從實際影象中可以發現,深層次的ResNet對於測試影象錯誤的判斷風格也和淺層的有區別。作為最深層次的網路,ResNet1202還是出現了錯誤,並且其出錯的圖片在其他模型中都預測正確了。從這可以看出,一方面是由於cifar資料集影象太小了,不能充分體現深層網路的特徵提取優勢;另一方面也體現出了深層次ResNet提取的特徵更加複雜,與淺層次ResNet是不同的。

5 程式碼(參考的github)

模型結構:

'''
Properly implemented ResNet-s for CIFAR10 as described in paper [1].

The implementation and structure of this file is hugely influenced by [2]
which is implemented for ImageNet and doesn't have option A for identity.
Moreover, most of the implementations on the web is copy-paste from
torchvision's resnet and has wrong number of params.

Proper ResNet-s for CIFAR10 (for fair comparision and etc.) has following
number of layers and parameters:

name      | layers | params
ResNet20  |    20  | 0.27M
ResNet32  |    32  | 0.46M
ResNet44  |    44  | 0.66M
ResNet56  |    56  | 0.85M
ResNet110 |   110  |  1.7M
ResNet1202|  1202  | 19.4m

which this implementation indeed has.

Reference:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
    Deep Residual Learning for Image Recognition. arXiv:1512.03385
[2] https://github.com/pytorch/vision/blob/master/torchvision/models/resnet.py

If you use this implementation in you work, please don't forget to mention the
author, Yerlan Idelbayev.
'''


import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init

from torch.autograd import Variable

__all__ = ['ResNet', 'resnet20', 'resnet32', 'resnet44', 'resnet56', 'resnet110', 'resnet1202']

def _weights_init(m):
    classname = m.__class__.__name__
    #print(classname)
    if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
        init.kaiming_normal_(m.weight)

class LambdaLayer(nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer, self).__init__()
        self.lambd = lambd

    def forward(self, x):
        return self.lambd(x)


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1, option='A'):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes:
            if option == 'A':
                """
                For CIFAR10 ResNet paper uses option A.
                """
                self.shortcut = LambdaLayer(lambda x:
                                            F.pad(x[:, :, ::2, ::2], (0, 0, 0, 0, planes//4, planes//4), "constant", 0))
            elif option == 'B':
                self.shortcut = nn.Sequential(
                     nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                     nn.BatchNorm2d(self.expansion * planes)
                )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 16

        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
        self.linear = nn.Linear(64, num_classes)

        self.apply(_weights_init)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion

        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, out.size()[3])
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def resnet20():
    return ResNet(BasicBlock, [3, 3, 3])


def resnet32():
    return ResNet(BasicBlock, [5, 5, 5])


def resnet44():
    return ResNet(BasicBlock, [7, 7, 7])


def resnet56():
    return ResNet(BasicBlock, [9, 9, 9])


def resnet110():
    return ResNet(BasicBlock, [18, 18, 18])


def resnet1202():
    return ResNet(BasicBlock, [200, 200, 200])


def test(net):
    import numpy as np
    total_params = 0

    for x in filter(lambda p: p.requires_grad, net.parameters()):
        total_params += np.prod(x.data.numpy().shape)
    print("Total number of params", total_params)
    print("Total layers", len(list(filter(lambda p: p.requires_grad and len(p.data.size())>1, net.parameters()))))


if __name__ == "__main__":
    for net_name in __all__:
        if net_name.startswith('resnet'):
            print(net_name)
            test(globals()[net_name]())
            print()

資料訓練:

import argparse
import os
import shutil
import time
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim
import torch.utils.data
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision
import resnet

model_names = sorted(name for name in resnet.__dict__
    if name.islower() and not name.startswith("__")
                     and name.startswith("resnet")
                     and callable(resnet.__dict__[name]))

# print(model_names)

parser = argparse.ArgumentParser(description='Propert ResNets for CIFAR10 in pytorch')
parser.add_argument('--arch', '-a', metavar='ARCH', default='resnet32',
                    choices=model_names,
                    help='model architecture: ' + ' | '.join(model_names) +
                    ' (default: resnet32)')
parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',
                    help='number of data loading workers (default: 4)')
parser.add_argument('--epochs', default=200, type=int, metavar='N',
                    help='number of total epochs to run')
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
                    help='manual epoch number (useful on restarts)')
parser.add_argument('-b', '--batch-size', default=128, type=int,
                    metavar='N', help='mini-batch size (default: 128)')
parser.add_argument('--lr', '--learning-rate', default=0.1, type=float,
                    metavar='LR', help='initial learning rate')
parser.add_argument('--momentum', default=0.95, type=float, metavar='M',
                    help='momentum')
parser.add_argument('--weight-decay', '--wd', default=1e-4, type=float,
                    metavar='W', help='weight decay (default: 5e-4)')
parser.add_argument('--print-freq', '-p', default=40, type=int,
                    metavar='N', help='print frequency (default: 20)')
parser.add_argument('--resume', default='', type=str, metavar='PATH',
                    help='path to latest checkpoint (default: none)')
parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true',
                    help='evaluate model on validation set')
parser.add_argument('--pretrained', dest='pretrained', action='store_true',
                    help='use pre-trained model')
parser.add_argument('--lookit', dest='lookit', action='store_true',
                    help='look the performance of model in images, need resume first')
parser.add_argument('--half', dest='half', action='store_true',
                    help='use half-precision(16-bit) ')
parser.add_argument('--save-dir', dest='save_dir',
                    help='The directory used to save the trained models',
                    default='save_temp', type=str)
parser.add_argument('--save-every', dest='save_every',
                    help='Saves checkpoints at every specified number of epochs',
                    type=int, default=10)
best_prec1 = 0


def main():
    global args, best_prec1
    args = parser.parse_args()

    # Check the save_dir exists or not
    if not os.path.exists(args.save_dir):
        os.makedirs(args.save_dir)
    if args.pretrained:
        print("使用finetuning模型")
        model = torch.nn.DataParallel(resnet.__dict__[args.arch]())
        model.load_state_dict(torch.load("finetuning/resnet32_finetuning.th")['state_dict'])

    else:
        model = torch.nn.DataParallel(resnet.__dict__[args.arch]())
    model.cuda()

    # optionally resume from a checkpoint
    if args.resume:
        print("檢查點路徑:", args.resume)
        if os.path.isfile(args.resume):
            print("=> loading checkpoint '{}'".format(args.resume))
            checkpoint = torch.load(args.resume)
            # args.start_epoch = checkpoint['epoch']
            best_prec1 = checkpoint['best_prec1']
            model.load_state_dict(checkpoint['state_dict'])
            # print("=> loaded checkpoint '{}' (epoch {})"
            #       .format(args.evaluate, checkpoint['epoch']))
        else:
            print("=> no checkpoint found at '{}'".format(args.resume))

    cudnn.benchmark = True

    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])

    train_loader = torch.utils.data.DataLoader(
        datasets.CIFAR10(root='./data', train=True, transform=transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomCrop(32, 4),
            transforms.ToTensor(),
            normalize,
        ]), download=True),
        batch_size=args.batch_size, shuffle=True,
        num_workers=args.workers, pin_memory=True)

    val_loader = torch.utils.data.DataLoader(
        datasets.CIFAR10(root='./data', train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            normalize,
        ])),
        batch_size=128, shuffle=False,
        num_workers=args.workers, pin_memory=True)



    # define loss function (criterion) and pptimizer
    criterion = nn.CrossEntropyLoss().cuda()

    if args.half:
        model.half()
        criterion.half()

    optimizer = torch.optim.SGD(model.parameters(), args.lr,
                                momentum=args.momentum,
                                weight_decay=args.weight_decay)

    lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer,
                                                        milestones=[100, 150], last_epoch=args.start_epoch - 1)

    if args.arch in ['resnet1202', 'resnet110']:
        # for resnet1202 original paper uses lr=0.01 for first 400 minibatches for warm-up
        # then switch back. In this setup it will correspond for first epoch.
        for param_group in optimizer.param_groups:
            param_group['lr'] = args.lr*0.1

    # 觀測已訓練好模型的表現,從單張圖片視覺化角度
    if args.lookit:
        test_loader = torch.utils.data.DataLoader(
            datasets.CIFAR10(root='./data', train=False, transform=transforms.Compose([
                transforms.ToTensor(),
                normalize,
            ])),
            batch_size=8, shuffle=False,
            num_workers=args.workers, pin_memory=True)
        dataiter = iter(test_loader)

        images, labels = dataiter.next()
        images = images.cuda()
        outputs = model(images).cpu()
        print("out:",outputs)
        _, predicted = torch.max(outputs, 1)
        print("predicted:", predicted)
        print("labels", labels)
        # print images
        classes = ('plane', 'car', 'bird', 'cat',
                   'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
        images = images.cpu()
        imshow(torchvision.utils.make_grid(images), "true")
        print('GroundTruth: ', ' '.join('%5s' % predicted[labels[j]] for j in range(8)))

        return

    # 觀測已訓練好模型的表現,從測試集準確率角度
    if args.evaluate:
        # 傳入三個引數, 驗證集合、模型以及損失函式
        validate(val_loader, model, criterion)
        return
    # print("程式執行還行:")
    # return
    # 下面是是開始訓練過程
    for epoch in range(args.start_epoch, args.epochs):

        # train for one epoch
        print('current lr {:.5e}'.format(optimizer.param_groups[0]['lr']))
        train(train_loader, model, criterion, optimizer, epoch)
        lr_scheduler.step()

        # evaluate on validation set
        prec1 = validate(val_loader, model, criterion)

        # remember best [email protected] and save checkpoint
        is_best = prec1 > best_prec1
        best_prec1 = max(prec1, best_prec1)

        if epoch > 0 and epoch % args.save_every == 0:
            save_checkpoint({
                'epoch': epoch + 1,
                'state_dict': model.state_dict(),
                'best_prec1': best_prec1,
            }, is_best, filename=os.path.join(args.save_dir, 'checkpoint.th'))

        save_checkpoint({
            'state_dict': model.state_dict(),
            'best_prec1': best_prec1,
        }, is_best, filename=os.path.join(args.save_dir, 'model.th'))


def train(train_loader, model, criterion, optimizer, epoch):
    """
        Run one train epoch
    """
    batch_time = AverageMeter()
    data_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()

    # switch to train mode
    model.train()

    end = time.time()
    for i, (input, target) in enumerate(train_loader):

        # measure data loading time
        data_time.update(time.time() - end)

        target = target.cuda()
        input_var = input.cuda()
        target_var = target
        if args.half:
            input_var = input_var.half()

        # compute output
        output = model(input_var)
        loss = criterion(output, target_var)

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        output = output.float()
        loss = loss.float()
        # measure accuracy and record loss
        prec1 = accuracy(output.data, target)[0]
        losses.update(loss.item(), input.size(0))
        top1.update(prec1.item(), input.size(0))

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        if i % args.print_freq == 0:
            print('Epoch: [{0}][{1}/{2}]\t'
                  'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
                  'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
                  'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                  '[email protected] {top1.val:.3f} ({top1.avg:.3f})'.format(
                      epoch, i, len(train_loader), batch_time=batch_time,
                      data_time=data_time, loss=losses, top1=top1))


def validate(val_loader, model, criterion):
    """
    Run evaluation
    """
    batch_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()

    # switch to evaluate mode
    model.eval()

    end = time.time()
    with torch.no_grad():
        # 這裡i列舉的範圍是0-79
        for i, (input, target) in enumerate(val_loader):
            target = target.cuda()
            input_var = input.cuda()
            target_var = target.cuda()

            if args.half:
                input_var = input_var.half()

            # compute output
            output = model(input_var)
            loss = criterion(output, target_var)

            output = output.float()
            loss = loss.float()

            # measure accuracy and record loss
            prec1 = accuracy(output.data, target)[0]
            losses.update(loss.item(), input.size(0))
            top1.update(prec1.item(), input.size(0))

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

            if i % args.print_freq == 0:
                print('Test: [{0}/{1}]\t'
                      'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
                      '損失函式Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                      '部分測試集準確率: {top1.val:.3f} ({top1.avg:.3f})'.format(
                          i, len(val_loader), batch_time=batch_time, loss=losses,
                          top1=top1))

    print('全體測試集準確率:{top1.avg:.3f}'
          .format(top1=top1))

    return top1.avg

def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
    """
    Save the training model
    """
    torch.save(state, filename)

class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


def accuracy(output, target, topk=(1,)):
    """Computes the [email protected] for the specified values of k"""
    maxk = max(topk)
    batch_size = target.size(0)

    _, pred = output.topk(maxk, 1, True, True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []
    for k in topk:
        correct_k = correct[:k].view(-1).float().sum(0)
        res.append(correct_k.mul_(100.0 / batch_size))
    return res


def imshow(img, name):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.savefig(".\\plotfigure\\" + str(name) + ".png", dpi=300)




if __name__ == '__main__':
    main()

測試集執行指令碼:


import os

# 用於直接遍歷資料夾並返回資料夾的檔案相對路徑列表,類似於os.walk
def DFS_file_search(dict_name):
    # list.pop() list.append()這兩個方法就可以實現棧維護功能
    stack = []
    result_txt = []
    stack.append(dict_name)
    while len(stack) != 0:  # 棧空代表所有目錄均已完成訪問
        temp_name = stack.pop()
        try:
            temp_name2 = os.listdir(temp_name) # list ["","",...]
            for eve in temp_name2:
                stack.append(temp_name + "\\" + eve)  # 維持絕對路徑的表達
        except NotADirectoryError:
            result_txt.append(temp_name)
    return result_txt

paths = DFS_file_search("pretrained_models")
for path in paths:
    model_selected = path.split("\\")[-1].split(".")[0]
    os.system('python trainer.py --resume={} -e --evaluate --arch={}'.format(path, model_selected))